Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8307] NPE in Calcite dialect when input PCollection has logical… #11581

Closed
wants to merge 2 commits into from

Conversation

rahul8383
Copy link
Contributor

@rahul8383 rahul8383 commented Apr 30, 2020

… type in schema, from JdbcIO Transform

When SqlTransform.query() PTransform is used with JdbcIO, where JdbcIO has logical types, an NPE is thrown in BeamSql when converting Beam Schema to Calcite RelDataType.

This PR adds standard logical types with URNs in schemas.logicaltypes package and adds Calcite's RelDataType Mapping for these logical types.

JdbcIO is modified to use these standard logical types to convert JDBC Schema to Beam Schema.

Standard logical types for DATE, TIME, TIME_WITH_TIMEZONE, TIMESTAMP_WITH_TIMEZONE are yet to be handled.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@rahul8383
Copy link
Contributor Author

rahul8383 commented Apr 30, 2020

R: @reuvenlax @amaliujia

@aaltay
Copy link
Member

aaltay commented May 1, 2020

retest this please

@reuvenlax
Copy link
Contributor

I'm wondering if this is a scalable solution.

In general SQL is supposed to be able to handle unknown logical types, and simply treat them as the base type. If you'r seeing a NPE, maybe we need to fix that. Where do you see the NPE?

@rahul8383
Copy link
Contributor Author

@reuvenlax
I have provided the source table schema and attached the NPE that I have faced in BEAM-8307

NPE is thrown as Calcite's RelDataType cannot be found for the JdbcIO Logical Type.

@rahul8383
Copy link
Contributor Author

I agree that this is not a scalable solution. Providing a Calcite RelDataType Mapping for every Logical Type defined(which is the solution presented in this PR) by every IO is not scalable.

Another approach to solving the problem is:
Provide Calcite RelDataType Mapping depending on the Base Type defined in the logical type.
But, as CHAR and VARCHAR logical types have Base Type as STRING, we have to choose a default Calcite RelDataType Mapping. The same thing applies to BINARY and VARBINARY logical types.
We also have to choose a default RelDataType Mapping if the Base Type is DATETIME.
By using this approach, I think we might be missing some built-in Aggregation functions provided by Calcite for specific types.

I was also thinking if we can use the IDENTIFIER of the logical type to determine the corresponding Calcite RelDataType. But, as the IDENTIFIER type is String and not an enum, it cannot be used. For example, all the logical types defined by JdbcIO use java.sql.JDBCType name as the IDENTIFIER.

Please correct me if my understanding is incorrect.

@reuvenlax
Copy link
Contributor

Where do you see the NPE?

@rahul8383
Copy link
Contributor Author

rahul8383 commented May 2, 2020

JdbcIO.Read -> SqlTransform.query(SELECT COUNT(*) FROM PCOLLECTION /Any query/ ) throws NPE if the input PCollection to SqlTransform has JdbcIO specific Logical Types(defined in org.apache.beam.sdk.io.jdbc.LogicalTypes) in its Schema.

Please find the Source Table Schema and the attached exception stack trace in the JIRA ticket: BEAM-8307.

@rahul8383
Copy link
Contributor Author

@amaliujia
Copy link
Contributor

retest this please

@iemejia iemejia requested a review from amaliujia May 6, 2020 23:30
@rahul8383
Copy link
Contributor Author

R: @TheNeuralBit

I found a bug while implementing this feature and raised PR #11609 to fix the bug as I thought that the bug fix could be cherry-picked in 2.21.0 release.

@rahul8383 rahul8383 force-pushed the using_BeamSql_with_JdbcIO branch 2 times, most recently from b9d9353 to 7098248 Compare May 12, 2020 01:55
@iemejia
Copy link
Member

iemejia commented May 12, 2020

retest this please

@rahul8383
Copy link
Contributor Author

The failing test org.apache.beam.runners.fnexecution.control.RemoteExecutionTest.testSplit should be unrelated to this change.

@TheNeuralBit
Copy link
Member

Run SQL PostCommit

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rahul8383! this is looking really good. I just have an ask around testing the logical types in SchemaCoder/RowCoder.

cc: @apilloud @robinyqiu for SQL type changes

} else {
return Arrays.copyOf(base, byteArraySize);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @reuvenlax this PR is removing the ability to convert smaller byte arrays. As noted in #11609 (comment) it seems this logic is inaccessible anyway.

@TheNeuralBit
Copy link
Member

Run SQL PostCommit

@rahul8383
Copy link
Contributor Author

@apilloud @amaliujia @robinyqiu
JdbcIO was using logical types which were specifically defined in JdbcIO package. When SqlTransform.query() was used with JdbcIO, an NPE is thrown as the Calcite's RelDataType Mapping could not be found for the logical types.
As all of these are standard logical types, this PR moved these logical types from JdbcIO package to schema.logicaltypes package so that other IOs which need these logical types can make use of it.
If BeamSql is supported for these standard logical types, BeamSql can be used with all the IOs that use these types.

To close this bug, some functional tests need to be added which uses the standard logical types defined in schema.logicaltypes package. Can you please provide some pointers as to what type of queries can be used to test these types?
Also CalciteUtils.CALCITE_TO_BEAM_TYPE_MAPPING doesn't specify these logical types. Does this mean the conversion of types is lossy? ex: will we lose the information about the maximum length of the String if VariableLengthString logical type is used in BeamSql?

@apilloud
Copy link
Member

Beam SQL should be rejecting unknown logical types. We expect users to put their data into supported logical types before passing it in. This conversion is up to the user to implement and will likely be lossy. It would be significant work to make it do something different.

@TheNeuralBit
Copy link
Member

Beam SQL should be rejecting unknown logical types.

The types that @rahul8383 is making into standard logical types here are well known SQL types. I don't think Beam SQL should consider them unknown.

@TheNeuralBit TheNeuralBit self-requested a review May 21, 2020 22:10
Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for letting this languish @rahul8383 I got busy with the 2.22.0 release and neglected it :(

I have a few more comments now


/** A base class for LogicalTypes that use the same input type as the underlying base type. */
@Experimental(Experimental.Kind.SCHEMAS)
public abstract class IdenticalBaseTAndInputTLogicalType<T> implements Schema.LogicalType<T, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this package-private?

.addField("variableBytes", Schema.FieldType.logicalType(VariableLengthBytes.of(100)))
.addField("fixedString", Schema.FieldType.logicalType(FixedLengthString.of(10)))
.addField("variableString", Schema.FieldType.logicalType(VariableLengthString.of(100)))
.addField("customDecimal", Schema.FieldType.logicalType(LogicalDecimal.of(10, 5)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests that use some or all of these new types in a SQL query? Right now I think this is the only test that exercises the new code in CalciteUtils, and it's not actually verifying the types will work in a SQL statement.

checkArgument(base == null || base.length() == length);
return base;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add constants for these new logical types in SqlTypes?

cc: @robinyqiu


/** A base class for LogicalTypes that use the same input type as the underlying base type. */
@Experimental(Experimental.Kind.SCHEMAS)
public abstract class IdenticalBaseTAndInputTLogicalType<T> implements Schema.LogicalType<T, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this package-private? I think its only used in schemas.logicaltypes right now


/** A LogicalType representing a Decimal type with custom precision and scale. */
@Experimental(Experimental.Kind.SCHEMAS)
public class LogicalDecimal extends IdenticalBaseTAndInputTLogicalType<BigDecimal> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just call this Decimal?

@stale
Copy link

stale bot commented Aug 22, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Aug 22, 2020
@stale
Copy link

stale bot commented Aug 29, 2020

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Aug 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants