Skip to content

Commit

Permalink
update error codes
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Mar 1, 2022
1 parent 7535015 commit 85268dd
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -422,12 +423,15 @@ public String toString() {
// The first context is always the one that fails.
AppendRowsContext failedContext =
Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null));
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
// Invalidate the StreamWriter and force a new one to be created.
LOG.error(
"Got error " + failedContext.getError() + " closing " + failedContext.streamName);
clearClients.accept(contexts);
appendFailures.inc();

boolean explicitStreamFinalized =
failedContext.getError() instanceof StreamFinalizedException;
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
Expand All @@ -438,10 +442,13 @@ public String toString() {
boolean offsetMismatch =
statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS);
// This implies that the stream doesn't exist or has already been finalized. In this
// case we have no
// choice but to create a new stream.
boolean streamDoesntExist = statusCode.equals(Code.INVALID_ARGUMENT);
if (offsetMismatch || streamDoesntExist) {
// case we have no choice but to create a new stream.
boolean streamDoesNotExist =
explicitStreamFinalized
|| statusCode.equals(Code.INVALID_ARGUMENT)
|| statusCode.equals(Code.NOT_FOUND)
|| statusCode.equals(Code.FAILED_PRECONDITION);
if (offsetMismatch || streamDoesNotExist) {
appendOffsetFailures.inc();
LOG.warn(
"Append to "
Expand Down

0 comments on commit 85268dd

Please sign in to comment.