1616
1717package com .google .cloud .pubsublite .internal .wire ;
1818
19+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
1920import static java .util .concurrent .TimeUnit .SECONDS ;
2021
2122import com .google .api .core .AbstractApiService ;
2728import io .grpc .StatusException ;
2829import io .grpc .stub .StreamObserver ;
2930import java .util .Optional ;
31+ import java .util .concurrent .Executors ;
32+ import java .util .concurrent .ScheduledExecutorService ;
33+ import java .util .concurrent .ScheduledFuture ;
3034import javax .annotation .concurrent .GuardedBy ;
35+ import org .threeten .bp .Duration ;
3136
3237/**
3338 * A connection which recreates an underlying stream on retryable errors.
@@ -43,16 +48,23 @@ class RetryingConnectionImpl<
4348 implements RetryingConnection <ConnectionT >, StreamObserver <ClientResponseT > {
4449 private static final GoogleLogger logger = GoogleLogger .forEnclosingClass ();
4550
51+ private static final Duration INITIAL_RECONNECT_BACKOFF_TIME = Duration .ofMillis (10 );
52+ private static final Duration MAX_RECONNECT_BACKOFF_TIME = Duration .ofSeconds (10 );
53+
4654 private final StreamFactory <StreamRequestT , StreamResponseT > streamFactory ;
4755 private final SingleConnectionFactory <
4856 StreamRequestT , StreamResponseT , ClientResponseT , ConnectionT >
4957 connectionFactory ;
5058 private final StreamRequestT initialRequest ;
5159 private final RetryingConnectionObserver <ClientResponseT > observer ;
60+ private final ScheduledExecutorService systemExecutor ;
5261
5362 // connectionMonitor will not be held in any upcalls.
5463 private final CloseableMonitor connectionMonitor = new CloseableMonitor ();
5564
65+ @ GuardedBy ("connectionMonitor.monitor" )
66+ private long nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME .toMillis ();
67+
5668 @ GuardedBy ("connectionMonitor.monitor" )
5769 private ConnectionT currentConnection ;
5870
@@ -69,6 +81,7 @@ class RetryingConnectionImpl<
6981 this .connectionFactory = connectionFactory ;
7082 this .initialRequest = initialRequest ;
7183 this .observer = observer ;
84+ this .systemExecutor = Executors .newSingleThreadScheduledExecutor ();
7285 }
7386
7487 @ Override
@@ -95,6 +108,7 @@ protected void doStop() {
95108 notifyFailed (e );
96109 return ;
97110 }
111+ systemExecutor .shutdownNow ();
98112 notifyStopped ();
99113 }
100114
@@ -124,6 +138,7 @@ public final void onNext(ClientResponseT value) {
124138 Status status ;
125139 try (CloseableMonitor .Hold h = connectionMonitor .enter ()) {
126140 if (completed ) return ;
141+ nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME .toMillis ();
127142 }
128143 status = observer .onClientResponse (value );
129144 if (!status .isOk ()) {
@@ -144,8 +159,11 @@ public final void onError(Throwable t) {
144159 return ;
145160 }
146161 Optional <Throwable > throwable = Optional .empty ();
162+ long backoffTime = 0 ;
147163 try (CloseableMonitor .Hold h = connectionMonitor .enter ()) {
148164 currentConnection .close ();
165+ backoffTime = nextRetryBackoffDuration ;
166+ nextRetryBackoffDuration = Math .min (backoffTime * 2 , MAX_RECONNECT_BACKOFF_TIME .toMillis ());
149167 } catch (Exception e ) {
150168 throwable = Optional .of (e );
151169 }
@@ -157,8 +175,10 @@ public final void onError(Throwable t) {
157175 .asRuntimeException ());
158176 return ;
159177 }
160- logger .atInfo ().atMostEvery (30 , SECONDS ).log ("Stream disconnected, attempting retry" );
161- observer .triggerReinitialize ();
178+ logger .atInfo ().withCause (t ).atMostEvery (30 , SECONDS ).log (
179+ "Stream disconnected attempting retry, after %s milliseconds" , backoffTime );
180+ ScheduledFuture <?> retry =
181+ systemExecutor .schedule (observer ::triggerReinitialize , backoffTime , MILLISECONDS );
162182 }
163183
164184 @ Override
0 commit comments