Skip to content

Commit

Permalink
HIVE-23539 : Optimize data copy during repl load operation for HDFS b…
Browse files Browse the repository at this point in the history
…ased staging location - incremental
  • Loading branch information
Pravin Sinha committed Jun 22, 2020
1 parent 86594d2 commit 22ee85b
Showing 1 changed file with 60 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
import org.apache.hadoop.hive.ql.plan.CopyWork;
Expand Down Expand Up @@ -452,11 +453,13 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
boolean isAutoPurge = false;
boolean needRecycle = false;
boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();
boolean performOnlyMoveOps = performOnlyMoveOpsDuringDataCopy(replicationSpec, dataPath, tgtPath);

if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
lft = LoadFileType.IGNORE;
destPath = loadPath = tgtPath;
lft = performOnlyMoveOps ? getLoadFileType(replicationSpec) : LoadFileType.IGNORE;
destPath = performOnlyMoveOps ? x.getCtx().getExternalTmpPath(tgtPath) : tgtPath;
loadPath = destPath;
isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge"));
if (table.isTemporary()) {
needRecycle = false;
Expand All @@ -482,8 +485,7 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
lft = LoadFileType.KEEP_EXISTING;
} else {
destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath);
lft = replace ? LoadFileType.REPLACE_ALL :
replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
lft = getLoadFileType(replicationSpec);
}
}

Expand All @@ -498,12 +500,23 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
);
}

Task<?> copyTask = null;
Task<?> copyOrMoveTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
if (performOnlyMoveOps) {
MoveWork mvWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false);
LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
Collections.singletonList(dataPath),
Collections.singletonList(destPath),
true, null, null);
mvWork.setMultiFilesDesc(loadFilesWork);
mvWork.setNeedCleanTarget(replace);
copyOrMoveTask = TaskFactory.get(mvWork, x.getConf());
} else {
copyOrMoveTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
}
} else {
copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
copyOrMoveTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}

MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false);
Expand All @@ -529,8 +542,8 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
//if Importing into existing table, FileFormat is checked by
// ImportSemanticAnalzyer.checked checkTable()
Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadTableTask);
x.getTasks().add(copyTask);
copyOrMoveTask.addDependentTask(loadTableTask);
x.getTasks().add(copyOrMoveTask);
return loadTableTask;
}

Expand Down Expand Up @@ -604,17 +617,18 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
srcLocation = new Path(dataDirBase, relativePartDataPath).toString();
}
fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
boolean performOnlyMoveOps = Utils.onSameHDFSFileSystem(new Path(srcLocation), new Path(partSpec.getLocation()));
x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ partSpecToString(partSpec.getPartSpec())
+ " with source location: " + srcLocation);
Path tgtLocation = new Path(partSpec.getLocation());

LoadFileType loadFileType;
Path destPath;
Path destPath = null;
if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
loadFileType = LoadFileType.IGNORE;
destPath = tgtLocation;
loadFileType = performOnlyMoveOps ? getLoadFileType(replicationSpec) : LoadFileType.IGNORE;
destPath = performOnlyMoveOps ? x.getCtx().getExternalTmpPath(tgtLocation) : tgtLocation;
isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge"));
if (table.isTemporary()) {
needRecycle = false;
Expand All @@ -623,9 +637,7 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable());
}
} else {
loadFileType = replicationSpec.isReplace() ?
LoadFileType.REPLACE_ALL :
replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
loadFileType = getLoadFileType(replicationSpec);
//Replication scope the write id will be invalid
boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
replicationSpec.isInReplicationScope();
Expand All @@ -645,12 +657,22 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
);
}

Task<?> copyTask = null;
Task<?> copyOrMoveTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
if (performOnlyMoveOps) {
MoveWork mvWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false);
LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
Collections.singletonList(new Path(srcLocation)),
Collections.singletonList(destPath),
true, null, null);
mvWork.setMultiFilesDesc(loadFilesWork);
copyOrMoveTask = TaskFactory.get(mvWork, x.getConf());
} else {
copyOrMoveTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
}
} else {
copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
copyOrMoveTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
}

Task<?> addPartTask = null;
Expand Down Expand Up @@ -694,25 +716,37 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
// operations, add partition task is anyways a no-op as alter partition operation does just some statistics
// update which is again done in load operations as part of move task.
if (x.getEventType() == DumpType.EVENT_INSERT) {
copyTask.addDependentTask(TaskFactory.get(moveWork, x.getConf()));
copyOrMoveTask.addDependentTask(TaskFactory.get(moveWork, x.getConf()));
} else {
if (addPartTask != null) {
copyTask.addDependentTask(addPartTask);
copyOrMoveTask.addDependentTask(addPartTask);
}
}
return copyTask;
return copyOrMoveTask;
}
Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadPartTask);
copyOrMoveTask.addDependentTask(loadPartTask);
if (addPartTask != null) {
addPartTask.addDependentTask(loadPartTask);
x.getTasks().add(copyTask);
x.getTasks().add(copyOrMoveTask);
return addPartTask;
}
return copyTask;
return copyOrMoveTask;
}
}

private static boolean performOnlyMoveOpsDuringDataCopy(ReplicationSpec replicationSpec, Path fromPath, Path toPath) {
return replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(fromPath, toPath);
}

private static LoadFileType getLoadFileType(ReplicationSpec replicationSpec) {
return (replicationSpec.isReplace()
? LoadFileType.REPLACE_ALL
: replicationSpec.isMigratingToTxnTable()
? LoadFileType.KEEP_EXISTING
: LoadFileType.OVERWRITE_EXISTING);
}

/**
* In REPL LOAD flow, the data copy is done separately for external tables using data locations
* dumped in file {@link ReplExternalTables#FILE_NAME}. So, we can skip copying it here.
Expand Down

0 comments on commit 22ee85b

Please sign in to comment.