@@ -74,6 +74,9 @@ public class EventHub implements Serializable {
7474 private Channel channel ;
7575 private transient TransactionContext transactionContext ;
7676 private transient byte [] clientTLSCertificateDigest ;
77+ private transient long reconnectCount ;
78+ private transient long lastBlockNumber ;
79+ private transient BlockEvent lastBlockEvent ;
7780
7881 /**
7982 * Get disconnected time.
@@ -167,19 +170,13 @@ public Properties getProperties() {
167170 return properties == null ? null : (Properties ) properties .clone ();
168171 }
169172
170- boolean connect () throws EventHubException {
171-
172- if (transactionContext == null ) {
173- throw new EventHubException ("Eventhub reconnect failed with no user context" );
174- }
175-
176- return connect (transactionContext );
177-
178- }
179-
180173 private transient StreamObserver <PeerEvents .Event > eventStream = null ; // Saved here to avoid potential garbage collection
181174
182175 synchronized boolean connect (final TransactionContext transactionContext ) throws EventHubException {
176+ return connect (transactionContext , false );
177+ }
178+
179+ synchronized boolean connect (final TransactionContext transactionContext , final boolean reconnection ) throws EventHubException {
183180 if (connected ) {
184181 logger .warn (format ("%s already connected." , toString ()));
185182 return true ;
@@ -209,72 +206,81 @@ public void onNext(PeerEvents.Event event) {
209206
210207 if (event .getEventCase () == PeerEvents .Event .EventCase .BLOCK ) {
211208 try {
212- eventQue .addBEvent (new BlockEvent (EventHub .this , event )); //add to channel queue
209+
210+ BlockEvent blockEvent = new BlockEvent (EventHub .this , event );
211+ setLastBlockSeen (blockEvent );
212+
213+ eventQue .addBEvent (blockEvent ); //add to channel queue
213214 } catch (InvalidProtocolBufferException e ) {
214215 EventHubException eventHubException = new EventHubException (format ("%s onNext error %s" , this , e .getMessage ()), e );
215216 logger .error (eventHubException .getMessage ());
216217 threw .add (eventHubException );
217218 }
218219 } else if (event .getEventCase () == PeerEvents .Event .EventCase .REGISTER ) {
219220
221+ if (reconnectCount > 1 ) {
222+ logger .info (format ("Eventhub %s has reconnecting after %d attempts" , name , reconnectCount ));
223+ }
224+
220225 connected = true ;
221226 connectedTime = System .currentTimeMillis ();
227+ reconnectCount = 0L ;
228+
222229 finishLatch .countDown ();
223230 }
224231 }
225232
226233 @ Override
227234 public void onError (Throwable t ) {
235+ connected = false ;
236+ eventStream = null ;
237+ disconnectedTime = System .currentTimeMillis ();
228238 if (shutdown ) { //IF we're shutdown don't try anything more.
229239 logger .trace (format ("%s was shutdown." , EventHub .this .toString ()));
230- connected = false ;
231- eventStream = null ;
240+
232241 finishLatch .countDown ();
233242 return ;
234243 }
235244
236- final boolean isTerminated = managedChannel .isTerminated ();
237- final boolean isChannelShutdown = managedChannel .isShutdown ();
245+ final ManagedChannel lmanagedChannel = managedChannel ;
246+
247+ final boolean isTerminated = lmanagedChannel == null ? true : lmanagedChannel .isTerminated ();
248+ final boolean isChannelShutdown = lmanagedChannel == null ? true : lmanagedChannel .isShutdown ();
249+
250+ if (reconnectCount % 50 == 1 ) {
251+ logger .warn (format ("%s terminated is %b shutdown is %b, retry count %d has error %s." , EventHub .this .toString (), isTerminated , isChannelShutdown ,
252+ reconnectCount , t .getMessage ()));
253+ } else {
254+ logger .trace (format ("%s terminated is %b shutdown is %b, retry count %d has error %s." , EventHub .this .toString (), isTerminated , isChannelShutdown ,
255+ reconnectCount , t .getMessage ()));
256+ }
238257
239- logger .error (format ("%s terminated is %b shutdown is %b has error %s " , EventHub .this .toString (), isTerminated , isChannelShutdown ,
240- t .getMessage ()), new EventHubException (t ));
241- threw .add (t );
242258 finishLatch .countDown ();
243259
244260 // logger.error("Error in stream: " + t.getMessage(), new EventHubException(t));
245261 if (t instanceof StatusRuntimeException ) {
246262 StatusRuntimeException sre = (StatusRuntimeException ) t ;
247263 Status sreStatus = sre .getStatus ();
248- logger .error (format ("%s :StatusRuntimeException Status %s. Description %s " , EventHub .this , sreStatus + "" , sreStatus .getDescription ()));
249- if (sre .getStatus ().getCode () == Status .Code .INTERNAL || sre .getStatus ().getCode () == Status .Code .UNAVAILABLE ) {
250-
251- connected = false ;
252- eventStream = null ;
253- disconnectedTime = System .currentTimeMillis ();
254- try {
255- if (!isChannelShutdown ) {
256- managedChannel .shutdownNow ();
257- }
258- if (null != disconnectedHandler ) {
259- try {
260- disconnectedHandler .disconnected (EventHub .this );
261- } catch (Exception e ) {
262- logger .warn (format ("Eventhub %s %s" , EventHub .this .name , e .getMessage ()), e );
263- eventQue .eventError (e );
264- }
265- }
266- } catch (Exception e ) {
267- logger .warn (format ("Eventhub %s Failed shutdown msg: %s" , EventHub .this .name , e .getMessage ()), e );
268- }
264+ if (reconnectCount % 50 == 1 ) {
265+ logger .warn (format ("%s :StatusRuntimeException Status %s. Description %s " , EventHub .this , sreStatus + "" , sreStatus .getDescription ()));
266+ } else {
267+ logger .trace (format ("%s :StatusRuntimeException Status %s. Description %s " , EventHub .this , sreStatus + "" , sreStatus .getDescription ()));
269268 }
269+
270+ try {
271+ reconnect ();
272+ } catch (Exception e ) {
273+ logger .warn (format ("Eventhub %s Failed shutdown msg: %s" , EventHub .this .name , e .getMessage ()));
274+ }
275+
270276 }
271277
272278 }
273279
274280 @ Override
275281 public void onCompleted () {
276282
277- logger .warn (format ("Stream completed %s" , EventHub .this .toString ()));
283+ logger .debug (format ("Stream completed %s" , EventHub .this .toString ()));
278284 finishLatch .countDown ();
279285
280286 }
@@ -288,27 +294,19 @@ public void onCompleted() {
288294 }
289295
290296 try {
291- if (!finishLatch .await (EVENTHUB_CONNECTION_WAIT_TIME , TimeUnit .MILLISECONDS )) {
292- EventHubException evh = new EventHubException (format ("EventHub %s failed to connect in %s ms." , name , EVENTHUB_CONNECTION_WAIT_TIME ));
293- logger .debug (evh .getMessage (), evh );
294297
295- throw evh ;
296- }
297- logger .trace (format ("Eventhub %s Done waiting for reply!" , name ));
298+ //On reconnection don't wait here.
298299
299- } catch (InterruptedException e ) {
300- logger .error (e );
301- }
300+ if (!reconnection && !finishLatch .await (EVENTHUB_CONNECTION_WAIT_TIME , TimeUnit .MILLISECONDS )) {
302301
303- if (!threw .isEmpty ()) {
304- eventStream = null ;
305- connected = false ;
306- Throwable t = threw .iterator ().next ();
302+ logger .warn (format ("EventHub %s failed to connect in %s ms." , name , EVENTHUB_CONNECTION_WAIT_TIME ));
307303
308- EventHubException evh = new EventHubException ( t . getMessage (), t );
309- logger .error (format ("EventHub %s Error in stream. error: " + t . getMessage (), toString ()), evh );
310- throw evh ;
304+ } else {
305+ logger .trace (format ("Eventhub %s Done waiting for reply!" , name ) );
306+ }
311307
308+ } catch (InterruptedException e ) {
309+ logger .error (e );
312310 }
313311
314312 logger .debug (format ("Eventhub %s connect is done with connect status: %b " , name , connected ));
@@ -321,6 +319,24 @@ public void onCompleted() {
321319
322320 }
323321
322+ private void reconnect () throws EventHubException {
323+
324+ final ManagedChannel lmanagedChannel = managedChannel ;
325+
326+ if (lmanagedChannel != null ) {
327+ managedChannel = null ;
328+ lmanagedChannel .shutdownNow ();
329+ }
330+
331+ EventHubDisconnected ldisconnectedHandler = disconnectedHandler ;
332+ if (!shutdown && null != ldisconnectedHandler ) {
333+ ++reconnectCount ;
334+ ldisconnectedHandler .disconnected (this );
335+
336+ }
337+
338+ }
339+
324340 private void blockListen (TransactionContext transactionContext ) throws CryptoException {
325341
326342 this .transactionContext = transactionContext ;
@@ -371,11 +387,17 @@ public String toString() {
371387
372388 public void shutdown () {
373389 shutdown = true ;
390+ lastBlockEvent = null ;
391+ lastBlockNumber = 0 ;
374392 connected = false ;
375393 disconnectedHandler = null ;
376394 channel = null ;
377395 eventStream = null ;
378- managedChannel .shutdownNow ();
396+ final ManagedChannel lmanagedChannel = managedChannel ;
397+ managedChannel = null ;
398+ if (lmanagedChannel != null ) {
399+ lmanagedChannel .shutdownNow ();
400+ }
379401 }
380402
381403 void setChannel (Channel channel ) throws InvalidArgumentException {
@@ -391,6 +413,15 @@ void setChannel(Channel channel) throws InvalidArgumentException {
391413 this .channel = channel ;
392414 }
393415
416+ synchronized void setLastBlockSeen (BlockEvent lastBlockSeen ) {
417+ long newLastBlockNumber = lastBlockSeen .getBlockNumber ();
418+ // overkill but make sure.
419+ if (lastBlockNumber < newLastBlockNumber ) {
420+ lastBlockNumber = newLastBlockNumber ;
421+ this .lastBlockEvent = lastBlockSeen ;
422+ }
423+ }
424+
394425 /**
395426 * Eventhub disconnection notification interface
396427 */
@@ -412,32 +443,26 @@ public interface EventHubDisconnected {
412443
413444 protected transient EventHubDisconnected disconnectedHandler = new EventHub .EventHubDisconnected () {
414445 @ Override
415- public synchronized void disconnected (final EventHub eventHub ) throws EventHubException {
416- logger .info (format ("Detected disconnect %s" , eventHub .toString ()));
417-
418- if (eventHub .connectedTime == 0 ) { //means event hub never connected
419- logger .error (format ("%s failed on first connect no retries" , eventHub .toString ()));
420-
421- eventHub .setEventHubDisconnectedHandler (null ); //don't try again
422-
423- //event hub never connected.
424- throw new EventHubException (format ("%s never connected." , eventHub .toString ()));
446+ public synchronized void disconnected (final EventHub eventHub ) {
447+ if (reconnectCount == 1 ) {
448+ logger .warn (format ("Channel %s detected disconnect on event hub %s (%s)" , channel .getName (), eventHub .toString (), url ));
425449 }
426450
427451 executorService .execute (() -> {
428452
429453 try {
430454 Thread .sleep (500 );
431455
432- if (eventHub .connect ()) {
433- logger .info (format ("Successful reconnect %s" , eventHub .toString ()));
434- } else {
435- logger .info (format ("Failed reconnect %s" , eventHub .toString ()));
456+ if (transactionContext == null ) {
457+ logger .warn ("Eventhub reconnect failed with no user context" );
458+ return ;
436459 }
437460
461+ eventHub .connect (transactionContext , true );
462+
438463 } catch (Exception e ) {
439464
440- logger .debug (format ("Failed %s to reconnect. %s" , toString (), e .getMessage ()));
465+ logger .warn (format ("Failed %s to reconnect. %s" , toString (), e .getMessage ()));
441466
442467 }
443468
0 commit comments