Skip to content

Conversation

@godfreyhe
Copy link
Contributor

What is the purpose of the change

Currently, DDL + table api (with window) does not work, because the source table TableSchema (from TableEnvironment#scan/from method) does not convert the watermark spec to rowtime attribute. This pr aims to fix that

Brief change log

  • convert watermark spec to rowtime attributes when executing TableEnvironment#scan/from

Verifying this change

This change added tests and can be verified as follows:

  • Extended TableSourceTest for validation the fix

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@godfreyhe
Copy link
Contributor Author

cc @wuchong

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 8f86268 (Tue May 26 09:24:07 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented May 26, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR. It is a really important fix.

Nevertheless I have some concerns if it is the right place to apply that transformation.

return catalogManager.getTable(tableIdentifier)
.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
return catalogManager.getTable(tableIdentifier).map(t -> {
CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this resolution be part of the TableSchema itself? IMO when a watermarkSpec is applied to a TableSchema it should adjust the type of the rowtime attribute field. I think it's too late in the TableEnvironment to apply that transformation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fix the rowtime type in TableSchema based on WatermarkSpec, but we can‘t fix the proctime type, unless we just use string compare(proctime()). As discussed in #12260, we should use SqlExprToRexConverter to get the correct type. For table api, there is the entry point to create a flink Table, and for sql, CatalogSchemaTable is the entry point to create a calcite Table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at #12260 and my feeling is even stronger that fix it everywhere but not where it actually should be fixed. IMO if we fixed it when the Table is created and have the proper types from the very beginning we would not need to patch it everywhere.

I have the feeling that the problem here is that the type we store in the catalog is wrong. I looked into it and my take is that the TimestampType#kind should also be stored in the catalog and this would fix most of the problems. Right now we are basically stripping that information when storing it in catalog. I am not 100% sure if we should store the Time attribute property, but I think it would fix imo hacks in this PR and #12260. I'd like to know what @twalthr thinks about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a long discussion in FLINK-17189 and had a conclusion that the TimestampType#kind shoudn't be stored in catalog. IMO, there are several reasons:

  1. the metadata stored in catalog should simply store the information of DDL. In other words, we should only store the pure DDL into catalog.
  2. the result datatype of computed column is not trusted, they may changed across versions, including PROCTIME() and user-defined functions. There are some users reported that the result type of PROCTIME() should be TIMESTAMP WITH LOCAL TIME ZONE, not TIMESTAMP WITHOUT TIME ZONE. Users may also change their UDF implementation. If we just use the result type stored in catalog, there of course will be an exception during code generation or runtime, because of type mismatch.
  3. I don't want to treat proctime attribute and rowtime attribute specially when storing them in catalog. They are not different than other computed columns and regular columns.

Thus, we have to resolve the result type of computed columns again, when we get the CatalogTable from catalog. Actually, the initial CatalogTable get from catalog is a unresovled table (like UnresolvedExpression). That's why we need a CatalogTableShemaResolver to resolve CatalogTableSchema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am 90% on the same page with the resolution. The 10 remaining percent still think that the stored type should be correct, but let's leave it out for now and assume I agree with the additional resolution.

I still think we can place the resolution at a better location to reuse it between Table API & SQL. Would it work if we put it in the CatalogManager#getTable? That way it would be adjusted both for the SQL & Table API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end my reasoning is that this logic should be pushed as close to the catalog as possible. If we cannot push it all the way to the creation of the table, maybe at least we can push it into the single place when we look up that table.

Copy link
Member

@wuchong wuchong Jun 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting it in CatalogManager#getTable sounds good to me. We thought to have a single util to resolve the schema. But I think you have a better idea to resolve the schema before CatalogTable is exposed out of CatalogManager, this can avoid forgetting to resolve the shema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I'm also agree we move the resolution logic into CatalogManager#getTable as a unified place for Table API and SQL.

TimestampKind.PROCTIME,
originalType.getPrecision());
fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to differentiate the streaming vs non-streaming mode? IMO we shouldn't do it. The kind should be just an extra metainformation in the schema that can be ignored in batch mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we need this. because LegacyCatalogSourceTable will erase time indicator types and create a watermark node if isStreamingMode is true and watermarkSpec is not empty. (see LegacyCatalogSourceTable) otherwise we will get type mismatch error for batch mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we do what we are doing here with the flag in the LegacyCatalogSourceTable? My reasoning is that we should push the isStreamingMode to the planner as much as possible, so that it can make such decisions based on properties of the source.

Making that distinctions in the API moves us away from that goal binding the API to a particular mode. IMO in the API there should be in the end no distinction between stream and batch.

Copy link
Contributor Author

@godfreyhe godfreyhe Jun 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that we should push the isStreamingMode to the planner as much as possible. Many classes have isStreamingMode flag now. I'm also try to remove the isStreamingMode from CatalogTableSchemaResolver, but I find we have to handle "erase the rowtime type logic for batch" at least three places (TableEnvironmentImpl#scanInternal for table api, CatalogSchemaTable#getRowType for CatalogTable, DatabaseCalciteSchema#getTable for QueryOperationCatalogView). Because we should make sure the type of Table from catalog and the type of RelNode expanded from Table (e.g. add projection node for computed column, add watermark assigner node for watermark, expand different kinds of view) are the same. For a long term, I think we should also keep the rowtime type for batch (e.g. support rowtime temporal join), and then we can remove isStreamingMode from CatalogTableSchemaResolver, and many other related logic can be simplified. (It's too complex to handle different kind of tables or views for both batch and streaming)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is also my thinking that a Table, including schema, should be the same in both cases, and possibly handled differently in the planner.

Do you think we could create a JIRA ticket to somehow track this effort? If we cannot remove it currently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a JIRA to track this https://issues.apache.org/jira/browse/FLINK-18180

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I put some suggestions how we could improve it a little bit.

}
}

public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver schemaResolver) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we pass it in the ctor? It is not an optional mutable parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also want to pass it in the ctor, but Parser is constructed in Planner, while Planner needs CatalogManager to construct. we can't get Parser instance when creating CatalogManager. see TableEnvironmentImpl#create.

A workaround approach is: using AtomicReference to hold the parser instance, code looks like:

public static TableEnvironmentImpl create(EnvironmentSettings settings) {

		// temporary solution until FLINK-15635 is fixed
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

		TableConfig tableConfig = new TableConfig();

		ModuleManager moduleManager = new ModuleManager();

                 // create a parser reference first 
		AtomicReference<Parser> parserRef = new AtomicReference<>();
		CatalogManager catalogManager = CatalogManager.newBuilder()
			.classLoader(classLoader)
			.config(tableConfig.getConfiguration())
			.defaultCatalog(
				settings.getBuiltInCatalogName(),
				new GenericInMemoryCatalog(
					settings.getBuiltInCatalogName(),
					settings.getBuiltInDatabaseName()))
                          // set CatalogTableSchemaResolver Supplier instead of CatalogTableSchemaResolver
			.schemaResolverSupplier(() -> new CatalogTableSchemaResolver(parserRef.get(), settings.isStreamingMode()))
			.build();

		FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);

		Map<String, String> executorProperties = settings.toExecutorProperties();
		Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
			.create(executorProperties);

		Map<String, String> plannerProperties = settings.toPlannerProperties();
		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
			.create(
				plannerProperties,
				executor,
				tableConfig,
				functionCatalog,
				catalogManager);
                // set parser reference
		parserRef.set(planner.getParser());

		return new TableEnvironmentImpl(
			catalogManager,
			moduleManager,
			tableConfig,
			executor,
			functionCatalog,
			planner,
			settings.isStreamingMode()
		);
	}

Users are very confused about how to create StreamTableEnvironmentImpl (I know some users use StreamTableEnvironmentImpl's ctor to create a StreamTableEnvironment instead of use create method)

Copy link
Contributor

@dawidwys dawidwys Jun 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argh, I missed that dependency. I can't think of a better solution now. Let's then go with the setter approach. Could you add a short comment on the setter why do we have it instead of passing it in ctor?

I think the first paragraph from your answer would work:

/**
* We do not pass it in the ctor, because we need a {@link Parser} that is constructed in a
* {@link Planner}.  At the same time {@link Planner} needs a {@link CatalogManager} to
* be constructed. Thus we can't get {@link Parser} instance when creating a {@link 
* CatalogManager}. See {@link TableEnvironmentImpl#create}.
*/

UnresolvedIdentifier parseIdentifier(String identifier);

/**
* Entry point for parse sql expression expressed as a String.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Entry point for parse sql expression expressed as a String.
* Entry point for parsing SQL expressions expressed as a String.

.map(LogicalTypeDataTypeConverter::toLogicalType)
.collect(Collectors.toList());
RelDataType inputType = typeFactory.buildRelNodeRowType(
JavaScalaConversionUtil$.MODULE$.toScala(fieldNames),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rather add a method to the FlinkTypeFactory that could work with Java? This hack really does not look nice.

List<LogicalType> fieldTypes = Arrays.stream(inputSchema.getFieldDataTypes())
.map(LogicalTypeDataTypeConverter::toLogicalType)
.collect(Collectors.toList());
RelDataType inputType = typeFactory.buildRelNodeRowType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change this method so that we do not need the typeFactory? Could we e.g. instead of Supplier<SqlExprToRexConverterFactory> pass a Function<TableSchema, SqlExprToRexConverter?

I'd prefer to limit the number of different cross dependencies unless strictly necessary.

FlinkTypeFactory typeFactory = typeFactorySupplier.get();
List<String> fieldNames = Arrays.asList(inputSchema.getFieldNames());
List<LogicalType> fieldTypes = Arrays.stream(inputSchema.getFieldDataTypes())
.map(LogicalTypeDataTypeConverter::toLogicalType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not DataType::getLogicalType? Why do we even need the LogicalTypeDataTypeConverter::toLogicalType?

def testProcTimeTableSourceSimple(): Unit = {
def testProctimeOnWatermarkSpec(): Unit = {
thrown.expect(classOf[ValidationException])
thrown.expectMessage("proctime can't be defined on watermark spec.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the message be rather: Watermark can not be defined for a processing time attribute column?

@Override
public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
// do not support for old planner
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets throw an exception instead.

TimestampKind.PROCTIME,
originalType.getPrecision());
fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is also my thinking that a Table, including schema, should be the same in both cases, and possibly handled differently in the planner.

Do you think we could create a JIRA ticket to somehow track this effort? If we cannot remove it currently?

public TableSchema resolve(TableSchema tableSchema) {
final String rowtime;
if (!tableSchema.getWatermarkSpecs().isEmpty()) {
// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment for this issue/PR, but rather for the WatermarkSpec, but this is very error prone imo to use a single string for a rowtime attribute if it is supposed to handle nested columns.

Just as an example. This will break if the dot was escaped.

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, let's wait for a green build


/**
* Entry point for parse sql expression expressed as a String.
* Entry point for parsing sql expression expressed as a String.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Entry point for parsing sql expression expressed as a String.
* Entry point for parsing SQL expressions expressed as a String.

dawidwys pushed a commit to dawidwys/flink that referenced this pull request Jun 9, 2020
dawidwys pushed a commit to dawidwys/flink that referenced this pull request Jun 9, 2020
@dawidwys dawidwys closed this in 8a8d8a9 Jun 9, 2020
dawidwys pushed a commit to dawidwys/flink that referenced this pull request Jun 9, 2020
dawidwys pushed a commit to dawidwys/flink that referenced this pull request Jun 9, 2020
dawidwys pushed a commit that referenced this pull request Jun 9, 2020
zhangjun0x01 pushed a commit to zhangjun0x01/flink that referenced this pull request Jul 8, 2020
@godfreyhe godfreyhe deleted the FLINK-17753 branch October 28, 2020 07:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants