From 436d5fe0c0cdb87d61fd34aab0ef2a26bd2f1dde Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Tue, 21 Feb 2017 10:57:04 -0800 Subject: [PATCH 1/4] Adds a few roverloads to ValueProvider, and a ValueProviders class to accompany the interface. --- .../beam/sdk/options/ValueProviders.java | 20 +++++++++ .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 43 ++++++++++--------- 2 files changed, 43 insertions(+), 20 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java new file mode 100644 index 000000000000..7b37a11ffcea --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java @@ -0,0 +1,20 @@ +package org.apache.beam.sdk.options; + +import javax.annotation.Nullable; + +/** + * Created by rfernand on 2/21/17. + */ +public final class ValueProviders { + + // Prevent instantiation. + private ValueProviders() {} + + /** + * Null-safe version of {@link ValueProvider#get()}. + */ + @Nullable + public static T getValueOrNull(@Nullable ValueProvider provider) { + return provider == null ? null : provider.get(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 2d6cbba41b41..4cc8c4bd1e73 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -54,6 +54,9 @@ import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.options.ValueProviders; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -166,7 +169,7 @@ public class BigtableIO { */ @Experimental public static Read read() { - return new Read(null, "", ByteKeyRange.ALL_KEYS, null, null); + return new Read(null, null, StaticValueProvider.of(ByteKeyRange.ALL_KEYS), null, null); } /** @@ -240,7 +243,7 @@ public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) { */ public Read withRowFilter(RowFilter filter) { checkNotNull(filter, "filter"); - return new Read(options, tableId, keyRange, filter, bigtableService); + return new Read(options, tableId, keyRange, StaticValueProvider.of(filter), bigtableService); } /** @@ -250,7 +253,7 @@ public Read withRowFilter(RowFilter filter) { */ public Read withKeyRange(ByteKeyRange keyRange) { checkNotNull(keyRange, "keyRange"); - return new Read(options, tableId, keyRange, filter, bigtableService); + return new Read(options, tableId, StaticValueProvider.of(keyRange), filter, bigtableService); } /** @@ -260,7 +263,7 @@ public Read withKeyRange(ByteKeyRange keyRange) { */ public Read withTableId(String tableId) { checkNotNull(tableId, "tableId"); - return new Read(options, tableId, keyRange, filter, bigtableService); + return new Read(options, StaticValueProvider.of(tableId), keyRange, filter, bigtableService); } /** @@ -275,14 +278,14 @@ public BigtableOptions getBigtableOptions() { * {@link ByteKeyRange#ALL_KEYS} to scan the entire table. */ public ByteKeyRange getKeyRange() { - return keyRange; + return ValueProviders.getValueOrNull(keyRange); } /** * Returns the table being read from. */ public String getTableId() { - return tableId; + return tableId.get(); } @Override @@ -349,20 +352,20 @@ public String toString() { * source is being built. */ @Nullable private final BigtableOptions options; - private final String tableId; - private final ByteKeyRange keyRange; - @Nullable private final RowFilter filter; + @Nullable private final ValueProvider tableId; + private final ValueProvider keyRange; + @Nullable private final ValueProvider filter; @Nullable private final BigtableService bigtableService; private Read( @Nullable BigtableOptions options, - String tableId, - ByteKeyRange keyRange, - @Nullable RowFilter filter, + @Nullable ValueProvider tableId, + ValueProvider keyRange, + @Nullable ValueProvider filter, @Nullable BigtableService bigtableService) { this.options = options; - this.tableId = checkNotNull(tableId, "tableId"); - this.keyRange = checkNotNull(keyRange, "keyRange"); + this.tableId = tableId; + this.keyRange = keyRange; this.filter = filter; this.bigtableService = bigtableService; } @@ -687,9 +690,9 @@ private BigtableIO() {} static class BigtableSource extends BoundedSource { public BigtableSource( SerializableFunction serviceFactory, - String tableId, - @Nullable RowFilter filter, - ByteKeyRange range, + ValueProvider tableId, + @Nullable ValueProvider filter, + ValueProvider range, @Nullable Long estimatedSizeBytes) { this.serviceFactory = serviceFactory; this.tableId = tableId; @@ -710,9 +713,9 @@ public String toString() { ////// Private state and internal implementation details ////// private final SerializableFunction serviceFactory; - private final String tableId; - @Nullable private final RowFilter filter; - private final ByteKeyRange range; + private final ValueProvider tableId; + @Nullable private final ValueProvider filter; + private final ValueProvider range; @Nullable private Long estimatedSizeBytes; @Nullable private transient List sampleRowKeys; From f34e05f6ca0f00412dfee8e6d1fac9375711bf5b Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Tue, 21 Feb 2017 11:30:41 -0800 Subject: [PATCH 2/4] Finishes adding ValueProvider for key range, filter, and tableId. --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 77 +++++++++++++------ 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 4cc8c4bd1e73..386d5ca89252 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -303,14 +303,22 @@ public BigtableService apply(PipelineOptions options) { @Override public void validate(PBegin input) { checkArgument(options != null, "BigtableOptions not specified"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); - try { - checkArgument( - getBigtableService(input.getPipeline().getOptions()).tableExists(tableId), - "Table %s does not exist", - tableId); - } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + + if (tableId == null) { + throw new IllegalArgumentException("tableId not specified"); + } else if (!tableId.isAccessible()) { + LOG.warn("Skipping validation for tableId"); + } else if (tableId.get().isEmpty()) { + throw new IllegalArgumentException("tableId not specified"); + } else { + try { + checkArgument( + getBigtableService(input.getPipeline().getOptions()).tableExists(tableId.get()), + "Table %s does not exist", + tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId.get(), e); + } } } @@ -719,16 +727,28 @@ public String toString() { @Nullable private Long estimatedSizeBytes; @Nullable private transient List sampleRowKeys; - protected BigtableSource withStartKey(ByteKey startKey) { + protected BigtableSource withStartKey(final ByteKey startKey) { checkNotNull(startKey, "startKey"); return new BigtableSource( - serviceFactory, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes); + serviceFactory, tableId, filter, ValueProvider.NestedValueProvider.of(range, + new SerializableFunction() { + @Override + public ByteKeyRange apply(ByteKeyRange input) { + return input.withStartKey(startKey); + } + }), estimatedSizeBytes); } - protected BigtableSource withEndKey(ByteKey endKey) { + protected BigtableSource withEndKey(final ByteKey endKey) { checkNotNull(endKey, "endKey"); return new BigtableSource( - serviceFactory, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes); + serviceFactory, tableId, filter, ValueProvider.NestedValueProvider.of(range, + new SerializableFunction() { + @Override + public ByteKeyRange apply(ByteKeyRange input) { + return input.withEndKey(endKey); + } + }), estimatedSizeBytes); } protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) { @@ -790,7 +810,7 @@ private List splitIntoBundlesBasedOnSamples( responseOffset, lastOffset); - if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) { + if (!range.get().overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) { // This region does not overlap the scan, so skip it. lastOffset = responseOffset; lastEndKey = responseEndKey; @@ -800,15 +820,15 @@ private List splitIntoBundlesBasedOnSamples( // Calculate the beginning of the split as the larger of startKey and the end of the last // split. Unspecified start is smallest key so is correctly treated as earliest key. ByteKey splitStartKey = lastEndKey; - if (splitStartKey.compareTo(range.getStartKey()) < 0) { - splitStartKey = range.getStartKey(); + if (splitStartKey.compareTo(range.get().getStartKey()) < 0) { + splitStartKey = range.get().getStartKey(); } // Calculate the end of the split as the smaller of endKey and the end of this sample. Note // that range.containsKey handles the case when range.getEndKey() is empty. ByteKey splitEndKey = responseEndKey; - if (!range.containsKey(splitEndKey)) { - splitEndKey = range.getEndKey(); + if (!range.get().containsKey(splitEndKey)) { + splitEndKey = range.get().getEndKey(); } // We know this region overlaps the desired key range, and we know a rough estimate of its @@ -830,8 +850,8 @@ private List splitIntoBundlesBasedOnSamples( // 1. we did not scan to the end yet (lastEndKey is concrete, not 0-length). // 2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey). if (!lastEndKey.isEmpty() - && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) { - splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey())); + && (range.get().getEndKey().isEmpty() || lastEndKey.compareTo(range.get().getEndKey()) < 0)) { + splits.add(this.withStartKey(lastEndKey).withEndKey(range.get().getEndKey())); } List ret = splits.build(); @@ -866,7 +886,7 @@ private long getEstimatedSizeBytesBasedOnSamples(List sam // Skip an empty region. lastOffset = currentOffset; continue; - } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) { + } else if (range.get().overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) { estimatedSizeBytes += currentOffset - lastOffset; } currentStartKey = currentEndKey; @@ -882,7 +902,13 @@ public BoundedReader createReader(PipelineOptions options) throws IOExcepti @Override public void validate() { - checkArgument(!tableId.isEmpty(), "tableId cannot be empty"); + if (tableId == null) { + throw new IllegalArgumentException("tableId not specified"); + } else if (!tableId.isAccessible()) { + LOG.warn("Skipping validation for tableId"); + } else if (tableId.get().isEmpty()) { + throw new IllegalArgumentException("tableId not specified"); + } } @Override @@ -940,16 +966,19 @@ private List splitKeyRangeIntoBundleSizedSubranges( return splits.build(); } + @Nullable public ByteKeyRange getRange() { - return range; + return ValueProviders.getValueOrNull(range); } + @Nullable public RowFilter getRowFilter() { - return filter; + return ValueProviders.getValueOrNull(filter); } + @Nullable public String getTableId() { - return tableId; + return ValueProviders.getValueOrNull(tableId); } } From 10c5371197c0b13d47b4170f228797a5661b2270 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Wed, 12 Apr 2017 10:47:28 -0700 Subject: [PATCH 3/4] Adds support for runtime projectId and InstanceID ValueProviders for each are now in Read and Write. When the service is being retrieved at runtime, we will override those two values (if provided) in the BigtableOptions. --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 48 +++++++++++++++---- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 316ac0336557..ee232dab4b07 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -167,7 +167,7 @@ public class BigtableIO { */ @Experimental public static Read read() { - return new Read(null, null, StaticValueProvider.of(ByteKeyRange.ALL_KEYS), null, null); + return new Read(null, null, StaticValueProvider.of(ByteKeyRange.ALL_KEYS), null, null, null, null); } /** @@ -179,7 +179,7 @@ public static Read read() { */ @Experimental public static Write write() { - return new Write(null, "", null); + return new Write(null, "", null, null, null); } /** @@ -220,7 +220,7 @@ public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) { BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build(); - return new Read(optionsWithAgent, tableId, keyRange, filter, bigtableService); + return new Read(optionsWithAgent, tableId, keyRange, filter, null, null, bigtableService); } /** @@ -231,7 +231,7 @@ public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) { */ public Read withRowFilter(RowFilter filter) { checkNotNull(filter, "filter"); - return new Read(options, tableId, keyRange, StaticValueProvider.of(filter), bigtableService); + return new Read(options, tableId, keyRange, StaticValueProvider.of(filter), null, null, bigtableService); } /** @@ -241,7 +241,7 @@ public Read withRowFilter(RowFilter filter) { */ public Read withKeyRange(ByteKeyRange keyRange) { checkNotNull(keyRange, "keyRange"); - return new Read(options, tableId, StaticValueProvider.of(keyRange), filter, bigtableService); + return new Read(options, tableId, StaticValueProvider.of(keyRange), filter, null, null, bigtableService); } /** @@ -251,7 +251,7 @@ public Read withKeyRange(ByteKeyRange keyRange) { */ public Read withTableId(String tableId) { checkNotNull(tableId, "tableId"); - return new Read(options, StaticValueProvider.of(tableId), keyRange, filter, bigtableService); + return new Read(options, StaticValueProvider.of(tableId), keyRange, filter,null, null, bigtableService); } /** @@ -351,6 +351,8 @@ public String toString() { @Nullable private final ValueProvider tableId; private final ValueProvider keyRange; @Nullable private final ValueProvider filter; + @Nullable private final ValueProvider projectId; + @Nullable private final ValueProvider instanceId; @Nullable private final BigtableService bigtableService; private Read( @@ -358,11 +360,15 @@ private Read( @Nullable ValueProvider tableId, ValueProvider keyRange, @Nullable ValueProvider filter, + @Nullable ValueProvider projectId, + @Nullable ValueProvider instanceId, @Nullable BigtableService bigtableService) { this.options = options; this.tableId = tableId; this.keyRange = keyRange; this.filter = filter; + this.projectId = projectId; + this.instanceId = instanceId; this.bigtableService = bigtableService; } @@ -376,7 +382,7 @@ private Read( */ Read withBigtableService(BigtableService bigtableService) { checkNotNull(bigtableService, "bigtableService"); - return new Read(options, tableId, keyRange, filter, bigtableService); + return new Read(options, tableId, keyRange, filter, null, null, bigtableService); } /** @@ -393,6 +399,14 @@ BigtableService getBigtableService(PipelineOptions pipelineOptions) { return bigtableService; } BigtableOptions.Builder clonedOptions = options.toBuilder(); + // Override the projectId in the options with the runtime-provided projectId. + if(projectId != null) { + clonedOptions.setProjectId(projectId.get()); + } + // Override the instanceId in the options with the runtime-provided projectId. + if(instanceId != null) { + clonedOptions.setInstanceId(instanceId.get()); + } if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials) { clonedOptions.setCredentialOptions( CredentialOptions.credential( @@ -418,14 +432,20 @@ public static class Write */ @Nullable private final BigtableOptions options; private final String tableId; + @Nullable ValueProvider projectId; + @Nullable ValueProvider instanceId; @Nullable private final BigtableService bigtableService; private Write( @Nullable BigtableOptions options, String tableId, + @Nullable ValueProvider projectId, + @Nullable ValueProvider instanceId, @Nullable BigtableService bigtableService) { this.options = options; this.tableId = checkNotNull(tableId, "tableId"); + this.projectId = projectId; + this.instanceId = instanceId; this.bigtableService = bigtableService; } @@ -463,7 +483,7 @@ public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) { .setUseCachedDataPool(true); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build(); - return new Write(optionsWithAgent, tableId, bigtableService); + return new Write(optionsWithAgent, tableId, null, null, bigtableService); } /** @@ -473,7 +493,7 @@ public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) { */ public Write withTableId(String tableId) { checkNotNull(tableId, "tableId"); - return new Write(options, tableId, bigtableService); + return new Write(options, tableId, null, null, bigtableService); } /** @@ -526,7 +546,7 @@ public void validate(PCollection>> input) { */ Write withBigtableService(BigtableService bigtableService) { checkNotNull(bigtableService, "bigtableService"); - return new Write(options, tableId, bigtableService); + return new Write(options, tableId, null, null, bigtableService); } @Override @@ -564,6 +584,14 @@ BigtableService getBigtableService(PipelineOptions pipelineOptions) { return bigtableService; } BigtableOptions.Builder clonedOptions = options.toBuilder(); + // Override the projectId in the options with the runtime-provided projectId. + if(projectId != null) { + clonedOptions.setProjectId(projectId.get()); + } + // Override the instanceId in the options with the runtime-provided projectId. + if(instanceId != null) { + clonedOptions.setInstanceId(instanceId.get()); + } if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials) { clonedOptions.setCredentialOptions( CredentialOptions.credential( From 0070393f1941243077429653cfc8e2333ef07302 Mon Sep 17 00:00:00 2001 From: Rafael Fernandez Date: Wed, 12 Apr 2017 13:05:13 -0700 Subject: [PATCH 4/4] Make tests green --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 21 +++++----- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 39 ++++++++++++------- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index ee232dab4b07..d5b533ebf9f7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -179,7 +179,7 @@ public static Read read() { */ @Experimental public static Write write() { - return new Write(null, "", null, null, null); + return new Write(null, null, null, null, null); } /** @@ -431,19 +431,19 @@ public static class Write * source is being built. */ @Nullable private final BigtableOptions options; - private final String tableId; + @Nullable private final ValueProvider tableId; @Nullable ValueProvider projectId; @Nullable ValueProvider instanceId; @Nullable private final BigtableService bigtableService; private Write( @Nullable BigtableOptions options, - String tableId, + @Nullable ValueProvider tableId, @Nullable ValueProvider projectId, @Nullable ValueProvider instanceId, @Nullable BigtableService bigtableService) { this.options = options; - this.tableId = checkNotNull(tableId, "tableId"); + this.tableId = tableId; this.projectId = projectId; this.instanceId = instanceId; this.bigtableService = bigtableService; @@ -493,7 +493,7 @@ public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) { */ public Write withTableId(String tableId) { checkNotNull(tableId, "tableId"); - return new Write(options, tableId, null, null, bigtableService); + return new Write(options, ValueProvider.StaticValueProvider.of(tableId), null, null, bigtableService); } /** @@ -507,12 +507,12 @@ public BigtableOptions getBigtableOptions() { * Returns the table being written to. */ public String getTableId() { - return tableId; + return tableId.get(); } @Override public PDone expand(PCollection>> input) { - input.apply(ParDo.of(new BigtableWriterFn(tableId, + input.apply(ParDo.of(new BigtableWriterFn(tableId.get(), new SerializableFunction() { @Override public BigtableService apply(PipelineOptions options) { @@ -525,14 +525,15 @@ public BigtableService apply(PipelineOptions options) { @Override public void validate(PCollection>> input) { checkArgument(options != null, "BigtableOptions not specified"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); + if(tableId == null) throw new IllegalArgumentException("Table ID not specified"); + checkArgument(!tableId.get().isEmpty(), "Table ID not specified"); try { checkArgument( - getBigtableService(input.getPipeline().getOptions()).tableExists(tableId), + getBigtableService(input.getPipeline().getOptions()).tableExists(tableId.get()), "Table %s does not exist", tableId); } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + LOG.warn("Error checking whether table {} exists; proceeding.", ValueProvider.StaticValueProvider.of(tableId), e); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 1c770a2e2958..63ebccd7dbf4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -86,6 +86,7 @@ import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -278,7 +279,7 @@ public void testReadingFailsTableDoesNotExist() throws Exception { // Exception will be thrown by read.validate() when read is applied. thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(String.format("Table %s does not exist", table)); + thrown.expectMessage(String.format("Table %s does not exist", ValueProvider.StaticValueProvider.of(table))); p.apply(read); } @@ -423,7 +424,8 @@ public void testReadingSplitAtFractionExhaustive() throws Exception { service.setupSampleRowKeys(table, numSamples, bytesPerRow); BigtableSource source = - new BigtableSource(serviceFactory, table, null, service.getTableRange(table), null); + new BigtableSource(serviceFactory, ValueProvider.StaticValueProvider.of(table), null, + ValueProvider.StaticValueProvider.of(service.getTableRange(table)), null); assertSplitAtFractionExhaustive(source, null); } @@ -440,7 +442,8 @@ public void testReadingSplitAtFraction() throws Exception { service.setupSampleRowKeys(table, numSamples, bytesPerRow); BigtableSource source = - new BigtableSource(serviceFactory, table, null, service.getTableRange(table), null); + new BigtableSource(serviceFactory, ValueProvider.StaticValueProvider.of(table), null, + ValueProvider.StaticValueProvider.of(service.getTableRange(table)), null); // With 0 items read, all split requests will fail. assertSplitAtFractionFails(source, 0, 0.1, null /* options */); assertSplitAtFractionFails(source, 0, 1.0, null /* options */); @@ -471,10 +474,10 @@ public void testReadingWithSplits() throws Exception { // Generate source and split it. BigtableSource source = new BigtableSource(serviceFactory, - table, - null /*filter*/, - ByteKeyRange.ALL_KEYS, - null /*size*/); + ValueProvider.StaticValueProvider.of(table), + null, + ValueProvider.StaticValueProvider.of(ByteKeyRange.ALL_KEYS), + null); List splits = source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */); @@ -499,9 +502,9 @@ public void testReadingWithSubSplits() throws Exception { // Generate source and split it. BigtableSource source = new BigtableSource(serviceFactory, - table, + ValueProvider.StaticValueProvider.of(table), null /*filter*/, - ByteKeyRange.ALL_KEYS, + ValueProvider.StaticValueProvider.of(ByteKeyRange.ALL_KEYS), null /*size*/); List splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null); @@ -527,7 +530,11 @@ public void testReadingWithFilterAndSubSplits() throws Exception { RowFilter filter = RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build(); BigtableSource source = - new BigtableSource(serviceFactory, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/); + new BigtableSource(serviceFactory, + ValueProvider.StaticValueProvider.of(table), + ValueProvider.StaticValueProvider.of(filter), + ValueProvider.StaticValueProvider.of(ByteKeyRange.ALL_KEYS), + null); List splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null); // Test num splits and split equality. @@ -555,9 +562,9 @@ public void testReadingDisplayData() { hasLabel("Table ID"), hasValue("fooTable")))); - assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString())); + assertThat(displayData, hasDisplayItem("rowFilter", ValueProvider.StaticValueProvider.of(rowFilter).toString())); - assertThat(displayData, hasDisplayItem("keyRange", keyRange.toString())); + assertThat(displayData, hasDisplayItem("keyRange", ValueProvider.StaticValueProvider.of(keyRange).toString())); // BigtableIO adds user-agent to options; assert only on key and not value. assertThat(displayData, hasDisplayItem("bigtableOptions")); @@ -622,7 +629,7 @@ public void testWritingFailsTableDoesNotExist() throws Exception { // Exception will be thrown by write.validate() when write is applied. thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(String.format("Table %s does not exist", table)); + thrown.expectMessage(String.format("Table %s does not exist", ValueProvider.StaticValueProvider.of(table))); emptyInput.apply("write", defaultWrite.withTableId(table)); } @@ -663,7 +670,11 @@ public void testGetSplitPointsConsumed() throws Exception { makeTableData(table, numRows); BigtableSource source = - new BigtableSource(serviceFactory, table, null, ByteKeyRange.ALL_KEYS, null); + new BigtableSource(serviceFactory, + ValueProvider.StaticValueProvider.of(table), + null, + ValueProvider.StaticValueProvider.of(ByteKeyRange.ALL_KEYS), + null); BoundedReader reader = source.createReader(TestPipeline.testingPipelineOptions());