Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9555]Support table api in scala shell #7121

Closed
wants to merge 6 commits into from

Conversation

shuiqiangchen
Copy link
Contributor

What is the purpose of the change

make table api available in scala shell so that user can experience table api in interactive way.

Brief change log

  • add flink-table dependency to flink-scala-shell module.
  • The scala shell provides a BatchTableEnvironment and a StreamTableEnvironment initially when starting

Verifying this change

This change is already covered by existing tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no) yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) no
  • The serializers: (yes / no / don't know) no
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) no
  • The S3 file system connector: (yes / no / don't know) no

Documentation

  • Does this pull request introduce a new feature? (yes / no) yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) not documented

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @shuiqiangchen !
In addition to a few comments I left, I have two suggestions as follows:

  1. Add test case for table/sql in ScalaShellITCase.
  2. Add the welcome print about sql/table examples in printWelcome method.
    Best,
    Jincheng

@@ -96,6 +97,12 @@ class FlinkILoop(
(scalaBenv,scalaSenv)
}

val (scalaBTEnv: BatchTableEnvironment, scalaSTEnv: StreamTableEnvironment) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that merge two local environment init logic as follows:

// local environment
  val (
    scalaBenv: ExecutionEnvironment,
    scalaSenv: StreamExecutionEnvironment,
    scalaBTabEnv: BatchTableEnvironment,
    scalaSTabEnv: StreamTableEnvironment) = {

    val scalaBenv = new ExecutionEnvironment(remoteBenv)
    val scalaSenv = new StreamExecutionEnvironment(remoteSenv)
    val scalaBTabEnv = TableEnvironment.getTableEnvironment(scalaBenv)
    val scalaSTabEnv = TableEnvironment.getTableEnvironment(scalaSenv)
    (scalaBenv, scalaSenv, scalaBTabEnv, scalaSTabEnv)
  }```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It seems much more well organized.

@@ -139,7 +146,9 @@ class FlinkILoop(
"org.apache.flink.api.scala._",
"org.apache.flink.api.scala.utils._",
"org.apache.flink.streaming.api.scala._",
"org.apache.flink.streaming.api.windowing.time._"
"org.apache.flink.streaming.api.windowing.time._",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we should import windowing.time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important to define windows or interval joins in DataStream API.

@@ -319,7 +319,6 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that revert this change, because flink-dist is the whole release strategy of flink. At present, we package the flink-table module to the opt directory. The minimum change I suggest is to modify the scala-shell CLASSPATH and include opt/opt/flink-tableXXX.jar in the CLASSPATH, to support tables in scala-shell, What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this change should be reverted but flink-table should remain in /opt. If a user wants to use flink-table (or any connector for that matter) the required jars should be copied manually to /lib.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your valuable suggestions! I have modified the scala shell start script to add the flink-table_*.jar to FLINK_CLASSPATH. In this way, we have no need to change the flink-dist module.


* val dataSet = benv.readTextFile("/path/to/data")
* dataSet.writeAsText("/path/to/output")
* benv.execute("My batch program")

* val batchTable = btenv.sqlQuery("SELECT * FROM tableName")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add example of registering table via benv then query it via btenv.


* val dataStream = senv.fromElements(1, 2, 3, 4)
* dataStream.countWindowAll(2).sum(0).print()
* senv.execute("My streaming program")

* val streamTable = stenv.sqlQuery("SELECT * FROM tableName")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add example of registering table via senv then query it via stenv.


* val dataSet = benv.readTextFile("/path/to/data")
* dataSet.writeAsText("/path/to/output")
* benv.execute("My batch program")

* val batchTable = btenv.sqlQuery("SELECT * FROM tableName")
HINT: You can use print() on a DataSet to print the contents to the shell.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add hint of how to see the table output in scala-shell


* val dataStream = senv.fromElements(1, 2, 3, 4)
* dataStream.countWindowAll(2).sum(0).print()
* senv.execute("My streaming program")

* val streamTable = stenv.sqlQuery("SELECT * FROM tableName")
HINT: You can only print a DataStream to the shell in local mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add hint of how to see the table output in scala-shell

@zjffdu
Copy link
Contributor

zjffdu commented Nov 27, 2018

Thanks @shuiqiangchen , I left a few comments. And agree with @sunjincheng121 , we need to add test

@@ -139,7 +147,8 @@ class FlinkILoop(
"org.apache.flink.api.scala._",
"org.apache.flink.api.scala.utils._",
"org.apache.flink.streaming.api.scala._",
"org.apache.flink.streaming.api.windowing.time._"
"org.apache.flink.table.api._",
"org.apache.flink.table.api.scala._"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also import org.apache.flink.types.Row which is used very frequently I believe ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree +1,I will add it.

@shuiqiangchen
Copy link
Contributor Author

Hi All, Thank you for your attention! I have pushed new codes according to your suggestions. Looking forward to your guidance.

| data.+=((1, 1L, "Hi"))
| data.+=((2, 2L, "Hello"))
| data.+=((3, 2L, "Hello world"))
|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 'c).select("*").where('a % 2 === 1 )
Copy link
Contributor

@zjffdu zjffdu Nov 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MutableList is not necessary here. Use this instead

val data = Seq(
  (1, 1L, "Hi),
  (2, 2L, "Hello"),
  (3, 2L, "Hello world")
)

val output = processInShell(input)
Assert.assertFalse(output.contains("failed"))
Assert.assertFalse(output.contains("error"))
Assert.assertFalse(output.contains("Exception"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to verify the output contains "Hi", "Hello world" but not "Hello".

val output = processInShell(input)
Assert.assertFalse(output.contains("failed"))
Assert.assertFalse(output.contains("error"))
Assert.assertFalse(output.contains("Exception"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to verify the output contains the correct word count result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@twalthr twalthr self-assigned this Nov 30, 2018
@@ -139,7 +148,9 @@ class FlinkILoop(
"org.apache.flink.api.scala._",
"org.apache.flink.api.scala.utils._",
"org.apache.flink.streaming.api.scala._",
"org.apache.flink.streaming.api.windowing.time._"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misunderstood, it should be import initially to support window operations.

@@ -52,6 +52,13 @@ bin=`cd "$bin"; pwd`

FLINK_CLASSPATH=`constructFlinkClassPath`

#adding flink table jar into class path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#adding -> # adding
# Append flink-table jar into class path ?

@zjffdu
Copy link
Contributor

zjffdu commented Dec 4, 2018

@shuiqiangchen Travis is failed, could you check that ? Thanks

@shuiqiangchen
Copy link
Contributor Author

@shuiqiangchen Travis is failed, could you check that ? Thanks

Thanks! It's the matter of checkStyle. I have fixed it.

@shuiqiangchen
Copy link
Contributor Author

Hi all, I have updated some codes following your suggestions. Please review it in your convenience.

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @shuiqiangchen Thanks for the update!
From the points of my view the PR is good to merge after you address the last comment.
+1 to merged
Bests,
Jincheng

* btenv.registerTable("tableName", batchTable)
* val result = btenv.sqlQuery("SELECT * FROM tableName").collect
HINT: You can use print() on a DataSet to print the contents and collect()
a sql query result back to the shell.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contents and collect() -> contents or collect() ?

@sunjincheng121
Copy link
Member

Thanks for the update! @shuiqiangchen
Merging...

sunjincheng121 pushed a commit to sunjincheng121/flink that referenced this pull request Dec 6, 2018
@asfgit asfgit closed this in b3a378a Dec 6, 2018
asfgit pushed a commit that referenced this pull request Dec 6, 2018
@sunjincheng121 sunjincheng121 self-assigned this Dec 6, 2018
zentol pushed a commit to zentol/flink that referenced this pull request Dec 6, 2018
tisonkun pushed a commit to tisonkun/flink that referenced this pull request Jan 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants