Skip to content
Permalink
Browse files
fix: remove reconnection feature from client library (#849)
* fix: Remove reconnection logic

* .

* .

* .

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed Feb 18, 2021
1 parent 687d48a commit 86dfc3a1b1e3429a1e0932919a300a3bbbcb1ceb
@@ -65,8 +65,8 @@
* without offset, please use a simpler writer {@code DirectWriter}.
*
* <p>A {@link StreamWrier} provides built-in capabilities to: handle batching of messages;
* controlling memory utilization (through flow control); automatic connection re-establishment and
* request cleanup (only keeps write schema on first request in the stream).
* controlling memory utilization (through flow control) and request cleanup (only keeps write
* schema on first request in the stream).
*
* <p>With customizable options that control:
*
@@ -863,14 +863,20 @@ public void onStart(StreamController controller) {

private void abortInflightRequests(Throwable t) {
synchronized (this.inflightBatches) {
boolean first_error = true;
while (!this.inflightBatches.isEmpty()) {
InflightBatch inflightBatch = this.inflightBatches.poll();
inflightBatch.onFailure(
new AbortedException(
"Request aborted due to previous failures",
t,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
if (first_error) {
inflightBatch.onFailure(t);
first_error = false;
} else {
inflightBatch.onFailure(
new AbortedException(
"Request aborted due to previous failures",
t,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
}
@@ -913,7 +919,12 @@ public void onResponse(AppendRowsResponse response) {
response.getAppendResult().getOffset().getValue(),
inflightBatch.getExpectedOffset()));
inflightBatch.onFailure(exception);
abortInflightRequests(exception);
abortInflightRequests(
new AbortedException(
"Request aborted due to previous failures",
exception,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
} else {
inflightBatch.onSuccess(response);
}
@@ -931,56 +942,7 @@ public void onComplete() {
@Override
public void onError(Throwable t) {
LOG.fine("OnError called");
if (streamWriter.shutdown.get()) {
abortInflightRequests(t);
return;
}
InflightBatch inflightBatch = null;
synchronized (this.inflightBatches) {
if (inflightBatches.isEmpty()) {
// The batches could have been aborted.
return;
}
inflightBatch = this.inflightBatches.poll();
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
if (isRecoverableError(t)) {
try {
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
&& !streamWriter.shutdown.get()) {
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
}
LOG.info(
"Try to reestablish connection due to transient error: "
+ t.toString()
+ " retry times: "
+ streamWriter.currentRetries);
streamWriter.refreshAppend();
LOG.info("Resending requests on after connection established");
streamWriter.writeBatch(inflightBatch);
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} catch (InterruptedException e) {
LOG.info("Got exception while retrying: " + e.toString());
inflightBatch.onFailure(new StatusRuntimeException(Status.ABORTED));
abortInflightRequests(new StatusRuntimeException(Status.ABORTED));
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
abortInflightRequests(t);
}
};

@@ -64,10 +64,16 @@ private void notifyNextAcquires() {
}
}

public synchronized void release(long messageSize) {
public synchronized void release(long messageSize) throws IllegalStateException {
lock.lock();
--pendingCount;
if (pendingCount < 0) {
throw new IllegalStateException("pendingCount cannot be less than 0");
}
pendingSize -= messageSize;
if (pendingSize < 0) {
throw new IllegalStateException("pendingSize cannot be less than 0");
}
notifyNextAcquires();
lock.unlock();
notifyAll();
@@ -32,7 +32,7 @@
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.DataLossException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.common.base.Strings;
@@ -510,42 +510,6 @@ public void testFlowControlBehaviorException() throws Exception {
}
}

@Test
public void testStreamReconnectionTransient() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.setFlowControlSettings(
StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
.toBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build())
.build())
.build();

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
testBigQueryWrite.addException(new StatusRuntimeException(Status.UNAVAILABLE));
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"m1"});
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(0L, future1.get().getAppendResult().getOffset().getValue());
assertEquals(1L, future2.get().getAppendResult().getOffset().getValue());
writer.close();
}

@Test
public void testStreamReconnectionPermanant() throws Exception {
StreamWriter writer =
@@ -569,36 +533,6 @@ public void testStreamReconnectionPermanant() throws Exception {
writer.close();
}

@Test
public void testStreamReconnectionExceedRetry() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.build())
.setRetrySettings(
RetrySettings.newBuilder()
.setMaxRetryDelay(Duration.ofMillis(100))
.setMaxAttempts(1)
.build())
.build();
assertEquals(1, writer.getRetrySettings().getMaxAttempts());
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addException(transientError);
ApiFuture<AppendRowsResponse> future3 = sendTestMessage(writer, new String[] {"toomanyretry"});
try {
future3.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
assertEquals(transientError.toString(), e.getCause().getCause().toString());
}
writer.close();
}

@Test
public void testOffset() throws Exception {
try (StreamWriter writer =
@@ -665,7 +599,7 @@ public void testOffsetMismatch() throws Exception {

@Test
public void testErrorPropagation() throws Exception {
try (StreamWriter writer =
StreamWriter writer =
getTestStreamWriterBuilder()
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
.setBatchingSettings(
@@ -674,13 +608,23 @@ public void testErrorPropagation() throws Exception {
.setElementCountThreshold(1L)
.setDelayThreshold(Duration.ofSeconds(5))
.build())
.build()) {
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
sendTestMessage(writer, new String[] {"A"}).get();
.build();
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"B"});
try {
future1.get();
fail("should throw exception");
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(DataLossException.class);
}
try {
future2.get();
fail("should throw exception");
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(AbortedException.class);
}
}

@Test
@@ -957,43 +901,6 @@ public void testFlushAll() throws Exception {
writer.close();
}

@Test
public void testFlushAllFailed() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100000))
.build())
.build();

testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertFalse(appendFuture3.isDone());
try {
writer.flushAll(100000);
fail("Should have thrown an Exception");
} catch (Exception expected) {
if (expected.getCause() instanceof com.google.api.gax.rpc.DataLossException) {
LOG.info("got: " + expected.toString());
} else {
fail("Unexpected exception:" + expected.toString());
}
}

assertTrue(appendFuture3.isDone());

writer.close();
}

@Test
public void testDatasetTraceId() throws Exception {
StreamWriter writer =
@@ -1032,10 +939,12 @@ public void testShutdownWithConnectionError() throws Exception {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"B"});
Thread.sleep(5000L);
// Move the needle for responses to be sent.
fakeExecutor.advanceTime(Duration.ofSeconds(20));
@@ -1044,9 +953,15 @@ public void testShutdownWithConnectionError() throws Exception {
assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue());
try {
appendFuture2.get();
fail("Should fail with exception");
} catch (java.util.concurrent.ExecutionException e) {
assertEquals("Request aborted due to previous failures", e.getCause().getMessage());
fail("Should fail with exception future2");
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(DataLossException.class);
}
try {
appendFuture3.get();
fail("Should fail with exception future3");
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(AbortedException.class);
}
}
}

0 comments on commit 86dfc3a

Please sign in to comment.