Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34172] Add support for altering a distribution via ALTER TABLE #24886

Merged
merged 6 commits into from
Jun 13, 2024

Conversation

jnh5y
Copy link
Contributor

@jnh5y jnh5y commented Jun 4, 2024

What is the purpose of the change

This PR implements the SQL parser changes for ALTER TABLE to support ADD, MODIFY, and DROP DISTRIBUTION statements.

Brief change log

The SQL Parser has been updated.
The AlterSchemaConverter has been updated to pass the changes in a DISTRIBUTION through to the Operation.

Verifying this change

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 4, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial feedback.

@@ -40,7 +41,8 @@ public static void unparseTableSchema(
int rightPrec,
SqlNodeList columnList,
List<SqlTableConstraint> constraints,
@Nullable SqlWatermark watermark) {
@Nullable SqlWatermark watermark,
@Nullable SqlDistribution distribution) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right. A distribution is not part of the table schema. A table schema is enclosed in ( ) like in CREATE TABLE <name> (<table schema>). The distribution comes after the schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

.partitionKeys(oldTable.getPartitionKeys())
.options(oldTable.getOptions());

if (alterTable instanceof SqlAlterTableSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we simply call oldTable.getDistribution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I ended up adding oldTable towards the end of my work. I can try to use it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this again, I figured out that we need to create the new table. Since that table needs the added/modified distribution, we have to take it from the SqlAlterTableSchema rather than the oldTable.

@@ -110,6 +113,7 @@ public Operation convertAlterSchema(
SchemaConverter converter = createSchemaConverter(alterTableSchema, oldTable);
converter.updateColumn(alterTableSchema.getColumnPositions().getList());
alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
alterTableSchema.getDistribution().ifPresent(converter::updateDistribution);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the main change in this class is only to update the distribution in case of a column name change similar to buildUpdatedPrimaryKey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to pass the distribution around in order to create the new table objects. I may be missing some alternative here.

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I only had some minor comments. The PR should be good to go in the next iteration.

.isNotPresent();

prepareNonManagedTableWithDistribution("tb3");
// rename column used as distribution key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment seems to be wrong, also isn't this tested above already, at least is used as a distribution key occurs two times in this clas

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this case of dropping the distribution. The other two cases are for a drop and modify, so I think both of those make sense.

@jnh5y jnh5y marked this pull request as ready for review June 12, 2024 14:32
Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @jnh5y. LGTM.

@twalthr twalthr merged commit d0f9bb4 into apache:master Jun 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants