From f1aa390d1989543f2848dae6b26596ffd1a5d8db Mon Sep 17 00:00:00 2001 From: mingmxu Date: Thu, 20 Jul 2017 14:32:42 -0700 Subject: [PATCH] remove README.md and update usages in BeamSqlExample --- dsls/sql/README.md | 24 ------------------- .../beam/dsls/sql/example/BeamSqlExample.java | 23 ++++++++++-------- 2 files changed, 13 insertions(+), 34 deletions(-) delete mode 100644 dsls/sql/README.md diff --git a/dsls/sql/README.md b/dsls/sql/README.md deleted file mode 100644 index ae9e0f3f6fd5..000000000000 --- a/dsls/sql/README.md +++ /dev/null @@ -1,24 +0,0 @@ - - -# Beam SQL - -Beam SQL provides a new interface, to execute a SQL query as a Beam pipeline. - -*It's working in progress...* diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 91df2bebecc1..4e364e1dd0ad 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -34,16 +34,19 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is a quick example, which uses Beam SQL DSL to create a data pipeline. * + *

Run the example with + *

+ * mvn -pl dsls/sql compile exec:java \
+ *  -Dexec.mainClass=org.apache.beam.dsls.sql.example.BeamSqlExample \
+ *   -Dexec.args="--runner=DirectRunner" -Pdirect-runner
+ * 
+ * */ class BeamSqlExample { - private static final Logger LOG = LoggerFactory.getLogger(BeamSqlExample.class); - public static void main(String[] args) throws Exception { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); Pipeline p = Pipeline.create(options); @@ -63,9 +66,9 @@ public static void main(String[] args) throws Exception { //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; PCollection outputStream = inputTable.apply( - BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1")); + BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1")); - //log out the output record; + //print the output record of case 1; outputStream.apply("log_result", MapElements.via(new SimpleFunction() { public Void apply(BeamSqlRow input) { @@ -74,12 +77,12 @@ public Void apply(BeamSqlRow input) { } })); - //Case 2. run the query with BeamSql.query + //Case 2. run the query with BeamSql.query over result PCollection of case 1. PCollection outputStream2 = - PCollectionTuple.of(new TupleTag("TABLE_B"), inputTable) - .apply(BeamSql.query("select c2, c3 from TABLE_B where c1=1")); + PCollectionTuple.of(new TupleTag("CASE1_RESULT"), outputStream) + .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1")); - //log out the output record; + //print the output record of case 2; outputStream2.apply("log_result", MapElements.via(new SimpleFunction() { @Override