Skip to content

Commit

Permalink
Merge "node-SDK: add multi-callback registrations" into v0.6
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Oct 6, 2016
2 parents adc1600 + cc31c23 commit a80038d
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions sdk/node/src/hfc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2842,15 +2842,16 @@ export function newFileKeyValStore(dir:string):KeyValStore {
/**
* The ChainCodeCBE is used internal to the EventHub to hold chaincode event registration callbacks.
*/
class ChainCodeCBE {
export class ChainCodeCBE {
// chaincode id
ccid: string;
eventname: string;
payload: Uint8Array;
// event name regex filter
eventNameFilter: RegExp;
// callback function to invoke on successful filter match
cb: Function;
constructor(ccid: string,eventname: string,payload: Uint8Array, cb: Function) {
constructor(ccid: string, eventNameFilter: string, cb: Function) {
this.ccid = ccid;
this.eventname = eventname;
this.payload = payload;
this.eventNameFilter = new RegExp(eventNameFilter);
this.cb = cb;
}
}
Expand Down Expand Up @@ -2879,7 +2880,7 @@ export class EventHub {
this.chaincodeRegistrants = new HashTable();
this.blockRegistrants = new Set();
this.txRegistrants = new HashTable();
this.peeraddr = "localhost:7053";
this.peeraddr = null;
this.connected = false;
}

Expand All @@ -2893,6 +2894,7 @@ export class EventHub {

public connect() {
if (this.connected) return;
if (!this.peeraddr) throw Error("Must set peer address before connecting.");
this.events = grpc.load(__dirname + "/protos/events.proto" ).protos;
this.client = new this.events.Events(this.peeraddr,grpc.credentials.createInsecure());
this.call = this.client.chat();
Expand All @@ -2902,11 +2904,15 @@ export class EventHub {
let eh = this; // for callback context
this.call.on('data', function(event) {
if ( event.Event == "chaincodeEvent" ) {
var cbe = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID + "/" + event.chaincodeEvent.eventName);
if ( cbe ) {
cbe.payload = event.chaincodeEvent.payload;
cbe.cb(cbe);
}
var cbtable = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID);
if( !cbtable ) {
return;
}
cbtable.forEach(function (cbe) {
if ( cbe.eventNameFilter.test(event.chaincodeEvent.eventName)) {
cbe.cb(event.chaincodeEvent);
}
});
} else if ( event.Event == "block") {
eh.blockRegistrants.forEach(function(cb){
cb(event.block);
Expand All @@ -2928,19 +2934,35 @@ export class EventHub {
this.connected = false;
}

public registerChaincodeEvent(ccid: string, eventname: string, callback: Function){
public registerChaincodeEvent(ccid: string, eventname: string, callback: Function): ChainCodeCBE {
if (!this.connected) return;
let cb = new ChainCodeCBE(ccid, eventname, null, callback);
let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: eventname }} ] }};
this.chaincodeRegistrants.put(ccid + "/" + eventname, cb);
this.call.write(register);
let cb = new ChainCodeCBE(ccid, eventname, callback);
let cbtable = this.chaincodeRegistrants.get(ccid);
if ( !cbtable ) {
cbtable = new Set();
this.chaincodeRegistrants.put(ccid, cbtable);
cbtable.add(cb);
let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: "" }} ] }};
this.call.write(register);
} else {
cbtable.add(cb);
}
return cb;
}

public unregisterChaincodeEvent(ccid: string, eventname: string){
public unregisterChaincodeEvent(cbe: ChainCodeCBE){
if (!this.connected) return;
var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid, eventName: eventname }} ] }};
this.chaincodeRegistrants.remove(ccid + "/" + eventname);
this.call.write(unregister);
let cbtable = this.chaincodeRegistrants.get(cbe.ccid);
if ( !cbtable ) {
debug("No event registration for ccid %s ", cbe.ccid);
return;
}
cbtable.delete(cbe);
if( cbtable.size <= 0 ) {
var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: cbe.ccid, eventName: "" }} ] }};
this.chaincodeRegistrants.remove(cbe.ccid);
this.call.write(unregister);
}
}

public registerBlockEvent(callback:Function){
Expand Down

0 comments on commit a80038d

Please sign in to comment.