Skip to content

Commit

Permalink
Merge f02850c into 628dace
Browse files Browse the repository at this point in the history
  • Loading branch information
ssisk committed Jul 6, 2017
2 parents 628dace + f02850c commit eaba092
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 123 deletions.
6 changes: 6 additions & 0 deletions sdks/java/io/jdbc/pom.xml
Expand Up @@ -168,5 +168,11 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-common</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Expand Up @@ -17,32 +17,37 @@
*/
package org.apache.beam.sdk.io.jdbc;

import java.sql.Connection;
import com.google.auto.value.AutoValue;

import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;

import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.PCollection;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.postgresql.ds.PGSimpleDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -67,112 +72,169 @@
*/
@RunWith(JUnit4.class)
public class JdbcIOIT {
private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class);
private static PGSimpleDataSource dataSource;
private static String writeTableName;
private static String tableName;

@Rule
public TestPipeline pipelineWrite = TestPipeline.create();
@Rule
public TestPipeline pipelineRead = TestPipeline.create();

@BeforeClass
public static void setup() throws SQLException {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
.as(IOTestPipelineOptions.class);

// We do dataSource set up in BeforeClass rather than Before since we don't need to create a new
// dataSource for each test.
dataSource = JdbcTestDataSet.getDataSource(options);

tableName = JdbcTestDataSet.getWriteTableName();
JdbcTestDataSet.createDataTable(dataSource, tableName);
}

@AfterClass
public static void tearDown() throws SQLException {
// Only do write table clean up once for the class since we don't want to clean up after both
// read and write tests, only want to do it once after all the tests are done.
JdbcTestDataSet.cleanUpDataTable(dataSource, writeTableName);
JdbcTestDataSet.cleanUpDataTable(dataSource, tableName);
}

private static class CreateKVOfNameAndId implements JdbcIO.RowMapper<KV<String, Integer>> {
@Override
public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
KV<String, Integer> kv =
KV.of(resultSet.getString("name"), resultSet.getInt("id"));
return kv;
}
/**
* Tests writing then reading data for a postgres database.
*/
@Test
public void testWriteThenRead() {
runWrite();
runRead();
}

private static class PutKeyInColumnOnePutValueInColumnTwo
implements JdbcIO.PreparedStatementSetter<KV<Integer, String>> {
@Override
public void setParameters(KV<Integer, String> element, PreparedStatement statement)
throws SQLException {
statement.setInt(1, element.getKey());
statement.setString(2, element.getValue());
}
}
/**
* Writes the test dataset to postgres.
*
* <p>This test does not attempt to validate the data - we do so in the read test. This does make
* it harder to tell whether a test failed in the write or read phase, but the tests are much
* easier to maintain (don't need any separate code to write test data for read tests to
* the database.)
*/
private void runWrite() {
pipelineWrite.apply(GenerateSequence.from(0).to(JdbcTestDataSet.EXPECTED_ROW_COUNT))
.apply(ParDo.of(new DeterministicallyConstructTestRowFn()))
.apply(JdbcIO.<TestRow>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withStatement(String.format("insert into %s values(?, ?)", tableName))
.withPreparedStatementSetter(new PrepareStatementFromTestRow()));

@Rule
public TestPipeline pipeline = TestPipeline.create();
pipelineWrite.run().waitUntilFinish();
}

/**
* Does a test read of a few rows from a postgres database.
* Read the test dataset from postgres and validate its contents.
*
* <p>Note that IT read tests must not do any data table manipulation (setup/clean up.)
* @throws SQLException
* <p>When doing the validation, we wish to ensure that we both:
* 1. Ensure *all* the rows are correct
* 2. Provide enough information in assertions such that it is easy to spot obvious errors (e.g.
* all elements have a similar mistake, or "only 5 elements were generated" and the user wants
* to see what the problem was.
*
* <p>We do not wish to generate and compare all of the expected values, so this method uses
* hashing to ensure that all expected data is present. However, hashing does not provide easy
* debugging information (failures like "every element was empty string" are hard to see),
* so we also:
* 1. Generate expected values for the first and last 500 rows
* 2. Use containsInAnyOrder to verify that their values are correct.
* Where first/last 500 rows is determined by the fact that we know all rows have a unique id - we
* can use the natural ordering of that key.
*/
@Test
public void testRead() throws SQLException {
String writeTableName = JdbcTestDataSet.READ_TABLE_NAME;

PCollection<KV<String, Integer>> output = pipeline.apply(JdbcIO.<KV<String, Integer>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery("select name,id from " + writeTableName)
.withRowMapper(new CreateKVOfNameAndId())
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
private void runRead() {
PCollection<TestRow> namesAndIds =
pipelineRead.apply(JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery(String.format("select name,id from %s;", tableName))
.withRowMapper(new CreateTestRowOfNameAndId())
.withCoder(SerializableCoder.of(TestRow.class)));

// TODO: validate actual contents of rows, not just count.
PAssert.thatSingleton(
output.apply("Count All", Count.<KV<String, Integer>>globally()))
.isEqualTo(1000L);
namesAndIds.apply("Count All", Count.<TestRow>globally()))
.isEqualTo(JdbcTestDataSet.EXPECTED_ROW_COUNT);

PCollection<String> consolidatedHashcode = namesAndIds
.apply(ParDo.of(new SelectNameFn()))
.apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode).containsInAnyOrder(JdbcTestDataSet.EXPECTED_HASH_CODE);

PCollection<List<TestRow>> frontOfList =
namesAndIds.apply(Top.<TestRow>smallest(500));
Iterable<TestRow> expectedFrontOfList = getExpectedValues(0, 500);
PAssert.thatSingletonIterable(frontOfList).containsInAnyOrder(expectedFrontOfList);

List<KV<String, Long>> expectedCounts = new ArrayList<>();
for (String scientist : JdbcTestDataSet.SCIENTISTS) {
expectedCounts.add(KV.of(scientist, 100L));

PCollection<List<TestRow>> backOfList =
namesAndIds.apply(Top.<TestRow>largest(500));
Iterable<TestRow> expectedBackOfList =
getExpectedValues((int) (JdbcTestDataSet.EXPECTED_ROW_COUNT - 500),
(int) JdbcTestDataSet.EXPECTED_ROW_COUNT);
PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList);

pipelineRead.run().waitUntilFinish();
}

@AutoValue
abstract static class TestRow implements Serializable, Comparable<TestRow> {
static TestRow create(Integer id, String name) {
return new AutoValue_JdbcIOIT_TestRow(id, name);
}
PAssert.that(output.apply("Count Scientist", Count.<String, Integer>perKey()))
.containsInAnyOrder(expectedCounts);

pipeline.run().waitUntilFinish();
abstract Integer id();
abstract String name();

public int compareTo(TestRow other) {
return id().compareTo(other.id());
}
}

/**
* Tests writes to a postgres database.
*
* <p>Write Tests must clean up their data - in this case, it uses a new table every test run so
* that it won't interfere with read tests/other write tests. It uses finally to attempt to
* clean up data at the end of the test run.
* @throws SQLException
*/
@Test
public void testWrite() throws SQLException {
writeTableName = JdbcTestDataSet.createWriteDataTable(dataSource);
private static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper<TestRow> {
@Override
public TestRow mapRow(ResultSet resultSet) throws Exception {
return TestRow.create(
resultSet.getInt("id"), resultSet.getString("name"));
}
}

ArrayList<KV<Integer, String>> data = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
KV<Integer, String> kv = KV.of(i, "Test");
data.add(kv);
private static class PrepareStatementFromTestRow
implements JdbcIO.PreparedStatementSetter<TestRow> {
@Override
public void setParameters(TestRow element, PreparedStatement statement)
throws SQLException {
statement.setLong(1, element.id());
statement.setString(2, element.name());
}
pipeline.apply(Create.of(data))
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withStatement(String.format("insert into %s values(?, ?)", writeTableName))
.withPreparedStatementSetter(new PutKeyInColumnOnePutValueInColumnTwo()));
}

private static TestRow expectedValueTestRow(Long seed) {
return TestRow.create(seed.intValue(), "Testval" + seed);
}

private Iterable<TestRow> getExpectedValues(int rangeStart, int rangeEnd) {
List<TestRow> ret = new ArrayList<TestRow>(rangeEnd - rangeStart + 1);
for (int i = rangeStart; i < rangeEnd; i++) {
ret.add(expectedValueTestRow((long) i));
}
return ret;
}

pipeline.run().waitUntilFinish();
/**
* Given a Long as a seed value, constructs a test data row used by the IT for testing writes.
*/
private static class DeterministicallyConstructTestRowFn extends DoFn<Long, TestRow> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(expectedValueTestRow(c.element()));
}
}

try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select count(*) from " + writeTableName)) {
resultSet.next();
int count = resultSet.getInt(1);
Assert.assertEquals(2000, count);
private static class SelectNameFn extends DoFn<TestRow, String> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().name());
}
// TODO: Actually verify contents of the rows.
}
}
Expand Up @@ -109,7 +109,7 @@ public static void startDatabase() throws Exception {
dataSource.setPortNumber(port);


JdbcTestDataSet.createReadDataTable(dataSource);
JdbcTestDataSet.createReadDataTableAndAddInitialData(dataSource);
}

@AfterClass
Expand Down Expand Up @@ -197,15 +197,17 @@ public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {

PAssert.thatSingleton(
output.apply("Count All", Count.<KV<String, Integer>>globally()))
.isEqualTo(1000L);
.isEqualTo(JdbcTestDataSet.EXPECTED_ROW_COUNT);

PAssert.that(output
.apply("Count Scientist", Count.<String, Integer>perKey())
).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
@Override
public Void apply(Iterable<KV<String, Long>> input) {
for (KV<String, Long> element : input) {
assertEquals(element.getKey(), 100L, element.getValue().longValue());
assertEquals(element.getKey(),
JdbcTestDataSet.EXPECTED_ROW_COUNT / JdbcTestDataSet.SCIENTISTS.length,
element.getValue().longValue());
}
return null;
}
Expand Down Expand Up @@ -242,19 +244,20 @@ public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {

PAssert.thatSingleton(
output.apply("Count One Scientist", Count.<KV<String, Integer>>globally()))
.isEqualTo(100L);
.isEqualTo(JdbcTestDataSet.EXPECTED_ROW_COUNT / JdbcTestDataSet.SCIENTISTS.length);

pipeline.run();
}

@Test
@Category(NeedsRunner.class)
public void testWrite() throws Exception {
final long rowsToAdd = 1000L;

String tableName = JdbcTestDataSet.createWriteDataTable(dataSource);
String tableName = JdbcTestDataSet.createWriteDataTableAndAddInitialData(dataSource);
try {
ArrayList<KV<Integer, String>> data = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < rowsToAdd; i++) {
KV<Integer, String> kv = KV.of(i, "Test");
data.add(kv);
}
Expand Down Expand Up @@ -282,7 +285,7 @@ public void setParameters(
resultSet.next();
int count = resultSet.getInt(1);

Assert.assertEquals(2000, count);
Assert.assertEquals(rowsToAdd + JdbcTestDataSet.EXPECTED_ROW_COUNT, count);
}
}
}
Expand Down

0 comments on commit eaba092

Please sign in to comment.