-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-23539 : Optimize data copy during repl load operation for HDFS b… #1084
Conversation
@@ -367,6 +369,17 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, | |||
} | |||
} | |||
|
|||
public static boolean onSameHDFSFileSystem(Path dataPath, Path tgtPath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit tests for this method
context.hiveConf, false, false | ||
); | ||
|
||
Path partDataSrc = new Path(event.dataPath() + File.separator + getPartitionName(sourceWarehousePartitionLocation)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can create a path directly from the constructor of Path instead of using File Separator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally yes, but in this case it will not as getPartitionName(sourceWarehousePartitionLocation) returns value with / attached. Path will take that itself as final path.
Task<?> movePartitionTask = null; | ||
if (loadFileType != LoadFileType.IGNORE) { | ||
// no need to create move task, if file is moved directly to target location. | ||
movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType); | ||
movePartitionTask = movePartitionTask(table, partSpec, moveSource, loadFileType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we moving from source here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Path moveSource = performOnlyMove ? partDataSrc : stagingDir;
Move src is either partDataSrc(which is repl Staging dir) or load's own local staging dir which is used to do data copy operation.
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
Outdated
Show resolved
Hide resolved
ptnRootTask = copyTask; | ||
if (performOnlyMove) { | ||
if (ptnRootTask == null) { | ||
ptnRootTask = addPartTask; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will the addPartTask not be added already as part of DDL operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that happens only when isOnlyDDLOperation is true. It doesn't come down in to this line in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How was addPartTask added before your change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not changing anything with respect to addPartTask. Just the copy-move is getting replaced with single move.
|
||
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, | ||
false, false); | ||
Path moveSrcPath = performOnlyMove ? dataPath : tmpPath; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is moveSrcPath ? What is tmpPath? dataPath is on source or target? It would be good if we could rename these variables for readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moveSrcPath is the path used to perform move data to final warehouse dir on target.
@@ -452,7 +453,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, | |||
boolean isAutoPurge = false; | |||
boolean needRecycle = false; | |||
boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); | |||
|
|||
boolean performOnlyMove = replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(dataPath, tgtPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this extra condition replicationSpec.isInReplicationScope()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we would like to use it only in replication cases.
addPartTask.addDependentTask(loadPartTask); | ||
} | ||
x.getTasks().add(loadPartTask); | ||
return addPartTask == null ? loadPartTask : addPartTask; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be simplified. There is already a not null check above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but then we will have to have two return statements which I wanted to avoid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are already multiple return statements in the same method.
@@ -179,6 +179,63 @@ public void externalTableReplicationWithLocalStaging() throws Throwable { | |||
.verifyResult("800"); | |||
} | |||
|
|||
@Test | |||
public void testHdfsMoveOptimizationOnTargetStaging() throws Throwable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for empty staging as the data is moved.
@@ -179,6 +179,63 @@ public void externalTableReplicationWithLocalStaging() throws Throwable { | |||
.verifyResult("800"); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Negative test for different staging when the data is not moved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment not clear
@@ -1539,15 +1539,14 @@ public void testCheckPointingWithSourceTableDataInserted() throws Throwable { | |||
.run("insert into t2 values (24)") | |||
.run("insert into t1 values (4)") | |||
.dump(primaryDbName, dumpClause); | |||
|
|||
assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because data is moved now. We are checking this late
@@ -1596,6 +1595,10 @@ public void testCheckPointingWithNewTablesAdded() throws Throwable { | |||
.run("insert into t3 values (3)") | |||
.dump(primaryDbName, dumpClause); | |||
|
|||
assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
ptnRootTask = copyTask; | ||
if (performOnlyMove) { | ||
if (ptnRootTask == null) { | ||
ptnRootTask = addPartTask; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How was addPartTask added before your change?
@@ -320,7 +334,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc | |||
private String getPartitionName(Path partitionMetadataFullPath) { | |||
//Get partition name by removing the metadata base path. | |||
//Needed for getting the data path | |||
return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length()); | |||
return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do a split instead at '/'. This may be error prone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Existing method. just changing the index
LoadFileType lft; | ||
boolean isAutoPurge = false; | ||
boolean needRecycle = false; | ||
boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); | ||
|
||
boolean performOnlyMove = replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(dataPath, tgtPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be added as a separate method.
addPartTask.addDependentTask(loadPartTask); | ||
} | ||
x.getTasks().add(loadPartTask); | ||
return addPartTask == null ? loadPartTask : addPartTask; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are already multiple return statements in the same method.
* If the Repl staging directory ('hive.repl.rootdir') is on the target cluster itself and the FS scheme is hdfs, | ||
* data is moved directly from Repl staging data dir of the table to the table's location on target warehouse. | ||
*/ | ||
Path moveDataSrc = performOnlyMove ? dataPath : destPath; | ||
if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replicationSpec.isInReplicationScope( is already done here
} | ||
|
||
Task<?> copyTask = null; | ||
if (replicationSpec.isInReplicationScope()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is already done before. Can be simplified
…ased staging location - bootstrap
…ased staging location - bootstrap: fixed tests
…ased staging location - Incremental - Single Move
…ased staging location - Incremental - Single Move - exim
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. |
…ased staging location
NOTICE
Please create an issue in ASF JIRA before opening a pull request,
and you need to set the title of the pull request which starts with
the corresponding JIRA issue number. (e.g. HIVE-XXXXX: Fix a typo in YYY)
For more details, please see https://cwiki.apache.org/confluence/display/Hive/HowToContribute