Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1757,14 +1757,8 @@ public enum Method {
* BigQuery</a>.
*/
STREAMING_INSERTS,
/** Use the new, exactly-once Storage Write API. */
STORAGE_WRITE_API,
/**
* Use the new, Storage Write API without exactly once enabled. This will be cheaper and
* provide lower latency, however comes with the caveat that the output table may contain
* duplicates.
*/
STORAGE_API_AT_LEAST_ONCE
/** Use the new, experimental Storage Write API. */
STORAGE_WRITE_API
}

abstract @Nullable ValueProvider<String> getJsonTableRef();
Expand Down Expand Up @@ -2538,11 +2532,8 @@ private Write.Method resolveMethod(PCollection<T> input) {
if (getMethod() != Write.Method.DEFAULT) {
return getMethod();
}
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
if (bqOptions.getUseStorageWriteApi()) {
return bqOptions.getUseStorageWriteApiAtLeastOnce()
? Method.STORAGE_API_AT_LEAST_ONCE
: Method.STORAGE_WRITE_API;
if (input.getPipeline().getOptions().as(BigQueryOptions.class).getUseStorageWriteApi()) {
return Write.Method.STORAGE_WRITE_API;
}
// By default, when writing an Unbounded PCollection, we use StreamingInserts and
// BigQuery's streaming import API.
Expand Down Expand Up @@ -2892,28 +2883,25 @@ private <DestinationT> WriteResult continueExpandTyped(
batchLoads.setNumFileShards(getNumFileShards());
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) {
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
} else if (method == Write.Method.STORAGE_WRITE_API) {
StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
if (getUseBeamSchema()) {
// This ensures that the Beam rows are directly translated into protos for Sorage API
// writes, with no
// need to round trip through JSON TableRow objects.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsBeamRow<>(
new StorageApiDynamicDestinationsBeamRow<T, DestinationT>(
dynamicDestinations, elementSchema, elementToRowFunction);
} else {
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
// Fallback behavior: convert to JSON TableRows and convert those into Beam TableRows.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsTableRow<>(
dynamicDestinations,
tableRowWriterFactory.getToRowFn(),
getBigQueryServices().getDatasetService(bqOptions),
getCreateDisposition());
dynamicDestinations, tableRowWriterFactory.getToRowFn());
}

BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<DestinationT, T>(
destinationCoder,
Expand All @@ -2922,8 +2910,7 @@ private <DestinationT> WriteResult continueExpandTyped(
getKmsKey(),
getStorageApiTriggeringFrequency(bqOptions),
getBigQueryServices(),
getStorageApiNumStreams(bqOptions),
method == Method.STORAGE_API_AT_LEAST_ONCE);
getStorageApiNumStreams(bqOptions));
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ public interface BigQueryOptions

void setUseStorageWriteApi(Boolean value);

@Description(
"If set, then BigQueryIO.Write will default to using the approximate Storage Write API.")
@Default.Boolean(false)
Boolean getUseStorageWriteApiAtLeastOnce();

void setUseStorageWriteApiAtLeastOnce(Boolean value);

@Description(
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.")
@Default.Integer(0)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import java.time.Duration;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
Expand All @@ -35,57 +30,31 @@
public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
extends StorageApiDynamicDestinations<T, DestinationT> {
private final SerializableFunction<T, TableRow> formatFunction;
private final DatasetService datasetService;
private final CreateDisposition createDisposition;

// TODO: Is this cache needed? All callers of getMessageConverter are already caching the resullt.
// TODO: Make static! Or at least optimize the constant schema case.
private final Cache<DestinationT, Descriptor> destinationDescriptorCache =
CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(15)).build();

StorageApiDynamicDestinationsTableRow(
DynamicDestinations<T, DestinationT> inner,
SerializableFunction<T, TableRow> formatFunction,
DatasetService datasetService,
CreateDisposition createDisposition) {
SerializableFunction<T, TableRow> formatFunction) {
super(inner);
this.formatFunction = formatFunction;
this.datasetService = datasetService;
this.createDisposition = createDisposition;
}

@Override
public MessageConverter<T> getMessageConverter(DestinationT destination) throws Exception {
final TableSchema tableSchema = getSchema(destination);
if (tableSchema == null) {
throw new RuntimeException(
"Schema must be set when writing TableRows using Storage API. Use "
+ "BigQueryIO.Write.withSchema to set the schema.");
}
return new MessageConverter<T>() {
Descriptor descriptor =
destinationDescriptorCache.get(
destination,
() -> {
@Nullable TableSchema tableSchema = getSchema(destination);
if (tableSchema == null) {
// If the table already exists, then try and fetch the schema from the existing
// table.
TableReference tableReference = getTable(destination).getTableReference();
@Nullable Table table = datasetService.getTable(tableReference);
if (table == null) {
if (createDisposition == CreateDisposition.CREATE_NEVER) {
throw new RuntimeException(
"BigQuery table "
+ tableReference
+ " not found. If you wanted to "
+ "automatically create the table, set the create disposition to CREATE_IF_NEEDED and specify a "
+ "schema.");
} else {
throw new RuntimeException(
"Schema must be set for table "
+ tableReference
+ " when writing TableRows using Storage API and "
+ "using a create disposition of CREATE_IF_NEEDED.");
}
}
tableSchema = table.getSchema();
}
return TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
});
() -> TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema));

@Override
public Descriptor getSchemaDescriptor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class StorageApiLoads<DestinationT, ElementT>
private final Duration triggeringFrequency;
private final BigQueryServices bqServices;
private final int numShards;
private final boolean allowInconsistentWrites;

public StorageApiLoads(
Coder<DestinationT> destinationCoder,
Expand All @@ -63,40 +62,19 @@ public StorageApiLoads(
String kmsKey,
Duration triggeringFrequency,
BigQueryServices bqServices,
int numShards,
boolean allowInconsistentWrites) {
int numShards) {
this.destinationCoder = destinationCoder;
this.dynamicDestinations = dynamicDestinations;
this.createDisposition = createDisposition;
this.kmsKey = kmsKey;
this.triggeringFrequency = triggeringFrequency;
this.bqServices = bqServices;
this.numShards = numShards;
this.allowInconsistentWrites = allowInconsistentWrites;
}

@Override
public WriteResult expand(PCollection<KV<DestinationT, ElementT>> input) {
if (allowInconsistentWrites) {
return expandInconsistent(input);
} else {
return triggeringFrequency != null ? expandTriggered(input) : expandUntriggered(input);
}
}

public WriteResult expandInconsistent(PCollection<KV<DestinationT, ElementT>> input) {
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows()));
PCollection<KV<DestinationT, byte[]>> convertedRecords =
inputInGlobalWindow
.apply("Convert", new StorageApiConvertMessages<>(dynamicDestinations))
.setCoder(KvCoder.of(destinationCoder, ByteArrayCoder.of()));
convertedRecords.apply(
"StorageApiWriteInconsistent",
new StorageApiWriteRecordsInconsistent<>(
dynamicDestinations, createDisposition, kmsKey, bqServices, destinationCoder));

return writeResult(input.getPipeline());
return triggeringFrequency != null ? expandTriggered(input) : expandUntriggered(input);
}

public WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
Expand Down Expand Up @@ -152,11 +130,7 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal", Window.<KV<DestinationT, ElementT>>into(new GlobalWindows()));
PCollection<KV<DestinationT, byte[]>> convertedRecords =
inputInGlobalWindow
.apply("Convert", new StorageApiConvertMessages<>(dynamicDestinations))
.setCoder(KvCoder.of(destinationCoder, ByteArrayCoder.of()));
convertedRecords.apply(
inputInGlobalWindow.apply(
"StorageApiWriteUnsharded",
new StorageApiWriteUnshardedRecords<>(
dynamicDestinations, createDisposition, kmsKey, bqServices, destinationCoder));
Expand Down

This file was deleted.

Loading