Skip to content

Commit

Permalink
[BEAM-961] This closes #1505
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed Dec 6, 2016
2 parents 1efda59 + 41ae08b commit 493c04f
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,26 @@
/**
* A {@link PTransform} that produces longs. When used to produce a
* {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0}
* and counts up to a specified maximum. When used to produce an
* or starting value, and counts up to a specified maximum. When used to produce an
* {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
* and then never produces more output. (In practice, this limit should never be reached.)
*
* <p>The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and
* {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
* supports dynamic work rebalancing.
*
* <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingInput#upTo(long)}:
* <p>To produce a bounded {@code PCollection<Long>} starting from {@code 0},
* use {@link CountingInput#upTo(long)}:
*
* <pre>{@code
* Pipeline p = ...
* PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
* PCollection<Long> bounded = p.apply(producer);
* }</pre>
*
* <p>To produce a bounded {@code PCollection<Long>} starting from {@code startOffset},
* use {@link CountingInput#forSubrange(long, long)} instead.
*
* <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
* calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
* with timestamps other than {@link Instant#now}.
Expand All @@ -75,6 +79,16 @@ public static BoundedCountingInput upTo(long numElements) {
return new BoundedCountingInput(numElements);
}

/**
* Creates a {@link BoundedCountingInput} that will produce elements
* starting from {@code startIndex} to {@code endIndex - 1}.
*/
public static BoundedCountingInput forSubrange(long startIndex, long endIndex) {
checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)",
endIndex, startIndex);
return new BoundedCountingInput(startIndex, endIndex);
}

/**
* Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up
* to {@link Long#MAX_VALUE}.
Expand Down Expand Up @@ -102,23 +116,35 @@ public static UnboundedCountingInput unbounded() {
* 0.
*/
public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
private final long numElements;
private final long startIndex;
private final long endIndex;

private BoundedCountingInput(long numElements) {
this.numElements = numElements;
this.endIndex = numElements;
this.startIndex = 0;
}

private BoundedCountingInput(long startIndex, long endIndex) {
this.endIndex = endIndex;
this.startIndex = startIndex;
}

@SuppressWarnings("deprecation")
@Override
public PCollection<Long> apply(PBegin begin) {
return begin.apply(Read.from(CountingSource.upTo(numElements)));
return begin.apply(Read.from(CountingSource.createSourceForSubrange(startIndex, endIndex)));
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("upTo", numElements)
.withLabel("Count Up To"));

if (startIndex == 0) {
builder.add(DisplayData.item("upTo", endIndex)
.withLabel("Count Up To"));
} else {
builder.add(DisplayData.item("startAt", startIndex).withLabel("Count Starting At"))
.add(DisplayData.item("upTo", endIndex).withLabel("Count Up To"));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ public static BoundedSource<Long> upTo(long numElements) {
return new BoundedCountingSource(0, numElements);
}

/**
* Creates a {@link BoundedSource} that will produce elements
* from {@code startIndex} to {@code endIndex - 1}.
*/
static BoundedSource<Long> createSourceForSubrange(long startIndex, long endIndex) {
checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)",
endIndex, startIndex);

return new BoundedCountingSource(startIndex, endIndex);
}

/**
* Create a new {@link UnboundedCountingSource}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@
*/
@RunWith(JUnit4.class)
public class CountingInputTest {
public static void addCountingAsserts(PCollection<Long> input, long numElements) {
public static void addCountingAsserts(PCollection<Long> input, long start, long end) {
// Count == numElements
PAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
.isEqualTo(numElements);
.isEqualTo(end - start);
// Unique count == numElements
PAssert.thatSingleton(
input
.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(0L);
// Max == numElements-1
.isEqualTo(end - start);
// Min == start
PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(start);
// Max == end-1
PAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
.isEqualTo(numElements - 1);
.isEqualTo(end - 1);
}

@Test
Expand All @@ -73,7 +73,19 @@ public void testBoundedInput() {
long numElements = 1000;
PCollection<Long> input = p.apply(CountingInput.upTo(numElements));

addCountingAsserts(input, numElements);
addCountingAsserts(input, 0, numElements);
p.run();
}

@Test
@Category(RunnableOnService.class)
public void testBoundedInputSubrange() {
Pipeline p = TestPipeline.create();
long start = 10;
long end = 1000;
PCollection<Long> input = p.apply(CountingInput.forSubrange(start, end));

addCountingAsserts(input, start, end);
p.run();
}

Expand All @@ -84,6 +96,14 @@ public void testBoundedDisplayData() {
assertThat(displayData, hasDisplayItem("upTo", 1234));
}

@Test
public void testBoundedDisplayDataSubrange() {
PTransform<?, ?> input = CountingInput.forSubrange(12, 1234);
DisplayData displayData = DisplayData.from(input);
assertThat(displayData, hasDisplayItem("startAt", 12));
assertThat(displayData, hasDisplayItem("upTo", 1234));
}

@Test
@Category(RunnableOnService.class)
public void testUnboundedInput() {
Expand All @@ -92,7 +112,7 @@ public void testUnboundedInput() {

PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));

addCountingAsserts(input, numElements);
addCountingAsserts(input, 0, numElements);
p.run();
}

Expand All @@ -110,7 +130,7 @@ public void testUnboundedInputRate() {
.withRate(elemsPerPeriod, periodLength)
.withMaxNumRecords(numElements));

addCountingAsserts(input, numElements);
addCountingAsserts(input, 0, numElements);
long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
Instant startTime = Instant.now();
p.run();
Expand All @@ -136,7 +156,7 @@ public void testUnboundedInputTimestamps() {
CountingInput.unbounded()
.withTimestampFn(new ValueAsTimestampFn())
.withMaxNumRecords(numElements));
addCountingAsserts(input, numElements);
addCountingAsserts(input, 0, numElements);

PCollection<Long> diffs =
input
Expand Down

0 comments on commit 493c04f

Please sign in to comment.