New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-2045] Support Read Hoodie As DataSource Table For Flink And DeltaStreamer #3120
Conversation
CI report:
Bot commands@hudi-bot supports the following commands:
|
532803d
to
aaca30f
Compare
Codecov Report
@@ Coverage Diff @@
## master #3120 +/- ##
============================================
+ Coverage 47.72% 47.74% +0.01%
- Complexity 5528 5555 +27
============================================
Files 934 935 +1
Lines 41457 41536 +79
Branches 4166 4180 +14
============================================
+ Hits 19786 19830 +44
- Misses 19914 19943 +29
- Partials 1757 1763 +6
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
716127b
to
8d23e1d
Compare
Hi @umehrot2 ,Currently |
@pengzhiwei2018 : Looks like an integ test is failing. Can you please check that out |
fcd06c8
to
5eff09c
Compare
Hi @nsivabalan , The reason of test failure is that we can not support read bootstrap MOR rt table in spark datasource way. This PR will enable the datasource table by default so that cause the failure. I close the datasource in HiveSyncTool for the bootstrap mor table to avoid the problem. After the JIRA https://issues.apache.org/jira/browse/HUDI-2071 has solved, we can remove this limit. |
adba2dc
to
dbcd6ae
Compare
936f3c5
to
5497716
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pengzhiwei2018 Left some comments.
@@ -110,6 +110,12 @@ | |||
@Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive") | |||
public Integer batchSyncNum = 1000; | |||
|
|||
@Parameter(names = {"--sparkDataSource"}, description = "Whether save this table as spark data source table.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the style consistent? Using -
to split words?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
@Parameter(names = {"--sparkDataSource"}, description = "Whether save this table as spark data source table.") | ||
public Boolean saveAsSparkDataSourceTable = true; | ||
|
||
@Parameter(names = {"--spark-schemaLengthThreshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
Show resolved
Hide resolved
packaging/hudi-flink-bundle/pom.xml
Outdated
@@ -141,6 +141,13 @@ | |||
|
|||
<include>org.apache.hbase:hbase-common</include> | |||
<include>commons-codec:commons-codec</include> | |||
<include>org.apache.spark:spark-sql_${scala.binary.version}</include> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Must we need to include spark dependencies into flink's bundle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have enable the hive sync for flink, we should include the spark dependencies. Because currently HiveSyncTool need the spark dependencies to generate the spark table properties.
One thing I can improve here is making the scope of the spark dependencies to ${flink.bundle.hive.scope}
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 Any thoughts that we can use to optimize? IMO, it seems to be not very graceful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well I think i can remove the spark dependencies by writing a util to convert the parquet schema to spark schema json without the spark lib. Maybe it need some more work to do.
packaging/hudi-flink-bundle/pom.xml
Outdated
</dependencies> | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless empty line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed!
a2a3e94
to
a7f17df
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA | |||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} | |||
import org.apache.hudi.exception.HoodieException | |||
import org.apache.hudi.hadoop.HoodieROTablePathFilter | |||
import org.apache.hudi.hive.util.ConfigUtils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split it via an empty line
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory | ||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong position
@@ -160,6 +168,8 @@ public String toString() { | |||
+ ", supportTimestamp=" + supportTimestamp | |||
+ ", decodePartition=" + decodePartition | |||
+ ", createManagedTable=" + createManagedTable | |||
+ ", saveAsSparkDataSourceTable=" + syncAsSparkDataSourceTable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
save
to sync
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
Outdated
Show resolved
Hide resolved
packaging/hudi-flink-bundle/pom.xml
Outdated
@@ -141,6 +141,13 @@ | |||
|
|||
<include>org.apache.hbase:hbase-common</include> | |||
<include>commons-codec:commons-codec</include> | |||
<include>org.apache.spark:spark-sql_${scala.binary.version}</include> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 Any thoughts that we can use to optimize? IMO, it seems to be not very graceful.
@leesf Would like to receive your inputs in this PR. |
@yanghua We should not include the spark-sql dependency in |
+1 on @danny0405 's comment on the deps |
I am writing a util to convert the parquet schema to spark schema json string without the spark dependencies. After this, we can drop the spark dependency. |
4ed02dc
to
b5bf84a
Compare
Hi @yanghua , I have write a util to convert the parquet schema to spark schema json without the spark dependencies. Please take a look again~ |
OK, sounds good. Will review it soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
@@ -70,6 +69,10 @@ | |||
return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); | |||
} | |||
|
|||
private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadataAndSyncAsDataSource() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, this method is too long and can not explain the purpose of the method body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will rename it.
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
||
public class TestParquet2SparkSchemaUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add more test cases for the multiple integer type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
partitionCols.add(column2Field.getOrDefault(partitionName, | ||
new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8))); | ||
} | ||
for (Type field : originGroupType.getFields()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
public Boolean syncAsSparkDataSourceTable = true; | ||
|
||
@Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.") | ||
public int sparkSchemaLengthThreshold = 4000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 4000
a special value here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the default value in spark conf: spark.sql.sources.schemaStringLengthThreshold
09020d6
to
ec60b9c
Compare
* Config stored in hive serde properties to tell query engine (spark/flink) to | ||
* read the table as a read-optimized table when this config is true. | ||
*/ | ||
public static final String IS_QUERY_AS_RO_TABLE = "hoodie.query.as.ro.table"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the relationship between this key IS_QUERY_AS_RO_TABLE
and SPARK_QUERY_AS_RO_KEY
and SPARK_QUERY_AS_RT_KEY
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK_QUERY_AS_RO_KEY is introduced by #2925 for spark sql writer to pass some params. It can only used for spark engine. In this PR, we do not need this now. We use IS_QUERY_AS_RO_TABLE which can be used for both spark & flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we should add some comments and some strategy to deprecate (or delete) them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR #2925 has not released yet, so we can just delete them.
* Convert the parquet schema to spark schema' json string. | ||
* This code is refer to org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter | ||
* in spark project. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use ParquetToSparkSchemaConverter
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using ParquetToSparkSchemaConverter
directly need the spark dependencies for hive-sync
. And the flink-bundle will also need the spark. In order to remove the spark dependencies, I write this util.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okey, got your idea. It still looks very weird because this spark schema can only be recognized by Spark engine only. Looks like a special case for writer here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Spark need these schema configs to read table as datasource table. We have to sync these configs.
@@ -70,6 +69,10 @@ | |||
return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); | |||
} | |||
|
|||
private static Iterable<Object[]> syncDataSourceTableParams() { | |||
return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we give some comments what each flag means ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
0fb47c6
to
ffa9341
Compare
+1, LGTM. |
…taStreamer
What is the purpose of the pull request
Currently we only support reading hoodie table as datasource table for spark since #2283
That PR just sync the spark table properties from the
HoodieSparkSqlWriter
, which can not be used by other engine like flink.In order to support this feature for flink and DeltaStreamer, we need to sync the spark table properties needed by datasource table to the meta store in
HiveSyncTool
.Brief change log
Sync the spark table properties and serde properties needed by spark datasource table in
HiveSyncTool
. So that all the engines can use this feature.Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.