Skip to content

Commit

Permalink
feat: provide sample code for row-level error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawal-siddharth committed Oct 31, 2022
1 parent 5699122 commit 18e97b8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
2 changes: 1 addition & 1 deletion samples/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>25.4.0</version>
<version>26.1.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
Expand All @@ -38,6 +39,7 @@
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.concurrent.Phaser;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -69,7 +71,11 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("test_string", String.format("record %03d-%03d", i, j));
StringBuilder sbSuffix = new StringBuilder();
for (int k = 0; k < j; k++) {
sbSuffix.append(k);
}
record.put("test_string", String.format("record %03d-%03d %s", i, j, sbSuffix.toString()));
jsonArr.put(record);
}

Expand Down Expand Up @@ -170,7 +176,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
}

public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success\n");
System.out.format("Append success%n");
done();
}

Expand All @@ -191,7 +197,37 @@ public void onFailure(Throwable throwable) {
return;
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s\n", e);
System.out.format("Failed to retry append: %s%n", e);
}
}

if (throwable instanceof AppendSerializtionError) {
AppendSerializtionError ase = (AppendSerializtionError)throwable;
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
if (rowIndexToErrorMessage.size() > 0) {
// Omit the faulty rows
JSONArray dataNew = new JSONArray();
for (int i = 0; i < appendContext.data.length(); i++) {
if (!rowIndexToErrorMessage.containsKey(i)) {
dataNew.put(appendContext.data.get(i));
} else {
// process faulty rows by placing them on a dead-letter-queue, for instance
}
}

// Mark the existing attempt as done since we got a response for it
done();

// Retry the remaining valid rows.
if (dataNew.length() > 0) {
try {
this.parent.append(new AppendContext(dataNew, 0));
return;
} catch (Exception e2) {
// Fall through to return error.
System.out.format("Failed to retry append with filtered rows: %s%n", e2);
}
}
}
}

Expand All @@ -202,7 +238,7 @@ public void onFailure(Throwable throwable) {
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
System.out.format("Error: %s\n", throwable);
System.out.format("Error that arrived: %s%n", throwable);
done();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ public void setUp() {
// Create a new dataset and table for each test.
datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
Schema schema = Schema.of(Field.of("test_string", StandardSQLTypeName.STRING));
Schema schema = Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"test_string", StandardSQLTypeName.STRING)
.setMaxLength(20L)
.build());
bigquery.create(DatasetInfo.newBuilder(datasetName).build());
TableInfo tableInfo =
TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))
Expand Down

0 comments on commit 18e97b8

Please sign in to comment.