Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@
*
* <p>Run the example with
* <pre>
* mvn -pl dsls/sql compile exec:java \
* -Dexec.mainClass=BeamSqlExample \
* mvn -pl sdks/java/extensions/sql \
* compile exec:java -Dexec.mainClass=org.apache.beam.sdk.extensions.sql.example.BeamSqlExample \
* -Dexec.args="--runner=DirectRunner" -Pdirect-runner
* </pre>
*
*/
class BeamSqlExample {
public static void main(String[] args) throws Exception {
Expand All @@ -54,36 +53,43 @@ public static void main(String[] args) throws Exception {
List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
BeamRecord row1 = new BeamRecord(type, Arrays.<Object>asList(1, "row", 1.0));
BeamRecord row2 = new BeamRecord(type, Arrays.<Object>asList(2, "row", 2.0));
BeamRecord row3 = new BeamRecord(type, Arrays.<Object>asList(3, "row", 3.0));

//create a source PCollection with Create.of();
PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row1, row2, row3)
.withCoder(type.getRecordCoder()));

//Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
PCollection<BeamRecord> outputStream = inputTable.apply(
BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1 > 1"));

//print the output record of case 1;
outputStream.apply("log_result",
MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
public Void apply(BeamRecord input) {
System.out.println("PCOLLECTION: " + input);
//expect output:
// PCOLLECTION: [3, row, 3.0]
// PCOLLECTION: [2, row, 2.0]
System.out.println("PCOLLECTION: " + input.getDataValues());
return null;
}
}));

//Case 2. run the query with BeamSql.query over result PCollection of case 1.
PCollection<BeamRecord> outputStream2 =
PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
.apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
.apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by c2"));

//print the output record of case 2;
outputStream2.apply("log_result",
MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
@Override
public Void apply(BeamRecord input) {
System.out.println("TABLE_B: " + input);
//expect output:
// CASE1_RESULT: [row, 5.0]
System.out.println("CASE1_RESULT: " + input.getDataValues());
return null;
}
}));
Expand Down