Skip to content

Commit

Permalink
fix: add schema update back to json writer (#905)
Browse files Browse the repository at this point in the history
* .

* fix: Add schema update back to JsonWriter

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed Mar 3, 2021
1 parent 5a167bf commit a2adbf8
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 492 deletions.
15 changes: 15 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,19 @@
<differenceType>7002</differenceType>
<method>void flushAll(long)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<differenceType>7002</differenceType>
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setBatchingSettings(com.google.api.gax.batching.BatchingSettings)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<differenceType>7002</differenceType>
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setExecutorProvider(com.google.api.gax.core.ExecutorProvider)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<differenceType>7002</differenceType>
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setRetrySettings(com.google.api.gax.retrying.RetrySettings)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.Schema;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -51,6 +50,7 @@ public class JsonStreamWriter implements AutoCloseable {
private BigQueryWriteClient client;
private String streamName;
private StreamWriter streamWriter;
private StreamWriter.Builder streamWriterBuilder;
private Descriptor descriptor;
private TableSchema tableSchema;

Expand All @@ -66,20 +66,16 @@ private JsonStreamWriter(Builder builder)
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);

StreamWriter.Builder streamWriterBuilder;
if (this.client == null) {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
} else {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
}
setStreamWriterSettings(
streamWriterBuilder,
builder.channelProvider,
builder.credentialsProvider,
builder.batchingSettings,
builder.retrySettings,
builder.executorProvider,
builder.endpoint,
builder.flowControlSettings,
builder.createDefaultStream);
this.streamWriter = streamWriterBuilder.build();
this.streamName = this.streamWriter.getStreamNameString();
Expand Down Expand Up @@ -134,17 +130,17 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
}

/**
* Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then calling
* refreshAppend(), and finally setting the descriptor. All of these actions need to be performed
* atomically to avoid having synchronization issues with append(). Flushing all rows first is
* necessary since if there are rows remaining when the connection refreshes, it will send out the
* old writer schema instead of the new one.
* Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then
* recreates stream writer, and finally setting the descriptor. All of these actions need to be
* performed atomically to avoid having synchronization issues with append(). Flushing all rows
* first is necessary since if there are rows remaining when the connection refreshes, it will
* send out the old writer schema instead of the new one.
*/
void refreshConnection()
throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
synchronized (this) {
this.streamWriter.writeAllOutstanding();
this.streamWriter.refreshAppend();
this.streamWriter.shutdown();
this.streamWriter = streamWriterBuilder.build();
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema);
}
Expand All @@ -170,39 +166,37 @@ public Descriptor getDescriptor() {

/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
StreamWriter.Builder builder,
@Nullable TransportChannelProvider channelProvider,
@Nullable CredentialsProvider credentialsProvider,
@Nullable BatchingSettings batchingSettings,
@Nullable RetrySettings retrySettings,
@Nullable ExecutorProvider executorProvider,
@Nullable String endpoint,
@Nullable FlowControlSettings flowControlSettings,
Boolean createDefaultStream) {
if (channelProvider != null) {
builder.setChannelProvider(channelProvider);
streamWriterBuilder.setChannelProvider(channelProvider);
}
if (credentialsProvider != null) {
builder.setCredentialsProvider(credentialsProvider);
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
}
if (batchingSettings != null) {
builder.setBatchingSettings(batchingSettings);
}
if (retrySettings != null) {
builder.setRetrySettings(retrySettings);
}
if (executorProvider != null) {
builder.setExecutorProvider(executorProvider);
BatchingSettings.Builder batchSettingBuilder =
BatchingSettings.newBuilder()
.setElementCountThreshold(1L)
.setRequestByteThreshold(4 * 1024 * 1024L);
if (flowControlSettings != null) {
streamWriterBuilder.setBatchingSettings(
batchSettingBuilder.setFlowControlSettings(flowControlSettings).build());
} else {
streamWriterBuilder.setBatchingSettings(batchSettingBuilder.build());
}
if (endpoint != null) {
builder.setEndpoint(endpoint);
streamWriterBuilder.setEndpoint(endpoint);
}
if (createDefaultStream) {
builder.createDefaultStream();
streamWriterBuilder.createDefaultStream();
}
JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
new JsonStreamWriterOnSchemaUpdateRunnable();
jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this);
builder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable);
streamWriterBuilder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable);
}

/**
Expand Down Expand Up @@ -313,9 +307,7 @@ public static final class Builder {

private TransportChannelProvider channelProvider;
private CredentialsProvider credentialsProvider;
private BatchingSettings batchingSettings;
private RetrySettings retrySettings;
private ExecutorProvider executorProvider;
private FlowControlSettings flowControlSettings;
private String endpoint;
private boolean createDefaultStream = false;

Expand Down Expand Up @@ -359,37 +351,15 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
}

/**
* Setter for the underlying StreamWriter's BatchingSettings.
*
* @param batchingSettings
* @return Builder
*/
public Builder setBatchingSettings(BatchingSettings batchingSettings) {
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "BatchingSettings is null.");
return this;
}

/**
* Setter for the underlying StreamWriter's RetrySettings.
*
* @param retrySettings
* @return Builder
*/
public Builder setRetrySettings(RetrySettings retrySettings) {
this.retrySettings = Preconditions.checkNotNull(retrySettings, "RetrySettings is null.");
return this;
}

/**
* Setter for the underlying StreamWriter's ExecutorProvider.
* Setter for the underlying StreamWriter's FlowControlSettings.
*
* @param executorProvider
* @param flowControlSettings
* @return Builder
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider =
Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null.");
this.flowControlSettings =
Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null.");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
List<InflightBatch> batchesToSend;
batchesToSend = messagesBatch.add(outstandingAppend);
// Setup the next duration based delivery alarm if there are messages batched.
setupAlarm();
if (batchingSettings.getDelayThreshold() != null) {
setupAlarm();
}
if (!batchesToSend.isEmpty()) {
for (final InflightBatch batch : batchesToSend) {
LOG.fine("Scheduling a batch for immediate sending");
Expand Down Expand Up @@ -738,58 +740,31 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) {
if (batchingSettings.getRequestByteThreshold() > getApiMaxRequestBytes()) {
builder.setRequestByteThreshold(getApiMaxRequestBytes());
}
Preconditions.checkNotNull(batchingSettings.getDelayThreshold());
Preconditions.checkArgument(batchingSettings.getDelayThreshold().toMillis() > 0);
LOG.info("here" + batchingSettings.getFlowControlSettings());
if (batchingSettings.getFlowControlSettings() == null) {
builder.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS);
} else {

if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() == null) {
builder.setFlowControlSettings(
batchingSettings
.getFlowControlSettings()
.toBuilder()
.setMaxOutstandingElementCount(
DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount())
.build());
} else {
Preconditions.checkArgument(
batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() > 0);
if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount()
> getApiMaxInflightRequests()) {
builder.setFlowControlSettings(
batchingSettings
.getFlowControlSettings()
.toBuilder()
.setMaxOutstandingElementCount(getApiMaxInflightRequests())
.build());
}
Long elementCount =
batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount();
if (elementCount == null || elementCount > getApiMaxInflightRequests()) {
elementCount = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount();
}
if (batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() == null) {
builder.setFlowControlSettings(
batchingSettings
.getFlowControlSettings()
.toBuilder()
.setMaxOutstandingRequestBytes(
DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes())
.build());
} else {
Preconditions.checkArgument(
batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() > 0);
Long elementSize =
batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes();
if (elementSize == null || elementSize < 0) {
elementSize = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes();
}
if (batchingSettings.getFlowControlSettings().getLimitExceededBehavior() == null) {
builder.setFlowControlSettings(
batchingSettings
.getFlowControlSettings()
.toBuilder()
.setLimitExceededBehavior(
DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior())
.build());
} else {
Preconditions.checkArgument(
batchingSettings.getFlowControlSettings().getLimitExceededBehavior()
!= FlowController.LimitExceededBehavior.Ignore);
FlowController.LimitExceededBehavior behavior =
batchingSettings.getFlowControlSettings().getLimitExceededBehavior();
if (behavior == null || behavior == FlowController.LimitExceededBehavior.Ignore) {
behavior = DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior();
}
builder.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(elementCount)
.setMaxOutstandingRequestBytes(elementSize)
.setLimitExceededBehavior(behavior)
.build());
}
this.batchingSettings = builder.build();
return this;
Expand Down
Loading

0 comments on commit a2adbf8

Please sign in to comment.