Skip to content

Commit

Permalink
fix: add client id and update trace id population for StreamWriter an…
Browse files Browse the repository at this point in the history
…d JsonWriter (#2389)

* fix: add a bit more message so customers are not going to be scaried by load shedding errors

* .

* fix:change the client id string, adding client id for stream writer

* .

* .

* .

* .

* .

* .

* .

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] committed Feb 7, 2024
1 parent e5913cf commit 4258af4
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 37 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -188,5 +188,10 @@
<className>com/google/cloud/bigquery/storage/v1/StreamWriter$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.StreamWriter$Builder setRetryFirstDelay(org.threeten.bp.Duration)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter$Builder setTraceIdBase(java.lang.String)</method>
</difference>
</differences>

Expand Up @@ -37,6 +37,7 @@
*/
public class JsonStreamWriter implements AutoCloseable {
private final SchemaAwareStreamWriter<Object> schemaAwareStreamWriter;
private static final String CLIENT_ID = "java-jsonwriter";

/**
* Constructs the JsonStreamWriter
Expand Down Expand Up @@ -227,8 +228,7 @@ public static final class Builder {
private final SchemaAwareStreamWriter.Builder<Object> schemaAwareStreamWriterBuilder;

private Builder(SchemaAwareStreamWriter.Builder<Object> schemaAwareStreamWriterBuilder) {
this.schemaAwareStreamWriterBuilder =
schemaAwareStreamWriterBuilder.setTraceIdBase("JsonWriter");
this.schemaAwareStreamWriterBuilder = schemaAwareStreamWriterBuilder.setClientId(CLIENT_ID);
}

/**
Expand Down
Expand Up @@ -94,14 +94,14 @@ private SchemaAwareStreamWriter(Builder<T> builder)
builder.executorProvider,
builder.endpoint,
builder.flowControlSettings,
builder.traceIdBase,
builder.traceId,
builder.compressorName,
builder.retrySettings);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
streamWriterBuilder.setLocation(builder.location);
streamWriterBuilder.setDefaultMissingValueInterpretation(
builder.defaultMissingValueInterpretation);
streamWriterBuilder.setClientId(builder.clientId);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
Expand Down Expand Up @@ -282,7 +282,6 @@ private void setStreamWriterSettings(
@Nullable ExecutorProvider executorProvider,
@Nullable String endpoint,
@Nullable FlowControlSettings flowControlSettings,
@Nullable String traceIdBase,
@Nullable String traceId,
@Nullable String compressorName,
@Nullable RetrySettings retrySettings) {
Expand All @@ -298,18 +297,8 @@ private void setStreamWriterSettings(
if (endpoint != null) {
streamWriterBuilder.setEndpoint(endpoint);
}
if (traceIdBase != null) {
if (traceId != null) {
streamWriterBuilder.setTraceId(traceIdBase + "_" + traceId);
} else {
streamWriterBuilder.setTraceId(traceIdBase + ":null");
}
} else {
if (traceId != null) {
streamWriterBuilder.setTraceId("SchemaAwareStreamWriter_" + traceId);
} else {
streamWriterBuilder.setTraceId("SchemaAwareStreamWriter:null");
}
if (traceId != null) {
streamWriterBuilder.setTraceId(traceId);
}
if (flowControlSettings != null) {
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
Expand Down Expand Up @@ -445,6 +434,7 @@ public static final class Builder<T> {

private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
private String clientId;

private static final String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -581,14 +571,8 @@ public Builder<T> setTraceId(String traceId) {
return this;
}

/**
* Setter for a traceIdBase to help identify traffic origin.
*
* @param traceIdBase
* @return Builder
*/
public Builder<T> setTraceIdBase(String traceIdBase) {
this.traceIdBase = Preconditions.checkNotNull(traceIdBase, "TraceIdBase is null.");
Builder<T> setClientId(String clientId) {
this.clientId = Preconditions.checkNotNull(clientId, "ClientId is null.");
return this;
}

Expand Down
Expand Up @@ -229,7 +229,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxInflightBytes,
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
builder.getFullTraceId(),
builder.compressorName,
clientSettings,
builder.retrySettings));
Expand Down Expand Up @@ -295,7 +295,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxInflightBytes,
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
builder.getFullTraceId(),
builder.compressorName,
client.getSettings(),
builder.retrySettings);
Expand Down Expand Up @@ -376,12 +376,12 @@ static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IO
private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
String storedTraceId =
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId();
if (!Objects.equals(storedTraceId, builder.traceId)) {
if (!Objects.equals(storedTraceId, builder.getFullTraceId())) {
throw new IllegalArgumentException(
String.format(
"Trace id used for the same connection pool for the same location must be the same, "
+ "however stored trace id is %s, and expected trace id is %s.",
storedTraceId, builder.traceId));
storedTraceId, builder.getFullTraceId()));
}
FlowController.LimitExceededBehavior storedLimitExceededBehavior =
singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior();
Expand Down Expand Up @@ -629,6 +629,8 @@ public static final class Builder {

private String traceId = null;

private String clientId = "java-streamwriter";

private TableSchema updatedTableSchema = null;

private String location = null;
Expand Down Expand Up @@ -730,6 +732,15 @@ public Builder setTraceId(String traceId) {
return this;
}

/**
* Sets the client id of the writer, for example, JsonStreamWriter has the client id of
* "java-jsonwriter".
*/
Builder setClientId(String clientId) {
this.clientId = clientId;
return this;
}

/** Location of the table this stream writer is targeting. */
public Builder setLocation(String location) {
this.location = location;
Expand Down Expand Up @@ -811,5 +822,13 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
public StreamWriter build() throws IOException {
return new StreamWriter(this);
}

String getFullTraceId() {
if (traceId == null) {
return clientId;
} else {
return clientId + " " + traceId;
}
}
}
}
Expand Up @@ -237,7 +237,7 @@ public void testSingleAppendSimpleJson() throws Exception {
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
"java-jsonwriter test:empty", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
}
}

Expand All @@ -257,7 +257,7 @@ public void testFlexibleColumnAppend() throws Exception {
jsonArr.put(flexible);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
Expand All @@ -284,8 +284,7 @@ public void testFlexibleColumnAppend() throws Exception {
.getRows()
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
assertEquals("java-jsonwriter", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
}
}

Expand Down Expand Up @@ -507,7 +506,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter:null");
assertEquals("java-jsonwriter", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
for (int i = 0; i < 4; i++) {
assertEquals(
testBigQueryWrite
Expand Down
Expand Up @@ -286,13 +286,13 @@ private void verifyAppendRequests(long appendCount) {
if (i == 0) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM_1);
assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID);
assertEquals(TEST_STREAM_1, serverRequest.getWriteStream());
assertEquals("java-streamwriter " + TEST_TRACE_ID, serverRequest.getTraceId());
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
assertEquals(serverRequest.getTraceId(), "");
assertEquals("", serverRequest.getWriteStream());
assertEquals("", serverRequest.getTraceId());
}
}
}
Expand Down

0 comments on commit 4258af4

Please sign in to comment.