2828import io .grpc .stub .StreamObserver ;
2929import org .apache .commons .logging .Log ;
3030import org .apache .commons .logging .LogFactory ;
31+ import org .hyperledger .fabric .protos .common .Common ;
3132import org .hyperledger .fabric .protos .common .Common .Envelope ;
3233import org .hyperledger .fabric .protos .orderer .Ab ;
3334import org .hyperledger .fabric .protos .orderer .Ab .SeekInfo ;
4546import static org .hyperledger .fabric .protos .peer .PeerEvents .DeliverResponse .TypeCase .STATUS ;
4647import static org .hyperledger .fabric .sdk .transaction .ProtoUtils .createSeekInfoEnvelope ;
4748
48-
4949/**
5050 * Sample client code that makes gRPC calls to the server.
5151 */
5252class PeerEventServiceClient {
5353 private static final Config config = Config .getConfig ();
54- private static final long ORDERER_WAIT_TIME = config .getOrdererWaitTime ();
54+ private static final long PEER_EVENT_REGISTRATION_WAIT_TIME = config .getPeerEventRegistrationWaitTime ();
5555 private static final Log logger = LogFactory .getLog (PeerEventServiceClient .class );
5656 private final String channelName ;
5757 private final ManagedChannelBuilder channelBuilder ;
5858 private final String name ;
5959 private final String url ;
60- private final long ordererWaitTimeMilliSecs ;
60+ private final long peerEventRegistrationWaitTimeMilliSecs ;
6161 private final PeerOptions peerOptions ;
6262 private final boolean filterBlock ;
6363 Properties properties = new Properties ();
@@ -70,7 +70,7 @@ class PeerEventServiceClient {
7070 private transient Peer peer ;
7171
7272 /**
73- * Construct client for accessing Orderer server using the existing managedChannel.
73+ * Construct client for accessing Peer eventing service using the existing managedChannel.
7474 */
7575 PeerEventServiceClient (Peer peer , ManagedChannelBuilder <?> channelBuilder , Properties properties , PeerOptions peerOptions ) {
7676
@@ -86,22 +86,22 @@ class PeerEventServiceClient {
8686
8787 if (null == properties ) {
8888
89- ordererWaitTimeMilliSecs = ORDERER_WAIT_TIME ;
89+ peerEventRegistrationWaitTimeMilliSecs = PEER_EVENT_REGISTRATION_WAIT_TIME ;
9090
9191 } else {
9292 this .properties = properties ;
9393
94- String ordererWaitTimeMilliSecsString = properties .getProperty ("ordererWaitTimeMilliSecs " , Long .toString (ORDERER_WAIT_TIME ));
94+ String peerEventRegistrationWaitTime = properties .getProperty ("peerEventRegistrationWaitTime " , Long .toString (PEER_EVENT_REGISTRATION_WAIT_TIME ));
9595
96- long tempOrdererWaitTimeMilliSecs = ORDERER_WAIT_TIME ;
96+ long tempPeerWaitTimeMilliSecs = PEER_EVENT_REGISTRATION_WAIT_TIME ;
9797
9898 try {
99- tempOrdererWaitTimeMilliSecs = Long .parseLong (ordererWaitTimeMilliSecsString );
99+ tempPeerWaitTimeMilliSecs = Long .parseLong (peerEventRegistrationWaitTime );
100100 } catch (NumberFormatException e ) {
101- logger .warn (format ("Orderer %s wait time %s not parsable." , name , ordererWaitTimeMilliSecsString ), e );
101+ logger .warn (format ("Peer event service registration %s wait time %s not parsable." , name , peerEventRegistrationWaitTime ), e );
102102 }
103103
104- ordererWaitTimeMilliSecs = tempOrdererWaitTimeMilliSecs ;
104+ peerEventRegistrationWaitTimeMilliSecs = tempPeerWaitTimeMilliSecs ;
105105 }
106106
107107 }
@@ -202,21 +202,30 @@ public void onNext(DeliverResponse resp) {
202202 done = true ;
203203 logger .debug (format ("DeliverResponse channel %s peer %s setting done." ,
204204 channelName , peer .getName ()));
205- retList .add (0 , resp );
206205
207- finishLatch .countDown ();
206+ if (resp .getStatus () == Common .Status .SUCCESS ) {
207+ retList .add (0 , resp );
208+ } else {
209+
210+ throwableList .add (new TransactionException (format ("Channel %s peer %s Status returned failure code %d (%s) during peer service event registration" ,
211+ channelName , peer .getName (), resp .getStatusValue (), resp .getStatus ().name ())));
212+ }
208213
209214 } else if (typeCase == FILTERED_BLOCK || typeCase == BLOCK ) {
210215 logger .trace (format ("Channel %s peer %s got event block hex hashcode: %016x, block number: %d" ,
211216 channelName , peer .getName (), resp .getBlock ().hashCode (), resp .getBlock ().getHeader ().getNumber ()));
212217 retList .add (resp );
213- finishLatch . countDown ();
218+
214219 channelEventQue .addBEvent (new BlockEvent (peer , resp ));
215220 } else {
216221 logger .error (format ("Channel %s peer %s got event block with unknown type: %s, %d" ,
217- channelName , peer .getName (), typeCase .name (), typeCase .getNumber ())
218- );
222+ channelName , peer .getName (), typeCase .name (), typeCase .getNumber ()));
223+
224+ throwableList .add (new TransactionException (format ("Channel %s peer %s Status got unknown type %s, %d" ,
225+ channelName , peer .getName (), typeCase .name (), typeCase .getNumber ())));
226+
219227 }
228+ finishLatch .countDown ();
220229
221230 }
222231
@@ -229,7 +238,6 @@ public void onError(Throwable t) {
229238 if (!shutdown ) {
230239 logger .error (format ("Received error on channel %s, peer %s, url %s, %s" ,
231240 channelName , name , url , t .getMessage ()), t );
232-
233241 done = true ;
234242 throwableList .add (t );
235243 finishLatch .countDown ();
@@ -259,10 +267,9 @@ public void onCompleted() {
259267 //nso.onCompleted();
260268
261269 try {
262- // if (!finishLatch.await(ordererWaitTimeMilliSecs, TimeUnit.MILLISECONDS)) {
263- if (!finishLatch .await (9999999 , TimeUnit .MILLISECONDS )) {
270+ if (!finishLatch .await (peerEventRegistrationWaitTimeMilliSecs , TimeUnit .MILLISECONDS )) {
264271 TransactionException ex = new TransactionException (format (
265- "Channel %s connect time exceeded for peer eventing service %s, timed out at %d ms." , channelName , name , ordererWaitTimeMilliSecs ));
272+ "Channel %s connect time exceeded for peer eventing service %s, timed out at %d ms." , channelName , name , peerEventRegistrationWaitTimeMilliSecs ));
266273 logger .error (ex .getMessage (), ex );
267274 throw ex ;
268275 }
0 commit comments