@@ -50,12 +50,14 @@
import java .util .concurrent .ScheduledFuture ;
import java .util .concurrent .TimeUnit ;
import java .util .concurrent .atomic .AtomicBoolean ;
import java .util .concurrent .atomic .AtomicReference ;
import java .util .concurrent .locks .Lock ;
import java .util .concurrent .locks .ReentrantLock ;
import java .util .logging .Level ;
import java .util .logging .Logger ;
import java .util .regex .Matcher ;
import java .util .regex .Pattern ;
import javax .annotation .concurrent .GuardedBy ;
import org .threeten .bp .Duration ;
/**
@@ -100,25 +102,34 @@ public class StreamWriter implements AutoCloseable {
private final Lock messagesBatchLock ;
private final Lock appendAndRefreshAppendLock ;
@ GuardedBy ("appendAndRefreshAppendLock" )
private final MessagesBatch messagesBatch ;
// Indicates if a stream has some non recoverable exception happened.
private final Lock exceptionLock ;
private Throwable streamException ;
private AtomicReference <Throwable > streamException ;
private BackgroundResource backgroundResources ;
private List <BackgroundResource > backgroundResourceList ;
private BigQueryWriteClient stub ;
BidiStreamingCallable <AppendRowsRequest , AppendRowsResponse > bidiStreamingCallable ;
@ GuardedBy ("appendAndRefreshAppendLock" )
ClientStream <AppendRowsRequest > clientStream ;
private final AppendResponseObserver responseObserver ;
private final ScheduledExecutorService executor ;
private final AtomicBoolean shutdown ;
@ GuardedBy ("appendAndRefreshAppendLock" )
private boolean shutdown ;
private final Waiter messagesWaiter ;
private final AtomicBoolean activeAlarm ;
@ GuardedBy ("appendAndRefreshAppendLock" )
private boolean activeAlarm ;
private ScheduledFuture <?> currentAlarmFuture ;
private Integer currentRetries = 0 ;
@@ -160,9 +171,8 @@ private StreamWriter(Builder builder)
this .messagesBatch = new MessagesBatch (batchingSettings , this .streamName , this );
messagesBatchLock = new ReentrantLock ();
appendAndRefreshAppendLock = new ReentrantLock ();
activeAlarm = new AtomicBoolean (false );
this .exceptionLock = new ReentrantLock ();
this .streamException = null ;
activeAlarm = false ;
this .streamException = new AtomicReference <Throwable >(null );
executor = builder .executorProvider .getExecutor ();
backgroundResourceList = new ArrayList <>();
@@ -185,7 +195,7 @@ private StreamWriter(Builder builder)
stub = builder .client ;
}
backgroundResources = new BackgroundResourceAggregation (backgroundResourceList );
shutdown = new AtomicBoolean ( false ) ;
shutdown = false ;
if (builder .onSchemaUpdateRunnable != null ) {
this .onSchemaUpdateRunnable = builder .onSchemaUpdateRunnable ;
this .onSchemaUpdateRunnable .setStreamWriter (this );
@@ -216,14 +226,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
return this .onSchemaUpdateRunnable ;
}
private void setException (Throwable t ) {
exceptionLock .lock ();
if (this .streamException == null ) {
this .streamException = t ;
}
exceptionLock .unlock ();
}
/**
* Schedules the writing of a message. The write of the message may occur immediately or be
* delayed based on the writer batching options.
@@ -253,27 +255,27 @@ private void setException(Throwable t) {
*/
public ApiFuture <AppendRowsResponse > append (AppendRowsRequest message ) {
appendAndRefreshAppendLock .lock ();
Preconditions .checkState (!shutdown .get (), "Cannot append on a shut-down writer." );
Preconditions .checkNotNull (message , "Message is null." );
final AppendRequestAndFutureResponse outstandingAppend =
new AppendRequestAndFutureResponse (message );
List <InflightBatch > batchesToSend ;
messagesBatchLock .lock ();
try {
Preconditions .checkState (!shutdown , "Cannot append on a shut-down writer." );
Preconditions .checkNotNull (message , "Message is null." );
Preconditions .checkState (streamException .get () == null , "Stream already failed." );
final AppendRequestAndFutureResponse outstandingAppend =
new AppendRequestAndFutureResponse (message );
List <InflightBatch > batchesToSend ;
batchesToSend = messagesBatch .add (outstandingAppend );
// Setup the next duration based delivery alarm if there are messages batched.
setupAlarm ();
if (!batchesToSend .isEmpty ()) {
for (final InflightBatch batch : batchesToSend ) {
LOG .fine ("Scheduling a batch for immediate sending. " );
LOG .fine ("Scheduling a batch for immediate sending" );
writeBatch (batch );
}
}
return outstandingAppend .appendResult ;
} finally {
messagesBatchLock .unlock ();
appendAndRefreshAppendLock .unlock ();
}
return outstandingAppend .appendResult ;
}
/**
@@ -285,9 +287,10 @@ public void refreshAppend() throws InterruptedException {
throw new UnimplementedException (null , GrpcStatusCode .of (Status .Code .UNIMPLEMENTED ), false );
}
@ GuardedBy ("appendAndRefreshAppendLock" )
private void setupAlarm () {
if (!messagesBatch .isEmpty ()) {
if (!activeAlarm . getAndSet ( true ) ) {
if (!activeAlarm ) {
long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
LOG .log (Level .FINE , "Setting up alarm for the next {0} ms." , delayThresholdMs );
currentAlarmFuture =
@@ -296,12 +299,12 @@ private void setupAlarm() {
@ Override
public void run () {
LOG .fine ("Sending messages based on schedule" );
activeAlarm . getAndSet ( false );
messagesBatchLock . lock () ;
appendAndRefreshAppendLock . lock ( );
activeAlarm = false ;
try {
writeBatch (messagesBatch .popBatch ());
} finally {
messagesBatchLock .unlock ();
appendAndRefreshAppendLock .unlock ();
}
}
},
@@ -310,9 +313,8 @@ public void run() {
}
} else if (currentAlarmFuture != null ) {
LOG .log (Level .FINER , "Cancelling alarm, no more messages" );
if (activeAlarm .getAndSet (false )) {
currentAlarmFuture .cancel (false );
}
currentAlarmFuture .cancel (false );
activeAlarm = false ;
}
}
@@ -321,27 +323,41 @@ public void run() {
* wait for the send operations to complete. To wait for messages to send, call {@code get} on the
* futures returned from {@code append}.
*/
@ GuardedBy ("appendAndRefreshAppendLock" )
public void writeAllOutstanding () {
InflightBatch unorderedOutstandingBatch = null ;
messagesBatchLock .lock ();
try {
if (!messagesBatch .isEmpty ()) {
writeBatch (messagesBatch .popBatch ());
}
messagesBatch .reset ();
} finally {
messagesBatchLock .unlock ();
if (!messagesBatch .isEmpty ()) {
writeBatch (messagesBatch .popBatch ());
}
messagesBatch .reset ();
}
@ GuardedBy ("appendAndRefreshAppendLock" )
private void writeBatch (final InflightBatch inflightBatch ) {
if (inflightBatch != null ) {
AppendRowsRequest request = inflightBatch .getMergedRequest ();
try {
appendAndRefreshAppendLock .unlock ();
messagesWaiter .acquire (inflightBatch .getByteSize ());
appendAndRefreshAppendLock .lock ();
if (shutdown || streamException .get () != null ) {
appendAndRefreshAppendLock .unlock ();
messagesWaiter .release (inflightBatch .getByteSize ());
appendAndRefreshAppendLock .lock ();
inflightBatch .onFailure (
new AbortedException (
shutdown
? "Stream closed, abort append."
: "Stream has previous errors, abort append." ,
null ,
GrpcStatusCode .of (Status .Code .ABORTED ),
true ));
return ;
}
responseObserver .addInflightBatch (inflightBatch );
clientStream .send (request );
} catch (FlowController .FlowControlException ex ) {
appendAndRefreshAppendLock .lock ();
inflightBatch .onFailure (ex );
}
}
@@ -447,9 +463,6 @@ private void onFailure(Throwable t) {
// Error has been set already.
LOG .warning ("Ignore " + t .toString () + " since error has already been set" );
return ;
} else {
LOG .info ("Setting " + t .toString () + " on response" );
this .streamWriter .setException (t );
}
for (AppendRequestAndFutureResponse request : inflightRequests ) {
@@ -511,26 +524,68 @@ public RetrySettings getRetrySettings() {
* pending messages are lost.
*/
protected void shutdown () {
if (shutdown .getAndSet (true )) {
LOG .fine ("Already shutdown." );
return ;
}
LOG .fine ("Shutdown called on writer" );
if (currentAlarmFuture != null && activeAlarm .getAndSet (false )) {
currentAlarmFuture .cancel (false );
}
writeAllOutstanding ();
appendAndRefreshAppendLock .lock ();
try {
synchronized (messagesWaiter ) {
if (shutdown ) {
LOG .fine ("Already shutdown." );
return ;
}
shutdown = true ;
LOG .info ("Shutdown called on writer: " + streamName );
if (currentAlarmFuture != null && activeAlarm ) {
currentAlarmFuture .cancel (false );
activeAlarm = false ;
}
// Wait for current inflight to drain.
try {
appendAndRefreshAppendLock .unlock ();
messagesWaiter .waitComplete (0 );
} catch (InterruptedException e ) {
LOG .warning ("Failed to wait for messages to return " + e .toString ());
}
} catch (InterruptedException e ) {
LOG .warning ("Failed to wait for messages to return " + e .toString ());
}
if (clientStream .isSendReady ()) {
clientStream .closeSend ();
appendAndRefreshAppendLock .lock ();
// Try to send out what's left in batch.
if (!messagesBatch .isEmpty ()) {
InflightBatch inflightBatch = messagesBatch .popBatch ();
AppendRowsRequest request = inflightBatch .getMergedRequest ();
if (streamException .get () != null ) {
inflightBatch .onFailure (
new AbortedException (
shutdown
? "Stream closed, abort append."
: "Stream has previous errors, abort append." ,
null ,
GrpcStatusCode .of (Status .Code .ABORTED ),
true ));
} else {
try {
appendAndRefreshAppendLock .unlock ();
messagesWaiter .acquire (inflightBatch .getByteSize ());
appendAndRefreshAppendLock .lock ();
responseObserver .addInflightBatch (inflightBatch );
clientStream .send (request );
} catch (FlowController .FlowControlException ex ) {
appendAndRefreshAppendLock .lock ();
LOG .warning (
"Unexpected flow control exception when sending batch leftover: " + ex .toString ());
}
}
}
// Close the stream.
try {
appendAndRefreshAppendLock .unlock ();
messagesWaiter .waitComplete (0 );
} catch (InterruptedException e ) {
LOG .warning ("Failed to wait for messages to return " + e .toString ());
}
appendAndRefreshAppendLock .lock ();
if (clientStream .isSendReady ()) {
clientStream .closeSend ();
}
backgroundResources .shutdown ();
} finally {
appendAndRefreshAppendLock .unlock ();
}
backgroundResources .shutdown ();
}
/**
@@ -815,11 +870,12 @@ public void onStart(StreamController controller) {
}
private void abortInflightRequests (Throwable t ) {
LOG .fine ("Aborting all inflight requests" );
synchronized (this .inflightBatches ) {
boolean first_error = true ;
while (!this .inflightBatches .isEmpty ()) {
InflightBatch inflightBatch = this .inflightBatches .poll ();
if (first_error ) {
if (first_error || t . getCause (). getClass () == AbortedException . class ) {
inflightBatch .onFailure (t );
first_error = false ;
} else {
@@ -894,7 +950,8 @@ public void onComplete() {
@ Override
public void onError (Throwable t ) {
LOG .fine ("OnError called" );
LOG .info ("OnError called: " + t .toString ());
streamWriter .streamException .set (t );
abortInflightRequests (t );
}
};
@@ -917,6 +974,7 @@ private MessagesBatch(
}
// Get all the messages out in a batch.
@ GuardedBy ("appendAndRefreshAppendLock" )
private InflightBatch popBatch () {
InflightBatch batch =
new InflightBatch (
@@ -958,6 +1016,7 @@ private long getMaxBatchBytes() {
// The message batch returned could contain the previous batch of messages plus the current
// message.
// if the message is too large.
@ GuardedBy ("appendAndRefreshAppendLock" )
private List <InflightBatch > add (AppendRequestAndFutureResponse outstandingAppend ) {
List <InflightBatch > batchesToSend = new ArrayList <>();
// Check if the next message makes the current batch exceed the max batch byte size.
@@ -978,7 +1037,6 @@ && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) {
|| getMessagesCount () == batchingSettings .getElementCountThreshold ()) {
batchesToSend .add (popBatch ());
}
return batchesToSend ;
}
}