Skip to content

Commit

Permalink
Merge pull request #6527: [BEAM-3652] Port WriteWithShardingFactoryTe…
Browse files Browse the repository at this point in the history
…st off DoFnTester
  • Loading branch information
kennknowles committed Oct 1, 2018
2 parents a9d0d66 + 488fb5f commit f219bb1
Showing 1 changed file with 44 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -163,74 +163,81 @@ public WriteOperation<Void, Object> createWriteOperation() {
}

@Test
public void keyBasedOnCountFnWithNoElements() throws Exception {
@Category(ValidatesRunner.class)
public void keyBasedOnCountFnWithNoElements() {
CalculateShardsFn fn = new CalculateShardsFn(0);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);

List<Integer> outputs = fnTester.processBundle(0L);
assertThat(outputs, containsInAnyOrder(1));
long input = 0L;
int output = 1;
PAssert.that(p.apply(Create.of(input)).apply(ParDo.of(fn))).containsInAnyOrder(output);
p.run().waitUntilFinish();
}

@Test
public void keyBasedOnCountFnWithOneElement() throws Exception {
@Category(ValidatesRunner.class)
public void keyBasedOnCountFnWithOneElement() {
CalculateShardsFn fn = new CalculateShardsFn(0);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);

List<Integer> outputs = fnTester.processBundle(1L);
assertThat(outputs, containsInAnyOrder(1));
long input = 1L;
int output = 1;
PAssert.that(p.apply(Create.of(input)).apply(ParDo.of(fn))).containsInAnyOrder(output);
p.run().waitUntilFinish();
}

@Test
public void keyBasedOnCountFnWithTwoElements() throws Exception {
@Category(ValidatesRunner.class)
public void keyBasedOnCountFnWithTwoElements() {
CalculateShardsFn fn = new CalculateShardsFn(0);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);

List<Integer> outputs = fnTester.processBundle(2L);
assertThat(outputs, containsInAnyOrder(2));
long input = 2L;
int output = 2;
PAssert.that(p.apply(Create.of(input)).apply(ParDo.of(fn))).containsInAnyOrder(output);
p.run().waitUntilFinish();
}

@Test
public void keyBasedOnCountFnFewElementsThreeShards() throws Exception {
@Category(ValidatesRunner.class)
public void keyBasedOnCountFnFewElementsThreeShards() {
CalculateShardsFn fn = new CalculateShardsFn(0);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);

List<Integer> outputs = fnTester.processBundle(5L);
assertThat(outputs, containsInAnyOrder(3));
long input = 5L;
int output = 3;
PAssert.that(p.apply(Create.of(input)).apply(ParDo.of(fn))).containsInAnyOrder(output);
p.run().waitUntilFinish();
}

@Test
public void keyBasedOnCountFnManyElements() throws Exception {
@Category(ValidatesRunner.class)
public void keyBasedOnCountFnManyElements() {
DoFn<Long, Integer> fn = new CalculateShardsFn(0);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);

List<Integer> shard = fnTester.processBundle((long) Math.pow(10, 10));
assertThat(shard, containsInAnyOrder(10));
long input = (long) Math.pow(10, 10);
int output = 10;
PAssert.that(p.apply(Create.of(input)).apply(ParDo.of(fn))).containsInAnyOrder(output);
p.run().waitUntilFinish();
}

@Test
public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
@Category(ValidatesRunner.class)
public void keyBasedOnCountFnFewElementsExtraShards() {
long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3;
PCollection<Long> inputCount = p.apply(Create.of(countValue));
PCollectionView<Long> elementCountView =
inputCount.apply(View.<Long>asSingleton().withDefaultValue(countValue));
CalculateShardsFn fn = new CalculateShardsFn(3);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);

fnTester.setSideInput(elementCountView, GlobalWindow.INSTANCE, countValue);

List<Integer> kvs = fnTester.processBundle(10L);
assertThat(kvs, containsInAnyOrder(6));
int output = 6;
PAssert.that(p.apply(Create.of(countValue)).apply(ParDo.of(fn))).containsInAnyOrder(output);
p.run().waitUntilFinish();
}

@Test
public void keyBasedOnCountFnManyElementsExtraShards() throws Exception {
@Category(ValidatesRunner.class)
public void keyBasedOnCountFnManyElementsExtraShards() {
CalculateShardsFn fn = new CalculateShardsFn(3);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);

double count = Math.pow(10, 10);
int output = 13;

List<Integer> shards = fnTester.processBundle((long) count);
assertThat(shards, containsInAnyOrder(13));
PAssert.that(p.apply(Create.of((long) count)).apply(ParDo.of(fn))).containsInAnyOrder(output);
p.run().waitUntilFinish();
}

private static class FakeFilenamePolicy extends FileBasedSink.FilenamePolicy {
Expand Down

0 comments on commit f219bb1

Please sign in to comment.