From 137d7e65f5940a7d4df895f9176a470ee060fe4e Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Wed, 24 Nov 2021 16:56:28 -0800 Subject: [PATCH 1/3] Revert "Merge pull request #15810: [BEAM-2791] Support low-latency StorageApi sink with no exactly-once guarantees" This reverts commit fafe9d1db1bc6d9e29bf33bd29dff5509dadc600. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 22 +- .../sdk/io/gcp/bigquery/BigQueryOptions.java | 7 - .../io/gcp/bigquery/SplittingIterable.java | 69 ------ .../sdk/io/gcp/bigquery/StorageApiLoads.java | 32 +-- .../StorageApiWriteRecordsInconsistent.java | 75 ------ .../StorageApiWriteUnshardedRecords.java | 221 ++++++------------ .../StorageApiWritesShardedRecords.java | 49 ++++ .../io/gcp/testing/FakeDatasetService.java | 38 +-- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 41 +--- 9 files changed, 141 insertions(+), 413 deletions(-) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 1702e32bf6e6..3ea550b144b3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1757,14 +1757,8 @@ public enum Method { * BigQuery. */ STREAMING_INSERTS, - /** Use the new, exactly-once Storage Write API. */ - STORAGE_WRITE_API, - /** - * Use the new, Storage Write API without exactly once enabled. This will be cheaper and - * provide lower latency, however comes with the caveat that the output table may contain - * duplicates. - */ - STORAGE_API_AT_LEAST_ONCE + /** Use the new, experimental Storage Write API. */ + STORAGE_WRITE_API } abstract @Nullable ValueProvider getJsonTableRef(); @@ -2538,11 +2532,8 @@ private Write.Method resolveMethod(PCollection input) { if (getMethod() != Write.Method.DEFAULT) { return getMethod(); } - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - if (bqOptions.getUseStorageWriteApi()) { - return bqOptions.getUseStorageWriteApiAtLeastOnce() - ? Method.STORAGE_API_AT_LEAST_ONCE - : Method.STORAGE_WRITE_API; + if (input.getPipeline().getOptions().as(BigQueryOptions.class).getUseStorageWriteApi()) { + return Write.Method.STORAGE_WRITE_API; } // By default, when writing an Unbounded PCollection, we use StreamingInserts and // BigQuery's streaming import API. @@ -2892,7 +2883,7 @@ private WriteResult continueExpandTyped( batchLoads.setNumFileShards(getNumFileShards()); } return input.apply(batchLoads); - } else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) { + } else if (method == Write.Method.STORAGE_WRITE_API) { BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); StorageApiDynamicDestinations storageApiDynamicDestinations; if (getUseBeamSchema()) { @@ -2922,8 +2913,7 @@ private WriteResult continueExpandTyped( getKmsKey(), getStorageApiTriggeringFrequency(bqOptions), getBigQueryServices(), - getStorageApiNumStreams(bqOptions), - method == Method.STORAGE_API_AT_LEAST_ONCE); + getStorageApiNumStreams(bqOptions)); return input.apply("StorageApiLoads", storageApiLoads); } else { throw new RuntimeException("Unexpected write method " + method); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index e9df915f2769..165898599c8f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -85,13 +85,6 @@ public interface BigQueryOptions void setUseStorageWriteApi(Boolean value); - @Description( - "If set, then BigQueryIO.Write will default to using the approximate Storage Write API.") - @Default.Boolean(false) - Boolean getUseStorageWriteApiAtLeastOnce(); - - void setUseStorageWriteApiAtLeastOnce(Boolean value); - @Description( "If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.") @Default.Integer(0) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java deleted file mode 100644 index cf367f3f427b..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; -import com.google.protobuf.ByteString; -import java.util.Iterator; -import java.util.NoSuchElementException; - -/** - * Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize - * parameter controls how many rows are batched into a single ProtoRows object before we move on to - * the next one. - */ -class SplittingIterable implements Iterable { - private final Iterable underlying; - private final long splitSize; - - public SplittingIterable(Iterable underlying, long splitSize) { - this.underlying = underlying; - this.splitSize = splitSize; - } - - @Override - public Iterator iterator() { - return new Iterator() { - final Iterator underlyingIterator = underlying.iterator(); - - @Override - public boolean hasNext() { - return underlyingIterator.hasNext(); - } - - @Override - public ProtoRows next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - ProtoRows.Builder inserts = ProtoRows.newBuilder(); - long bytesSize = 0; - while (underlyingIterator.hasNext()) { - ByteString byteString = ByteString.copyFrom(underlyingIterator.next()); - inserts.addSerializedRows(byteString); - bytesSize += byteString.size(); - if (bytesSize > splitSize) { - break; - } - } - return inserts.build(); - } - }; - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 2d079e3baa4f..3c27ddc8e42f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -54,7 +54,6 @@ public class StorageApiLoads private final Duration triggeringFrequency; private final BigQueryServices bqServices; private final int numShards; - private final boolean allowInconsistentWrites; public StorageApiLoads( Coder destinationCoder, @@ -63,8 +62,7 @@ public StorageApiLoads( String kmsKey, Duration triggeringFrequency, BigQueryServices bqServices, - int numShards, - boolean allowInconsistentWrites) { + int numShards) { this.destinationCoder = destinationCoder; this.dynamicDestinations = dynamicDestinations; this.createDisposition = createDisposition; @@ -72,31 +70,11 @@ public StorageApiLoads( this.triggeringFrequency = triggeringFrequency; this.bqServices = bqServices; this.numShards = numShards; - this.allowInconsistentWrites = allowInconsistentWrites; } @Override public WriteResult expand(PCollection> input) { - if (allowInconsistentWrites) { - return expandInconsistent(input); - } else { - return triggeringFrequency != null ? expandTriggered(input) : expandUntriggered(input); - } - } - - public WriteResult expandInconsistent(PCollection> input) { - PCollection> inputInGlobalWindow = - input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows())); - PCollection> convertedRecords = - inputInGlobalWindow - .apply("Convert", new StorageApiConvertMessages<>(dynamicDestinations)) - .setCoder(KvCoder.of(destinationCoder, ByteArrayCoder.of())); - convertedRecords.apply( - "StorageApiWriteInconsistent", - new StorageApiWriteRecordsInconsistent<>( - dynamicDestinations, createDisposition, kmsKey, bqServices, destinationCoder)); - - return writeResult(input.getPipeline()); + return triggeringFrequency != null ? expandTriggered(input) : expandUntriggered(input); } public WriteResult expandTriggered(PCollection> input) { @@ -152,11 +130,7 @@ public WriteResult expandUntriggered(PCollection> inp PCollection> inputInGlobalWindow = input.apply( "rewindowIntoGlobal", Window.>into(new GlobalWindows())); - PCollection> convertedRecords = - inputInGlobalWindow - .apply("Convert", new StorageApiConvertMessages<>(dynamicDestinations)) - .setCoder(KvCoder.of(destinationCoder, ByteArrayCoder.of())); - convertedRecords.apply( + inputInGlobalWindow.apply( "StorageApiWriteUnsharded", new StorageApiWriteUnshardedRecords<>( dynamicDestinations, createDisposition, kmsKey, bqServices, destinationCoder)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java deleted file mode 100644 index 5f65fd273607..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** - * A transform to write sharded records to BigQuery using the Storage API. This transform uses the - * default stream to write the records. Records written will show up in BigQuery immediately, - * however exactly once is not guaranteed - duplicates may appear in the output. For exactly-once - * writes, use {@link StorageApiWritesShardedRecords} or {@link StorageApiWriteUnshardedRecords}. - */ -@SuppressWarnings("FutureReturnValueIgnored") -public class StorageApiWriteRecordsInconsistent - extends PTransform>, PCollection> { - private final StorageApiDynamicDestinations dynamicDestinations; - private final CreateDisposition createDisposition; - private final String kmsKey; - private final BigQueryServices bqServices; - private final Coder destinationCoder; - - public StorageApiWriteRecordsInconsistent( - StorageApiDynamicDestinations dynamicDestinations, - CreateDisposition createDisposition, - String kmsKey, - BigQueryServices bqServices, - Coder destinationCoder) { - this.dynamicDestinations = dynamicDestinations; - this.createDisposition = createDisposition; - this.kmsKey = kmsKey; - this.bqServices = bqServices; - this.destinationCoder = destinationCoder; - } - - @Override - public PCollection expand(PCollection> input) { - String operationName = input.getName() + "/" + getName(); - // Append records to the Storage API streams. - input.apply( - "Write Records", - ParDo.of( - new StorageApiWriteUnshardedRecords.WriteRecordsDoFn<>( - operationName, - dynamicDestinations, - bqServices, - destinationCoder, - createDisposition, - kmsKey, - true)) - .withSideInputs(dynamicDestinations.getSideInputs())); - return input.getPipeline().apply("voids", Create.empty(VoidCoder.of())); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 0d775a62ba8d..0b45b2010f5c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -40,8 +39,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType; import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -53,9 +50,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; @@ -63,7 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings({"nullness", "FutureReturnValueIgnored"}) +@SuppressWarnings({"nullness"}) /** * Write records to the Storage API using a standard batch approach. PENDING streams are used, which * do not become visible until they are finalized and committed. Each input bundle to the DoFn @@ -71,7 +65,7 @@ * a finalize/commit operation at the end. */ public class StorageApiWriteUnshardedRecords - extends PTransform>, PCollection> { + extends PTransform>, PCollection> { private static final Logger LOG = LoggerFactory.getLogger(StorageApiWriteUnshardedRecords.class); private final StorageApiDynamicDestinations dynamicDestinations; @@ -79,34 +73,6 @@ public class StorageApiWriteUnshardedRecords private final String kmsKey; private final BigQueryServices bqServices; private final Coder destinationCoder; - private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); - - private static final Cache APPEND_CLIENTS = - CacheBuilder.newBuilder() - .expireAfterAccess(5, TimeUnit.MINUTES) - .removalListener( - (RemovalNotification removal) -> { - @Nullable final StreamAppendClient streamAppendClient = removal.getValue(); - // Close the writer in a different thread so as not to block the main one. - runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close); - }) - .build(); - - // Run a closure asynchronously, ignoring failures. - private interface ThrowingRunnable { - void run() throws Exception; - } - - private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { - executor.submit( - () -> { - try { - task.run(); - } catch (Exception e) { - // - } - }); - } public StorageApiWriteUnshardedRecords( StorageApiDynamicDestinations dynamicDestinations, @@ -122,20 +88,12 @@ public StorageApiWriteUnshardedRecords( } @Override - public PCollection expand(PCollection> input) { + public PCollection expand(PCollection> input) { String operationName = input.getName() + "/" + getName(); return input .apply( "Write Records", - ParDo.of( - new WriteRecordsDoFn<>( - operationName, - dynamicDestinations, - bqServices, - destinationCoder, - createDisposition, - kmsKey, - false)) + ParDo.of(new WriteRecordsDoFn(operationName)) .withSideInputs(dynamicDestinations.getSideInputs())) .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) // Calling Reshuffle makes the output stable - once this completes, the append operations @@ -145,8 +103,25 @@ public PCollection expand(PCollection> input) { .apply("Finalize writes", ParDo.of(new StorageApiFinalizeWritesDoFn(bqServices))); } - static class WriteRecordsDoFn - extends DoFn, KV> { + private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); + // Run a closure asynchronously, ignoring failures. + private interface ThrowingRunnable { + void run() throws Exception; + } + + @SuppressWarnings("FutureReturnValueIgnored") + private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { + executor.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + // + } + }); + } + + class WriteRecordsDoFn extends DoFn, KV> { class DestinationState { private final String tableUrn; private final MessageConverter messageConverter; @@ -154,23 +129,16 @@ class DestinationState { private @Nullable StreamAppendClient streamAppendClient = null; private long currentOffset = 0; private List pendingMessages; - private @Nullable DatasetService datasetService; - private final Counter recordsAppended = - Metrics.counter(WriteRecordsDoFn.class, "recordsAppended"); - private final Counter appendFailures = - Metrics.counter(WriteRecordsDoFn.class, "appendFailures"); - private final boolean useDefaultStream; + @Nullable private DatasetService datasetService; public DestinationState( String tableUrn, MessageConverter messageConverter, - DatasetService datasetService, - boolean useDefaultStream) { + DatasetService datasetService) { this.tableUrn = tableUrn; this.messageConverter = messageConverter; this.pendingMessages = Lists.newArrayList(); this.datasetService = datasetService; - this.useDefaultStream = useDefaultStream; } void close() { @@ -184,27 +152,16 @@ void close() { } } - String getDefaultStreamName() { - return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default"; - } - StreamAppendClient getWriteStream() { try { if (streamAppendClient == null) { - if (!useDefaultStream) { - this.streamName = - Preconditions.checkNotNull(datasetService) - .createWriteStream(tableUrn, Type.PENDING) - .getName(); - } else { - this.streamName = getDefaultStreamName(); - } + this.streamName = + Preconditions.checkNotNull(datasetService) + .createWriteStream(tableUrn, Type.PENDING) + .getName(); this.streamAppendClient = - APPEND_CLIENTS.get( - streamName, - () -> - datasetService.getStreamAppendClient( - streamName, messageConverter.getSchemaDescriptor())); + Preconditions.checkNotNull(datasetService) + .getStreamAppendClient(streamName, messageConverter.getSchemaDescriptor()); this.currentOffset = 0; } return streamAppendClient; @@ -222,13 +179,21 @@ void invalidateWriteStream() { } } - void addMessage(byte[] message) throws Exception { - pendingMessages.add(ByteString.copyFrom(message)); + void addMessage(ElementT element) throws Exception { + ByteString message = messageConverter.toMessage(element).toByteString(); + pendingMessages.add(message); + if (shouldFlush()) { + flush(); + } + } + + boolean shouldFlush() { + // TODO: look at byte size too? + return pendingMessages.size() > 100; } @SuppressWarnings({"nullness"}) - void flush(RetryManager> retryManager) - throws Exception { + void flush() throws Exception { if (pendingMessages.isEmpty()) { return; } @@ -240,16 +205,14 @@ void flush(RetryManager> retryMa ProtoRows protoRows = inserts.build(); pendingMessages.clear(); + RetryManager> retryManager = + new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 5); retryManager.addOperation( c -> { try { - StreamAppendClient writeStream = getWriteStream(); - long offset = -1; - if (!this.useDefaultStream) { - offset = this.currentOffset; - this.currentOffset += inserts.getSerializedRowsCount(); - } - return writeStream.appendRows(offset, protoRows); + long offset = currentOffset; + currentOffset += inserts.getSerializedRowsCount(); + return getWriteStream().appendRows(offset, protoRows); } catch (Exception e) { throw new RuntimeException(e); } @@ -261,71 +224,23 @@ void flush(RetryManager> retryMa + " failed with error " + Iterables.getFirst(contexts, null).getError()); invalidateWriteStream(); - appendFailures.inc(); return RetryType.RETRY_ALL_OPERATIONS; }, response -> { - recordsAppended.inc(protoRows.getSerializedRowsCount()); + LOG.info("Append to stream {} succeeded.", streamName); }, new Context<>()); + // TODO: Do we have to wait on every append? + retryManager.run(true); } } private Map destinations = Maps.newHashMap(); private final TwoLevelMessageConverterCache messageConverters; private @Nullable DatasetService datasetService; - private int numPendingRecords = 0; - private int numPendingRecordBytes = 0; - private static final int FLUSH_THRESHOLD_RECORDS = 100; - private static final int FLUSH_THRESHOLD_RECORD_BYTES = 2 * 1024 * 1024; - private final StorageApiDynamicDestinations dynamicDestinations; - private final BigQueryServices bqServices; - private final Coder destinationCoder; - private final CreateDisposition createDisposition; - private final String kmsKey; - private final boolean useDefaultStream; - WriteRecordsDoFn( - String operationName, - StorageApiDynamicDestinations dynamicDestinations, - BigQueryServices bqServices, - Coder destinationCoder, - CreateDisposition createDisposition, - String kmsKey, - boolean useDefaultStream) { + WriteRecordsDoFn(String operationName) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); - this.dynamicDestinations = dynamicDestinations; - this.bqServices = bqServices; - this.destinationCoder = destinationCoder; - this.createDisposition = createDisposition; - this.kmsKey = kmsKey; - this.useDefaultStream = useDefaultStream; - } - - boolean shouldFlush() { - return numPendingRecords > FLUSH_THRESHOLD_RECORDS - || numPendingRecordBytes > FLUSH_THRESHOLD_RECORD_BYTES; - } - - void flushIfNecessary() throws Exception { - if (shouldFlush()) { - // Too much memory being used. Flush the state and wait for it to drain out. - // TODO(reuvenlax): Consider waiting for memory usage to drop instead of waiting for all the - // appends to finish. - flushAll(); - } - } - - void flushAll() throws Exception { - RetryManager> - retryManager = - new RetryManager<>(Duration.standardSeconds(1), Duration.standardSeconds(10), 1000); - for (DestinationState destinationState : destinations.values()) { - destinationState.flush(retryManager); - } - retryManager.run(true); - numPendingRecords = 0; - numPendingRecordBytes = 0; } private void initializeDatasetService(PipelineOptions pipelineOptions) { @@ -337,8 +252,6 @@ private void initializeDatasetService(PipelineOptions pipelineOptions) { @StartBundle public void startBundle() throws IOException { destinations = Maps.newHashMap(); - numPendingRecords = 0; - numPendingRecordBytes = 0; } DestinationState createDestinationState(ProcessContext c, DestinationT destination) { @@ -366,36 +279,37 @@ DestinationState createDestinationState(ProcessContext c, DestinationT destinati } catch (Exception e) { throw new RuntimeException(e); } - return new DestinationState( - createdTable.getTableUrn(), messageConverter, datasetService, useDefaultStream); + return new DestinationState(createdTable.getTableUrn(), messageConverter, datasetService); } @ProcessElement public void process( ProcessContext c, PipelineOptions pipelineOptions, - @Element KV element) + @Element KV element) throws Exception { initializeDatasetService(pipelineOptions); dynamicDestinations.setSideInputAccessorFromProcessContext(c); DestinationState state = destinations.computeIfAbsent(element.getKey(), k -> createDestinationState(c, k)); - flushIfNecessary(); + + if (state.shouldFlush()) { + // Too much memory being used. Flush the state and wait for it to drain out. + // TODO(reuvenlax): Consider waiting for memory usage to drop instead of waiting for all the + // appends to finish. + state.flush(); + } state.addMessage(element.getValue()); - ++numPendingRecords; - numPendingRecordBytes += element.getValue().length; } @FinishBundle public void finishBundle(FinishBundleContext context) throws Exception { - flushAll(); - if (!useDefaultStream) { - for (DestinationState state : destinations.values()) { - context.output( - KV.of(state.tableUrn, state.streamName), - BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)), - GlobalWindow.INSTANCE); - } + for (DestinationState state : destinations.values()) { + state.flush(); + context.output( + KV.of(state.tableUrn, state.streamName), + BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)), + GlobalWindow.INSTANCE); } } @@ -404,7 +318,6 @@ public void teardown() { for (DestinationState state : destinations.values()) { state.close(); } - destinations.clear(); try { if (datasetService != null) { datasetService.close(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index c0009df792fd..52871845ad6f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -24,13 +24,16 @@ import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type; +import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.Descriptor; import io.grpc.Status; import io.grpc.Status.Code; import java.io.IOException; import java.time.Instant; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -176,6 +179,52 @@ public PCollection expand( "Flush and finalize writes", ParDo.of(new StorageApiFlushAndFinalizeDoFn(bqServices))); } + /** + * Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize + * parameter controls how many rows are batched into a single ProtoRows object before we move on + * to the next one. + */ + static class SplittingIterable implements Iterable { + private final Iterable underlying; + private final long splitSize; + + public SplittingIterable(Iterable underlying, long splitSize) { + this.underlying = underlying; + this.splitSize = splitSize; + } + + @Override + public Iterator iterator() { + return new Iterator() { + final Iterator underlyingIterator = underlying.iterator(); + + @Override + public boolean hasNext() { + return underlyingIterator.hasNext(); + } + + @Override + public ProtoRows next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + ProtoRows.Builder inserts = ProtoRows.newBuilder(); + long bytesSize = 0; + while (underlyingIterator.hasNext()) { + ByteString byteString = ByteString.copyFrom(underlyingIterator.next()); + inserts.addSerializedRows(byteString); + bytesSize += byteString.size(); + if (bytesSize > splitSize) { + break; + } + } + return inserts.build(); + } + }; + } + } + class WriteRecordsDoFn extends DoFn, Iterable>, KV> { private final Counter recordsAppended = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index bb3ef9f0a630..eda8059ac2fb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -86,15 +86,13 @@ public class FakeDatasetService implements DatasetService, Serializable { public void close() throws Exception {} static class Stream { - final String streamName; final List stream; final TableContainer tableContainer; final Type type; long nextFlushPosition; boolean finalized; - Stream(String streamName, TableContainer tableContainer, Type type) { - this.streamName = streamName; + Stream(TableContainer tableContainer, Type type) { this.stream = Lists.newArrayList(); this.tableContainer = tableContainer; this.type = type; @@ -112,20 +110,9 @@ void appendRows(long position, List rowsToAppend) { throw new RuntimeException("Stream already finalized."); } if (position != -1 && position != stream.size()) { - throw new RuntimeException( - "Bad append: " - + position - + " + for stream " - + streamName - + " expected " - + stream.size()); + throw new RuntimeException("Bad append: " + position); } stream.addAll(rowsToAppend); - if (type == Type.COMMITTED) { - for (TableRow row : rowsToAppend) { - tableContainer.addRow(row, ""); - } - } } void flush(long position) { @@ -262,21 +249,7 @@ public void createTable(Table table) throws IOException { "Tried to get a dataset %s:%s, but no such table was set", tableReference.getProjectId(), tableReference.getDatasetId()); } - dataset.computeIfAbsent( - tableReference.getTableId(), - k -> { - TableContainer tableContainer = new TableContainer(table); - // Create the default stream. - String streamName = - String.format( - "projects/%s/datasets/%s/tables/%s/streams/_default", - tableReference.getProjectId(), - tableReference.getDatasetId(), - BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId())); - writeStreams.put(streamName, new Stream(streamName, tableContainer, Type.COMMITTED)); - - return tableContainer; - }); + dataset.computeIfAbsent(tableReference.getTableId(), k -> new TableContainer(table)); } } @@ -439,6 +412,9 @@ public Table patchTableDescription( @Override public WriteStream createWriteStream(String tableUrn, Type type) throws IOException, InterruptedException { + if (type != Type.PENDING && type != Type.BUFFERED) { + throw new RuntimeException("We only support PENDING or BUFFERED streams."); + } TableReference tableReference = BigQueryHelpers.parseTableUrn(BigQueryHelpers.stripPartitionDecorator(tableUrn)); synchronized (tables) { @@ -448,7 +424,7 @@ public WriteStream createWriteStream(String tableUrn, Type type) tableReference.getDatasetId(), tableReference.getTableId()); String streamName = UUID.randomUUID().toString(); - writeStreams.put(streamName, new Stream(streamName, tableContainer, type)); + writeStreams.put(streamName, new Stream(tableContainer, type)); return WriteStream.newBuilder().setName(streamName).build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 3d8f0fac302e..c1fb6511c225 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -159,20 +159,16 @@ public class BigQueryIOWriteTest implements Serializable { @Parameters public static Iterable data() { return ImmutableList.of( - new Object[] {false, false, false}, - new Object[] {false, false, true}, - new Object[] {true, false, false}, - new Object[] {true, false, true}, - new Object[] {true, true, true}); + new Object[] {false, false}, + new Object[] {false, true}, + new Object[] {true, false}, + new Object[] {true, true}); } @Parameter(0) public boolean useStorageApi; @Parameter(1) - public boolean useStorageApiApproximate; - - @Parameter(2) public boolean useStreaming; @Rule @@ -198,9 +194,6 @@ public void evaluate() throws Throwable { bqOptions.setTempLocation(testFolder.getRoot().getAbsolutePath()); if (useStorageApi) { bqOptions.setUseStorageWriteApi(true); - if (useStorageApiApproximate) { - bqOptions.setUseStorageWriteApiAtLeastOnce(true); - } if (useStreaming) { bqOptions.setNumStorageWriteApiStreams(2); bqOptions.setStorageWriteApiTriggeringFrequencySec(1); @@ -484,8 +477,7 @@ void testClustering(BigQueryIO.Write.Method insertMethod) throws Exception { public void testTimePartitioning() throws Exception { BigQueryIO.Write.Method method; if (useStorageApi) { - method = - useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API; + method = Method.STORAGE_WRITE_API; } else if (useStreaming) { method = Method.STREAMING_INSERTS; } else { @@ -505,8 +497,7 @@ public void testTimePartitioningStorageApi() throws Exception { @Test public void testClusteringStorageApi() throws Exception { if (useStorageApi) { - testClustering( - useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API); + testClustering(Method.STORAGE_WRITE_API); } } @@ -535,12 +526,7 @@ public void testClusteringTableFunction() throws Exception { // withMethod overrides the pipeline option, so we need to explicitly request // STORAGE_API_WRITES. - BigQueryIO.Write.Method method = - useStorageApi - ? (useStorageApiApproximate - ? Method.STORAGE_API_AT_LEAST_ONCE - : Method.STORAGE_WRITE_API) - : Method.FILE_LOADS; + BigQueryIO.Write.Method method = useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS; p.apply(Create.of(row1, row2)) .apply( BigQueryIO.writeTableRows() @@ -1099,12 +1085,7 @@ static class SchemaPojo { public void testSchemaWriteLoads() throws Exception { // withMethod overrides the pipeline option, so we need to explicitly request // STORAGE_API_WRITES. - BigQueryIO.Write.Method method = - useStorageApi - ? (useStorageApiApproximate - ? Method.STORAGE_API_AT_LEAST_ONCE - : Method.STORAGE_WRITE_API) - : Method.FILE_LOADS; + BigQueryIO.Write.Method method = useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS; p.apply( Create.of( new SchemaPojo("a", 1), @@ -2129,11 +2110,7 @@ public void testWriteToTableDecorator() throws Exception { // withMethod overrides the pipeline option, so we need to explicitly requiest // STORAGE_API_WRITES. BigQueryIO.Write.Method method = - useStorageApi - ? (useStorageApiApproximate - ? Method.STORAGE_API_AT_LEAST_ONCE - : Method.STORAGE_WRITE_API) - : Method.STREAMING_INSERTS; + useStorageApi ? Method.STORAGE_WRITE_API : Method.STREAMING_INSERTS; TableSchema schema = new TableSchema() .setFields( From cb57aa790d1e69cc06edd67d289077d9abc767fb Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Wed, 24 Nov 2021 17:27:29 -0800 Subject: [PATCH 2/3] Revert "for existing tables, no need to set a schema" This reverts commit 77ea3abc98fc0098fd1017502e5ce0de7e868dfe. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 ++- ...StorageApiDynamicDestinationsTableRow.java | 49 ++++--------------- .../sdk/io/gcp/testing/FakeJobService.java | 6 +-- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 31 ++++-------- 4 files changed, 22 insertions(+), 72 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3ea550b144b3..cabac6b94f8e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2891,7 +2891,7 @@ private WriteResult continueExpandTyped( // writes, with no // need to round trip through JSON TableRow objects. storageApiDynamicDestinations = - new StorageApiDynamicDestinationsBeamRow<>( + new StorageApiDynamicDestinationsBeamRow( dynamicDestinations, elementSchema, elementToRowFunction); } else { RowWriterFactory.TableRowWriterFactory tableRowWriterFactory = @@ -2899,12 +2899,10 @@ private WriteResult continueExpandTyped( // Fallback behavior: convert to JSON TableRows and convert those into Beam TableRows. storageApiDynamicDestinations = new StorageApiDynamicDestinationsTableRow<>( - dynamicDestinations, - tableRowWriterFactory.getToRowFn(), - getBigQueryServices().getDatasetService(bqOptions), - getCreateDisposition()); + dynamicDestinations, tableRowWriterFactory.getToRowFn()); } + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); StorageApiLoads storageApiLoads = new StorageApiLoads( destinationCoder, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index de4817b8ce87..0204488079e4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -17,16 +17,11 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Message; import java.time.Duration; -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; @@ -35,57 +30,31 @@ public class StorageApiDynamicDestinationsTableRow extends StorageApiDynamicDestinations { private final SerializableFunction formatFunction; - private final DatasetService datasetService; - private final CreateDisposition createDisposition; - // TODO: Is this cache needed? All callers of getMessageConverter are already caching the resullt. + // TODO: Make static! Or at least optimize the constant schema case. private final Cache destinationDescriptorCache = CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(15)).build(); StorageApiDynamicDestinationsTableRow( DynamicDestinations inner, - SerializableFunction formatFunction, - DatasetService datasetService, - CreateDisposition createDisposition) { + SerializableFunction formatFunction) { super(inner); this.formatFunction = formatFunction; - this.datasetService = datasetService; - this.createDisposition = createDisposition; } @Override public MessageConverter getMessageConverter(DestinationT destination) throws Exception { + final TableSchema tableSchema = getSchema(destination); + if (tableSchema == null) { + throw new RuntimeException( + "Schema must be set when writing TableRows using Storage API. Use " + + "BigQueryIO.Write.withSchema to set the schema."); + } return new MessageConverter() { Descriptor descriptor = destinationDescriptorCache.get( destination, - () -> { - @Nullable TableSchema tableSchema = getSchema(destination); - if (tableSchema == null) { - // If the table already exists, then try and fetch the schema from the existing - // table. - TableReference tableReference = getTable(destination).getTableReference(); - @Nullable Table table = datasetService.getTable(tableReference); - if (table == null) { - if (createDisposition == CreateDisposition.CREATE_NEVER) { - throw new RuntimeException( - "BigQuery table " - + tableReference - + " not found. If you wanted to " - + "automatically create the table, set the create disposition to CREATE_IF_NEEDED and specify a " - + "schema."); - } else { - throw new RuntimeException( - "Schema must be set for table " - + tableReference - + " when writing TableRows using Storage API and " - + "using a create disposition of CREATE_IF_NEEDED."); - } - } - tableSchema = table.getSchema(); - } - return TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema); - }); + () -> TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema)); @Override public Descriptor getSchemaDescriptor() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java index 768ecb1ae874..a891c484ad19 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java @@ -364,16 +364,12 @@ private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load) throws InterruptedException, IOException { TableReference destination = load.getDestinationTable(); TableSchema schema = load.getSchema(); + checkArgument(schema != null, "No schema specified"); List sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId()); WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition()); CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition()); Table existingTable = datasetService.getTable(destination); - if (schema == null) { - schema = existingTable.getSchema(); - } - checkArgument(schema != null, "No schema specified"); - if (!validateDispositions(existingTable, createDisposition, writeDisposition)) { return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index c1fb6511c225..9d9de39243c6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1545,45 +1545,32 @@ public TableRow apply(Long input) { } @Test - public void testCreateNever() throws Exception { - BigQueryIO.Write.Method method = - useStreaming - ? (useStorageApi - ? (useStorageApiApproximate - ? Method.STORAGE_API_AT_LEAST_ONCE - : Method.STORAGE_WRITE_API) - : Method.STREAMING_INSERTS) - : useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS; + public void testCreateNeverWithStreaming() throws Exception { + if (!useStreaming) { + return; + } p.enableAbandonedNodeEnforcement(false); - TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset-id.table"); - TableSchema tableSchema = - new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("number").setType("INTEGER"))); - fakeDatasetService.createTable(new Table().setTableReference(tableRef).setSchema(tableSchema)); + TableReference tableRef = new TableReference(); + tableRef.setDatasetId("dataset"); + tableRef.setTableId("sometable"); PCollection tableRows = - p.apply(GenerateSequence.from(0).to(10)) + p.apply(GenerateSequence.from(0)) .apply( MapElements.via( new SimpleFunction() { @Override public TableRow apply(Long input) { - return new TableRow().set("name", "name " + input).set("number", input); + return null; } })) .setCoder(TableRowJsonCoder.of()); tableRows.apply( BigQueryIO.writeTableRows() .to(tableRef) - .withMethod(method) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) - .withTestServices(fakeBqServices) .withoutValidation()); - p.run(); } @Test From e98a26ba7436dd5d5459961d25a0b1c1dcbb390e Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Wed, 24 Nov 2021 18:27:21 -0800 Subject: [PATCH 3/3] fix merge conflict --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index cabac6b94f8e..c6e443a28bdd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2884,7 +2884,6 @@ private WriteResult continueExpandTyped( } return input.apply(batchLoads); } else if (method == Write.Method.STORAGE_WRITE_API) { - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); StorageApiDynamicDestinations storageApiDynamicDestinations; if (getUseBeamSchema()) { // This ensures that the Beam rows are directly translated into protos for Sorage API