Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor only, add StreamWriter to AppendRowsRequestResponse #1981

Merged
merged 6 commits into from Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand Down Expand Up @@ -267,16 +268,19 @@ public void run(Throwable finalStatus) {
}

/** Schedules the writing of rows at given offset. */
ApiFuture<AppendRowsResponse> append(
String streamName, ProtoSchema writerSchema, ProtoRows rows, long offset) {
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
Preconditions.checkNotNull(streamWriter);
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(
ProtoData.newBuilder().setWriterSchema(writerSchema).setRows(rows).build());
ProtoData.newBuilder()
.setWriterSchema(streamWriter.getProtoSchema())
.setRows(rows)
.build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
requestBuilder.setWriteStream(streamName);
return appendInternal(requestBuilder.build());
requestBuilder.setWriteStream(streamWriter.getStreamName());
return appendInternal(streamWriter, requestBuilder.build());
}

Boolean isUserClosed() {
Expand All @@ -288,8 +292,9 @@ Boolean isUserClosed() {
}
}

private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
private ApiFuture<AppendRowsResponse> appendInternal(
StreamWriter streamWriter, AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
requestWrapper.appendResult.setException(
new StatusRuntimeException(
Expand Down Expand Up @@ -840,10 +845,14 @@ static final class AppendRequestAndResponse {
final AppendRowsRequest message;
final long messageSize;

AppendRequestAndResponse(AppendRowsRequest message) {
// The writer that issues the call of the request.
final StreamWriter streamWriter;

AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) {
this.appendResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getProtoRows().getSerializedSize();
this.streamWriter = streamWriter;
}
}

Expand Down
Expand Up @@ -264,8 +264,7 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
}
Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset);
connectionWorker.append(streamWriter, rows, offset);
return ApiFutures.transform(
responseFuture,
// Add callback for update schema
Expand Down
Expand Up @@ -145,8 +145,7 @@ public enum Kind {
public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows protoRows, long offset) {
if (getKind() == Kind.CONNECTION_WORKER) {
return connectionWorker()
.append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset);
return connectionWorker().append(streamWriter, protoRows, offset);
} else {
return connectionWorkerPool().append(streamWriter, protoRows, offset);
}
Expand Down Expand Up @@ -376,7 +375,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
if (userClosed.get()) {
AppendRequestAndResponse requestWrapper =
new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build());
new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build(), this);
requestWrapper.appendResult.setException(
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
Expand Down
Expand Up @@ -80,6 +80,14 @@ public void testMultiplexedAppendSuccess() throws Exception {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema("foo"))
.build();
StreamWriter sw2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(createProtoSchema("complicate"))
.build();
// We do a pattern of:
// send to stream1, string1
// send to stream1, string2
Expand All @@ -95,8 +103,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
createProtoSchema("foo"),
sw1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
Expand All @@ -105,8 +112,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_2,
createProtoSchema("complicate"),
sw2,
createComplicateTypeProtoRows(new String[] {String.valueOf(i)}),
i));
break;
Expand Down Expand Up @@ -197,23 +203,27 @@ public void testAppendInSameStream_switchSchema() throws Exception {
// send to stream1, schema3
// send to stream1, schema1
// ...
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
StreamWriter sw2 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema2).build();
StreamWriter sw3 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema3).build();
for (long i = 0; i < appendCount; i++) {
switch ((int) i % 4) {
case 0:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
sw1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
case 1:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema2,
sw2,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
Expand All @@ -222,8 +232,7 @@ public void testAppendInSameStream_switchSchema() throws Exception {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema3,
sw3,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
Expand Down Expand Up @@ -293,6 +302,9 @@ public void testAppendInSameStream_switchSchema() throws Exception {

@Test
public void testAppendButInflightQueueFull() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
Expand All @@ -305,7 +317,6 @@ public void testAppendButInflightQueueFull() throws Exception {
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);
ProtoSchema schema1 = createProtoSchema("foo");

long appendCount = 6;
for (int i = 0; i < appendCount; i++) {
Expand All @@ -322,23 +333,15 @@ public void testAppendButInflightQueueFull() throws Exception {
StatusRuntimeException.class,
() -> {
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
createFooProtoRows(new String[] {String.valueOf(5)}),
5);
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(5)}), 5);
});
long timeDiff = System.currentTimeMillis() - startTime;
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 5);
assertTrue(timeDiff > 500);
} else {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
}
}
Expand Down Expand Up @@ -396,11 +399,10 @@ private ProtoSchema createProtoSchema(String protoName) {

private ApiFuture<AppendRowsResponse> sendTestMessage(
ConnectionWorker connectionWorker,
String streamName,
ProtoSchema protoSchema,
StreamWriter streamWriter,
ProtoRows protoRows,
long offset) {
return connectionWorker.append(streamName, protoSchema, protoRows, offset);
return connectionWorker.append(streamWriter, protoRows, offset);
}

private ProtoRows createFooProtoRows(String[] messages) {
Expand Down