From 8b3c468adb574f229c6d24da58d4e52c4a025cec Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Thu, 21 Jul 2016 00:31:01 +0800 Subject: [PATCH 1/3] [FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples --- .../flink/examples/java/JavaSQLExample.java | 70 +++++++++++++++++++ .../examples/scala/StreamSQLExample.scala | 60 ++++++++++++++++ .../examples/scala/StreamTableExample.scala | 56 +++++++++++++++ .../flink/examples/scala/WordCountSQL.scala | 43 ++++++++++++ 4 files changed, 229 insertions(+) create mode 100644 flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java new file mode 100644 index 0000000000000..a6844d7e7b993 --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.java; + +import org.apache.flink.api.table.Table; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.BatchTableEnvironment; +import org.apache.flink.api.table.TableEnvironment; + +/** + * Simple example that shows how the Batch SQL used in Java. + */ +public class JavaSQLExample { + + public static class WC { + public String word; + public long frequence; + + // Public constructor to make it a Flink POJO + public WC() { + + } + + public WC(String word, long frequence) { + this.word = word; + this.frequence = frequence; + } + + @Override + public String toString() { + return "WC " + word + " " + frequence; + } + } + + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + + DataSet input = env.fromElements( + new WC("Hello", 1), + new WC("Ciao", 1), + new WC("Hello", 1)); + + // register the DataSet as table "WordCount" + tableEnv.registerDataSet("WordCount", input, "word, frequence"); + // run a SQL query on the Table and retrieve the result as a new Table + Table table = tableEnv.sql( + "SELECT word, SUM(frequence) as frequence FROM WordCount GROUP BY word"); + + DataSet result = tableEnv.toDataSet(table, WC.class); + + result.print(); + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala new file mode 100644 index 0000000000000..fa6264808f0ee --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Simple example for demonstrating the use of SQL on Stream Table. + */ +object StreamSQLExample { + + case class Order(user: Long, product: String, amount: Int) + + def main(args: Array[String]): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val orderA: DataStream[Order] = env.fromCollection(Seq( + Order(1L, "beer", 3), + Order(1L, "diaper", 4), + Order(3L, "rubber", 2))) + + val orderB: DataStream[Order] = env.fromCollection(Seq( + Order(2L, "pen", 3), + Order(2L, "rubber", 3), + Order(4L, "beer", 1))) + + // register the DataStream under the name "OrderA" and "OrderB" + tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount) + tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount) + + // Union two tables + val result = tEnv.sql( + "SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " + + "SELECT STREAM * FROM OrderB WHERE amount < 2") + + result.toDataStream[Order].print() + + env.execute() + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala new file mode 100644 index 0000000000000..812ed1f5caf82 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Simple example for demonstrating the use of Table API on Stream Table. + */ +object StreamTableExample { + + case class Order(user: Long, product: String, amount: Int) + + def main(args: Array[String]): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val orderA = env.fromCollection(Seq( + Order(1L, "beer", 3), + Order(1L, "diaper", 4), + Order(3L, "rubber", 2))).toTable(tEnv) + + val orderB = env.fromCollection(Seq( + Order(2L, "pen", 3), + Order(2L, "rubber", 3), + Order(4L, "beer", 1))).toTable(tEnv) + + val result: DataStream[Order] = orderA.unionAll(orderB) + .select('user, 'product, 'amount) + .where('amount > 2) + .toDataStream[Order] + + result.print() + + env.execute() + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala new file mode 100644 index 0000000000000..e969fb41872f4 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment + +/** + * Simple example that shows how the Batch SQL used in Scala. + */ +object WordCountSQL { + case class WC(word: String, count: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + tEnv.registerDataSet("WordCount", input, 'word, 'frequence) + + val table = tEnv.sql("SELECT word, SUM(frequence) FROM WordCount GROUP BY word") + + table.toDataSet[WC].print() + } +} From 8fabe7deb70f41b5075e52a869b969879d06e7b5 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Sun, 24 Jul 2016 16:19:32 +0800 Subject: [PATCH 2/3] fix typo --- .../apache/flink/examples/java/JavaSQLExample.java | 12 ++++++------ .../apache/flink/examples/scala/WordCountSQL.scala | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java index a6844d7e7b993..418a3de81945d 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java @@ -30,21 +30,21 @@ public class JavaSQLExample { public static class WC { public String word; - public long frequence; + public long frequency; // Public constructor to make it a Flink POJO public WC() { } - public WC(String word, long frequence) { + public WC(String word, long frequency) { this.word = word; - this.frequence = frequence; + this.frequency = frequency; } @Override public String toString() { - return "WC " + word + " " + frequence; + return "WC " + word + " " + frequency; } } @@ -58,10 +58,10 @@ public static void main(String[] args) throws Exception { new WC("Hello", 1)); // register the DataSet as table "WordCount" - tableEnv.registerDataSet("WordCount", input, "word, frequence"); + tableEnv.registerDataSet("WordCount", input, "word, frequency"); // run a SQL query on the Table and retrieve the result as a new Table Table table = tableEnv.sql( - "SELECT word, SUM(frequence) as frequence FROM WordCount GROUP BY word"); + "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); DataSet result = tableEnv.toDataSet(table, WC.class); diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala index e969fb41872f4..41efffc9ec633 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala @@ -34,9 +34,9 @@ object WordCountSQL { val tEnv = TableEnvironment.getTableEnvironment(env) val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) - tEnv.registerDataSet("WordCount", input, 'word, 'frequence) + tEnv.registerDataSet("WordCount", input, 'word, 'frequency) - val table = tEnv.sql("SELECT word, SUM(frequence) FROM WordCount GROUP BY word") + val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") table.toDataSet[WC].print() } From 2ad61e6b62dcc24f3cf920e84b1a13ac69385b10 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Mon, 25 Jul 2016 09:30:00 +0800 Subject: [PATCH 3/3] address comment --- .../java/org/apache/flink/examples/java/JavaSQLExample.java | 2 ++ .../org/apache/flink/examples/scala/StreamSQLExample.scala | 2 ++ .../org/apache/flink/examples/scala/StreamTableExample.scala | 2 ++ 3 files changed, 6 insertions(+) diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java index 418a3de81945d..bbac94ad00ee1 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java @@ -49,6 +49,8 @@ public String toString() { } public static void main(String[] args) throws Exception { + + // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala index fa6264808f0ee..8eed77d17e282 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala @@ -30,6 +30,8 @@ object StreamSQLExample { case class Order(user: Long, product: String, amount: Int) def main(args: Array[String]): Unit = { + + // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala index 812ed1f5caf82..9081f508506c9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala @@ -30,6 +30,8 @@ object StreamTableExample { case class Order(user: Long, product: String, amount: Int) def main(args: Array[String]): Unit = { + + // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env)