diff --git a/sdk/node/src/hfc.ts b/sdk/node/src/hfc.ts index 00e1f97447e..440b865f5ab 100644 --- a/sdk/node/src/hfc.ts +++ b/sdk/node/src/hfc.ts @@ -2842,15 +2842,16 @@ export function newFileKeyValStore(dir:string):KeyValStore { /** * The ChainCodeCBE is used internal to the EventHub to hold chaincode event registration callbacks. */ -class ChainCodeCBE { +export class ChainCodeCBE { + // chaincode id ccid: string; - eventname: string; - payload: Uint8Array; + // event name regex filter + eventNameFilter: RegExp; + // callback function to invoke on successful filter match cb: Function; - constructor(ccid: string,eventname: string,payload: Uint8Array, cb: Function) { + constructor(ccid: string, eventNameFilter: string, cb: Function) { this.ccid = ccid; - this.eventname = eventname; - this.payload = payload; + this.eventNameFilter = new RegExp(eventNameFilter); this.cb = cb; } } @@ -2879,7 +2880,7 @@ export class EventHub { this.chaincodeRegistrants = new HashTable(); this.blockRegistrants = new Set(); this.txRegistrants = new HashTable(); - this.peeraddr = "localhost:7053"; + this.peeraddr = null; this.connected = false; } @@ -2893,6 +2894,7 @@ export class EventHub { public connect() { if (this.connected) return; + if (!this.peeraddr) throw Error("Must set peer address before connecting."); this.events = grpc.load(__dirname + "/protos/events.proto" ).protos; this.client = new this.events.Events(this.peeraddr,grpc.credentials.createInsecure()); this.call = this.client.chat(); @@ -2902,11 +2904,15 @@ export class EventHub { let eh = this; // for callback context this.call.on('data', function(event) { if ( event.Event == "chaincodeEvent" ) { - var cbe = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID + "/" + event.chaincodeEvent.eventName); - if ( cbe ) { - cbe.payload = event.chaincodeEvent.payload; - cbe.cb(cbe); - } + var cbtable = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID); + if( !cbtable ) { + return; + } + cbtable.forEach(function (cbe) { + if ( cbe.eventNameFilter.test(event.chaincodeEvent.eventName)) { + cbe.cb(event.chaincodeEvent); + } + }); } else if ( event.Event == "block") { eh.blockRegistrants.forEach(function(cb){ cb(event.block); @@ -2928,19 +2934,35 @@ export class EventHub { this.connected = false; } - public registerChaincodeEvent(ccid: string, eventname: string, callback: Function){ + public registerChaincodeEvent(ccid: string, eventname: string, callback: Function): ChainCodeCBE { if (!this.connected) return; - let cb = new ChainCodeCBE(ccid, eventname, null, callback); - let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: eventname }} ] }}; - this.chaincodeRegistrants.put(ccid + "/" + eventname, cb); - this.call.write(register); + let cb = new ChainCodeCBE(ccid, eventname, callback); + let cbtable = this.chaincodeRegistrants.get(ccid); + if ( !cbtable ) { + cbtable = new Set(); + this.chaincodeRegistrants.put(ccid, cbtable); + cbtable.add(cb); + let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: "" }} ] }}; + this.call.write(register); + } else { + cbtable.add(cb); + } + return cb; } - public unregisterChaincodeEvent(ccid: string, eventname: string){ + public unregisterChaincodeEvent(cbe: ChainCodeCBE){ if (!this.connected) return; - var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid, eventName: eventname }} ] }}; - this.chaincodeRegistrants.remove(ccid + "/" + eventname); - this.call.write(unregister); + let cbtable = this.chaincodeRegistrants.get(cbe.ccid); + if ( !cbtable ) { + debug("No event registration for ccid %s ", cbe.ccid); + return; + } + cbtable.delete(cbe); + if( cbtable.size <= 0 ) { + var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: cbe.ccid, eventName: "" }} ] }}; + this.chaincodeRegistrants.remove(cbe.ccid); + this.call.write(unregister); + } } public registerBlockEvent(callback:Function){