Skip to content

Commit

Permalink
This closes #1586
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed Dec 13, 2016
2 parents d9657ff + 47cc2dc commit 91cc606
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 11 deletions.
7 changes: 7 additions & 0 deletions runners/core-java/pom.xml
Expand Up @@ -152,6 +152,13 @@

<!-- test dependencies -->

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.21</version>
<scope>test</scope>
</dependency>

<!-- Utilities such as WindowMatchers -->
<dependency>
<groupId>org.apache.beam</groupId>
Expand Down
Expand Up @@ -118,7 +118,12 @@ public void populateDisplayData(DisplayData.Builder builder) {
public static class BoundedToUnboundedSourceAdapter<T>
extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {

private BoundedSource<T> boundedSource;
@SuppressWarnings("unused") // for Kryo
private BoundedToUnboundedSourceAdapter() {
this.boundedSource = null;
}

private final BoundedSource<T> boundedSource;

public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
this.boundedSource = boundedSource;
Expand Down
Expand Up @@ -17,19 +17,28 @@
*/
package org.apache.beam.runners.core;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
Expand All @@ -44,11 +53,13 @@
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
Expand All @@ -65,6 +76,7 @@
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.objenesis.strategy.StdInstantiatorStrategy;

/**
* Unit tests for {@link UnboundedReadFromBoundedSource}.
Expand Down Expand Up @@ -101,27 +113,92 @@ public void testBoundedToUnboundedSourceAdapter() throws Exception {

PCollection<Long> output =
p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));

// Count == numElements
PAssert
.thatSingleton(output.apply("Count", Count.<Long>globally()))
.isEqualTo(numElements);
.thatSingleton(output.apply("Count", Count.<Long>globally()))
.isEqualTo(numElements);
// Unique count == numElements
PAssert
.thatSingleton(output.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
.thatSingleton(output.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
.isEqualTo(numElements);
// Min == 0
PAssert
.thatSingleton(output.apply("Min", Min.<Long>globally()))
.isEqualTo(0L);
.thatSingleton(output.apply("Min", Min.<Long>globally()))
.isEqualTo(0L);
// Max == numElements-1
PAssert
.thatSingleton(output.apply("Max", Max.<Long>globally()))
.isEqualTo(numElements - 1);
.thatSingleton(output.apply("Max", Max.<Long>globally()))
.isEqualTo(numElements - 1);

p.run();
}

@Test
public void testAdapterKryoSerializationNoMemoization() throws IOException {
long numElements = 100;
BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
new BoundedToUnboundedSourceAdapter<>(boundedSource);

//Kryo instantiation
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());

//Serialization of object without any memoization
ByteArrayOutputStream adapterWithoutMemoizationBos = new ByteArrayOutputStream();
try (Output output = new Output(adapterWithoutMemoizationBos)) {
kryo.writeObject(output, unboundedSource);
}

// Copy empty and memoized variants of the Adapater
ByteArrayInputStream bisWithoutMemoization =
new ByteArrayInputStream(adapterWithoutMemoizationBos.toByteArray());
BoundedToUnboundedSourceAdapter<Long> copiedWithoutMemoization =
kryo.readObject(new Input(bisWithoutMemoization), BoundedToUnboundedSourceAdapter.class);

Source.Reader<Long> reader =
copiedWithoutMemoization.createReader(TestPipeline.testingPipelineOptions(), null);
List<Long> readLongs = SourceTestUtils.readFromUnstartedReader(reader);
assertThat(readLongs, hasSize((int) numElements));
List<Long> expectedLongs = new ArrayList<>();
for (int i = 0; i < numElements; i++) {
expectedLongs.add((long) i);
}
assertThat(readLongs, containsInAnyOrder(expectedLongs.toArray()));
}

@Test
public void testAdapterKryoSerializationWithMemoization() throws IOException {
long numElements = 100;
BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
new BoundedToUnboundedSourceAdapter<>(boundedSource);

//Kryo instantiation
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
// Serialization of object with memoized fields
ByteArrayOutputStream sourceWithMemoizationsOutStream = new ByteArrayOutputStream();
try (Output output = new Output(sourceWithMemoizationsOutStream)) {
kryo.writeObject(output, unboundedSource);
}

ByteArrayInputStream bisWithMemoization =
new ByteArrayInputStream(sourceWithMemoizationsOutStream.toByteArray());
BoundedToUnboundedSourceAdapter<Long> copiedWithMemoization =
kryo.readObject(new Input(bisWithMemoization), BoundedToUnboundedSourceAdapter.class);
Source.Reader<Long> reader =
copiedWithMemoization.createReader(TestPipeline.testingPipelineOptions(), null);
List<Long> readLongs = SourceTestUtils.readFromUnstartedReader(reader);
assertThat(readLongs, hasSize((int) numElements));
List<Long> expectedLongs = new ArrayList<>();
for (int i = 0; i < numElements; i++) {
expectedLongs.add((long) i);
}
assertThat(readLongs, containsInAnyOrder(expectedLongs.toArray()));
}

@Test
public void testCountingSourceToUnboundedCheckpoint() throws Exception {
long numElements = 100;
Expand Down

0 comments on commit 91cc606

Please sign in to comment.