Skip to content

Commit

Permalink
node-SDK: add multi-callback registrations
Browse files Browse the repository at this point in the history
This patch allows the sdk to handle multiple registrations
on the same event criteria. It also enables registrations for
all events for a particular chaincode.

Change-Id: I2caa60e0080a8b41e9a8bf5d61ae671eaf09814a
Signed-off-by: Patrick Mullaney <pm.mullaney@gmail.com>
  • Loading branch information
pmullaney committed Sep 27, 2016
1 parent f28d3d6 commit cc31c23
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions sdk/node/src/hfc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2842,15 +2842,16 @@ export function newFileKeyValStore(dir:string):KeyValStore {
/**
* The ChainCodeCBE is used internal to the EventHub to hold chaincode event registration callbacks.
*/
class ChainCodeCBE {
export class ChainCodeCBE {
// chaincode id
ccid: string;
eventname: string;
payload: Uint8Array;
// event name regex filter
eventNameFilter: RegExp;
// callback function to invoke on successful filter match
cb: Function;
constructor(ccid: string,eventname: string,payload: Uint8Array, cb: Function) {
constructor(ccid: string, eventNameFilter: string, cb: Function) {
this.ccid = ccid;
this.eventname = eventname;
this.payload = payload;
this.eventNameFilter = new RegExp(eventNameFilter);
this.cb = cb;
}
}
Expand Down Expand Up @@ -2879,7 +2880,7 @@ export class EventHub {
this.chaincodeRegistrants = new HashTable();
this.blockRegistrants = new Set();
this.txRegistrants = new HashTable();
this.peeraddr = "localhost:7053";
this.peeraddr = null;
this.connected = false;
}

Expand All @@ -2893,6 +2894,7 @@ export class EventHub {

public connect() {
if (this.connected) return;
if (!this.peeraddr) throw Error("Must set peer address before connecting.");
this.events = grpc.load(__dirname + "/protos/events.proto" ).protos;
this.client = new this.events.Events(this.peeraddr,grpc.credentials.createInsecure());
this.call = this.client.chat();
Expand All @@ -2902,11 +2904,15 @@ export class EventHub {
let eh = this; // for callback context
this.call.on('data', function(event) {
if ( event.Event == "chaincodeEvent" ) {
var cbe = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID + "/" + event.chaincodeEvent.eventName);
if ( cbe ) {
cbe.payload = event.chaincodeEvent.payload;
cbe.cb(cbe);
}
var cbtable = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID);
if( !cbtable ) {
return;
}
cbtable.forEach(function (cbe) {
if ( cbe.eventNameFilter.test(event.chaincodeEvent.eventName)) {
cbe.cb(event.chaincodeEvent);
}
});
} else if ( event.Event == "block") {
eh.blockRegistrants.forEach(function(cb){
cb(event.block);
Expand All @@ -2928,19 +2934,35 @@ export class EventHub {
this.connected = false;
}

public registerChaincodeEvent(ccid: string, eventname: string, callback: Function){
public registerChaincodeEvent(ccid: string, eventname: string, callback: Function): ChainCodeCBE {
if (!this.connected) return;
let cb = new ChainCodeCBE(ccid, eventname, null, callback);
let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: eventname }} ] }};
this.chaincodeRegistrants.put(ccid + "/" + eventname, cb);
this.call.write(register);
let cb = new ChainCodeCBE(ccid, eventname, callback);
let cbtable = this.chaincodeRegistrants.get(ccid);
if ( !cbtable ) {
cbtable = new Set();
this.chaincodeRegistrants.put(ccid, cbtable);
cbtable.add(cb);
let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: "" }} ] }};
this.call.write(register);
} else {
cbtable.add(cb);
}
return cb;
}

public unregisterChaincodeEvent(ccid: string, eventname: string){
public unregisterChaincodeEvent(cbe: ChainCodeCBE){
if (!this.connected) return;
var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid, eventName: eventname }} ] }};
this.chaincodeRegistrants.remove(ccid + "/" + eventname);
this.call.write(unregister);
let cbtable = this.chaincodeRegistrants.get(cbe.ccid);
if ( !cbtable ) {
debug("No event registration for ccid %s ", cbe.ccid);
return;
}
cbtable.delete(cbe);
if( cbtable.size <= 0 ) {
var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: cbe.ccid, eventName: "" }} ] }};
this.chaincodeRegistrants.remove(cbe.ccid);
this.call.write(unregister);
}
}

public registerBlockEvent(callback:Function){
Expand Down

0 comments on commit cc31c23

Please sign in to comment.