-
Notifications
You must be signed in to change notification settings - Fork 470
[Lake/Paimon] Create datalake enabled table should also create in lake #640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ad3e4a7 to
b768922
Compare
| <useTransitiveDependencies>true</useTransitiveDependencies> | ||
| <useTransitiveFiltering>true</useTransitiveFiltering> | ||
| <includes> | ||
| <include>org.apache.flink:flink-shaded-hadoop-2-uber</include> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Paimon requires hadoop bundled, soe we include it in paimon plugin dir
https://paimon.apache.org/docs/master/flink/quick-start/
b768922 to
74248a9
Compare
|
Is it ready to review? @luoyuxia |
| } | ||
|
|
||
| // set pk | ||
| if (tableDescriptor.getSchema().getPrimaryKey().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first, I introduce additional offset and timestamp coumns, that's to enabled Fluss to subscribe the data in lake from a given offset and timestamp via Fluss client. But now, I feel like we can remove these two additional offset and timestamp columns at least for now.
-
Now, we mainly focus on Flink read the historical data in paimon and real-time data in Fluss. The
offsetandtimestampcolumns is not used. Introduce these columns may bring unnecassary complexity in early stage -
subscribe via
offsetandtimestampcolumns only works for log table and only works for paimon with bucket-num specified. But in paimon, it's recommend not to set bucket-num. So,offsetandtimestampcolumns become useless in most cases.
Still, we keep the possibility to support to subscribe the data in lake from a given offset and timestamp in the future. We can then introduce a option to enabled this feature for lake table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discuss, still keep the ability to subscribe via offset/timestamp, so, let's introduce another column bucket to help us to subscribe via bucket + offset.
74248a9 to
569d5e9
Compare
569d5e9 to
8e06422
Compare
Yes, now, it's ready to review. |
|
@wuchong @leonardBang Could you please help review? |
8e06422 to
85e28a2
Compare
leonardBang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @luoyuxia for the contribution, the PR looks generally to me, only left some minor comments
fluss-common/src/main/java/com/alibaba/fluss/lakehouse/lakestorage/LakeCatalog.java
Show resolved
Hide resolved
| public static final String OFFSET_COLUMN_NAME = "__offset"; | ||
| public static final String TIMESTAMP_COLUMN_NAME = "__timestamp"; | ||
| public static final String BUCKET_COLUMN_NAME = "__bucket"; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For system metadata column, could we expose system metadata column configuration for users to avoid potential column conflict ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, these system metadata columns are used for fluss client to subcribe from a given timetime/offset. I hope it to be fixed now since most system's metadata column is fixed and fixed columns make it easy to understand.
I think we can make it congiurable in the future if we does found it help. It's a compatible change.
.../fluss-lake-format-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java
Outdated
Show resolved
Hide resolved
| String.format( | ||
| "The table %s already exists in %s catalog, please " | ||
| + "first drop the table in %s catalog.", | ||
| tablePath, dataLakeFormat, dataLakeFormat)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both drop existed table and suggest a new table name makes sense in this case, the later should be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest both of them in the message.
| <!-- end exclude for lakehouse-paimon --> | ||
| <exclude>com.alibaba.fluss.lakehouse.cli.*</exclude> | ||
| <exclude>com.alibaba.fluss.kafka.*</exclude> | ||
| <exclde>com.alibaba.fluss.lake.paimon.FlussDataTypeToPaimonDataType</exclde> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a full types test in LakeEnabledTableCreateITCase is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add full types of fluss, but we have still to keep it in here since Fluss doesn't support array, map, row type. So the maxinum line coverage can only reach 65%.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me
f5ef0f4 to
eea9c6a
Compare
| CoreOptions.ChangelogProducer.INPUT.toString()); | ||
| } else { | ||
| // for log table, need to set bucket, offset and timestamp | ||
| schemaBuilder.column(BUCKET_COLUMN_NAME, DataTypes.INT()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we need to check the original schema contains same system column name like __bucket or not, to avoid conflict with users' original columns.
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luoyuxia the pull request looks good in general. I left some minor comments.
...-format-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
Outdated
Show resolved
Hide resolved
.../fluss-lake-format-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java
Show resolved
Hide resolved
fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
Outdated
Show resolved
Hide resolved
...-format-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/com/alibaba/fluss/lakehouse/lakestorage/LakeStoragePluginSetUp.java
Outdated
Show resolved
Hide resolved
| tableDescriptor.getSchema().getColumns()) { | ||
| String columnName = column.getName(); | ||
| if (systemColumns.containsKey(columnName)) { | ||
| throw new InvalidTableException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created an issue #810 to avoid creating table with system columns even if the table is not enabled lake.
|
Besides, could you rename the module |
8c14974 to
baf72d7
Compare
|
@wuchong Comments addressed |
baf72d7 to
d4887cb
Compare
Purpose
Linked issue: close #430
Brief change log
LakeStoragePluginSetUpthat load theLakeStoragePluginby datalake formatLakeCatalogto create table in lakeLakeCatalogTests
LakeEnabledTableCreateITCase
API and Format
Documentation