@@ -99,6 +99,10 @@ public class StreamWriter implements AutoCloseable {
private final Lock appendAndRefreshAppendLock ;
private final MessagesBatch messagesBatch ;
// Indicates if a stream has some non recoverable exception happened.
private final Lock exceptionLock ;
private Throwable streamException ;
private BackgroundResource backgroundResources ;
private List <BackgroundResource > backgroundResourceList ;
@@ -145,10 +149,13 @@ private StreamWriter(Builder builder)
this .batchingSettings = builder .batchingSettings ;
this .retrySettings = builder .retrySettings ;
this .messagesBatch = new MessagesBatch (batchingSettings , this .streamName );
this .messagesBatch = new MessagesBatch (batchingSettings , this .streamName , this );
messagesBatchLock = new ReentrantLock ();
appendAndRefreshAppendLock = new ReentrantLock ();
activeAlarm = new AtomicBoolean (false );
this .exceptionLock = new ReentrantLock ();
this .streamException = null ;
executor = builder .executorProvider .getExecutor ();
backgroundResourceList = new ArrayList <>();
if (builder .executorProvider .shouldAutoClose ()) {
@@ -212,6 +219,14 @@ public Boolean expired() {
return createTime .plus (streamTTL ).compareTo (Instant .now ()) < 0 ;
}
private void setException (Throwable t ) {
exceptionLock .lock ();
if (this .streamException == null ) {
this .streamException = t ;
}
exceptionLock .unlock ();
}
/**
* Schedules the writing of a message. The write of the message may occur immediately or be
* delayed based on the writer batching options.
@@ -265,6 +280,33 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
return outstandingAppend .appendResult ;
}
/**
* This is the general flush method for asynchronise append operation. When you have outstanding
* append requests, calling flush will make sure all outstanding append requests completed and
* successful. Otherwise there will be an exception thrown.
*
* @throws Exception
*/
public void flushAll (long timeoutMillis ) throws Exception {
appendAndRefreshAppendLock .lock ();
try {
writeAllOutstanding ();
synchronized (messagesWaiter ) {
messagesWaiter .waitComplete (timeoutMillis );
}
} finally {
appendAndRefreshAppendLock .unlock ();
}
exceptionLock .lock ();
try {
if (streamException != null ) {
throw new Exception (streamException );
}
} finally {
exceptionLock .unlock ();
}
}
/**
* Flush the rows on a BUFFERED stream, up to the specified offset. After flush, rows will be
* available for read. If no exception is thrown, it means the flush happened.
@@ -411,14 +453,15 @@ private static final class InflightBatch {
private long expectedOffset ;
private Boolean attachSchema ;
private String streamName ;
private final AtomicBoolean failed ;
private final StreamWriter streamWriter ;
InflightBatch (
List <AppendRequestAndFutureResponse > inflightRequests ,
long batchSizeBytes ,
String streamName ,
Boolean attachSchema ) {
Boolean attachSchema ,
StreamWriter streamWriter ) {
this .inflightRequests = inflightRequests ;
this .offsetList = new ArrayList <Long >(inflightRequests .size ());
for (AppendRequestAndFutureResponse request : inflightRequests ) {
@@ -435,6 +478,7 @@ private static final class InflightBatch {
this .attachSchema = attachSchema ;
this .streamName = streamName ;
this .failed = new AtomicBoolean (false );
this .streamWriter = streamWriter ;
}
int count () {
@@ -482,7 +526,9 @@ private void onFailure(Throwable t) {
return ;
} else {
LOG .info ("Setting " + t .toString () + " on response" );
this .streamWriter .setException (t );
}
for (AppendRequestAndFutureResponse request : inflightRequests ) {
request .appendResult .setException (t );
}
@@ -552,8 +598,12 @@ protected void shutdown() {
currentAlarmFuture .cancel (false );
}
writeAllOutstanding ();
synchronized (messagesWaiter ) {
messagesWaiter .waitComplete ();
try {
synchronized (messagesWaiter ) {
messagesWaiter .waitComplete (0 );
}
} catch (InterruptedException e ) {
LOG .warning ("Failed to wait for messages to return " + e .toString ());
}
if (clientStream .isSendReady ()) {
clientStream .closeSend ();
@@ -820,14 +870,14 @@ public void onStart(StreamController controller) {
private void abortInflightRequests (Throwable t ) {
synchronized (this .inflightBatches ) {
while (!this .inflightBatches .isEmpty ()) {
this .inflightBatches
. poll ()
. onFailure (
new AbortedException (
"Request aborted due to previous failures" ,
t ,
GrpcStatusCode . of ( Status . Code . ABORTED ),
true ));
InflightBatch inflightBatch = this .inflightBatches . poll ();
inflightBatch . onFailure (
new AbortedException (
"Request aborted due to previous failures" ,
t ,
GrpcStatusCode . of ( Status . Code . ABORTED ) ,
true ));
streamWriter . messagesWaiter . release ( inflightBatch . getByteSize ( ));
}
}
}
@@ -850,13 +900,15 @@ public void onResponse(AppendRowsResponse response) {
streamWriter .getOnSchemaUpdateRunnable (), 0L , TimeUnit .MILLISECONDS );
}
}
// TODO: Deal with in stream errors .
// Currently there is nothing retryable. If the error is already exists, then ignore it .
if (response .hasError ()) {
StatusRuntimeException exception =
new StatusRuntimeException (
Status .fromCodeValue (response .getError ().getCode ())
.withDescription (response .getError ().getMessage ()));
inflightBatch .onFailure (exception );
if (response .getError ().getCode () != 6 /* ALREADY_EXISTS */ ) {
StatusRuntimeException exception =
new StatusRuntimeException (
Status .fromCodeValue (response .getError ().getCode ())
.withDescription (response .getError ().getMessage ()));
inflightBatch .onFailure (exception );
}
}
if (inflightBatch .getExpectedOffset () > 0
&& response .getOffset () != inflightBatch .getExpectedOffset ()) {
@@ -907,30 +959,25 @@ public void onError(Throwable t) {
}
} else {
inflightBatch .onFailure (t );
abortInflightRequests (t );
synchronized (streamWriter .currentRetries ) {
streamWriter .currentRetries = 0 ;
}
}
} catch (IOException | InterruptedException e ) {
LOG .info ("Got exception while retrying." );
inflightBatch .onFailure (e );
abortInflightRequests (e );
synchronized (streamWriter .currentRetries ) {
streamWriter .currentRetries = 0 ;
}
}
} else {
inflightBatch .onFailure (t );
abortInflightRequests (t );
synchronized (streamWriter .currentRetries ) {
streamWriter .currentRetries = 0 ;
}
try {
if (!streamWriter .shutdown .get ()) {
// Establish a new connection.
streamWriter .refreshAppend ();
}
} catch (IOException | InterruptedException e ) {
LOG .info ("Failed to establish a new connection" );
}
}
} finally {
streamWriter .messagesWaiter .release (inflightBatch .getByteSize ());
@@ -945,17 +992,21 @@ private static class MessagesBatch {
private final BatchingSettings batchingSettings ;
private Boolean attachSchema = true ;
private final String streamName ;
private final StreamWriter streamWriter ;
private MessagesBatch (BatchingSettings batchingSettings , String streamName ) {
private MessagesBatch (
BatchingSettings batchingSettings , String streamName , StreamWriter streamWriter ) {
this .batchingSettings = batchingSettings ;
this .streamName = streamName ;
this .streamWriter = streamWriter ;
reset ();
}
// Get all the messages out in a batch.
private InflightBatch popBatch () {
InflightBatch batch =
new InflightBatch (messages , batchedBytes , this .streamName , this .attachSchema );
new InflightBatch (
messages , batchedBytes , this .streamName , this .attachSchema , this .streamWriter );
this .attachSchema = false ;
reset ();
return batch ;