Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3640] Add support for SQL in DataSet programs #1862

Closed
wants to merge 3 commits into from

Conversation

vasia
Copy link
Contributor

@vasia vasia commented Apr 7, 2016

This PR adds basic support for batch SQL queries embedded in Table API programs.
In order to run a SQL query, a DataSet or Table needs to be registered in the TableEnvironment and then the query is executed using the sql method:

val tEnv = getScalaTableEnvironment
val t = getDataSet(env).toTable
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT * FROM MyTable"
val result = tEnv.sql(sqlQuery)

The result of the sql method is a Table which can be used in subsequent Table API or SQL queries.

- add EnumerableToLogicalScan rule
- in order to be able to mix TableAPI and SQL, we need our own copy of PlannerImpl
@rmetzger
Copy link
Contributor

rmetzger commented Apr 7, 2016

Awesome! Really cool to see that coming to Flink.
I played a bit around with it and it seems to work ;)

Table table = tableEnv.fromDataSet(input);
tableEnv.registerTable("tab", table);
tableEnv.registerTable("tab1", table);
Table res = tableEnv.sql("SELECT COUNT(tab1.acount) AS acount, tab.word " +
    "FROM tab, tab1 " +
    "WHERE tab.word = tab1.word GROUP BY tab.word");
res = res.filter("acount > 2");
Table res = tableEnv.sql("SELECT COUNT(acount) AS acount, word " +
    "FROM (SELECT * FROM tab WHERE acount = 1 ) " +
    "GROUP BY word");

@@ -83,4 +84,17 @@ class AbstractTableEnvironment {
)
TranslationContext.registerTable(dataSetTable, name)
}

/**
* Execute a SQL query on a batch [[Table]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the description of this method more generic as it will be the entry point for stream SQL queries as well.

import scala.collection.JavaConverters._

@RunWith(classOf[Parameterized])
class AggregationsITCase(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need each test for DataSet and Table? Wouldn't it be sufficient to test for Table and have one or two tests for DataSet?

@fhueske
Copy link
Contributor

fhueske commented Apr 8, 2016

Thanks for the PR. I had a few minor comments but otherwise it looks really good.

There are a few follow up issues, IMO:

  • Check if we somehow can get around the EnumerableToLogicalTableScan. Maybe the Calcite community can help. I will open a JIRA for this once the PR is merged.
  • Check how we can exclude unsupported SQL features such as outer joins, intersection, etc. Also here, the Calcite community should be able to help. I will open a JIRA for this once the PR is merged.
  • Refactor TranslationContext and TableEnvironment to prevent that the same planner is used several times. I'll start a discussion about this soon.

@vasia
Copy link
Contributor Author

vasia commented Apr 9, 2016

Thanks for the review @fhueske. I've addressed your comments :)


// initialize RelBuilder
frameworkConfig = Frameworks
.newConfigBuilder
.defaultSchema(tables)
.parserConfig(parserConfig)
.costFactory(new DataSetCostFactory)
.traitDefs(ConventionTraitDef.INSTANCE)
.programs(Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line can be removed because we set the rules explicitly before calling the optimizer.

@fhueske
Copy link
Contributor

fhueske commented Apr 11, 2016

+1 to merge after resolving one last minor comment.

@vasia
Copy link
Contributor Author

vasia commented Apr 11, 2016

Thanks, I will make the change and merge.

@asfgit asfgit closed this in ed1e52a Apr 11, 2016
fijolekProjects pushed a commit to fijolekProjects/flink that referenced this pull request May 1, 2016
- add EnumerableToLogicalScan rule
- in order to be able to mix TableAPI and SQL, we need our own copy of PlannerImpl
- create a dummy RelNode in the reset() method, in order to retrieve the RelOptPlanner

This closes apache#1862
hequn8128 pushed a commit to hequn8128/flink that referenced this pull request Jun 22, 2017
- add EnumerableToLogicalScan rule
- in order to be able to mix TableAPI and SQL, we need our own copy of PlannerImpl
- create a dummy RelNode in the reset() method, in order to retrieve the RelOptPlanner

This closes apache#1862
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants