Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9555]Support table api in scala shell #7121

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions flink-scala-shell/pom.xml
Expand Up @@ -78,6 +78,13 @@ under the License.
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down
Expand Up @@ -23,6 +23,8 @@ import java.io.{BufferedReader, File, FileOutputStream}
import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment, ScalaShellRemoteStreamEnvironment}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.util.AbstractID

import scala.tools.nsc.interpreter._
Expand Down Expand Up @@ -90,10 +92,17 @@ class FlinkILoop(
}

// local environment
val (scalaBenv: ExecutionEnvironment, scalaSenv: StreamExecutionEnvironment) = {
val (
scalaBenv: ExecutionEnvironment,
scalaSenv: StreamExecutionEnvironment,
scalaBTEnv: BatchTableEnvironment,
scalaSTEnv: StreamTableEnvironment
) = {
val scalaBenv = new ExecutionEnvironment(remoteBenv)
val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
(scalaBenv,scalaSenv)
val scalaBTEnv = TableEnvironment.getTableEnvironment(scalaBenv)
val scalaSTEnv = TableEnvironment.getTableEnvironment(scalaSenv)
(scalaBenv,scalaSenv,scalaBTEnv,scalaSTEnv)
}

/**
Expand Down Expand Up @@ -139,7 +148,10 @@ class FlinkILoop(
"org.apache.flink.api.scala._",
"org.apache.flink.api.scala.utils._",
"org.apache.flink.streaming.api.scala._",
"org.apache.flink.streaming.api.windowing.time._"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misunderstood, it should be import initially to support window operations.

"org.apache.flink.streaming.api.windowing.time._",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we should import windowing.time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important to define windows or interval joins in DataStream API.

"org.apache.flink.table.api._",
"org.apache.flink.table.api.scala._",
"org.apache.flink.types.Row"
)

override def createInterpreter(): Unit = {
Expand All @@ -152,6 +164,8 @@ class FlinkILoop(
// set execution environment
intp.bind("benv", this.scalaBenv)
intp.bind("senv", this.scalaSenv)
intp.bind("btenv", this.scalaBTEnv)
intp.bind("stenv", this.scalaSTEnv)
}
}

Expand Down Expand Up @@ -243,22 +257,29 @@ class FlinkILoop(

F L I N K - S C A L A - S H E L L

NOTE: Use the prebound Execution Environments to implement batch or streaming programs.
NOTE: Use the prebound Execution Environments and Table Environment to implement batch or streaming programs.

Batch - Use the 'benv' variable
Batch - Use the 'benv' and 'btenv' variable

* val dataSet = benv.readTextFile("/path/to/data")
* dataSet.writeAsText("/path/to/output")
* benv.execute("My batch program")
*
* val batchTable = btenv.fromDataSet(dataSet)
* btenv.registerTable("tableName", batchTable)
* val result = btenv.sqlQuery("SELECT * FROM tableName").collect
HINT: You can use print() on a DataSet to print the contents and collect()
a sql query result back to the shell.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contents and collect() -> contents or collect() ?


HINT: You can use print() on a DataSet to print the contents to the shell.

Streaming - Use the 'senv' variable
Streaming - Use the 'senv' and 'stenv' variable

* val dataStream = senv.fromElements(1, 2, 3, 4)
* dataStream.countWindowAll(2).sum(0).print()
*
* val streamTable = stenv.fromDataStream(dataStream, 'num)
* val resultTable = streamTable.select('num).where('num % 2 === 1 )
* resultTable.toAppendStream[Row].print()
* senv.execute("My streaming program")

HINT: You can only print a DataStream to the shell in local mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add hint of how to see the table output in scala-shell

"""
// scalastyle:on
Expand Down
Expand Up @@ -168,6 +168,61 @@ class ScalaShellITCase extends TestLogger {
Assert.assertTrue(output.contains("WC(world,10)"))
}

@Test
def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
val input =
"""
|val data = Seq(
| (1, 1L, "Hi"),
| (2, 2L, "Hello"),
| (3, 2L, "Hello world"))
|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 'c).select('a,'c).where(
|'a% 2 === 1 )
|val results = t.toDataSet[Row].collect()
|results.foreach(println)
|:q
""".stripMargin
val output = processInShell(input)
Assert.assertFalse(output.contains("failed"))
Assert.assertFalse(output.contains("error"))
Assert.assertFalse(output.contains("Exception"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to verify the output contains "Hi", "Hello world" but not "Hello".

Assert.assertTrue(output.contains("1,Hi"))
Assert.assertTrue(output.contains("3,Hello world"))
}

@Test
def testGroupedAggregationStreamTableAPIQuery: Unit = {
val input =
"""
| val data = List(
| ("Hello", 1),
| ("word", 1),
| ("Hello", 1),
| ("bark", 1),
| ("bark", 1),
| ("bark", 1),
| ("bark", 1),
| ("bark", 1),
| ("bark", 1),
| ("flink", 1)
| )
| val stream = senv.fromCollection(data)
| val table = stream.toTable(stenv, 'word, 'num)
| val resultTable = table.groupBy('word).select('num.sum as 'count).groupBy('count).select(
| 'count,'count.count as 'frequency)
| val results = resultTable.toRetractStream[Row]
| results.print
| senv.execute
""".stripMargin
val output = processInShell(input)
Assert.assertTrue(output.contains("6,1"))
Assert.assertTrue(output.contains("1,2"))
Assert.assertTrue(output.contains("2,1"))
Assert.assertFalse(output.contains("failed"))
Assert.assertFalse(output.contains("error"))
Assert.assertFalse(output.contains("Exception"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to verify the output contains the correct word count result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

/**
* Submit external library.
* Disabled due to FLINK-7111.
Expand Down
7 changes: 7 additions & 0 deletions flink-scala-shell/start-script/start-scala-shell.sh
Expand Up @@ -52,6 +52,13 @@ bin=`cd "$bin"; pwd`

FLINK_CLASSPATH=`constructFlinkClassPath`

# Append flink-table jar into class path
opt=`dirname "$0"`
opt=`cd ../"$opt"/opt; pwd`
FLINK_TABLE_LIB_PATH=$opt/`ls $opt|grep flink-table_*`
FLINK_CLASSPATH=$FLINK_CLASSPATH:$FLINK_TABLE_LIB_PATH


# https://issues.scala-lang.org/browse/SI-6502, cant load external jars interactively
# in scala shell since 2.10, has to be done at startup
# checks arguments for additional classpath and adds it to the "standard classpath"
Expand Down