-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5884] Support bulk_insert for insert_overwrite and insert_overwrite_table #8076
[HUDI-5884] Support bulk_insert for insert_overwrite and insert_overwrite_table #8076
Conversation
f8d8080
to
be21282
Compare
@hudi-bot run azure |
6a239ad
to
f384bbc
Compare
Hey @alexeykudinkin @nsivabalan, could you please take a look? |
mode = SaveMode.Overwrite | ||
isOverWriteTable = true | ||
val mode = if (overwrite) { | ||
SaveMode.Overwrite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the Overwrite
mode doesn't care abt the old data, do we need to enable bulk_insert
by default if it's Overwrite
mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a good suggestion. cc @nsivabalan @yihua
} else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { | ||
// When user set operation as INSERT_OVERWRITE_TABLE, | ||
// overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation | ||
} else if (mode == SaveMode.Overwrite && tableExists && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we need to explicitly delete old data if it's Overwrite
mode, this behavior actually make the HUDI not ACID-compliant(I keep it here to make the tests pass).
Maybe we should only delete old data if using drop table
command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean, for Overwrite
mode, we should not delete the basePath. Just overwrite the existing data. If so, I agree with you. Probably something to tackle in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean, for Overwrite mode, we should not delete the basePath. Just overwrite the existing data.
Yea
Probably something to tackle in another PR.
Sure, will fix it in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please file a JIRA to track this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, created: HUDI-6286
Hi, @stream2000 Could you also please review this, this fixes the pr #8015 |
+ " To use row writer please switch to spark 2 or spark 3"); | ||
} | ||
|
||
records.write().format(targetFormat) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still keep the old behavior here to do the bulk_insert
, maybe we should also use HoodieDatasetBulkInsertHelper.bulkInsert
to perform write operation? We can reduce many codes for handling commit
behavior(Here will add a complete commit, while HoodieDatasetBulkInsertHelper.bulkInsert
doesn't, we need to handle this differently in bulkInsertAsRow
)
8becde2
to
5914b1a
Compare
Gentle ping @alexeykudinkin @xushiyan @danny0405 @yihua |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boneanxs I am yet to review fully, but have taken one pass. Can you break it down into two PRs - a) don't delete the table location if using SaveMode.Overwrite for bulk_insert, insert_overwrite, b) add support for bulk_insert for insert_overwrite and insert_overwrite_table.
Also, I want to understand the use case when we need this. If you can elaborate a bit more on why we need this, that would be great.
public static final ConfigProperty<String> BULKINSERT_INPUT_DATA_SCHEMA_DDL = ConfigProperty | ||
.key("hoodie.bulkinsert.schema.ddl") | ||
.noDefaultValue() | ||
.withDocumentation("Schema set for row writer/bulk insert."); | ||
|
||
public static final ConfigProperty<String> BULKINSERT_OVERWRITE_MODE = ConfigProperty | ||
.key("hoodie.bulkinsert.overwrite.mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value for this config is a write operation type. So, its key should be named accordingly.
...ava/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
Show resolved
Hide resolved
...n/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
Show resolved
Hide resolved
import java.util.List; | ||
import java.util.Map; | ||
|
||
public abstract class BaseDatasetBulkCommitActionExecutor implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this abstraction at a higher layer i.e. in hudi-client-common
? And then maybe extend in hudi-spark-common for Dataset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, at first I tried to put this in hudi-client-common
, but since BaseDatasetBulkCommitActionExecutor
needs to access DataSourceUtils
and DataSourceWriteOptions
, not sure it's reasonable to move these classes there, and I'm afraid there are other dependencies for these two classes, we may also need to change those dependencies.
|
||
public abstract class BaseDatasetBulkCommitActionExecutor implements Serializable { | ||
|
||
protected final HoodieWriteConfig writeConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to serialize write config too or can it be transient?
import java.util.List; | ||
import java.util.Map; | ||
|
||
public abstract class BaseDatasetBulkCommitActionExecutor implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public abstract class BaseDatasetBulkCommitActionExecutor implements Serializable { | |
public abstract class BaseDatasetBulkInsertCommitActionExecutor implements Serializable { |
mode = SaveMode.Overwrite | ||
isOverWriteTable = true | ||
val mode = if (overwrite) { | ||
SaveMode.Overwrite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a good suggestion. cc @nsivabalan @yihua
val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, basePath.toString, tblName, opts) | ||
val executor = mode match { | ||
case SaveMode.Append => | ||
new DatasetBulkInsertActionExecutor(writeConfig, writeClient, instantTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use writeClient to do the insert overwrite indead of calling the xxxActionExecutor directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeClient is specifically for RDD[HoodieRecord]
, since all xxxActionExecutor
here are Dataset[Row]
based, I didn't put these logic there before.
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
public class DatasetBulkInsertActionExecutor extends BaseDatasetBulkCommitActionExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should change DatasetBulkInsertActionExecutor -> DatasetBulkInsertCommitActionExecutor since it is sub class of BaseCommitActionExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense, will change
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
public class DatasetBulkInsertOverwriteActionExecutor extends BaseDatasetBulkCommitActionExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, DatasetBulkInsertOverwriteCommitActionExecutor
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class DatasetBulkInsertOverwriteTableActionExecutor extends DatasetBulkInsertOverwriteActionExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, DatasetBulkInsertOverwriteTableCommitActionExecutor
Yea, sure, will do so
Currently, we want to migrate all existing hive tables to HUDI table, given many hive tables
|
5914b1a
to
1fadedf
Compare
Hi @codope @stream2000 Gentle ping... Could you please take a look again? |
| partitioned by (dt, hh) | ||
| location '${tmp.getCanonicalPath}/$tableMultiPartition' | ||
""".stripMargin) | ||
test("Test bulk insert with insert into for non partitioned table") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are only testing for default values of BULKINSERT_OVERWRITE_OPERATION_TYPE
right? Can we also test for the other possible value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test bulk insert with insert overwrite table
test INSERT_OVERWRITE_TABLE
,
Test bulk insert with insert overwrite partition
test INSERT_OVERWRITE
These two tests test all values of BULKINSERT_OVERWRITE_OPERATION_TYPE
@@ -106,8 +106,14 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap, | |||
override def toInsertableRelation: InsertableRelation = { | |||
new InsertableRelation { | |||
override def insert(data: DataFrame, overwrite: Boolean): Unit = { | |||
val mode = if (overwriteTable || overwritePartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm if it's insert_overwrite_table then then table basePath will still be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIth this pr, it won't delete the basePath.
// HoodieSparkSqlWriter#handleSaveModes
// won't delete the path if it's Overwrite mode and INSERT_OVERWRITE_TABLE, INSERT_OVERWRITE
else if (mode == SaveMode.Overwrite && tableExists &&
(operation != WriteOperationType.INSERT_OVERWRITE_TABLE
&& operation != WriteOperationType.INSERT_OVERWRITE
&& operation != WriteOperationType.BULK_INSERT)) {
// For INSERT_OVERWRITE_TABLE, INSERT_OVERWRITE and BULK_INSERT with Overwrite mode,
// we'll use replacecommit to overwrite the old data.
log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(tablePath, true)
tableExists = false
}
} else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { | ||
// When user set operation as INSERT_OVERWRITE_TABLE, | ||
// overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation | ||
} else if (mode == SaveMode.Overwrite && tableExists && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean, for Overwrite
mode, we should not delete the basePath. Just overwrite the existing data. If so, I agree with you. Probably something to tackle in another PR.
...spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
Outdated
Show resolved
Hide resolved
.options(optsOverrides) | ||
.mode(SaveMode.Append) | ||
.save(); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return null here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BULK_INSERT
doesn't need to return WriteStatus
(don't need to execute afterExecute
method), since it call dataframe api records.write()
to perform write operation, it will write the commit data after the write operation is done(in HoodieDataSourceInternalBatchWrite#commit
, dataSourceInternalWriterHelper.commit
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then how about returning Option<HoodieData<WriteStatus>>
or maybe empty HoodieData if the return is not needed at the call site? Returning null can be potentially dangerous, if another author adds some change with the assumption that WriteStatus will always be present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense, let me change it
...k-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
Show resolved
Hide resolved
1fadedf
to
851a1c3
Compare
...e/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
Outdated
Show resolved
Hide resolved
@hudi-bot run azure |
Hey, @codope, all comments are addressed, could you pls review it again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boneanxs Can you please rebase?
} else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { | ||
// When user set operation as INSERT_OVERWRITE_TABLE, | ||
// overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation | ||
} else if (mode == SaveMode.Overwrite && tableExists && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please file a JIRA to track this change.
.options(optsOverrides) | ||
.mode(SaveMode.Append) | ||
.save(); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then how about returning Option<HoodieData<WriteStatus>>
or maybe empty HoodieData if the return is not needed at the call site? Returning null can be potentially dangerous, if another author adds some change with the assumption that WriteStatus will always be present.
Looks good to me. @yihua @nsivabalan If you can take one pass, that would be great. |
@yihua Gentle ping... could you pls help to review it? |
Sorry for the delay. I will review this PR this week. |
@boneanxs meanwhile, could you rebase the PR on the latest master? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boneanxs Since this is a breaking change for users who rely on Savemode.Overwrite
, can we just keep the bulk insert part, while extract the behavior change (i.e. not delete the table location if using SaveMode.Overwrite for bulk_insert, insert_overwrite) to a separate PR? We intend to do any behavior changes in 1.0 release while keeping 0.14.0 compatible with previous releases..
@codope removed the breaking changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @boneanxs for extracting out the breaking change. Left one minor comment for the config. Can you also squash all commits to one?
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
Show resolved
Hide resolved
02b3132
to
b796358
Compare
Change Logs
bulk_insert
support forinsert_overwrite
andinsert_overwrite_table
Impact
In order to keep consistent behavior, this pr still deletes old data completely when overwriting whole table while enabling
BULK_INSERT
. Here's a table to better understand current HUDI behaviors under differentSaveMode
andOperation
(Especially forBULK_INSERT
andINSERT_OVERWRITE
related).dataframe:
replaceCommit
to overwrite all table datareplaceCommit
to overwrite old partitionssql:
replaceCommit
to overwrite old partitionsreplaceCommit
to overwrite all table datareplaceCommit
to overwrite old partitionsThere still are some issues needed to be addressed,
INSERT_OVERWRITE
withOverwrite
mode should not delete whole table data, it only needs to overwrite old partitionsOverwrite
mode will delete all data firstcreated a issue to track this: HUDI-6286
Risk level (write none, low medium or high below)
low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist