Skip to content

Commit

Permalink
fix: several StreamWriter issues (#213)
Browse files Browse the repository at this point in the history
* fix: several StreamWriter issues

- Make messages wait in flow controll to be delivered in order
- Avoid recreating the BigQueryWriteClient stub during reconnection, which is not necessary.
- Allow user to pass in BigQueryWriteClient stub so that it can be shared with other API calls.

	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java

* Wait doesn't need to be synchronized

* fix: unlock is not called in exception case

	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java
  • Loading branch information
yirutang committed Apr 24, 2020
1 parent e3314f4 commit b803863
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class StreamWriter implements AutoCloseable {

private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
private final BigQueryWriteSettings stubSettings;
private BigQueryWriteSettings stubSettings;

private final Lock messagesBatchLock;
private final MessagesBatch messagesBatch;
Expand Down Expand Up @@ -142,13 +142,21 @@ private StreamWriter(Builder builder)
messagesWaiter = new Waiter(this.batchingSettings.getFlowControlSettings());
responseObserver = new AppendResponseObserver(this);

stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setExecutorProvider(builder.executorProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
.build();
if (builder.client == null) {
stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setExecutorProvider(builder.executorProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
.build();
stub = BigQueryWriteClient.create(stubSettings);
backgroundResourceList.add(stub);
} else {
stub = builder.client;
}
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);

shutdown = new AtomicBoolean(false);
refreshAppend();
Stream.WriteStream stream =
Expand Down Expand Up @@ -240,15 +248,10 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
public void refreshAppend() throws IOException, InterruptedException {
synchronized (this) {
Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
if (stub != null) {
// There could be a moment, stub is not yet initialized.
if (clientStream != null) {
clientStream.closeSend();
stub.shutdown();
stub.awaitTermination(1, TimeUnit.MINUTES);
}
backgroundResourceList.remove(stub);
stub = BigQueryWriteClient.create(stubSettings);
backgroundResourceList.add(stub);
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
messagesBatch.resetAttachSchema();
bidiStreamingCallable = stub.appendRowsCallable();
clientStream = bidiStreamingCallable.splitCall(responseObserver);
Expand Down Expand Up @@ -314,14 +317,12 @@ public void writeAllOutstanding() {
private void writeBatch(final InflightBatch inflightBatch) {
if (inflightBatch != null) {
AppendRowsRequest request = inflightBatch.getMergedRequest();
messagesWaiter.waitOnElementCount();
messagesWaiter.waitOnSizeLimit(inflightBatch.getByteSize());
responseObserver.addInflightBatch(inflightBatch);
clientStream.send(request);

synchronized (messagesWaiter) {
messagesWaiter.incrementPendingCount(1);
messagesWaiter.incrementPendingSize(inflightBatch.getByteSize());
try {
messagesWaiter.acquire(inflightBatch.getByteSize());
responseObserver.addInflightBatch(inflightBatch);
clientStream.send(request);
} catch (FlowController.FlowControlException ex) {
inflightBatch.onFailure(ex);
}
}
}
Expand All @@ -346,14 +347,14 @@ private static final class InflightBatch {
final ArrayList<Long> offsetList;
final long creationTime;
int attempt;
int batchSizeBytes;
long batchSizeBytes;
long expectedOffset;
Boolean attachSchema;
String streamName;

InflightBatch(
List<AppendRequestAndFutureResponse> inflightRequests,
int batchSizeBytes,
long batchSizeBytes,
String streamName,
Boolean attachSchema) {
this.inflightRequests = inflightRequests;
Expand All @@ -377,7 +378,7 @@ int count() {
return inflightRequests.size();
}

int getByteSize() {
long getByteSize() {
return this.batchSizeBytes;
}

Expand Down Expand Up @@ -478,7 +479,9 @@ public void shutdown() {
currentAlarmFuture.cancel(false);
}
writeAllOutstanding();
messagesWaiter.waitComplete();
synchronized (messagesWaiter) {
messagesWaiter.waitComplete();
}
if (clientStream.isSendReady()) {
clientStream.closeSend();
}
Expand All @@ -496,7 +499,7 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
}

/**
* Constructs a new {@link Builder} using the given topic.
* Constructs a new {@link Builder} using the given stream.
*
* <p>Example of creating a {@code WriteStream}.
*
Expand All @@ -514,7 +517,15 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
* }</pre>
*/
public static Builder newBuilder(String streamName) {
return new Builder(streamName);
return new Builder(streamName, null);
}

/**
* Constructs a new {@link Builder} using the given stream and an existing BigQueryWriteClient.
*/
public static Builder newBuilder(String streamName, BigQueryWriteClient client) {
Preconditions.checkArgument(client != null);
return new Builder(streamName, client);
}

/** A builder of {@link StreamWriter}s. */
Expand All @@ -523,9 +534,6 @@ public static final class Builder {
static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10);

// Meaningful defaults.
static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L;
static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 100 * 1024L; // 100 kB
static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(10);
static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS =
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
Expand All @@ -534,9 +542,9 @@ public static final class Builder {
.build();
public static final BatchingSettings DEFAULT_BATCHING_SETTINGS =
BatchingSettings.newBuilder()
.setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
.setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
.setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
.setDelayThreshold(Duration.ofMillis(10))
.setRequestByteThreshold(100 * 1024L) // 100 kb
.setElementCountThreshold(100L)
.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS)
.build();
public static final RetrySettings DEFAULT_RETRY_SETTINGS =
Expand All @@ -555,6 +563,8 @@ public static final class Builder {
private String streamName;
private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();

private BigQueryWriteClient client = null;

// Batching options
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;

Expand All @@ -569,8 +579,9 @@ public static final class Builder {
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();

private Builder(String stream) {
private Builder(String stream, BigQueryWriteClient client) {
this.streamName = Preconditions.checkNotNull(stream);
this.client = client;
}

/**
Expand Down Expand Up @@ -771,11 +782,7 @@ public void onResponse(AppendRowsResponse response) {
inflightBatch.onSuccess(response);
}
} finally {
synchronized (streamWriter.messagesWaiter) {
streamWriter.messagesWaiter.incrementPendingCount(-1);
streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
streamWriter.messagesWaiter.notifyAll();
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
}

Expand Down Expand Up @@ -805,11 +812,11 @@ public void onError(Throwable t) {
&& !streamWriter.shutdown.get()) {
streamWriter.refreshAppend();
// Currently there is a bug that it took reconnected stream 5 seconds to pick up
// stream count. So wait at least 5 seconds before sending a new request.
// stream count. So wait at least 7 seconds before sending a new request.
Thread.sleep(
Math.min(
streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(),
Duration.ofSeconds(5).toMillis()));
Duration.ofSeconds(7).toMillis()));
streamWriter.writeBatch(inflightBatch);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
Expand Down Expand Up @@ -837,19 +844,15 @@ public void onError(Throwable t) {
}
}
} finally {
synchronized (streamWriter.messagesWaiter) {
streamWriter.messagesWaiter.incrementPendingCount(-1);
streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
streamWriter.messagesWaiter.notifyAll();
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
}
};

// This class controls how many messages are going to be sent out in a batch.
private static class MessagesBatch {
private List<AppendRequestAndFutureResponse> messages;
private int batchedBytes;
private long batchedBytes;
private final BatchingSettings batchingSettings;
private Boolean attachSchema = true;
private final String streamName;
Expand Down Expand Up @@ -882,7 +885,7 @@ private boolean isEmpty() {
return messages.isEmpty();
}

private int getBatchedBytes() {
private long getBatchedBytes() {
return batchedBytes;
}

Expand Down
Loading

0 comments on commit b803863

Please sign in to comment.