-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8866][Table API & SQL] Add support for unified table sink instantiation #6201
Conversation
a7ce69b
to
239aba3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR @suez1224. I had a first look at the change and added some feedback. I might have a second look tomorrow. In general, it would be great if we could split SQL Client and flink-table
changes and get the core feature in first.
One thing that we need to define more clearly is how to map the time attributes from the query to the sink. Proctime can be ignored but how do we map rowtime? We could in theory use information we get from the rowtime descriptor. If it is from-source
we don't have to worry about it, if it is from-field
we should put the rowtime exactly into this field. But I don't know how we handle multiple rowtimes in the future.
What do you think?
|
||
import java.util | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the updated comment again.
@@ -16,21 +16,18 @@ | |||
* limitations under the License. | |||
*/ | |||
|
|||
package org.apache.flink.table.sources | |||
package org.apache.flink.table.connector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use plural connectors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* the current classpath to be found. | ||
*/ | ||
trait TableSourceFactory[T] { | ||
trait TableConnectorFactory[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we could also simplify this and call it TableFactory
. What do you think? We also call CREATE TABLE
not CREATE TABLE CONNECTOR
in SQL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, also, I've updated the DDL design doc to call it TABLE CONNECTOR, which I thin k it is more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@suez1224 Actually, I liked CREATE TABLE
because it is closer to SQL. The reason why I proposed TableFactory
was because the factory does much more than just constructing a connector. It also performs schema validation, format discovery and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 I think the most baffling point I have read until this point was the Table*Connector*Factory
part :-)
* Specify the type of the table connector, check | ||
* [[org.apache.flink.table.descriptors.TableDescriptorValidator]] for all values. | ||
* | ||
* @return the table connector type,. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove comma
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* | ||
* @return the table connector type,. | ||
*/ | ||
def tableType() : String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to getType()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
val properties = mutable.Map[String, String]() | ||
properties.put(TableDescriptorValidator.TABLE_TYPE, | ||
TableDescriptorValidator.TABLE_TYPE_VALUE_SINK) | ||
properties.put(CONNECTOR_TYPE, "test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use strings here for everything (not the variables). This allows tests to fail if we refactor one of the properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, done
new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames, fieldTypes)) | ||
|
||
tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable") | ||
tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need more test cases about how we handle the time attributes for both
table types. Maybe not only ITCases but also unit tests. The configure
method is an internal method that should not be called here.
} | ||
|
||
class UnsafeMemoryTableSource(tableSchema: TableSchema, | ||
returnType: TypeInformation[Row], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually intend differently. Take org.apache.flink.table.codegen.CodeGenerator
as an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private <T> void executeUpdateInternal(ExecutionContext<T> context, String query) { | ||
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); | ||
|
||
envInst.getTableEnvironment().sqlUpdate(query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap it into a try-catch similar to org.apache.flink.table.client.gateway.local.LocalExecutor#createTable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to ship the query config here.
final JobGraph jobGraph = envInst.createJobGraph(jobName); | ||
|
||
// create execution | ||
new Thread(new ProgramDeployer<>(context, jobName, jobGraph, null)).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even a detached job needs to return a result. Otherwise you cannot be sure if the job has been submitted or not. E.g., the cluster might not be reachable. In any case, every created thread should be managed by the result store. So we should have a similar architecture as for queries. Maybe instead of CollectStreamResult
a StatusResult
. Maybe we should do the SQL Client changes in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let me put it into a separate PR. StatusResult make sense to me.
@twalthr , for sink only table, I dont think the user need to define any rowtimes on it, since it will never use as a source. For table as both source and sink, when registering it as sink, I think we only need to take care of the 'from-field' columns, since they map to actual data fields in the table. For proctime and 'from-source' columns, we can just ignore them when building the sink schema. Maybe, we should have some helper method for building the schema for source and sink separately. Please correct me if I missed something here. What do you think? |
@suez1224 Yes sounds good to me. Only We should also think of the opposite of a timestamps extractor (timestamp inserter) for cases where the rowtime needs some preprocessing (like e.g. concatenation of a DATE and TIME column), but we can deal with such cases in a follow up issue. A helper method would be useful. We already have something similar in |
Hi, I think timestamp fields of source-sink tables should be handled as follows when emitting the table:
|
@fhueske @twalthr thanks for the comments. In
Please correct me if I missed something here. What do you think? |
Hi @suez1224, that sounds good overall. :-) A few comments:
What do you think? |
I agree with @fhueske. Let's do the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @suez1224 for the PR. It looks really good!
I think @twalthr also had similar comments: seems like there are some unification that can be made to reduce duplications between source and sink.
I will also try directly patching this on to https://github.com/walterddr/AthenaX/tree/upgrade_1.5 to eliminate the need to create a sink provider.
throw new SqlClientException( | ||
"Invalid table 'type' attribute value, only 'source' is supported"); | ||
"Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing both
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. thanks
"Invalid table 'type' attribute value, only 'source' or 'sink' is supported"); | ||
} | ||
if (this.tables.containsKey(tableName)) { | ||
throw new SqlClientException("Duplicate table name '" + tableName + "'."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if only "source"
and "sink"
is allowed, should we allow the same name but different type. e.g. {"name": "t1", "type": "source"}
and {"name": "t1", "type": "sink"}
co-exist? this is actually following up with the previous comment. I think we just need one, either should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the current implementation allow only source and sink in one table.
registerTableSinkInternal(name, configuredSink) | ||
} | ||
|
||
def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could probably move this to based class TableEnvironment
?
* the current classpath to be found. | ||
*/ | ||
trait TableSourceFactory[T] { | ||
trait TableConnectorFactory[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 I think the most baffling point I have read until this point was the Table*Connector*Factory
part :-)
import org.apache.calcite.schema.Statistic | ||
import org.apache.calcite.schema.impl.AbstractTable | ||
|
||
class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: Option[TableSourceTable[T1]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huge +1. My understanding is this will be the overall class to hold a table source, sink or both. TableSourceSinkTable
seems redundant.
*/ | ||
object TableSourceFactoryService extends Logging { | ||
class TableConnectorFactoryService[T] extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
*/ | ||
object TableSourceFactoryService extends Logging { | ||
class TableConnectorFactoryService[T] extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also just TableFactoryService
?
* Common class for all descriptors describing a table sink. | ||
*/ | ||
abstract class TableSinkDescriptor extends TableDescriptor { | ||
override private[flink] def addProperties(properties: DescriptorProperties): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Should be able to unify
- Consolidate table sink and table source instantiation. - Add support to register a Calcite table with both tableSource and tableSink. - Add Insert command support in SQL client. - Add CsvTableSinkFactory.
4324efd
to
a709787
Compare
…tiate TableSinks This closes apache#6201.
…tiate TableSinks This closes apache#6201.
…tiate TableSinks This closes apache#6201.
…tiate TableSinks This closes apache#6201.
…tiate TableSinks This closes apache#6201.
…tiate TableSinks This closes apache#6201.
(The sections below can be removed for hotfixes of typos)
What is the purpose of the change
Add interfaces to support unified table sink configuration and instantiation. Consolidate table source and table sink configuration and instantiation.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes)Documentation