From e3cab8ea88473a83e8d02b34baf3239b7226f82d Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Wed, 29 Jun 2016 12:03:27 -0700 Subject: [PATCH 1/3] Expose getSplitPointsConsumed() in BigtableIO --- .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index f725a661fc15..4f2db0e35ac0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -807,6 +807,11 @@ public final Double getFractionConsumed() { return rangeTracker.getFractionConsumed(); } + @Override + public final long getSplitPointsConsumed() { + return rangeTracker.getSplitPointsConsumed(); + } + @Override public final synchronized BigtableSource splitAtFraction(double fraction) { ByteKey splitKey; From 85d1a7b5ecccf94062e2da5b36a4947b0a4c9d9f Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Wed, 29 Jun 2016 14:40:51 -0700 Subject: [PATCH 2/3] Added getSplitPointsConsumed test for BigtableIO --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 4 +++ .../sdk/io/gcp/bigtable/BigtableIOTest.java | 30 ++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 4f2db0e35ac0..158f3ee8f91d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -768,6 +768,8 @@ public boolean start() throws IOException { && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())); if (hasRecord) { ++recordsReturned; + } else { + rangeTracker.markDone(); } return hasRecord; } @@ -784,6 +786,8 @@ public boolean advance() throws IOException { && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())); if (hasRecord) { ++recordsReturned; + } else { + rangeTracker.markDone(); } return hasRecord; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index c09943bbed83..1401960882a5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -25,7 +25,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verifyNotNull; - import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -33,6 +32,7 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; @@ -490,6 +490,34 @@ public void testWritingDisplayData() { assertThat(displayData, hasDisplayItem("tableId", "fooTable")); } + @Test + public void testGetSplitPointsConsumed() throws Exception { + final String table = "TEST-TABLE"; + final int numRows = 100; + int splitPointsConsumed = 0; + + makeTableData(table, numRows); + + BigtableSource source = + new BigtableSource(service, table, null, ByteKeyRange.ALL_KEYS, null); + + BoundedReader reader = source.createReader(TestPipeline.testingPipelineOptions()); + + reader.start(); + // Started, 0 split points consumed + assertEquals("splitPointsConsumed starting", splitPointsConsumed, reader.getSplitPointsConsumed()); + + // Split points consumed increases for each row read + while (reader.advance()) { + assertEquals("splitPointsConsumed advancing", ++splitPointsConsumed, reader.getSplitPointsConsumed()); + } + + // Reader marked as done, 100 split points consumed + assertEquals("splitPointsConsumed done", ++splitPointsConsumed, reader.getSplitPointsConsumed()); + + reader.close(); + } + //////////////////////////////////////////////////////////////////////////////////////////// private static final String COLUMN_FAMILY_NAME = "family"; private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column"); From 324e1a831f758481c3907c8755ce8396af4c1752 Mon Sep 17 00:00:00 2001 From: Ian Zhou Date: Wed, 29 Jun 2016 14:54:40 -0700 Subject: [PATCH 3/3] Minor changes to style and test --- .../apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 10 ++++------ .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 8 +++++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 158f3ee8f91d..cddb333ffd18 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -765,11 +765,10 @@ public boolean start() throws IOException { reader = service.createReader(getCurrentSource()); boolean hasRecord = reader.start() - && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())); + && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())) + || rangeTracker.markDone(); if (hasRecord) { ++recordsReturned; - } else { - rangeTracker.markDone(); } return hasRecord; } @@ -783,11 +782,10 @@ public synchronized BigtableSource getCurrentSource() { public boolean advance() throws IOException { boolean hasRecord = reader.advance() - && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())); + && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())) + || rangeTracker.markDone(); if (hasRecord) { ++recordsReturned; - } else { - rangeTracker.markDone(); } return hasRecord; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 1401960882a5..cdbaaac8fbba 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -505,15 +505,17 @@ public void testGetSplitPointsConsumed() throws Exception { reader.start(); // Started, 0 split points consumed - assertEquals("splitPointsConsumed starting", splitPointsConsumed, reader.getSplitPointsConsumed()); + assertEquals("splitPointsConsumed starting", + splitPointsConsumed, reader.getSplitPointsConsumed()); // Split points consumed increases for each row read while (reader.advance()) { - assertEquals("splitPointsConsumed advancing", ++splitPointsConsumed, reader.getSplitPointsConsumed()); + assertEquals("splitPointsConsumed advancing", + ++splitPointsConsumed, reader.getSplitPointsConsumed()); } // Reader marked as done, 100 split points consumed - assertEquals("splitPointsConsumed done", ++splitPointsConsumed, reader.getSplitPointsConsumed()); + assertEquals("splitPointsConsumed done", numRows, reader.getSplitPointsConsumed()); reader.close(); }