Skip to content

Commit

Permalink
HIVE-23539 : Optimize data copy during repl load operation for HDFS b…
Browse files Browse the repository at this point in the history
…ased staging location
  • Loading branch information
Pravin Sinha committed Jun 9, 2020
1 parent 59abbff commit 92054da
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,63 @@ public void externalTableReplicationWithLocalStaging() throws Throwable {
.verifyResult("800");
}

@Test
public void testHdfsMoveOptimizationOnTargetStaging() throws Throwable {
List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir);
withClauseOptions.addAll(externalTableBasePathWithClause());
primary.run("use " + primaryDbName)
.run("create table t1 (id int)")
.run("insert into table t1 values (1)")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('ranchi')")
.run("insert into table t2 partition(country='us') values ('austin')")
.run("insert into table t2 partition(country='france') values ('paris')")
.dump(primaryDbName, withClauseOptions);
replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1")
.verifyResult("1")
.run("show tables like 't2'")
.verifyResult("t2")
.run("select place from t2 where country = 'india'")
.verifyResult("ranchi")
.run("select place from t2 where country = 'us'")
.verifyResult("austin")
.run("select place from t2 where country = 'france'")
.verifyResult("paris");

primary.run("use " + primaryDbName)
.run("insert into table t1 values (2)")
.run("insert into table t2 partition(country='india') values ('mysore')")
.run("insert into table t2 partition(country='us') values ('decorah')")
.run("insert into table t2 partition(country='france') values ('yvoire')")
.run("create table t4 (id int)")
.run("insert into table t4 values (4)")
.dump(primaryDbName, withClauseOptions);

replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1")
.verifyResults(new String[] {"1", "2"})
.run("show tables like 't2'")
.verifyResult("t2")
.run("show tables like 't4'")
.verifyResult("t4")
.run("select place from t2 where country = 'india'")
.verifyResults(new String[] {"ranchi", "mysore"})
.run("select place from t2 where country = 'us'")
.verifyResults(new String[]{"austin", "decorah"})
.run("select place from t2 where country = 'france'")
.verifyResults(new String[]{"paris", "yvoire"})
.run("select id from t4")
.verifyResult("4");

}

private List<String> getStagingLocationConfig(String stagingLoc) {
List<String> confList = new ArrayList<>();
confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc + "'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
Expand All @@ -57,6 +58,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -225,6 +227,7 @@ private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc add
}

Path stagingDir = replicaWarehousePartitionLocation;
boolean performOnlyMove = Utils.onSameHDFSFileSystem(event.dataPath(), replicaWarehousePartitionLocation);
// if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
LoadFileType loadFileType;
if (event.replicationSpec().isInReplicationScope() &&
Expand All @@ -243,29 +246,34 @@ private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc add
: LoadFileType.OVERWRITE_EXISTING);
stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
}

Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)),
stagingDir,
context.hiveConf, false, false
);

Path partDataSrc = new Path(event.dataPath() + File.separator + getPartitionName(sourceWarehousePartitionLocation));
Path moveSource = performOnlyMove ? partDataSrc : stagingDir;
Task<?> movePartitionTask = null;
if (loadFileType != LoadFileType.IGNORE) {
// no need to create move task, if file is moved directly to target location.
movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType);
movePartitionTask = movePartitionTask(table, partSpec, moveSource, loadFileType);
}

if (ptnRootTask == null) {
ptnRootTask = copyTask;
if (performOnlyMove) {
if (ptnRootTask == null) {
ptnRootTask = addPartTask;
} else {
ptnRootTask.addDependentTask(addPartTask);
}
} else {
ptnRootTask.addDependentTask(copyTask);
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
partDataSrc,
stagingDir, context.hiveConf, false, false
);
if (ptnRootTask == null) {
ptnRootTask = copyTask;
} else {
ptnRootTask.addDependentTask(copyTask);
}
copyTask.addDependentTask(addPartTask);
}

// Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is
// retried for bootstrap, we skip current partition update.
copyTask.addDependentTask(addPartTask);
if (movePartitionTask != null) {
addPartTask.addDependentTask(movePartitionTask);
movePartitionTask.addDependentTask(ckptTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
Expand All @@ -56,7 +57,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -276,6 +276,7 @@ private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path
Path fromURI) {
Path dataPath = fromURI;
Path tmpPath = tgtPath;
boolean performOnlyMove = Utils.onSameHDFSFileSystem(dataPath, tgtPath);

// if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
LoadFileType loadFileType;
Expand All @@ -297,17 +298,15 @@ private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path
LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table "
+ table.getCompleteName() + " with source location: "
+ dataPath.toString() + " and target location " + tgtPath.toString());

Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf,
false, false);
Path moveSrcPath = performOnlyMove ? dataPath : tmpPath;

MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
if (AcidUtils.isTransactionalTable(table)) {
if (replicationSpec.isMigratingToTxnTable()) {
// Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir.
// ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata.
LoadTableDesc loadTableWork = new LoadTableDesc(
tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
moveSrcPath, Utilities.getTableDesc(table), new TreeMap<>(),
loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID
);
loadTableWork.setStmtId(0);
Expand All @@ -317,20 +316,26 @@ private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path
moveWork.setLoadTableWork(loadTableWork);
} else {
LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
Collections.singletonList(tmpPath),
Collections.singletonList(moveSrcPath),
Collections.singletonList(tgtPath),
true, null, null);
moveWork.setMultiFilesDesc(loadFilesWork);
}
} else {
LoadTableDesc loadTableWork = new LoadTableDesc(
tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
moveSrcPath, Utilities.getTableDesc(table), new TreeMap<>(),
loadFileType, 0L
);
moveWork.setLoadTableWork(loadTableWork);
}
moveWork.setIsInReplicationScope(replicationSpec.isInReplicationScope());
Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
if (performOnlyMove) {
//If staging directory is on target cluster and on hdfs, use just move opertaion for data copy.
return loadTableTask;
}
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf,
false, false);
copyTask.addDependentTask(loadTableTask);
return copyTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
import org.apache.hadoop.hive.ql.plan.CopyWork;
Expand Down Expand Up @@ -452,7 +453,7 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
boolean isAutoPurge = false;
boolean needRecycle = false;
boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable();

boolean performOnlyMove = replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(dataPath, tgtPath);
if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable ||
x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
lft = LoadFileType.IGNORE;
Expand Down Expand Up @@ -498,37 +499,39 @@ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
);
}

Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}

MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false);


Path dataSrc = performOnlyMove ? dataPath : destPath;
if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) {
LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
Collections.singletonList(destPath),
Collections.singletonList(dataSrc),
Collections.singletonList(tgtPath),
true, null, null);
moveWork.setMultiFilesDesc(loadFilesWork);
moveWork.setNeedCleanTarget(replace);
} else {
LoadTableDesc loadTableWork = new LoadTableDesc(
loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
dataSrc, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
if (replicationSpec.isMigratingToTxnTable()) {
loadTableWork.setInsertOverwrite(replace);
}
loadTableWork.setStmtId(stmtId);
moveWork.setLoadTableWork(loadTableWork);
}
Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf());
if (performOnlyMove) {
x.getTasks().add(loadTableTask);
return loadTableTask;
}

Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}
//if Importing into existing table, FileFormat is checked by
// ImportSemanticAnalzyer.checked checkTable()
Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadTableTask);
x.getTasks().add(copyTask);
return loadTableTask;
Expand Down Expand Up @@ -591,6 +594,7 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
return addPartTask;
} else {
String srcLocation = partSpec.getLocation();
boolean performOnlyMove = false;
if (replicationSpec.isInReplicationScope()
&& !ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType())) {
Path partLocation = new Path(partSpec.getLocation());
Expand All @@ -602,6 +606,7 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
}
String relativePartDataPath = EximUtil.DATA_PATH_NAME + File.separator + bucketDir;
srcLocation = new Path(dataDirBase, relativePartDataPath).toString();
performOnlyMove = Utils.onSameHDFSFileSystem(new Path(srcLocation), new Path(partSpec.getLocation()));
}
fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
Expand Down Expand Up @@ -644,13 +649,10 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
)
);
}

Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
if (performOnlyMove) {
moveTaskSrc = new Path(srcLocation);
} else if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) {
moveTaskSrc = destPath;
}

Task<?> addPartTask = null;
Expand All @@ -661,15 +663,14 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
addPartTask = TaskFactory.get(
new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
}

MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(),
null, null, false);

// Note: this sets LoadFileType incorrectly for ACID; is that relevant for import?
// See setLoadFileType and setIsAcidIow calls elsewhere for an example.
if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) {
LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
Collections.singletonList(destPath),
Collections.singletonList(moveTaskSrc),
Collections.singletonList(tgtLocation),
true, null, null);
moveWork.setMultiFilesDesc(loadFilesWork);
Expand All @@ -686,9 +687,25 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
loadTableWork.setInheritTableSpecs(false);
moveWork.setLoadTableWork(loadTableWork);
}
Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
if (performOnlyMove) {
if (addPartTask != null) {
addPartTask.addDependentTask(loadPartTask);
}
x.getTasks().add(loadPartTask);
return addPartTask == null ? loadPartTask : addPartTask;
}

Task<?> copyTask = null;
if (replicationSpec.isInReplicationScope()) {
copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false);
} else {
copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
}

if (loadFileType == LoadFileType.IGNORE) {
// if file is coped directly to the target location, then no need of move task in case the operation getting
// if file is copied directly to the target location, then no need of move task in case the operation getting
// replayed is add partition. As add partition will add the event for create partition. Even the statics are
// updated properly in create partition flow as the copy is done directly to the partition location. For insert
// operations, add partition task is anyways a no-op as alter partition operation does just some statistics
Expand All @@ -702,7 +719,6 @@ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
}
return copyTask;
}
Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadPartTask);
if (addPartTask != null) {
addPartTask.addDependentTask(loadPartTask);
Expand Down
Loading

0 comments on commit 92054da

Please sign in to comment.