Skip to content

Commit

Permalink
HIVE-23539 : Optimize data copy during repl load operation for HDFS b…
Browse files Browse the repository at this point in the history
…ased staging location - Incremental - Single Move
  • Loading branch information
Pravin Sinha committed Jun 22, 2020
1 parent 15cf549 commit 3429e27
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 37 deletions.
2 changes: 1 addition & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf)
Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false);
}
// Set isManaged to false as this is not load data operation for which it is needed.
if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false)) {
if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false, work.isPerformOnlyMove())) {
try {
if (deletePath != null) {
tgtFs.delete(deletePath, true);
Expand Down
10 changes: 5 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
Original file line number Diff line number Diff line change
Expand Up @@ -4402,7 +4402,7 @@ private static void deleteAndRename(FileSystem destFs, Path destFile, FileStatus
//from mv command if the destf is a directory, it replaces the destf instead of moving under
//the destf. in this case, the replaced destf still preserves the original destf's permission
public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace,
boolean isSrcLocal, boolean isManaged) throws HiveException {
boolean isSrcLocal, boolean isManaged, boolean replOnlyMoveOp) throws HiveException {
final FileSystem srcFs, destFs;
try {
destFs = destf.getFileSystem(conf);
Expand Down Expand Up @@ -4436,7 +4436,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
//if replace is false, rename (mv) actually move the src under dest dir
//if destf is an existing file, rename is actually a replace, and do not need
// to delete the file first
if (replace && !srcIsSubDirOfDest) {
if (replace && !(srcIsSubDirOfDest||replOnlyMoveOp)) {
destFs.delete(destf, true);
LOG.debug("The path " + destf.toString() + " is deleted");
}
Expand All @@ -4457,7 +4457,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
replace, // overwrite destination
conf);
} else {
if (srcIsSubDirOfDest || destIsSubDirOfSrc) {
if (srcIsSubDirOfDest || destIsSubDirOfSrc || replOnlyMoveOp) {
FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);

List<Future<Void>> futures = new LinkedList<>();
Expand Down Expand Up @@ -4886,7 +4886,7 @@ private void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, H
// 2. srcs must be a list of files -- ensured by LoadSemanticAnalyzer
// in both cases, we move the file under destf
if (srcs.length == 1 && srcs[0].isDirectory()) {
if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal, isManaged)) {
if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal, isManaged, false)) {
throw new IOException("Error moving: " + srcf + " into: " + destf);
}

Expand All @@ -4913,7 +4913,7 @@ private void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, H
public Boolean call() throws Exception {
SessionState.setCurrentSessionState(parentSession);
return moveFile(
conf, src.getPath(), destFile, true, isSrcLocal, isManaged);
conf, src.getPath(), destFile, true, isSrcLocal, isManaged, false);
}
}),
destFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
import org.apache.hadoop.hive.ql.plan.CopyWork;
Expand Down Expand Up @@ -447,16 +448,16 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
assert table != null;
assert table.getParameters() != null;
Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
Path destPath = null, loadPath = null;
Path destPath = null;
LoadFileType lft;
boolean isAutoPurge = false;
boolean needRecycle = false;
boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();

boolean performOnlyMove = replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(dataPath, tgtPath);
if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
lft = LoadFileType.IGNORE;
destPath = loadPath = tgtPath;
lft = performOnlyMove ? getLoadFileType(replicationSpec) : LoadFileType.IGNORE;
destPath = tgtPath;
isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge"));
if (table.isTemporary()) {
needRecycle = false;
Expand All @@ -478,10 +479,9 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
* boolean, Long, int)}
* skip the unnecessary file (rename) operation but it will perform other things.
*/
loadPath = tgtPath;
lft = LoadFileType.KEEP_EXISTING;
} else {
destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath);
destPath = x.getCtx().getExternalTmpPath(tgtPath);
lft = replace ? LoadFileType.REPLACE_ALL :
replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
}
Expand All @@ -498,37 +498,44 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
);
}

Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}

MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false);


/**
* If the Repl staging directory ('hive.repl.rootdir') is on the target cluster itself and the FS scheme is hdfs,
* data is moved directly from Repl staging data dir of the table to the table's location on target warehouse.
*/
Path moveDataSrc = performOnlyMove ? dataPath : destPath;
if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) {
LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
Collections.singletonList(destPath),
Collections.singletonList(moveDataSrc),
Collections.singletonList(tgtPath),
true, null, null);
moveWork.setMultiFilesDesc(loadFilesWork);
moveWork.setNeedCleanTarget(replace);
} else {
LoadTableDesc loadTableWork = new LoadTableDesc(
loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
moveDataSrc, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
if (replicationSpec.isMigratingToTxnTable()) {
loadTableWork.setInsertOverwrite(replace);
}
loadTableWork.setStmtId(stmtId);
moveWork.setLoadTableWork(loadTableWork);
}
Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf());
if (performOnlyMove) {
moveWork.setPerformOnlyMove(true);
x.getTasks().add(loadTableTask);
return loadTableTask;
}

Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}
//if Importing into existing table, FileFormat is checked by
// ImportSemanticAnalzyer.checked checkTable()
Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadTableTask);
x.getTasks().add(copyTask);
return loadTableTask;
Expand Down Expand Up @@ -591,6 +598,7 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
return addPartTask;
} else {
String srcLocation = partSpec.getLocation();
boolean performOnlyMove = false;
if (replicationSpec.isInReplicationScope()
&& !ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType())) {
Path partLocation = new Path(partSpec.getLocation());
Expand All @@ -602,8 +610,14 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
}
String relativePartDataPath = EximUtil.DATA_PATH_NAME + File.separator + bucketDir;
srcLocation = new Path(dataDirBase, relativePartDataPath).toString();
/**
* If the Repl staging directory ('hive.repl.rootdir') is on the target cluster itself and the FS scheme is
* hdfs, data is moved directly from Repl staging data dir of the partition to the partition's location on
* target warehouse.
*/
}
fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
performOnlyMove = Utils.onSameHDFSFileSystem(new Path(srcLocation), new Path(partSpec.getLocation()));
x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ partSpecToString(partSpec.getPartSpec())
+ " with source location: " + srcLocation);
Expand All @@ -613,7 +627,7 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
Path destPath;
if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
loadFileType = LoadFileType.IGNORE;
loadFileType = performOnlyMove ? getLoadFileType(replicationSpec) : LoadFileType.IGNORE;
destPath = tgtLocation;
isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge"));
if (table.isTemporary()) {
Expand Down Expand Up @@ -644,15 +658,18 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
)
);
}

Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
/**
* If the Repl staging directory ('hive.repl.rootdir') is on the target cluster itself and the FS scheme is hdfs,
* data is moved directly from Repl staging data dir for partition to the partition's location on target
* warehouse.
*/
if (performOnlyMove) {
moveTaskSrc = new Path(srcLocation);
} else if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) {
moveTaskSrc = destPath;
}


Task<?> addPartTask = null;
if (x.getEventType() != DumpType.EVENT_COMMIT_TXN) {
// During replication, by the time we are applying commit transaction event, we expect
Expand All @@ -661,15 +678,14 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
addPartTask = TaskFactory.get(
new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
}

MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(),
null, null, false);

// Note: this sets LoadFileType incorrectly for ACID; is that relevant for import?
// See setLoadFileType and setIsAcidIow calls elsewhere for an example.
if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) {
LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
Collections.singletonList(destPath),
Collections.singletonList(moveTaskSrc),
Collections.singletonList(tgtLocation),
true, null, null);
moveWork.setMultiFilesDesc(loadFilesWork);
Expand All @@ -686,9 +702,26 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
loadTableWork.setInheritTableSpecs(false);
moveWork.setLoadTableWork(loadTableWork);
}
Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
if (performOnlyMove) {
moveWork.setPerformOnlyMove(true);
if (addPartTask != null) {
addPartTask.addDependentTask(loadPartTask);
}
x.getTasks().add(loadPartTask);
return addPartTask == null ? loadPartTask : addPartTask;
}

Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
}

if (loadFileType == LoadFileType.IGNORE) {
// if file is coped directly to the target location, then no need of move task in case the operation getting
// if file is copied directly to the target location, then no need of move task in case the operation getting
// replayed is add partition. As add partition will add the event for create partition. Even the statics are
// updated properly in create partition flow as the copy is done directly to the partition location. For insert
// operations, add partition task is anyways a no-op as alter partition operation does just some statistics
Expand All @@ -702,7 +735,6 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
}
return copyTask;
}
Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadPartTask);
if (addPartTask != null) {
addPartTask.addDependentTask(loadPartTask);
Expand All @@ -713,6 +745,14 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
}
}

private static LoadFileType getLoadFileType(ReplicationSpec replicationSpec) {
return (replicationSpec.isReplace()
? LoadFileType.REPLACE_ALL
: replicationSpec.isMigratingToTxnTable()
? LoadFileType.KEEP_EXISTING
: LoadFileType.OVERWRITE_EXISTING);
}

/**
* In REPL LOAD flow, the data copy is done separately for external tables using data locations
* dumped in file {@link ReplExternalTables#FILE_NAME}. So, we can skip copying it here.
Expand Down
10 changes: 10 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class MoveWork implements Serializable {
private boolean checkFileFormat;
private boolean srcLocal;
private boolean needCleanTarget;
private boolean performOnlyMove;

/**
* ReadEntitites that are passed to the hooks.
Expand Down Expand Up @@ -96,6 +97,7 @@ public MoveWork(final MoveWork o) {
inputs = o.getInputs();
outputs = o.getOutputs();
needCleanTarget = o.needCleanTarget;
setPerformOnlyMove(o.isPerformOnlyMove());
}

@Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
Expand Down Expand Up @@ -172,4 +174,12 @@ public void setIsInReplicationScope(boolean isInReplicationScope) {
public boolean getIsInReplicationScope() {
return this.isInReplicationScope;
}

public boolean isPerformOnlyMove() {
return performOnlyMove;
}

public void setPerformOnlyMove(boolean performOnlyMove) {
this.performOnlyMove = performOnlyMove;
}
}

0 comments on commit 3429e27

Please sign in to comment.