Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-244] Add JDBC IO #942

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
14f431d
[BEAM-244] Add JDBC IO
jbonofre Sep 5, 2016
4d4be1f
[BEAM-244] Refactore to use first simple composite DoFn instead of so…
jbonofre Sep 13, 2016
cecf39e
[BEAM-244] Directly use JdbcOptions in Read and use JdbcOptions as st…
jbonofre Sep 14, 2016
089ef5c
[BEAM-244] Fix checkstyle
jbonofre Sep 14, 2016
89cf44e
[BEAM-244] Add RowMapper and ElementInserter to allow user customize …
jbonofre Sep 23, 2016
4aed92a
[BEAM-244] Javadoc fixes
jbonofre Sep 23, 2016
d26353f
[BEAM-244] Add batching for write, use preparedStatementSetter
jbonofre Sep 25, 2016
d3227dd
[BEAM-244] Rename query to statement, rename JdbcWriter to WriteFn, u…
jbonofre Sep 26, 2016
23c6e19
[BEAM-244] Minor fix on main comment section
jbonofre Sep 26, 2016
5a8a21e
[BEAM-244] Use preparedStatement batch, simplify test
jbonofre Sep 26, 2016
a2b94e1
[BEAM-244] Introduce DataSourceConfiguration POJO to bootstrap DataSo…
jbonofre Sep 28, 2016
3a8b613
[BEAM-244] Add connection pool configuration properties, use try-on-r…
jbonofre Sep 29, 2016
d7669c4
[BEAM-244] Finishing touches on JdbcIO, using AutoValue, abstracts Da…
jkff Sep 29, 2016
759650f
[BEAM-244] Add connection pool configuration
jbonofre Sep 30, 2016
fd15a44
[BEAM-244] Remove connection pool properties as they don't make lot o…
jbonofre Sep 30, 2016
7f734cf
[BEAM-244] Add Fn to prevent fusion in the Read PTransform
jbonofre Sep 30, 2016
0f07d39
[BEAM-244] Move Random in @Setup and use nextInt in @ProcessElement
jbonofre Sep 30, 2016
ba2c4bf
[BEAM-244] Prevent executeBatch() in WriteFn if the batch is empty
jbonofre Sep 30, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,26 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Random;

import javax.annotation.Nullable;
import javax.sql.DataSource;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
Expand Down Expand Up @@ -193,6 +202,7 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
@Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
@Nullable abstract String getQuery();
@Nullable abstract RowMapper<T> getRowMapper();
@Nullable abstract Coder<T> getCoder();

abstract Builder<T> toBuilder();

Expand All @@ -201,6 +211,7 @@ abstract static class Builder<T> {
abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
abstract Builder<T> setQuery(String query);
abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Read<T> build();
}

Expand All @@ -219,17 +230,44 @@ public Read<T> withRowMapper(RowMapper<T> rowMapper) {
return toBuilder().setRowMapper(rowMapper).build();
}

public Read<T> withCoder(Coder<T> coder) {
checkNotNull(coder, "coder");
return toBuilder().setCoder(coder).build();
}

@Override
public PCollection<T> apply(PBegin input) {
return input
.apply(Create.of(getQuery()))
.apply(ParDo.of(new ReadFn<>(this)));
.apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
// generate a random key followed by a GroupByKey and then ungroup
// to prevent fusion
// see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
// for details
.apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
private int randInt;
@Setup
public void setup() {
Random random = new Random();
randInt = random.nextInt();
Copy link
Contributor

@jkff jkff Sep 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses the same key for the whole bundle, which is equivalent to not doing this at all. The random key should be generated in processElement so that different elements get distributed onto different keys.

Ideally (but not required): can you try running an example pipeline against, e.g., a local spark or flink cluster with a local database and confirm that it speeds up as you add more workers? (I wish we had some infrastructure for performance testing...) I'd suggest a scenario where the database contains a small number of records (e.g. 100), but processing of every record is, e.g., a 10-second sleep. It should take less than 1000 seconds to complete and should scale linearly with number of workers; but it'd be sufficient to test with, e.g., 10 workers and confirm that it takes about 100 seconds (modulo per-worker startup time).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I wanted to do new Random in @setup, but nextInt in @ProcessElement. I'm fixing that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes for the test, I have my dockerized cluster. I will test on it.

}
@ProcessElement
public void processElement(ProcessContext context) {
T record = context.element();
Copy link
Contributor

@jkff jkff Sep 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary to extract all the subexpressions into variables, this can be a one-liner:
context.output(KV.of(random.nextInt(), context.element()));

KV<Integer, T> kvRecord = KV.of(randInt, record);
context.output(kvRecord);
}
}))
.apply(GroupByKey.<Integer, T>create())
.apply(Values.<Iterable<T>>create())
.apply(Flatten.<T>iterables());
}

@Override
public void validate(PBegin input) {
checkNotNull(getQuery(), "query");
checkNotNull(getRowMapper(), "rowMapper");
checkNotNull(getCoder(), "coder");
checkNotNull(getDataSourceConfiguration());
}

Expand All @@ -238,6 +276,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("query", getQuery()));
builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
getDataSourceConfiguration().populateDisplayData(builder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;

import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -151,9 +152,8 @@ public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
KV.of(resultSet.getString("name"), resultSet.getInt("id"));
return kv;
}
}))
.setCoder(KvCoder.of(
StringUtf8Coder.of(), SerializableCoder.of(Integer.class)));
})
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));

PAssert.thatSingleton(
output.apply("Count All", Count.<KV<String, Integer>>globally()))
Expand Down