Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2543] feat(spark-connector): support row-level operations to iceberg Table #2642

Closed
wants to merge 345 commits into from

Conversation

caican00
Copy link
Contributor

@caican00 caican00 commented Mar 22, 2024

What changes were proposed in this pull request?

support row-level operations to iceberg Table

1. update tableName set c1=v1, c2=v2, ...

2. merge into targetTable t
   using sourceTable s
   on s.key=t.key
   when matched then ...
   when not matched then ...

3. delete from table where xxx

Why are the changes needed?

support row-level operations to iceberg Table

Fix: #2543

Does this PR introduce any user-facing change?

Yes, support update ... , merge into ..., delete from ...

How was this patch tested?

New ITs.
And tested locally.

@caican00 caican00 marked this pull request as draft March 22, 2024 07:33
@caican00 caican00 changed the title [WIP][#2543] feat(spark-connector): support row-level operations to iceberg Table [#2543] feat(spark-connector): support row-level operations to iceberg Table Mar 27, 2024
@caican00 caican00 changed the title [#2543] feat(spark-connector): support row-level operations to iceberg Table [WIP][#2543] feat(spark-connector): support row-level operations to iceberg Table Mar 27, 2024
@caican00 caican00 changed the title [WIP][#2543] feat(spark-connector): support row-level operations to iceberg Table [#2543] feat(spark-connector): support row-level operations to iceberg Table Mar 27, 2024
@FANNG1
Copy link
Contributor

FANNG1 commented Apr 30, 2024

Another thought, how about upgrading Iceberg to a newer version since the problem only exists in older versions? @caican00 @qqqttt123 @jerryshao WDYT?

@caican00
Copy link
Contributor Author

caican00 commented Apr 30, 2024

Another thought, how about upgrading Iceberg to a newer version since the problem only exists in older versions? @caican00 @qqqttt123 @jerryshao WDYT?

@FANNG1

  1. IMO, we have to support spark multi-version, such as spark3.1, spark3.3, spark3.4, spark3.5, etc.
  2. Iceberg parser only has this problem before spark3.5, but some physical plans in Iceberg spark-connector, such as AddPartitionFieldExec, SetWriteDistributionAndOrderingExec, have this problem in all versions.

such as spark3.5:
https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala#L47

https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166

@caican00
Copy link
Contributor Author

caican00 commented Apr 30, 2024

Another thought, how about upgrading Iceberg to a newer version since the problem only exists in older versions? @caican00 @qqqttt123 @jerryshao WDYT?

@FANNG1

  1. IMO, we have to support spark multi-version, such as spark3.1, spark3.3, spark3.4, spark3.5, etc.
  2. Iceberg parser only has this problem before spark3.5, but some physical plans in Iceberg spark-connector, such as AddPartitionFieldExec, SetWriteDistributionAndOrderingExec, have this problem in all versions.

such as spark3.5: https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala#L47

https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166

Unless SparkIcebergTable extended SparkTable and GravitinoIcebergCatalog extended SparkCatalog.
This solution, discussed earlier, also needs to override some of scala's methods, such as productElement, productArity, canEqual. WDYT? cc @FANNG1 @qqqttt123 @jerryshao

@FANNG1
Copy link
Contributor

FANNG1 commented Apr 30, 2024

My concern is current implementation seems hard to maintain especially for different versions of the Spark and Iceberg. If there is no simple solution, I prefer the original implement which returns a GravitinoIcebergTable extends SparkTable.

@caican00
Copy link
Contributor Author

My concern is current implementation seems hard to maintain especially for different versions of the Spark and Iceberg. If there is no simple solution, I prefer the original implement which returns a GravitinoIcebergTable extends SparkTable.

I prefer this solution too.

@caican00
Copy link
Contributor Author

caican00 commented Apr 30, 2024

My concern is current implementation seems hard to maintain especially for different versions of the Spark and Iceberg. If there is no simple solution, I prefer the original implement which returns a GravitinoIcebergTable extends SparkTable.

I prefer this solution too.

@FANNG1 should i fallback to the original implementation? I would like to finish it today.
And another thought, could we only make SparkIcebergTable extend SparkTable?
hive, jdbc and so on can be inconsistent?
for example, it seems unnecessary to make SparkHiveTable also extend kyuubi HiveTable.

@FANNG1
Copy link
Contributor

FANNG1 commented Apr 30, 2024

My concern is current implementation seems hard to maintain especially for different versions of the Spark and Iceberg. If there is no simple solution, I prefer the original implement which returns a GravitinoIcebergTable extends SparkTable.

I prefer this solution too.

@FANNG1 should i fallback to the original implementation? I would like to finish it today. And another thought, could we only make SparkIcebergTable inherit SparkTable? hive, jdbc and so on can be inconsistent? for example, it seems unnecessary to make SparkHiveTable also inherit kyuubi HiveTable.

@qqqttt123 @jerryshao WDYT?

@qqqttt123
Copy link
Contributor

Some questions.

  1. How the Trino to solve the issue?
  2. Is it necessary to support parser if we support row-level operations?

@FANNG1
Copy link
Contributor

FANNG1 commented Apr 30, 2024

Some questions.

  1. How the Trino to solve the issue?

seems there are no relationship about How Trino solve the issue ? Spark and Trino had different frameworks.

@caican00
Copy link
Contributor Author

caican00 commented Apr 30, 2024

Some questions.

  1. How the Trino to solve the issue?
  2. Is it necessary to support parser if we support row-level operations?

@qqqttt123

  1. Trino sqlParser does not have this issue.
  2. for spark-connector in Iceberg, it explicitly uses SparkTable to identify whether it is an Iceberg table, so we have to rewrite the parser or inherit SparkTable to make rowLevelCommands recognizable.

https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala#L127-L186
image

@caican00
Copy link
Contributor Author

Some questions.

  1. How the Trino to solve the issue?

seems there are no relationship about How Trino solve the issue ? Spark and Trino had different frameworks.

Yes, it actually doesn't have the same problem with spark. So is flink.

@qqqttt123
Copy link
Contributor

What's the problem to extend the SparkTable?

@wForget
Copy link
Contributor

wForget commented Apr 30, 2024

Why not just return org.apache.iceberg.spark.source.SparkTable but wrap it? Did we do any extra work?

@caican00
Copy link
Contributor Author

caican00 commented Apr 30, 2024

What's the problem to extend the SparkTable?

@qqqttt123

  1. we have to make SparkBaseTable as an interface, as java can only extend one class.
    https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java
    if we use combined interface,it seems that we can no longer extract a common baseTable.
    And Then each data source needs to implement Table, SupportsRead, and SupportsWrite interfaces separately, it will cause redundant code and the readability of the code becomes terrible.

  2. some physical rules also have same problem, such as ExtendedDataSourceV2Strategy, it explicitly uses org.apache.iceberg.spark.SparkCatalog to identify whether it is an Iceberg Catalog. https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166
    therefore, we also have to make GravitinoIcebergCatalog extend SparkCatalog of Iceberg, and then we have to make BaseCatalog of Gravitino as an interface. It will cause the same problem as above.
    https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java

@qqqttt123
Copy link
Contributor

What's the problem to extend the SparkTable?

@qqqttt123

  1. we have to make SparkBaseTable as an interface, as java can only extend one class.
    https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java
    if we use combined interface,it seems that we can no longer extract a common baseTable.
    And Then each data source needs to implement Table, SupportsRead, and SupportsWrite interfaces separately, it will cause redundant code and the readability of the code becomes terrible.
  2. some physical rules also have same problem, such as ExtendedDataSourceV2Strategy, it explicitly uses SparkCatalog to identify whether it is an Iceberg Catalog. https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166
    therefore, we also have to make GravitinoIcebergCatalog extend SparkCatalog of Iceberg, and then we have to make BaseCatalog of Gravitino as an interface. It will cause the same problem as above.
    https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java

This is not a problem. We can extend the interface. We can extract a common a class as new class field. It's ok.

@caican00
Copy link
Contributor Author

Why not just return org.apache.iceberg.spark.source.SparkTable but wrap it? Did we do any extra work?

we have wrapped org.apache.iceberg.spark.source.SparkTable in com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable, but the parent class of com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable is not org.apache.iceberg.spark.source.SparkTable.

@caican00
Copy link
Contributor Author

What's the problem to extend the SparkTable?

@qqqttt123

  1. we have to make SparkBaseTable as an interface, as java can only extend one class.
    https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java
    if we use combined interface,it seems that we can no longer extract a common baseTable.
    And Then each data source needs to implement Table, SupportsRead, and SupportsWrite interfaces separately, it will cause redundant code and the readability of the code becomes terrible.
  2. some physical rules also have same problem, such as ExtendedDataSourceV2Strategy, it explicitly uses SparkCatalog to identify whether it is an Iceberg Catalog. https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166
    therefore, we also have to make GravitinoIcebergCatalog extend SparkCatalog of Iceberg, and then we have to make BaseCatalog of Gravitino as an interface. It will cause the same problem as above.
    https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java

This is not a problem. We can extend the interface. We can extract a common a class as new class field. It's ok.

ok for me, @FANNG1 WDYT?

@caican00
Copy link
Contributor Author

What's the problem to extend the SparkTable?

@qqqttt123

  1. we have to make SparkBaseTable as an interface, as java can only extend one class.
    https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java
    if we use combined interface,it seems that we can no longer extract a common baseTable.
    And Then each data source needs to implement Table, SupportsRead, and SupportsWrite interfaces separately, it will cause redundant code and the readability of the code becomes terrible.
  2. some physical rules also have same problem, such as ExtendedDataSourceV2Strategy, it explicitly uses SparkCatalog to identify whether it is an Iceberg Catalog. https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166
    therefore, we also have to make GravitinoIcebergCatalog extend SparkCatalog of Iceberg, and then we have to make BaseCatalog of Gravitino as an interface. It will cause the same problem as above.
    https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java

This is not a problem. We can extend the interface. We can extract a common a class as new class field. It's ok.

ok for me, @FANNG1 WDYT?

if we all think it is ok, i will go ahead, thank you.

@FANNG1
Copy link
Contributor

FANNG1 commented Apr 30, 2024

if we all think it is ok, i will go ahead, thank you.

It's ok for me, something to keep consistent before refactor,

  1. please keep consistent of the implement of HiveTable and IcebergTable.
  2. refacor the common logic in SparkBaseTable as a common helper class, not interface.
  3. close current PR and propose a new PR.

WDYT? @caican00

@caican00
Copy link
Contributor Author

caican00 commented Apr 30, 2024

  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent?
if do it like this,why not just get the implementation of the parent class directly? In this case, why implement SparkBaseTable, which seems a little redundant?

@caican00
Copy link
Contributor Author

  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

and in addition, kyuubi's HiveTable is a scala class, and extending it in java requires overriding some of scala's methods

@FANNG1
Copy link
Contributor

FANNG1 commented Apr 30, 2024

  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

because keep consistent is easy to maintenance, SparkIcebergTable extends SparkTable while SparkHiveTable compose KyubbyHiveTable is really confusing to new developers, when implementing new features should consider the two cases, if not, may encounter bugs which is unmaintainable.

@caican00
Copy link
Contributor Author

  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

because keep consistent is easy to maintenance, SparkIcebergTable extends SparkTable while SparkHiveTable compose KyubbyHiveTable is really confusing to new developers, when implementing new features should consider the two cases, if not, may encounter bugs which is unmaintainable.

@FANNG1 got it, should I submit a separate pr to refactor the table implementation? This does not include row level operations.
In this new pr we can explain the reasons for the refactoring and then submit the row level pr

@FANNG1
Copy link
Contributor

FANNG1 commented Apr 30, 2024

  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

because keep consistent is easy to maintenance, SparkIcebergTable extends SparkTable while SparkHiveTable compose KyubbyHiveTable is really confusing to new developers, when implementing new features should consider the two cases, if not, may encounter bugs which is unmaintainable.

@FANNG1 got it, should I submit a separate pr to refactor the table implementation? This does not include row level operations. In this new pr we can explain the reasons for the refactoring and then submit the row level pr

both are ok for me.

@caican00
Copy link
Contributor Author

  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

because keep consistent is easy to maintenance, SparkIcebergTable extends SparkTable while SparkHiveTable compose KyubbyHiveTable is really confusing to new developers, when implementing new features should consider the two cases, if not, may encounter bugs which is unmaintainable.

@FANNG1 got it, should I submit a separate pr to refactor the table implementation? This does not include row level operations. In this new pr we can explain the reasons for the refactoring and then submit the row level pr

both are ok for me.

ok

@caican00
Copy link
Contributor Author

caican00 commented May 1, 2024

close this pr and create a new pr to refactor table implementation and support row-level operations feature.

@caican00 caican00 closed this May 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Subtask] support row-level operations to iceberg Table
5 participants