From 03bf9b7de04f53c8c307d5e9d8da2a735fe65fb8 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Tue, 13 Dec 2016 16:59:32 -0800 Subject: [PATCH 01/21] Increase KafkaIO version to 0.2.0 Recent PR #491 changes how KafkaIO splits. This makes it incompatible with Dataflow update across these two versions. --- contrib/kafka/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/kafka/pom.xml b/contrib/kafka/pom.xml index 4beecd7ba5..29f64d5577 100644 --- a/contrib/kafka/pom.xml +++ b/contrib/kafka/pom.xml @@ -25,7 +25,7 @@ google-cloud-dataflow-java-contrib-kafka Google Cloud Dataflow Kafka IO Library Library to read Kafka topics. - 0.1.0-SNAPSHOT + 0.2.0-SNAPSHOT [1.6.0, 2.0.0) From 8fd6661267d254b2089f35649f9f1102eeecda57 Mon Sep 17 00:00:00 2001 From: Davor Bonaci Date: Mon, 9 Jan 2017 10:30:55 -0800 Subject: [PATCH 02/21] Update version range dependency to exclude 2.0.0-betaX versions (#528) * Update version range dependency to exclude 2.0.0-betaX versions * fixup --- contrib/firebaseio/pom.xml | 2 +- contrib/hadoop/pom.xml | 2 +- contrib/join-library/pom.xml | 2 +- contrib/kafka/pom.xml | 2 +- contrib/sorter/pom.xml | 2 +- .../examples/src/main/resources/archetype-resources/pom.xml | 2 +- .../starter/src/main/resources/archetype-resources/pom.xml | 2 +- .../starter/src/test/resources/projects/basic/reference/pom.xml | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/contrib/firebaseio/pom.xml b/contrib/firebaseio/pom.xml index b50ef89167..bf5414e24f 100644 --- a/contrib/firebaseio/pom.xml +++ b/contrib/firebaseio/pom.xml @@ -21,7 +21,7 @@ UTF-8 - [1.2.0, 2.0.0) + [1.2.0, 1.99) diff --git a/contrib/hadoop/pom.xml b/contrib/hadoop/pom.xml index 0659e3ebaa..60327b7fda 100644 --- a/contrib/hadoop/pom.xml +++ b/contrib/hadoop/pom.xml @@ -36,7 +36,7 @@ UTF-8 - [1.2.0,2.0.0) + [1.2.0, 1.99) diff --git a/contrib/join-library/pom.xml b/contrib/join-library/pom.xml index f15ef9794c..3f5675de4d 100644 --- a/contrib/join-library/pom.xml +++ b/contrib/join-library/pom.xml @@ -50,7 +50,7 @@ UTF-8 - [1.0.0, 2.0.0) + [1.0.0, 1.99) diff --git a/contrib/kafka/pom.xml b/contrib/kafka/pom.xml index 4beecd7ba5..c13946cc64 100644 --- a/contrib/kafka/pom.xml +++ b/contrib/kafka/pom.xml @@ -28,7 +28,7 @@ 0.1.0-SNAPSHOT - [1.6.0, 2.0.0) + [1.6.0, 1.99) 0.9.0.1 19.0 diff --git a/contrib/sorter/pom.xml b/contrib/sorter/pom.xml index d9ffb65bd9..4580ea5bab 100644 --- a/contrib/sorter/pom.xml +++ b/contrib/sorter/pom.xml @@ -36,7 +36,7 @@ UTF-8 - [1.2.0,2.0.0) + [1.2.0, 1.99) 2.7.1 diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml index 486a82b602..f58aafce9c 100644 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml +++ b/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml @@ -107,7 +107,7 @@ com.google.cloud.dataflow google-cloud-dataflow-java-sdk-all - [1.0.0, 2.0.0) + [1.0.0, 1.99) diff --git a/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml b/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml index ab2bcf9a70..70bf1e53d3 100644 --- a/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml +++ b/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml @@ -38,7 +38,7 @@ com.google.cloud.dataflow google-cloud-dataflow-java-sdk-all - [1.0.0, 2.0.0) + [1.0.0, 1.99) diff --git a/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml index b94fb39e12..db136756b3 100644 --- a/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml +++ b/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml @@ -38,7 +38,7 @@ com.google.cloud.dataflow google-cloud-dataflow-java-sdk-all - [1.0.0, 2.0.0) + [1.0.0, 1.99) From faa4c2e9af1c3c28a6ba78ebeabeced4866f82fc Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Mon, 9 Jan 2017 11:13:31 -0800 Subject: [PATCH 03/21] README.md for contrib/kafka --- contrib/kafka/README.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 contrib/kafka/README.md diff --git a/contrib/kafka/README.md b/contrib/kafka/README.md new file mode 100644 index 0000000000..e69de29bb2 From 2829f00fa224e56d832d8a2e7174b5d5d05da223 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Mon, 9 Jan 2017 11:16:13 -0800 Subject: [PATCH 04/21] README.md for contrib/kafka --- contrib/kafka/README.md | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/contrib/kafka/README.md b/contrib/kafka/README.md index e69de29bb2..386072317f 100644 --- a/contrib/kafka/README.md +++ b/contrib/kafka/README.md @@ -0,0 +1,36 @@ +# KafkaIO : Dataflow Unbounded Source and Sink for Kafka Topics + +KafkaIO provides unbounded sources and sinks for [Kafka](https://www.firebase.com/) +topics. Kafka version 0.9 and above are supported. + +## Basic Usage + * Read from a topic with 8 byte long keys and string values: + ```java + PCollection> kafkaRecords = + pipeline + .applY(KafkaIO.read() + .withBootstrapServers("broker_1:9092,broker_2:9092") + .withTopics(ImmutableList.of("topic_a")) + .withKeyCoder(BigEndianLongCoder.of()) + .withValueCoder(StringUtf8Coder.of()) + .withoutMetadata() + ); + ``` + * Write the same PCollection to a Kafka topic: + ```java + kafkaRecords.apply(KafkaIO.write() + .withBootstrapServers("broker_1:9092,broker_2:9092") + .withTopic("results") + .withKeyCoder(BigEndianLongCoder.of()) + .withValueCoder(StringUtf8Coder.of()) + ``` + +Please see JavaDoc for KafkaIO in +[KafkaIO.java](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java#L100) +for complete documentation and a more descriptive usage example. + +## Release Notes + * **0.2.0** : Assign one split for each of the Kafka topic partitions. This makes Dataflow + [Update](https://cloud.google.com/dataflow/pipelines/updating-a-pipeline) + from previous version incompatible. + * **0.1.0** : KafkaIO with support for Unbounded Source and Sink. From 3e6116294f53085dab04dba9094b5a462f00104e Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Mon, 9 Jan 2017 13:55:49 -0800 Subject: [PATCH 05/21] Fix formatting --- contrib/kafka/README.md | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/contrib/kafka/README.md b/contrib/kafka/README.md index 386072317f..40b6c00ddf 100644 --- a/contrib/kafka/README.md +++ b/contrib/kafka/README.md @@ -4,26 +4,28 @@ KafkaIO provides unbounded sources and sinks for [Kafka](https://www.firebase.co topics. Kafka version 0.9 and above are supported. ## Basic Usage - * Read from a topic with 8 byte long keys and string values: - ```java - PCollection> kafkaRecords = - pipeline - .applY(KafkaIO.read() - .withBootstrapServers("broker_1:9092,broker_2:9092") - .withTopics(ImmutableList.of("topic_a")) - .withKeyCoder(BigEndianLongCoder.of()) - .withValueCoder(StringUtf8Coder.of()) - .withoutMetadata() - ); - ``` - * Write the same PCollection to a Kafka topic: - ```java - kafkaRecords.apply(KafkaIO.write() + +* Read from a topic with 8 byte long keys and string values: +```java + PCollection> kafkaRecords = + pipeline + .applY(KafkaIO.read() .withBootstrapServers("broker_1:9092,broker_2:9092") - .withTopic("results") + .withTopics(ImmutableList.of("topic_a")) .withKeyCoder(BigEndianLongCoder.of()) .withValueCoder(StringUtf8Coder.of()) - ``` + .withoutMetadata() + ); +``` + +* Write the same PCollection to a Kafka topic: +```java + kafkaRecords.apply(KafkaIO.write() + .withBootstrapServers("broker_1:9092,broker_2:9092") + .withTopic("results") + .withKeyCoder(BigEndianLongCoder.of()) + .withValueCoder(StringUtf8Coder.of()) +``` Please see JavaDoc for KafkaIO in [KafkaIO.java](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java#L100) From 40d174f955e3f999b35388deea47fcd6c250c273 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Mon, 9 Jan 2017 15:04:07 -0800 Subject: [PATCH 06/21] review comments --- contrib/kafka/README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/contrib/kafka/README.md b/contrib/kafka/README.md index 40b6c00ddf..a84a493a03 100644 --- a/contrib/kafka/README.md +++ b/contrib/kafka/README.md @@ -1,7 +1,7 @@ # KafkaIO : Dataflow Unbounded Source and Sink for Kafka Topics -KafkaIO provides unbounded sources and sinks for [Kafka](https://www.firebase.com/) -topics. Kafka version 0.9 and above are supported. +KafkaIO provides unbounded source and sink for [Kafka](http://kafka.apache.org/) +topics. Kafka versions 0.9 and above are supported. ## Basic Usage @@ -9,7 +9,7 @@ topics. Kafka version 0.9 and above are supported. ```java PCollection> kafkaRecords = pipeline - .applY(KafkaIO.read() + .apply(KafkaIO.read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopics(ImmutableList.of("topic_a")) .withKeyCoder(BigEndianLongCoder.of()) @@ -25,6 +25,7 @@ topics. Kafka version 0.9 and above are supported. .withTopic("results") .withKeyCoder(BigEndianLongCoder.of()) .withValueCoder(StringUtf8Coder.of()) + ); ``` Please see JavaDoc for KafkaIO in From efd33cc43061e54d8fe1e16415157de21b904d90 Mon Sep 17 00:00:00 2001 From: igorbernstein2 Date: Thu, 26 Jan 2017 14:57:48 -0500 Subject: [PATCH 07/21] =?UTF-8?q?Fix=20HadoopFileSource=E2=80=99s=20split?= =?UTF-8?q?=20size=20estimate=20(#534)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix HadoopFileSource’s split size estimate * Properly set interrupted state --- .../contrib/hadoop/HadoopFileSource.java | 11 ++++++++- .../contrib/hadoop/HadoopFileSourceTest.java | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java index e8981d2d6a..cffc475d71 100644 --- a/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java +++ b/contrib/hadoop/src/main/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSource.java @@ -239,12 +239,21 @@ private Coder getDefaultCoder(Class c) { public long getEstimatedSizeBytes(PipelineOptions options) { long size = 0; try { + // If this source represents a split from splitIntoBundles, then return the size of the split, + // rather then the entire input + if (serializableSplit != null) { + return serializableSplit.getSplit().getLength(); + } + Job job = Job.getInstance(); // new instance for (FileStatus st : listStatus(createFormat(job), job)) { size += st.getLen(); } } catch (IOException | NoSuchMethodException | InvocationTargetException - | IllegalAccessException | InstantiationException e) { + | IllegalAccessException | InstantiationException) { + // ignore, and return 0 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // ignore, and return 0 } return size; diff --git a/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java b/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java index cef3c08348..eac54a1e31 100644 --- a/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java +++ b/contrib/hadoop/src/test/java/com/google/cloud/dataflow/contrib/hadoop/HadoopFileSourceTest.java @@ -152,6 +152,29 @@ public void testSplits() throws Exception { assertTrue(nonEmptySplits > 2); } + @Test + public void testSplitEstimatedSize() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HadoopFileSource source = HadoopFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class + ); + + long originalSize = source.getEstimatedSizeBytes(options); + long splitTotalSize = 0; + List>> splits = source.splitIntoBundles( + SequenceFile.SYNC_INTERVAL, options + ); + for (BoundedSource> splitSource : splits) { + splitTotalSize += splitSource.getEstimatedSizeBytes(options); + } + // Assert that the estimated size of the whole is the sum of its parts + assertEquals(originalSize, splitTotalSize); + } + private File createFileWithData(String filename, List> records) throws IOException { File tmpFile = tmpFolder.newFile(filename); From dbe464440011decfc6d1a11ff6af9cc7b5ee3b68 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 30 Jan 2017 12:25:27 -0800 Subject: [PATCH 08/21] BigQuery: fix an issue with option propagation and refactor to future-proof * We created a helper in BigQueryIO to create a JobConfigurationQuery capturing all options, but we had not yet propagated this cleanup into the Services abstraction or helper classes. Refactor BigQueryServices and BigQueryTableRowIterator to propagate the same configuration. Adds a new deprecated constructor to BigQueryTableRowIterator for backwards-compatibility. This fixes GoogleCloudPlatform/DataflowJavaSDK#539. --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 10 ++- .../dataflow/sdk/util/BigQueryServices.java | 3 +- .../sdk/util/BigQueryServicesImpl.java | 17 ++--- .../sdk/util/BigQueryTableRowIterator.java | 67 ++++++++++--------- .../cloud/dataflow/sdk/io/BigQueryIOTest.java | 8 +-- 5 files changed, 52 insertions(+), 53 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ace18eff5e..3472a8afed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { public BoundedReader createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, query.get(), executingProject.get(), flattenResults, useLegacySql)); + bqOptions, createBasicQueryConfig(), executingProject.get())); } @Override @@ -1152,11 +1152,12 @@ private void executeQuery( .setProjectId(executingProject) .setJobId(jobId); + // When changing options here, consider whether to change the defaults from + // #createBasicQueryConfig instead. JobConfigurationQuery queryConfig = createBasicQueryConfig() .setAllowLargeResults(true) .setCreateDisposition("CREATE_IF_NEEDED") .setDestinationTable(destinationTable) - .setPriority("BATCH") .setWriteDisposition("WRITE_EMPTY"); jobService.startQueryJob(jobRef, queryConfig); @@ -1167,9 +1168,12 @@ private void executeQuery( } private JobConfigurationQuery createBasicQueryConfig() { + // Due to deprecated functionality, if this function is updated + // then the similar code in BigQueryTableRowIterator#fromQuery should be updated. return new JobConfigurationQuery() - .setQuery(query.get()) .setFlattenResults(flattenResults) + .setPriority("BATCH") + .setQuery(query.get()) .setUseLegacySql(useLegacySql); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java index ec96009494..df247629f5 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -58,8 +58,7 @@ public interface BigQueryServices extends Serializable { * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. */ BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql); + BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId); /** * An interface for the Cloud BigQuery load service. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index 1a37e01375..af2ed9e73b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -39,14 +39,11 @@ import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; - import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.NoSuchElementException; - import javax.annotation.Nullable; /** @@ -83,9 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql) { - return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql); + BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, queryConfig, projectId); } @VisibleForTesting @@ -521,14 +517,11 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) { private static BigQueryJsonReader fromQuery( BigQueryOptions bqOptions, - String query, - String projectId, - @Nullable Boolean flattenResults, - @Nullable Boolean useLegacySql) { + JobConfigurationQuery queryConfig, + String projectId) { return new BigQueryJsonReaderImpl( BigQueryTableRowIterator.fromQuery( - query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults, - useLegacySql)); + queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build())); } private static BigQueryJsonReader fromTable( diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index 8f4ff793dc..63bd025099 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -46,11 +46,9 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; - import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -61,7 +59,6 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; - import javax.annotation.Nullable; /** @@ -73,6 +70,7 @@ public class BigQueryTableRowIterator implements AutoCloseable { @Nullable private TableReference ref; @Nullable private final String projectId; @Nullable private TableSchema schema; + @Nullable private final JobConfigurationQuery queryConfig; private final Bigquery client; private String pageToken; private Iterator iteratorOverCurrentBatch; @@ -89,25 +87,18 @@ public class BigQueryTableRowIterator implements AutoCloseable { // following interval to check the status of query execution job private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1); - private final String query; - // Whether to flatten query results. - private final boolean flattenResults; - // Whether to use the BigQuery legacy SQL dialect.. - private final boolean useLegacySql; // Temporary dataset used to store query results. private String temporaryDatasetId = null; // Temporary table used to store query results. private String temporaryTableId = null; private BigQueryTableRowIterator( - @Nullable TableReference ref, @Nullable String query, @Nullable String projectId, - Bigquery client, boolean flattenResults, boolean useLegacySql) { + @Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig, + @Nullable String projectId, Bigquery client) { this.ref = ref; - this.query = query; + this.queryConfig = queryConfig; this.projectId = projectId; this.client = checkNotNull(client, "client"); - this.flattenResults = flattenResults; - this.useLegacySql = useLegacySql; } /** @@ -116,7 +107,7 @@ private BigQueryTableRowIterator( public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) { checkNotNull(ref, "ref"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true); + return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client); } /** @@ -135,15 +126,31 @@ public static BigQueryTableRowIterator fromQuery( * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the * specified query in the specified project. */ + @Deprecated public static BigQueryTableRowIterator fromQuery( String query, String projectId, Bigquery client, @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql) { checkNotNull(query, "query"); checkNotNull(projectId, "projectId"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(null, query, projectId, client, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), - MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); + JobConfigurationQuery queryConfig = new JobConfigurationQuery() + .setFlattenResults(MoreObjects.firstNonNull(flattenResults, Boolean.TRUE)) + .setPriority("BATCH") + .setQuery(query) + .setUseLegacySql(MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); + return new BigQueryTableRowIterator(null, queryConfig, projectId, client); + } + + /** + * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the + * specified query in the specified project. + */ + public static BigQueryTableRowIterator fromQuery( + JobConfigurationQuery queryConfig, String projectId, Bigquery client) { + checkNotNull(queryConfig, "queryConfig"); + checkNotNull(projectId, "projectId"); + checkNotNull(client, "client"); + return new BigQueryTableRowIterator(null, queryConfig, projectId, client); } /** @@ -151,7 +158,7 @@ public static BigQueryTableRowIterator fromQuery( * @throws IOException on failure */ public void open() throws IOException, InterruptedException { - if (query != null) { + if (queryConfig != null) { ref = executeQueryAndWaitForCompletion(); } // Get table schema. @@ -401,15 +408,17 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce */ private TableReference executeQueryAndWaitForCompletion() throws IOException, InterruptedException { + checkState(projectId != null, "Cannot dryRun a query in unknown (null) project"); + checkState(queryConfig != null, "Cannot dryRun a null query"); // Dry run query to get source table location Job dryRunJob = new Job() .setConfiguration(new JobConfiguration() - .setQuery(new JobConfigurationQuery() - .setQuery(query)) + .setQuery(queryConfig) .setDryRun(true)); JobStatistics jobStats = executeWithBackOff( client.jobs().insert(projectId, dryRunJob), - String.format("Error when trying to dry run query %s.", query)).getStatistics(); + String.format("Error when trying to dry run query %s.", + queryConfig.toPrettyString())).getStatistics(); // Let BigQuery to pick default location if the query does not read any tables. String location = null; @@ -428,14 +437,8 @@ private TableReference executeQueryAndWaitForCompletion() createDataset(temporaryDatasetId, location); Job job = new Job(); JobConfiguration config = new JobConfiguration(); - JobConfigurationQuery queryConfig = new JobConfigurationQuery(); config.setQuery(queryConfig); job.setConfiguration(config); - queryConfig.setQuery(query); - queryConfig.setAllowLargeResults(true); - queryConfig.setFlattenResults(flattenResults); - queryConfig.setUseLegacySql(useLegacySql); - TableReference destinationTable = new TableReference(); destinationTable.setProjectId(projectId); @@ -445,13 +448,15 @@ private TableReference executeQueryAndWaitForCompletion() Job queryJob = executeWithBackOff( client.jobs().insert(projectId, job), - String.format("Error when trying to execute the job for query %s.", query)); + String.format("Error when trying to execute the job for query %s.", + queryConfig.toPrettyString())); JobReference jobId = queryJob.getJobReference(); while (true) { Job pollJob = executeWithBackOff( client.jobs().get(projectId, jobId.getJobId()), - String.format("Error when trying to get status of the job for query %s.", query)); + String.format("Error when trying to get status of the job for query %s.", + queryConfig.toPrettyString())); JobStatus status = pollJob.getStatus(); if (status.getState().equals("DONE")) { // Job is DONE, but did not necessarily succeed. @@ -461,7 +466,9 @@ private TableReference executeQueryAndWaitForCompletion() } else { // There will be no temporary table to delete, so null out the reference. temporaryTableId = null; - throw new IOException("Executing query " + query + " failed: " + error.getMessage()); + throw new IOException( + String.format("Executing query %s failed: %s", + queryConfig.toPrettyString(), error.getMessage())); } } Uninterruptibles.sleepUninterruptibly( diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index a5bddec315..61356e18d0 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -106,7 +106,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; - import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.Assert; @@ -122,7 +121,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; - import java.io.File; import java.io.FileFilter; import java.io.IOException; @@ -135,8 +133,6 @@ import java.util.NoSuchElementException; import java.util.Set; -import javax.annotation.Nullable; - /** * Tests for BigQueryIO. */ @@ -187,8 +183,7 @@ public BigQueryJsonReader getReaderFromTable( @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql) { + BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) { return new FakeBigQueryReader(jsonTableRowReturns); } @@ -1749,3 +1744,4 @@ public boolean accept(File pathname) { }}).length); } } + From b4e391ee3fd3fd7cbd4b7e499601ae201f6dcf6d Mon Sep 17 00:00:00 2001 From: Eugene Hlyzov Date: Mon, 30 Jan 2017 23:45:34 +0300 Subject: [PATCH 09/21] [BEAM-359] Treat erased type variables as non-deterministic in AvroCoder (#531) --- .../cloud/dataflow/sdk/coders/AvroCoder.java | 9 +++++-- .../dataflow/sdk/coders/AvroCoderTest.java | 26 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java index c5aa029531..d85eb9b937 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java @@ -474,6 +474,10 @@ private void doCheck(String context, TypeDescriptor type, Schema schema) { checkMap(context, type, schema); break; case RECORD: + if (!(type.getType() instanceof Class)) { + reportError(context, "Cannot determine type from generic %s due to erasure", type); + return; + } checkRecord(type, schema); break; case UNION: @@ -694,7 +698,8 @@ private void checkArray(String context, TypeDescriptor type, Schema schema) { * Extract a field from a class. We need to look at the declared fields so that we can * see private fields. We may need to walk up to the parent to get classes from the parent. */ - private static Field getField(Class clazz, String name) { + private static Field getField(Class originalClazz, String name) { + Class clazz = originalClazz; while (clazz != null) { for (Field field : clazz.getDeclaredFields()) { AvroName avroName = field.getAnnotation(AvroName.class); @@ -708,7 +713,7 @@ private static Field getField(Class clazz, String name) { } throw new IllegalArgumentException( - "Unable to get field " + name + " from class " + clazz); + "Unable to get field " + name + " from class " + originalClazz); } } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/AvroCoderTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/AvroCoderTest.java index 3ed055bc4a..d6a2a172c9 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/AvroCoderTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/AvroCoderTest.java @@ -751,4 +751,30 @@ public int hashCode() { return Objects.hash(getClass(), onlySomeTypesAllowed); } } + + @Test + public void testAvroCoderForGenerics() throws Exception { + Schema fooSchema = AvroCoder.of(Foo.class).getSchema(); + Schema schema = new Schema.Parser().parse("{" + + "\"type\":\"record\"," + + "\"name\":\"SomeGeneric\"," + + "\"namespace\":\"ns\"," + + "\"fields\":[" + + " {\"name\":\"foo\", \"type\":" + fooSchema.toString() + "}" + + "]}"); + @SuppressWarnings("rawtypes") + AvroCoder coder = AvroCoder.of(SomeGeneric.class, schema); + + assertNonDeterministic(coder, + reasonField(SomeGeneric.class, "foo", "erasure")); + } + + private static class SomeGeneric { + @SuppressWarnings("unused") + private T foo; + } + private static class Foo { + @SuppressWarnings("unused") + String id; + } } From 502f99f6dfb233b2681ea6be55b8f63d900c4e67 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 30 Jan 2017 14:20:15 -0800 Subject: [PATCH 10/21] fixups --- .../sdk/util/BigQueryTableRowIterator.java | 1 + .../util/BigQueryTableRowIteratorTest.java | 37 ++++++++++--------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index 63bd025099..a518032228 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -445,6 +445,7 @@ private TableReference executeQueryAndWaitForCompletion() destinationTable.setDatasetId(temporaryDatasetId); destinationTable.setTableId(temporaryTableId); queryConfig.setDestinationTable(destinationTable); + queryConfig.setAllowLargeResults(Boolean.TRUE); Job queryJob = executeWithBackOff( client.jobs().insert(projectId, job), diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java index d6ac5b36ba..94d858aebb 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java @@ -258,14 +258,18 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException // Mock job polling. JobStatus status = new JobStatus().setState("DONE"); - TableReference tableRef = - new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef); + JobConfigurationQuery resultQueryConfig = + new JobConfigurationQuery().setDestinationTable( + new TableReference() + .setProjectId("project") + .setDatasetId("tempdataset") + .setTableId("temptable") + ); Job getJob = new Job() .setJobReference(new JobReference()) .setStatus(status) - .setConfiguration(new JobConfiguration().setQuery(queryConfig)); + .setConfiguration(new JobConfiguration().setQuery(resultQueryConfig)); when(mockJobsGet.execute()).thenReturn(getJob); // Mock table schema fetch. @@ -281,8 +285,9 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException String query = String.format( "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo", photoBytesEncoded); + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { iterator.open(); assertTrue(iterator.advance()); TableRow row = iterator.getCurrent(); @@ -317,7 +322,7 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException verify(mockTablesDelete).execute(); // Table data read. verify(mockClient).tabledata(); - verify(mockTabledata).list("project", "dataset", "table"); + verify(mockTabledata).list("project", "tempdataset", "temptable"); verify(mockTabledataList).execute(); } @@ -334,18 +339,16 @@ public void testQueryFailed() throws IOException { when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception); String query = "NOT A QUERY"; + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { - - try { - iterator.open(); - fail(); - } catch (Exception expected) { - // Verify message explains cause and reports the query. - assertThat(expected.getMessage(), containsString("Error")); - assertThat(expected.getMessage(), containsString(query)); - assertThat(expected.getCause().getMessage(), containsString(errorReason)); - } + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { + iterator.open(); + fail(); + } catch (Exception expected) { + // Verify message explains cause and reports the query. + assertThat(expected.getMessage(), containsString("Error")); + assertThat(expected.getMessage(), containsString(query)); + assertThat(expected.getCause().getMessage(), containsString(errorReason)); } // Job inserted to run the query, then polled once. From 2ea5a233e1e895cf4f8babf9924d2b88cffbc5e9 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Thu, 2 Feb 2017 13:24:23 -0800 Subject: [PATCH 11/21] Fix InProcessPipelineRunner to handle a null subscription --- .../dataflow/sdk/io/PubsubUnboundedSource.java | 11 ++++++----- .../dataflow/sdk/util/PubsubTestClient.java | 2 +- .../sdk/io/PubsubUnboundedSourceTest.java | 17 +++++++++++++++++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java index 575fe39771..4da8ad1bad 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSource.java @@ -30,6 +30,7 @@ import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.ValueProvider; +import com.google.cloud.dataflow.sdk.options.ValueProvider.StaticValueProvider; import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.DoFn; @@ -1290,6 +1291,7 @@ public String getIdLabel() { @Override public PCollection apply(PBegin input) { + ValueProvider subscriptionPath = subscription; if (subscription == null) { try { try (PubsubClient pubsubClient = @@ -1299,9 +1301,8 @@ public PCollection apply(PBegin input) { .as(DataflowPipelineOptions.class))) { checkState(project.isAccessible(), "createRandomSubscription must be called at runtime."); checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime."); - SubscriptionPath subscriptionPath = - pubsubClient.createRandomSubscription( - project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC); + subscriptionPath = StaticValueProvider.of(pubsubClient.createRandomSubscription( + project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC)); LOG.warn("Created subscription {} to topic {}." + " Note this subscription WILL NOT be deleted when the pipeline terminates", subscription, topic); @@ -1314,7 +1315,7 @@ public PCollection apply(PBegin input) { return input.getPipeline().begin() .apply(Read.from(new PubsubSource(this))) .apply(ParDo.named("PubsubUnboundedSource.Stats") - .of(new StatsFn(pubsubFactory, subscription, - timestampLabel, idLabel))); + .of(new StatsFn(pubsubFactory, checkNotNull(subscriptionPath), + timestampLabel, idLabel))); } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java index c3a5a4e959..2f8a1db18c 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java @@ -372,7 +372,7 @@ public List listTopics(ProjectPath project) throws IOException { @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { - throw new UnsupportedOperationException(); + return; } @Override diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java index f7e4f863de..dc48bab878 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java @@ -36,7 +36,10 @@ import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.util.PubsubClient; import com.google.cloud.dataflow.sdk.util.PubsubClient.IncomingMessage; +import com.google.cloud.dataflow.sdk.util.PubsubClient.OutgoingMessage; +import com.google.cloud.dataflow.sdk.util.PubsubClient.ProjectPath; import com.google.cloud.dataflow.sdk.util.PubsubClient.SubscriptionPath; +import com.google.cloud.dataflow.sdk.util.PubsubClient.TopicPath; import com.google.cloud.dataflow.sdk.util.PubsubTestClient; import com.google.cloud.dataflow.sdk.util.PubsubTestClient.PubsubTestClientFactory; @@ -60,8 +63,12 @@ */ @RunWith(JUnit4.class) public class PubsubUnboundedSourceTest { + private static final ProjectPath PROJECT = + PubsubClient.projectPathFromId("testProject"); private static final SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final TopicPath TOPIC = + PubsubClient.topicPathFromName("testProject", "testTopic"); private static final String DATA = "testData"; private static final long TIMESTAMP = 1234L; private static final long REQ_TIME = 6373L; @@ -320,4 +327,14 @@ public void readManyMessages() throws IOException { assertTrue(dataToMessageNum.isEmpty()); reader.close(); } + + @Test + public void testNullTopic() throws Exception { + factory = PubsubTestClient.createFactoryForPublish( + TOPIC, ImmutableList.of(), ImmutableList.of()); + TestPipeline p = TestPipeline.create(); + p.apply(new PubsubUnboundedSource<>( + clock, factory, StaticValueProvider.of(PROJECT), StaticValueProvider.of(TOPIC), + null, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL)); + } } From 5625ffbf5ab9cfee4debbc38afc6a1fe5c69c892 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 6 Feb 2017 14:48:43 -0800 Subject: [PATCH 12/21] fixups --- .../main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index 3472a8afed..a6d9871287 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1158,6 +1158,7 @@ private void executeQuery( .setAllowLargeResults(true) .setCreateDisposition("CREATE_IF_NEEDED") .setDestinationTable(destinationTable) + .setPriority("BATCH") .setWriteDisposition("WRITE_EMPTY"); jobService.startQueryJob(jobRef, queryConfig); @@ -1172,7 +1173,6 @@ private JobConfigurationQuery createBasicQueryConfig() { // then the similar code in BigQueryTableRowIterator#fromQuery should be updated. return new JobConfigurationQuery() .setFlattenResults(flattenResults) - .setPriority("BATCH") .setQuery(query.get()) .setUseLegacySql(useLegacySql); } From 9c59d78112a51451eb9e6ae64a966b0ee2677fb8 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 6 Feb 2017 14:56:50 -0800 Subject: [PATCH 13/21] fixups --- .../java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java | 2 +- .../google/cloud/dataflow/sdk/util/BigQueryServices.java | 2 +- .../cloud/dataflow/sdk/util/BigQueryServicesImpl.java | 8 +++----- .../cloud/dataflow/sdk/util/BigQueryTableRowIterator.java | 4 ++-- .../com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java | 2 +- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index a6d9871287..f844f49aed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { public BoundedReader createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, createBasicQueryConfig(), executingProject.get())); + bqOptions, executingProject.get(), createBasicQueryConfig())); } @Override diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java index df247629f5..43232f699a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -58,7 +58,7 @@ public interface BigQueryServices extends Serializable { * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. */ BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId); + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig); /** * An interface for the Cloud BigQuery load service. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index af2ed9e73b..84e718addd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -80,8 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) { - return BigQueryJsonReaderImpl.fromQuery(bqOptions, queryConfig, projectId); + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig); } @VisibleForTesting @@ -516,9 +516,7 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) { } private static BigQueryJsonReader fromQuery( - BigQueryOptions bqOptions, - JobConfigurationQuery queryConfig, - String projectId) { + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { return new BigQueryJsonReaderImpl( BigQueryTableRowIterator.fromQuery( queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build())); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index a518032228..5ab5c897ed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -408,8 +408,8 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce */ private TableReference executeQueryAndWaitForCompletion() throws IOException, InterruptedException { - checkState(projectId != null, "Cannot dryRun a query in unknown (null) project"); - checkState(queryConfig != null, "Cannot dryRun a null query"); + checkState(projectId != null, "Unable to execute a query without a configured project id"); + checkState(queryConfig != null, "Unable to execute a query without a configured query"); // Dry run query to get source table location Job dryRunJob = new Job() .setConfiguration(new JobConfiguration() diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index 61356e18d0..7f9d2e95ce 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -183,7 +183,7 @@ public BigQueryJsonReader getReaderFromTable( @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) { + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { return new FakeBigQueryReader(jsonTableRowReturns); } From 20862aa7b9a690e8025ac5c1a6756eaffb05794c Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Fri, 10 Feb 2017 17:09:34 -0800 Subject: [PATCH 14/21] Fixups --- .../google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java index dc48bab878..06979315d8 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java @@ -329,7 +329,7 @@ public void readManyMessages() throws IOException { } @Test - public void testNullTopic() throws Exception { + public void testNullSubscription() throws Exception { factory = PubsubTestClient.createFactoryForPublish( TOPIC, ImmutableList.of(), ImmutableList.of()); TestPipeline p = TestPipeline.create(); From d7a70fef5f68074fd175dabce9870d55369a6dc0 Mon Sep 17 00:00:00 2001 From: Sam McVeety Date: Tue, 14 Feb 2017 16:57:33 -0800 Subject: [PATCH 15/21] Fixups --- .../dataflow/sdk/util/PubsubTestClient.java | 39 +++++++++++++++++++ .../sdk/io/PubsubUnboundedSourceTest.java | 2 +- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java index 2f8a1db18c..01831218d4 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PubsubTestClient.java @@ -107,6 +107,11 @@ private static class State { */ @Nullable Map ackDeadline; + + /** + * Whether a subscription has been created. + */ + boolean createdSubscription; } private static final State STATE = new State(); @@ -124,12 +129,40 @@ public static PubsubTestClientFactory createFactoryForPublish( final TopicPath expectedTopic, final Iterable expectedOutgoingMessages, final Iterable failingOutgoingMessages) { + return createFactoryForPublishInternal( + expectedTopic, expectedOutgoingMessages, failingOutgoingMessages, false); + } + + /** + * Return a factory for testing publishers. Only one factory may be in-flight at a time. + * The factory must be closed when the test is complete, at which point final validation will + * occur. Additionally, verify that createSubscription was called. + */ + public static PubsubTestClientFactory createFactoryForPublishVerifySubscription( + final TopicPath expectedTopic, + final Iterable expectedOutgoingMessages, + final Iterable failingOutgoingMessages) { + return createFactoryForPublishInternal( + expectedTopic, expectedOutgoingMessages, failingOutgoingMessages, true); + } + + /** + * Return a factory for testing publishers. Only one factory may be in-flight at a time. + * The factory must be closed when the test is complete, at which point final validation will + * occur. + */ + public static PubsubTestClientFactory createFactoryForPublishInternal( + final TopicPath expectedTopic, + final Iterable expectedOutgoingMessages, + final Iterable failingOutgoingMessages, + final boolean verifySubscriptionCreated) { synchronized (STATE) { checkState(!STATE.isActive, "Test still in flight"); STATE.expectedTopic = expectedTopic; STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages); STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages); STATE.isActive = true; + STATE.createdSubscription = false; } return new PubsubTestClientFactory() { @Override @@ -148,6 +181,9 @@ public String getKind() { @Override public void close() { synchronized (STATE) { + if (verifySubscriptionCreated) { + checkState(STATE.createdSubscription, "Did not call create subscription"); + } checkState(STATE.isActive, "No test still in flight"); checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(), "Still waiting for %s messages to be published", @@ -372,6 +408,9 @@ public List listTopics(ProjectPath project) throws IOException { @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { + synchronized (STATE) { + STATE.createdSubscription = true; + } return; } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java index 06979315d8..65fdf737af 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubUnboundedSourceTest.java @@ -330,7 +330,7 @@ public void readManyMessages() throws IOException { @Test public void testNullSubscription() throws Exception { - factory = PubsubTestClient.createFactoryForPublish( + factory = PubsubTestClient.createFactoryForPublishVerifySubscription( TOPIC, ImmutableList.of(), ImmutableList.of()); TestPipeline p = TestPipeline.create(); p.apply(new PubsubUnboundedSource<>( From 4a9f16469fec467cf54efaa98a658413da231a00 Mon Sep 17 00:00:00 2001 From: gsgalloway Date: Wed, 1 Mar 2017 15:22:22 -0800 Subject: [PATCH 16/21] Small fix for BigtableIO.WriteOperation.finalize --- .../com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java index 3751d160f5..62f3b25d57 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java @@ -976,9 +976,9 @@ public void initialize(PipelineOptions options) {} public void finalize(Iterable writerResults, PipelineOptions options) { long count = 0; for (Long value : writerResults) { - value += count; + count += value; } - logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink); + logger.debug("Wrote {} elements to BigtableIO.Sink {}", count, sink); } @Override From 4ede2806b898157e381995242384e3c515e8795f Mon Sep 17 00:00:00 2001 From: Anil Muppalla Date: Mon, 3 Apr 2017 22:05:16 -0400 Subject: [PATCH 17/21] edited doc --- .../cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java index 5fd8554c90..4b4184063d 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java @@ -58,7 +58,7 @@ * *

Additionaly firings before or after the watermark can be requested by calling * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or - * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}. + * {@code AfterWatermark.pastEndOfWindow.withLateFirings(OnceTrigger)}. * * @param {@link BoundedWindow} subclass used to represent the windows used. */ From 1cb04a638781943af15975590c4cabb6995fafd7 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Fri, 7 Apr 2017 15:35:25 -0700 Subject: [PATCH 18/21] DataflowPipelineJob: gracefully handle cancellatoin concurrent with termination This is a backport of BEAM-1880 https://github.com/apache/beam/pull/2428 --- .../sdk/runners/DataflowPipelineJob.java | 32 ++++++++++++++++-- .../sdk/runners/DataflowPipelineJobTest.java | 33 +++++++++++++++++++ 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java index 4a68755565..ced2759561 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java @@ -292,9 +292,35 @@ public void cancel() throws IOException { content.setProjectId(projectId); content.setId(jobId); content.setRequestedState("JOB_STATE_CANCELLED"); - dataflowClient.projects().jobs() - .update(projectId, jobId, content) - .execute(); + try { + dataflowClient.projects().jobs() + .update(projectId, jobId, content) + .execute(); + } catch (IOException e) { + State state = getState(); + if (state.isTerminal()) { + LOG.warn("Cancel failed because job {} is already terminated in state {}.", jobId, state); + } else if (e.getMessage().contains("has terminated")) { + // This handles the case where the getState() call above returns RUNNING but the cancel + // was rejected because the job is in fact done. Hopefully, someday we can delete this + // code if there is better consistency between the State and whether Cancel succeeds. + // + // Example message: + // Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform + // operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has + // terminated in state SUCCESS: Workflow job: 2017-04-01_22_50_59-9269855660514862348 + // succeeded. + LOG.warn("Cancel failed because job {} is already terminated.", jobId, e); + } else { + String errorMsg = String.format( + "Failed to cancel job in state %s, " + + "please go to the Developers Console to cancel it manually: %s", + state, + MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId())); + LOG.warn(errorMsg); + throw new IOException(errorMsg, e); + } + } } @Override diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java index 1d6ccc66ab..9d1172bb7b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJobTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -42,6 +43,7 @@ import com.google.api.services.dataflow.model.MetricUpdate; import com.google.cloud.dataflow.sdk.PipelineResult.State; import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; +import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -91,6 +93,9 @@ public class DataflowPipelineJobTest { @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule + public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowPipelineJob.class); + @Before public void setup() { MockitoAnnotations.initMocks(this); @@ -193,6 +198,34 @@ public void testWaitToFinishCancelled() throws Exception { assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED)); } + /** + * Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns + * non-terminal state even though the cancel API call failed, which can happen in practice. + * + *

TODO: delete this code if the API calls become consistent. + */ + @Test + public void testCancelTerminatedJobWithStaleState() throws IOException { + Dataflow.Projects.Jobs.Get statusRequest = + mock(Dataflow.Projects.Jobs.Get.class); + + Job statusResponse = new Job(); + statusResponse.setCurrentState("JOB_STATE_RUNNING"); + when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(statusRequest); + when(statusRequest.execute()).thenReturn(statusResponse); + + Dataflow.Projects.Jobs.Update update = mock( + Dataflow.Projects.Jobs.Update.class); + when(mockJobs.update(eq(PROJECT_ID), eq(JOB_ID), any(Job.class))) + .thenReturn(update); + when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS")); + + DataflowPipelineJob job = new DataflowPipelineJob( + PROJECT_ID, JOB_ID, mockWorkflowClient, null); + job.cancel(); + expectedLogs.verifyWarn("Cancel failed because job " + JOB_ID + " is already terminated."); + } + /** * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED} * state is terminal. From 7cecf6e249015dac02a1a0e66b9225b03077611e Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 19 Apr 2017 10:27:12 -0700 Subject: [PATCH 19/21] Cache result of BigQuerySourceBase.split --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 39 ++++++++++++------- .../cloud/dataflow/sdk/io/BigQueryIOTest.java | 7 +++- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index f844f49aed..ba5fc3bb4a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1211,6 +1211,8 @@ private abstract static class BigQuerySourceBase extends BoundedSource protected final BigQueryServices bqServices; protected final ValueProvider executingProject; + private List> cachedSplitResult; + private BigQuerySourceBase( String jobIdToken, String extractDestinationDir, @@ -1225,19 +1227,30 @@ private BigQuerySourceBase( @Override public List> splitIntoBundles( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - TableReference tableToExtract = getTableToExtract(bqOptions); - JobService jobService = bqServices.getJobService(bqOptions); - String extractJobId = getExtractJobId(jobIdToken); - List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); - - TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable( - tableToExtract.getProjectId(), - tableToExtract.getDatasetId(), - tableToExtract.getTableId()).getSchema(); - - cleanupTempResource(bqOptions); - return createSources(tempFiles, tableSchema); + // splitIntoBundles() can be called multiple times, e.g. Dataflow runner may call it multiple + // times with different desiredBundleSizeBytes in case the splitIntoBundles() call produces + // too many sources. We ignore desiredBundleSizeBytes anyway, however in any case, we should + // not initiate another BigQuery extract job for the repeated splitIntoBundles() calls. + if (cachedSplitResult == null) { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableToExtract = getTableToExtract(bqOptions); + JobService jobService = bqServices.getJobService(bqOptions); + String extractJobId = getExtractJobId(jobIdToken); + List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + + TableSchema tableSchema = + bqServices + .getDatasetService(bqOptions) + .getTable( + tableToExtract.getProjectId(), + tableToExtract.getDatasetId(), + tableToExtract.getTableId()) + .getSchema(); + + cleanupTempResource(bqOptions); + cachedSplitResult = createSources(tempFiles, tableSchema); + } + return cachedSplitResult; } protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index 7f9d2e95ce..1e4f733096 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -30,6 +30,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import com.google.api.client.util.Data; @@ -1130,10 +1131,14 @@ public void testBigQueryTableSourceInitSplit() throws Exception { List> sources = bqSource.splitIntoBundles(100, options); assertEquals(1, sources.size()); + // Simulate a repeated call to splitIntoBundles(), like a Dataflow worker will sometimes do. + sources = bqSource.splitIntoBundles(200, options); + assertEquals(1, sources.size()); BoundedSource actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); - Mockito.verify(mockJobService) + // A repeated call to splitIntoBundles() should not have caused a duplicate extract job. + Mockito.verify(mockJobService, times(1)) .startExtractJob(Mockito.any(), Mockito.any()); } From 52e593a18b92ead25341244922b5334307daa70f Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 21 Apr 2017 13:25:58 -0700 Subject: [PATCH 20/21] Makes cachedSplitResult transient in BigQuerySourceBase --- .../main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ba5fc3bb4a..d55dfd5910 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1211,7 +1211,7 @@ private abstract static class BigQuerySourceBase extends BoundedSource protected final BigQueryServices bqServices; protected final ValueProvider executingProject; - private List> cachedSplitResult; + private transient List> cachedSplitResult; private BigQuerySourceBase( String jobIdToken, From f89d619f68c94ac54971265b7ecf0b7778f072fe Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Fri, 28 Apr 2017 16:10:25 -0700 Subject: [PATCH 21/21] Fix a typo in Count.java --- .../java/com/google/cloud/dataflow/sdk/transforms/Count.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java index ffa11d13a3..99d5fb8c86 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Count.java @@ -21,7 +21,7 @@ import com.google.cloud.dataflow.sdk.values.PCollection; /** - * {@code PTransorm}s to count the elements in a {@link PCollection}. + * {@code PTransform}s to count the elements in a {@link PCollection}. * *

{@link Count#perElement()} can be used to count the number of occurrences of each * distinct element in the PCollection, {@link Count#perKey()} can be used to count the