diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 29dc269f7fde..cb40acbe447b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -36,6 +36,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collections; @@ -626,7 +627,9 @@ public void startBundle(StartBundleContext c) throws IOException { public void processElement(ProcessContext c) throws Exception { checkForFailures(); Futures.addCallback( - bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element())); + bigtableWriter.writeRecord(c.element()), + new WriteExceptionCallback(c.element()), + MoreExecutors.directExecutor()); ++recordsWritten; }