From 3ae49a0fedca8926a7a99df06403f65d827f1f8e Mon Sep 17 00:00:00 2001 From: mingmxu Date: Wed, 9 Aug 2017 11:07:31 -0700 Subject: [PATCH] [rebased] update example. --- .../sql/example/BeamSqlExample.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java index 3a46accfe875..73cd534633d4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java @@ -39,11 +39,10 @@ * *

Run the example with *

- * mvn -pl dsls/sql compile exec:java \
- *  -Dexec.mainClass=BeamSqlExample \
+ * mvn -pl sdks/java/extensions/sql \
+ *   compile exec:java -Dexec.mainClass=org.apache.beam.sdk.extensions.sql.example.BeamSqlExample \
  *   -Dexec.args="--runner=DirectRunner" -Pdirect-runner
  * 
- * */ class BeamSqlExample { public static void main(String[] args) throws Exception { @@ -54,21 +53,26 @@ public static void main(String[] args) throws Exception { List fieldNames = Arrays.asList("c1", "c2", "c3"); List fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes); - BeamRecord row = new BeamRecord(type, 1, "row", 1.0); + BeamRecord row1 = new BeamRecord(type, Arrays.asList(1, "row", 1.0)); + BeamRecord row2 = new BeamRecord(type, Arrays.asList(2, "row", 2.0)); + BeamRecord row3 = new BeamRecord(type, Arrays.asList(3, "row", 3.0)); //create a source PCollection with Create.of(); - PCollection inputTable = PBegin.in(p).apply(Create.of(row) + PCollection inputTable = PBegin.in(p).apply(Create.of(row1, row2, row3) .withCoder(type.getRecordCoder())); //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; PCollection outputStream = inputTable.apply( - BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1")); + BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1 > 1")); //print the output record of case 1; outputStream.apply("log_result", MapElements.via(new SimpleFunction() { public Void apply(BeamRecord input) { - System.out.println("PCOLLECTION: " + input); + //expect output: + // PCOLLECTION: [3, row, 3.0] + // PCOLLECTION: [2, row, 2.0] + System.out.println("PCOLLECTION: " + input.getDataValues()); return null; } })); @@ -76,14 +80,16 @@ public Void apply(BeamRecord input) { //Case 2. run the query with BeamSql.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag("CASE1_RESULT"), outputStream) - .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1")); + .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by c2")); //print the output record of case 2; outputStream2.apply("log_result", MapElements.via(new SimpleFunction() { @Override public Void apply(BeamRecord input) { - System.out.println("TABLE_B: " + input); + //expect output: + // CASE1_RESULT: [row, 5.0] + System.out.println("CASE1_RESULT: " + input.getDataValues()); return null; } }));