From a34f06089d4b085d18da5f72be4378807827176c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Mon, 22 May 2017 07:51:13 +0200 Subject: [PATCH] [BEAM-1531] Add dynamic work rebalancing support for HBaseIO --- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 81 ++++++++++++++++++- .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 56 ++++++++++++- 2 files changed, 132 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 2ba682639ab7..7f58cef30310 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; +import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -303,6 +304,22 @@ static class HBaseSource extends BoundedSource { this.estimatedSizeBytes = estimatedSizeBytes; } + HBaseSource withStartKey(ByteKey startKey) throws IOException { + checkNotNull(startKey, "startKey"); + Read newRead = new Read(read.serializableConfiguration, read.tableId, + new SerializableScan( + new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes()))); + return new HBaseSource(newRead, estimatedSizeBytes); + } + + HBaseSource withEndKey(ByteKey endKey) throws IOException { + checkNotNull(endKey, "endKey"); + Read newRead = new Read(read.serializableConfiguration, read.tableId, + new SerializableScan( + new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes()))); + return new HBaseSource(newRead, estimatedSizeBytes); + } + @Override public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception { if (estimatedSizeBytes == null) { @@ -463,19 +480,25 @@ public Coder getOutputCoder() { } private static class HBaseReader extends BoundedSource.BoundedReader { - private final HBaseSource source; + private HBaseSource source; private Connection connection; private ResultScanner scanner; private Iterator iter; private Result current; + private final ByteKeyRangeTracker rangeTracker; private long recordsReturned; HBaseReader(HBaseSource source) { this.source = source; + Scan scan = source.read.serializableScan.get(); + ByteKeyRange range = ByteKeyRange + .of(ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow())); + rangeTracker = ByteKeyRangeTracker.of(range); } @Override public boolean start() throws IOException { + HBaseSource source = getCurrentSource(); Configuration configuration = source.read.serializableConfiguration.get(); String tableId = source.read.tableId; connection = ConnectionFactory.createConnection(configuration); @@ -495,9 +518,15 @@ public Result getCurrent() throws NoSuchElementException { @Override public boolean advance() throws IOException { - boolean hasRecord = iter.hasNext(); + if (!iter.hasNext()) { + return rangeTracker.markDone(); + } + final Result next = iter.next(); + boolean hasRecord = + rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow())) + || rangeTracker.markDone(); if (hasRecord) { - current = iter.next(); + current = next; ++recordsReturned; } return hasRecord; @@ -517,9 +546,53 @@ public void close() throws IOException { } @Override - public BoundedSource getCurrentSource() { + public synchronized HBaseSource getCurrentSource() { return source; } + + @Override + public final Double getFractionConsumed() { + return rangeTracker.getFractionConsumed(); + } + + @Override + public final long getSplitPointsConsumed() { + return rangeTracker.getSplitPointsConsumed(); + } + + @Override + @Nullable + public final synchronized HBaseSource splitAtFraction(double fraction) { + ByteKey splitKey; + try { + splitKey = rangeTracker.getRange().interpolateKey(fraction); + } catch (RuntimeException e) { + LOG.info("{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(), + fraction, e); + return null; + } + LOG.info( + "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey); + HBaseSource primary; + HBaseSource residual; + try { + primary = source.withEndKey(splitKey); + residual = source.withStartKey(splitKey); + } catch (Exception e) { + LOG.info( + "{}: Interpolating for fraction {} yielded invalid split key {}.", + rangeTracker.getRange(), + fraction, + splitKey, + e); + return null; + } + if (!rangeTracker.trySplitAtPosition(splitKey)) { + return null; + } + this.source = primary; + return residual; + } } /** diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index 806a27f72284..0b7f203b1c5f 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -18,6 +18,9 @@ package org.apache.beam.sdk.io.hbase; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; @@ -83,7 +86,7 @@ public class HBaseIOTest { private static HBaseTestingUtility htu; private static HBaseAdmin admin; - private static Configuration conf = HBaseConfiguration.create(); + private static final Configuration conf = HBaseConfiguration.create(); private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); private static final byte[] COLUMN_NAME = Bytes.toBytes("name"); private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email"); @@ -277,6 +280,57 @@ public void testReadingWithKeyRange() throws Exception { .withKeyRange(startRow, stopRow), 441); } + /** + * Tests dynamic work rebalancing exhaustively. + */ + @Test + public void testReadingSplitAtFractionExhaustive() throws Exception { + final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE"; + final int numRows = 7; + + createTable(table); + writeData(table, numRows); + + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); + HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */) + .withStartKey(ByteKey.of(48)).withEndKey(ByteKey.of(58)); + + assertSplitAtFractionExhaustive(source, null); + } + + /** + * Unit tests of splitAtFraction. + */ + @Test + public void testReadingSplitAtFraction() throws Exception { + final String table = "TEST-SPLIT-AT-FRACTION"; + final int numRows = 10; + + createTable(table); + writeData(table, numRows); + + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); + HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); + + // The value k is based on the partitioning schema for the data, in this test case, + // the partitioning is HEX-based, so we start from 1/16m and the value k will be + // around 1/256, so the tests are done in approximately k ~= 0.003922 steps + double k = 0.003922; + + assertSplitAtFractionFails(source, 0, k, null /* options */); + assertSplitAtFractionFails(source, 0, 1.0, null /* options */); + // With 1 items read, all split requests past k will succeed. + assertSplitAtFractionSucceedsAndConsistent(source, 1, k, null /* options */); + assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */); + // With 3 items read, all split requests past 3k will succeed. + assertSplitAtFractionFails(source, 3, 2 * k, null /* options */); + assertSplitAtFractionSucceedsAndConsistent(source, 3, 3 * k, null /* options */); + assertSplitAtFractionSucceedsAndConsistent(source, 3, 4 * k, null /* options */); + // With 6 items read, all split requests past 6k will succeed. + assertSplitAtFractionFails(source, 6, 5 * k, null /* options */); + assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */); + } + @Test public void testReadingDisplayData() { HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("fooTable");