From f4dab2c60df6f113367c8a93064f1c974dbc0a9c Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Wed, 3 May 2017 15:42:50 -0700 Subject: [PATCH 1/2] BigQueryIO: Remove tempLocation usage at pipeline construction time --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 45 ++++++++++--------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 22 +++++---- .../io/gcp/bigquery/BigQueryQuerySource.java | 5 +-- .../io/gcp/bigquery/BigQuerySourceBase.java | 22 ++++++--- .../io/gcp/bigquery/BigQueryTableSource.java | 6 +-- .../io/gcp/bigquery/WriteBundlesToFiles.java | 28 +++++++----- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 27 ++++++----- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 24 ++++------ 8 files changed, 95 insertions(+), 84 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 4e14696fc90a..34b7b3f6fd94 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -106,26 +107,30 @@ public void validate(PipelineOptions options) { @Override public WriteResult expand(PCollection> input) { Pipeline p = input.getPipeline(); - BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); - - validate(p.getOptions()); - final String stepUuid = BigQueryHelpers.randomUUIDString(); - String tempLocation = options.getTempLocation(); - String tempFilePrefix; - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = - factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); - } - // Create a singleton job ID token at execution time. This will be used as the base for all - // load jobs issued from this instance of the transfomr. - PCollection singleton = p.apply("Create", Create.of(tempFilePrefix)); + // load jobs issued from this instance of the transform. + PCollection singleton = p + .apply("Create", Create.of((Void) null)) + .apply("GetTempFilePrefix", ParDo.of(new DoFn() { + @ProcessElement + public void getTempFilePrefix(ProcessContext c) { + String tempLocation = c.getPipelineOptions().getTempLocation(); + String tempFilePrefix; + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + tempFilePrefix = + factory.resolve( + factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); + } + c.output(tempFilePrefix); + } + })); + PCollectionView jobIdTokenView = p.apply("TriggerIdCreation", Create.of("ignored")) .apply( @@ -152,7 +157,7 @@ public String apply(String input) { PCollection> results = inputInGlobalWindow .apply("WriteBundlesToFiles", ParDo.of( - new WriteBundlesToFiles(tempFilePrefix))) + new WriteBundlesToFiles(stepUuid))) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); TupleTag, List>> multiPartitionsTag = @@ -209,7 +214,7 @@ public String apply(String input) { bigQueryServices, jobIdTokenView, schemasView, - tempFilePrefix, + stepUuid, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, dynamicDestinations)) @@ -247,7 +252,7 @@ public String apply(String input) { bigQueryServices, jobIdTokenView, schemasView, - tempFilePrefix, + stepUuid, writeDisposition, createDisposition, dynamicDestinations)) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 29491d8bc5de..583c57a98a2b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -482,17 +482,7 @@ public void validate(PipelineOptions options) { @Override public PCollection expand(PBegin input) { final String stepUuid = BigQueryHelpers.randomUUIDString(); - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); BoundedSource source; - final String extractDestinationDir; - String tempLocation = bqOptions.getTempLocation(); - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - extractDestinationDir = factory.resolve(tempLocation, stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve extract destination directory in %s", tempLocation)); - } if (getQuery() != null && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) { @@ -502,14 +492,12 @@ public PCollection expand(PBegin input) { getQuery(), getFlattenResults(), getUseLegacySql(), - extractDestinationDir, getBigQueryServices()); } else { source = BigQueryTableSource.create( stepUuid, getTableProvider(), - extractDestinationDir, getBigQueryServices()); } PassThroughThenCleanup.CleanupOperation cleanupOperation = @@ -517,6 +505,16 @@ public PCollection expand(PBegin input) { @Override void cleanup(PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + final String extractDestinationDir; + String tempLocation = bqOptions.getTempLocation(); + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + extractDestinationDir = factory.resolve(tempLocation, stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve extract destination directory in %s", + tempLocation)); + } JobReference jobRef = new JobReference() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java index 205f9cc2686a..710c934a70b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java @@ -53,14 +53,12 @@ static BigQueryQuerySource create( ValueProvider query, Boolean flattenResults, Boolean useLegacySql, - String extractDestinationDir, BigQueryServices bqServices) { return new BigQueryQuerySource( stepUuid, query, flattenResults, useLegacySql, - extractDestinationDir, bqServices); } @@ -74,9 +72,8 @@ private BigQueryQuerySource( ValueProvider query, Boolean flattenResults, Boolean useLegacySql, - String extractDestinationDir, BigQueryServices bqServices) { - super(stepUuid, extractDestinationDir, bqServices); + super(stepUuid, bqServices); this.query = checkNotNull(query, "query"); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 0171046513c6..fb9abd7ffe16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -41,6 +41,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,14 +66,12 @@ abstract class BigQuerySourceBase extends BoundedSource { protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; protected final String stepUuid; - protected final String extractDestinationDir; protected final BigQueryServices bqServices; private transient List> cachedSplitResult; - BigQuerySourceBase(String stepUuid, String extractDestinationDir, BigQueryServices bqServices) { + BigQuerySourceBase(String stepUuid, BigQueryServices bqServices) { this.stepUuid = checkNotNull(stepUuid, "stepUuid"); - this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir"); this.bqServices = checkNotNull(bqServices, "bqServices"); } @@ -86,9 +86,20 @@ public List> split( BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); JobService jobService = bqServices.getJobService(bqOptions); + + final String extractDestinationDir; + String tempLocation = bqOptions.getTempLocation(); + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + extractDestinationDir = factory.resolve(tempLocation, stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve extract destination directory in %s", tempLocation)); + } + String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); List tempFiles = executeExtract( - extractJobId, tableToExtract, jobService, bqOptions.getProject()); + extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir); TableSchema tableSchema = bqServices.getDatasetService(bqOptions) .getTable(tableToExtract).getSchema(); @@ -114,7 +125,8 @@ public Coder getDefaultOutputCoder() { } private List executeExtract( - String jobId, TableReference table, JobService jobService, String executingProject) + String jobId, TableReference table, JobService jobService, String executingProject, + String extractDestinationDir) throws InterruptedException, IOException { JobReference jobRef = new JobReference() .setProjectId(executingProject) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index e754bd279aba..1d45641f978a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -45,9 +45,8 @@ class BigQueryTableSource extends BigQuerySourceBase { static BigQueryTableSource create( String stepUuid, ValueProvider table, - String extractDestinationDir, BigQueryServices bqServices) { - return new BigQueryTableSource(stepUuid, table, extractDestinationDir, bqServices); + return new BigQueryTableSource(stepUuid, table, bqServices); } private final ValueProvider jsonTable; @@ -56,9 +55,8 @@ static BigQueryTableSource create( private BigQueryTableSource( String stepUuid, ValueProvider table, - String extractDestinationDir, BigQueryServices bqServices) { - super(stepUuid, extractDestinationDir, bqServices); + super(stepUuid, bqServices); this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson()); this.tableSizeBytes = new AtomicReference<>(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 4f609b22033e..03cdde521a0a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -32,7 +32,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ class WriteBundlesToFiles // Map from tablespec to a writer for that table. private transient Map writers; - private final String tempFilePrefix; + private final String stepUuid; /** * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file, @@ -104,8 +105,8 @@ public Result decode(InputStream inStream, Context context) throws public void verifyDeterministic() {} } - WriteBundlesToFiles(String tempFilePrefix) { - this.tempFilePrefix = tempFilePrefix; + WriteBundlesToFiles(String stepUuid) { + this.stepUuid = stepUuid; } @StartBundle @@ -117,6 +118,17 @@ public void startBundle(Context c) { @ProcessElement public void processElement(ProcessContext c) throws Exception { + String tempLocation = c.getPipelineOptions().getTempLocation(); + String tempFilePrefix; + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + tempFilePrefix = + factory.resolve( + factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); + } TableRowWriter writer = writers.get(c.element().getKey()); if (writer == null) { writer = new TableRowWriter(tempFilePrefix); @@ -147,12 +159,4 @@ public void finishBundle(Context c) throws Exception { } writers.clear(); } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder.addIfNotNull( - DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix")); - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index b2992447c4cd..8157b66fccd1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -41,7 +41,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.FileIOChannelFactory; import org.apache.beam.sdk.util.GcsIOChannelFactory; import org.apache.beam.sdk.util.GcsUtil; @@ -73,7 +72,7 @@ class WriteTables private final BigQueryServices bqServices; private final PCollectionView jobIdToken; private final PCollectionView> schemasView; - private final String tempFilePrefix; + private final String stepUuid; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; private final DynamicDestinations dynamicDestinations; @@ -83,7 +82,7 @@ public WriteTables( BigQueryServices bqServices, PCollectionView jobIdToken, PCollectionView> schemasView, - String tempFilePrefix, + String stepUuid, WriteDisposition writeDisposition, CreateDisposition createDisposition, DynamicDestinations dynamicDestinations) { @@ -91,7 +90,7 @@ public WriteTables( this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.schemasView = schemasView; - this.tempFilePrefix = tempFilePrefix; + this.stepUuid = stepUuid; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; this.dynamicDestinations = dynamicDestinations; @@ -113,6 +112,18 @@ public void processElement(ProcessContext c) throws Exception { tableReference, tableDestination.getTableDescription()); } + String tempLocation = c.getPipelineOptions().getTempLocation(); + String tempFilePrefix; + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + tempFilePrefix = + factory.resolve( + factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); + } + Integer partition = c.element().getKey().getShardNumber(); List partitionFiles = Lists.newArrayList(c.element().getValue()); String jobIdPrefix = @@ -213,12 +224,4 @@ static void removeTemporaryFiles( throw new IOException("Unrecognized file system."); } } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder.addIfNotNull( - DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix")); - } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index e267dab6d03b..f6fb67e5c526 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -136,7 +136,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -946,7 +945,6 @@ public void testBuildSourceDisplayDataQuery() { } @Test - @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient") public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); BigQueryIO.Read read = BigQueryIO.read() @@ -962,7 +960,6 @@ public void testTableSourcePrimitiveDisplayData() throws IOException, Interrupte } @Test - @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient") public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); BigQueryIO.Read read = BigQueryIO.read() @@ -988,13 +985,11 @@ public void testBuildWrite() { } @Test - @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient") public void testBatchWritePrimitiveDisplayData() throws IOException, InterruptedException { testWritePrimitiveDisplayData(/* streaming: */ false); } @Test - @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient") public void testStreamingWritePrimitiveDisplayData() throws IOException, InterruptedException { testWritePrimitiveDisplayData(/* streaming: */ true); } @@ -1360,9 +1355,10 @@ public void testBigQueryTableSourceThroughJsonAPI() throws Exception { Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI"); String stepUuid = "testStepUuid"; BoundedSource bqSource = BigQueryTableSource.create( - stepUuid, StaticValueProvider.of(table), baseDir.toString(), fakeBqServices); + stepUuid, StaticValueProvider.of(table), fakeBqServices); PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation(baseDir.toString()); Assert.assertThat( SourceTestUtils.readFromSource(bqSource, options), CoreMatchers.is(expected)); @@ -1399,9 +1395,8 @@ public void testBigQueryTableSourceInitSplit() throws Exception { Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit"); String stepUuid = "testStepUuid"; - String extractDestinationDir = baseDir.toString(); BoundedSource bqSource = BigQueryTableSource.create( - stepUuid, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices); + stepUuid, StaticValueProvider.of(table), fakeBqServices); PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(baseDir.toString()); @@ -1479,12 +1474,10 @@ public void testBigQueryQuerySourceInitSplit() throws Exception { String query = FakeBigQueryServices.encodeQuery(expected); - String extractDestinationDir = baseDir.toString(); BoundedSource bqSource = BigQueryQuerySource.create( stepUuid, StaticValueProvider.of(query), - true /* flattenResults */, true /* useLegacySql */, - extractDestinationDir, fakeBqServices); - options.setTempLocation(extractDestinationDir); + true /* flattenResults */, true /* useLegacySql */, fakeBqServices); + options.setTempLocation(baseDir.toString()); TableReference queryTable = new TableReference() .setProjectId(bqOptions.getProject()) @@ -1571,7 +1564,7 @@ public void testBigQueryNoTableQuerySourceInitSplit() throws Exception { BoundedSource bqSource = BigQueryQuerySource.create( stepUuid, StaticValueProvider.of(query), - true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices); + true /* flattenResults */, true /* useLegacySql */, fakeBqServices); options.setTempLocation(baseDir.toString()); @@ -1845,7 +1838,7 @@ public void testWriteTables() throws Exception { long numPartitions = 3; long numFilesPerPartition = 10; String jobIdToken = "jobIdToken"; - String tempFilePrefix = "tempFilePrefix"; + String stepUuid = "stepUuid"; Map> expectedTempTables = Maps.newHashMap(); Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables"); @@ -1898,7 +1891,7 @@ public void testWriteTables() throws Exception { fakeBqServices, jobIdTokenView, schemaMapView, - tempFilePrefix, + stepUuid, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, new IdentityDynamicTables()); @@ -1907,6 +1900,7 @@ public void testWriteTables() throws Exception { KV> tester = DoFnTester.of(writeTables); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, ImmutableMap.of()); + tester.getPipelineOptions().setTempLocation("tempLocation"); for (KV, List> partition : partitions) { tester.processElement(partition); } From c318ec6e3ebe276eea9c7e5c03db18ddebf93515 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Wed, 3 May 2017 17:57:50 -0700 Subject: [PATCH 2/2] Address comments --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 21 ++++-------- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 15 +++++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 16 ++++----- .../io/gcp/bigquery/BigQuerySourceBase.java | 14 ++------ .../io/gcp/bigquery/WriteBundlesToFiles.java | 17 +++------- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 16 +++------ .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 33 ------------------- 7 files changed, 38 insertions(+), 94 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 34b7b3f6fd94..78d39b5090fc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -19,11 +19,11 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.TableRow; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; @@ -45,8 +45,6 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; @@ -116,18 +114,11 @@ public WriteResult expand(PCollection> input) { .apply("GetTempFilePrefix", ParDo.of(new DoFn() { @ProcessElement public void getTempFilePrefix(ProcessContext c) { - String tempLocation = c.getPipelineOptions().getTempLocation(); - String tempFilePrefix; - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = - factory.resolve( - factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); - } - c.output(tempFilePrefix); + c.output( + resolveTempLocation( + c.getPipelineOptions().getTempLocation(), + "BigQueryWriteTemp", + stepUuid)); } })); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 70e7a5ffd977..6b4e518a5c50 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -40,6 +40,8 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; /** A set of helper functions and classes used by {@link BigQueryIO}. */ public class BigQueryHelpers { @@ -304,4 +306,17 @@ static TableReference createTempTableReference(String projectId, String jobUuid) .setTableId(queryTempTableId); return queryTempTableRef; } + + static String resolveTempLocation( + String tempLocationDir, String bigQueryOperationName, String stepUuid) { + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocationDir); + return factory.resolve( + factory.resolve(tempLocationDir, bigQueryOperationName), stepUuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve temp destination directory in %s", + tempLocationDir), e); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 583c57a98a2b..c76ee86948d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.model.Job; @@ -505,16 +506,11 @@ public PCollection expand(PBegin input) { @Override void cleanup(PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - final String extractDestinationDir; - String tempLocation = bqOptions.getTempLocation(); - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - extractDestinationDir = factory.resolve(tempLocation, stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve extract destination directory in %s", - tempLocation)); - } + final String extractDestinationDir = + resolveTempLocation( + bqOptions.getTempLocation(), + "BigQueryExtractTemp", + stepUuid); JobReference jobRef = new JobReference() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index fb9abd7ffe16..41e298c56fb9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -41,8 +42,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,15 +86,8 @@ public List> split( TableReference tableToExtract = getTableToExtract(bqOptions); JobService jobService = bqServices.getJobService(bqOptions); - final String extractDestinationDir; - String tempLocation = bqOptions.getTempLocation(); - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - extractDestinationDir = factory.resolve(tempLocation, stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve extract destination directory in %s", tempLocation)); - } + final String extractDestinationDir = + resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid); String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid)); List tempFiles = executeExtract( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 03cdde521a0a..e90b974d973f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; + import com.google.api.services.bigquery.model.TableRow; import com.google.common.collect.Maps; import java.io.IOException; @@ -32,8 +34,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,17 +118,8 @@ public void startBundle(Context c) { @ProcessElement public void processElement(ProcessContext c) throws Exception { - String tempLocation = c.getPipelineOptions().getTempLocation(); - String tempFilePrefix; - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = - factory.resolve( - factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); - } + String tempFilePrefix = resolveTempLocation( + c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid); TableRowWriter writer = writers.get(c.element().getKey()); if (writer == null) { writer = new TableRowWriter(tempFilePrefix); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 8157b66fccd1..c480b421c6b3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; + import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; @@ -112,18 +114,8 @@ public void processElement(ProcessContext c) throws Exception { tableReference, tableDestination.getTableDescription()); } - String tempLocation = c.getPipelineOptions().getTempLocation(); - String tempFilePrefix; - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - tempFilePrefix = - factory.resolve( - factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e); - } - + String tempFilePrefix = resolveTempLocation( + c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid); Integer partition = c.element().getKey().getShardNumber(); List partitionFiles = Lists.newArrayList(c.element().getValue()); String jobIdPrefix = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index f6fb67e5c526..026afce4b3a6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -91,7 +91,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -984,38 +983,6 @@ public void testBuildWrite() { null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, ""); } - @Test - public void testBatchWritePrimitiveDisplayData() throws IOException, InterruptedException { - testWritePrimitiveDisplayData(/* streaming: */ false); - } - - @Test - public void testStreamingWritePrimitiveDisplayData() throws IOException, InterruptedException { - testWritePrimitiveDisplayData(/* streaming: */ true); - } - - private void testWritePrimitiveDisplayData(boolean streaming) throws IOException, - InterruptedException { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(streaming); - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); - - BigQueryIO.Write write = BigQueryIO.writeTableRows() - .to("project:dataset.table") - .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) - .withTestServices(new FakeBigQueryServices() - .withDatasetService(new FakeDatasetService()) - .withJobService(new FakeJobService())) - .withoutValidation(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("BigQueryIO.Write should include the table spec in its primitive display data", - displayData, hasItem(hasDisplayItem("tableSpec"))); - - assertThat("BigQueryIO.Write should include the table schema in its primitive display data", - displayData, hasItem(hasDisplayItem("schema"))); - } - @Test public void testBuildWriteWithoutValidation() { // This test just checks that using withoutValidation will not trigger object