Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-23539 : Optimize data copy during repl load operation for HDFS b… #1084

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1539,15 +1539,14 @@ public void testCheckPointingWithSourceTableDataInserted() throws Throwable {
.run("insert into t2 values (24)")
.run("insert into t1 values (4)")
.dump(primaryDbName, dumpClause);

assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because data is moved now. We are checking this late

assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select * from t1")
.verifyResults(new String[]{"1", "2", "3", "4"})
.run("select * from t2")
.verifyResults(new String[]{"11", "21", "13", "24"});
assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
}

@Test
Expand Down Expand Up @@ -1596,6 +1595,13 @@ public void testCheckPointingWithNewTablesAdded() throws Throwable {
.run("insert into t3 values (3)")
.dump(primaryDbName, dumpClause);

Path tablet3Path = new Path(dbPath, "t3");
assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
assertTrue(fs.exists(tablet3Path));


replica.load(replicatedDbName, primaryDbName)
.run("use " + replicatedDbName)
.run("select * from t1")
Expand All @@ -1606,9 +1612,10 @@ public void testCheckPointingWithNewTablesAdded() throws Throwable {
.verifyResults(new String[]{"t1", "t2", "t3"})
.run("select * from t3")
.verifyResults(new String[]{"1", "2", "3"});
assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime());
assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());

assertFalse(fs.exists(tablet1Path));
assertFalse(fs.exists(tablet2Path));
assertFalse(fs.exists(tablet3Path));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1427,27 +1427,6 @@ public Boolean apply(NotificationEvent entry) {

primary.run("use " + primaryDb)
.run("drop table " + tbl);

//delete load ack to reuse the dump
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
+ LOAD_ACKNOWLEDGEMENT.toString()), true);


InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}

callerVerifier.assertInjectionsPerformed(true, false);
replica.load(replicatedDbName_CM, primaryDbName, withConfigs);

replica.run("use " + replicatedDbName_CM)
.run("select country from " + tbl + " where country == 'india'")
.verifyResults(Arrays.asList("india"))
.run(" drop database if exists " + replicatedDbName_CM + " cascade");
}

// This requires the tables are loaded in a fixed sorted order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,63 @@ public void externalTableReplicationWithLocalStaging() throws Throwable {
.verifyResult("800");
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative test for different staging when the data is not moved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment not clear

public void testHdfsMoveOptimizationOnTargetStaging() throws Throwable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for empty staging as the data is moved.

List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir);
withClauseOptions.addAll(externalTableBasePathWithClause());
primary.run("use " + primaryDbName)
.run("create table t1 (id int)")
.run("insert into table t1 values (1)")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('ranchi')")
.run("insert into table t2 partition(country='us') values ('austin')")
.run("insert into table t2 partition(country='france') values ('paris')")
.dump(primaryDbName, withClauseOptions);
replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1")
.verifyResult("1")
.run("show tables like 't2'")
.verifyResult("t2")
.run("select place from t2 where country = 'india'")
.verifyResult("ranchi")
.run("select place from t2 where country = 'us'")
.verifyResult("austin")
.run("select place from t2 where country = 'france'")
.verifyResult("paris");

primary.run("use " + primaryDbName)
.run("insert into table t1 values (2)")
.run("insert into table t2 partition(country='india') values ('mysore')")
.run("insert into table t2 partition(country='us') values ('decorah')")
.run("insert into table t2 partition(country='france') values ('yvoire')")
.run("create table t4 (id int)")
.run("insert into table t4 values (4)")
.dump(primaryDbName, withClauseOptions);

replica.load(replicatedDbName, primaryDbName, withClauseOptions)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("select id from t1")
.verifyResults(new String[] {"1", "2"})
.run("show tables like 't2'")
.verifyResult("t2")
.run("show tables like 't4'")
.verifyResult("t4")
.run("select place from t2 where country = 'india'")
.verifyResults(new String[] {"ranchi", "mysore"})
.run("select place from t2 where country = 'us'")
.verifyResults(new String[]{"austin", "decorah"})
.run("select place from t2 where country = 'france'")
.verifyResults(new String[]{"paris", "yvoire"})
.run("select id from t4")
.verifyResult("4");

}

private List<String> getStagingLocationConfig(String stagingLoc) {
List<String> confList = new ArrayList<>();
confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc + "'");
Expand Down
2 changes: 1 addition & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf)
Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false);
}
// Set isManaged to false as this is not load data operation for which it is needed.
if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false)) {
if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false, work.isPerformOnlyMove())) {
try {
if (deletePath != null) {
tgtFs.delete(deletePath, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.HiveTableName;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
Expand All @@ -58,6 +60,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -266,48 +269,57 @@ private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc add
return ptnRootTask;
}

Path stagingDir = replicaWarehousePartitionLocation;
Path loadTmpDir = replicaWarehousePartitionLocation;
boolean performOnlyMove = event.replicationSpec().isInReplicationScope()
&& Utils.onSameHDFSFileSystem(event.dataPath(), replicaWarehousePartitionLocation);
// if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
LoadFileType loadFileType;
if (event.replicationSpec().isInReplicationScope() &&
context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
loadFileType = LoadFileType.IGNORE;
loadFileType = performOnlyMove ? getLoadFileType(event.replicationSpec()) : LoadFileType.IGNORE;
if (event.replicationSpec().isMigratingToTxnTable()) {
// Migrating to transactional tables in bootstrap load phase.
// It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
// ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata.
stagingDir = new Path(stagingDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
loadTmpDir = new Path(loadTmpDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
}
} else {
loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL :
(event.replicationSpec().isMigratingToTxnTable()
? LoadFileType.KEEP_EXISTING
: LoadFileType.OVERWRITE_EXISTING);
stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
loadFileType = getLoadFileType(event.replicationSpec());
loadTmpDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
}
Path partDataSrc = new Path(event.dataPath(), getPartitionName(sourceWarehousePartitionLocation));
/**
* If the Repl staging directory ('hive.repl.rootdir') is on the target cluster itself and the FS scheme is hdfs,
* data is moved directly from Repl staging data dir of partition to the partition's location on target warehouse.
*/

Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)),
stagingDir,
context.hiveConf, false, false
);

Path moveSource = performOnlyMove ? partDataSrc : loadTmpDir;
Task<?> movePartitionTask = null;
if (loadFileType != LoadFileType.IGNORE) {
// no need to create move task, if file is moved directly to target location.
movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType);
movePartitionTask = movePartitionTask(table, partSpec, moveSource, loadFileType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we moving from source here?

Copy link
Contributor Author

@pkumarsinha pkumarsinha Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Path moveSource = performOnlyMove ? partDataSrc : stagingDir;
Move src is either partDataSrc(which is repl Staging dir) or load's own local staging dir which is used to do data copy operation.

}

if (ptnRootTask == null) {
ptnRootTask = copyTask;
if (performOnlyMove) {
if (ptnRootTask == null) {
ptnRootTask = addPartTask;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the addPartTask not be added already as part of DDL operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that happens only when isOnlyDDLOperation is true. It doesn't come down in to this line in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How was addPartTask added before your change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not changing anything with respect to addPartTask. Just the copy-move is getting replaced with single move.

} else {
ptnRootTask.addDependentTask(addPartTask);
}
} else {
ptnRootTask.addDependentTask(copyTask);
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
partDataSrc,
loadTmpDir, context.hiveConf, false, false
);
if (ptnRootTask == null) {
ptnRootTask = copyTask;
} else {
ptnRootTask.addDependentTask(copyTask);
}
copyTask.addDependentTask(addPartTask);
}

// Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is
// retried for bootstrap, we skip current partition update.
copyTask.addDependentTask(addPartTask);
if (movePartitionTask != null) {
addPartTask.addDependentTask(movePartitionTask);
movePartitionTask.addDependentTask(ckptTask);
Expand All @@ -317,10 +329,17 @@ private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc add
return ptnRootTask;
}

private LoadFileType getLoadFileType(ReplicationSpec replicationSpec) {
return replicationSpec.isReplace()
? LoadFileType.REPLACE_ALL
: (replicationSpec.isMigratingToTxnTable()
? LoadFileType.KEEP_EXISTING
: LoadFileType.OVERWRITE_EXISTING);
}
private String getPartitionName(Path partitionMetadataFullPath) {
//Get partition name by removing the metadata base path.
//Needed for getting the data path
return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length());
return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length() + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a split instead at '/'. This may be error prone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing method. just changing the index

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
Expand All @@ -56,7 +57,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -272,42 +272,44 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent
return new TableLocationTuple(path.toString(), false);
}

private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath,
Path fromURI) {
private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, Path fromURI) {
Path dataPath = fromURI;
Path tmpPath = tgtPath;
Path loadTmpDir = tgtPath;
boolean performOnlyMove = replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(dataPath, tgtPath);

// if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
LoadFileType loadFileType;
if (replicationSpec.isInReplicationScope() &&
context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
loadFileType = LoadFileType.IGNORE;
loadFileType = performOnlyMove ? getLoadFileType(replicationSpec) : LoadFileType.IGNORE;
if (event.replicationSpec().isMigratingToTxnTable()) {
// Migrating to transactional tables in bootstrap load phase.
// It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
// ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata.
tmpPath = new Path(tmpPath, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
loadTmpDir = new Path(loadTmpDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
}
} else {
loadFileType = (replicationSpec.isReplace() || replicationSpec.isMigratingToTxnTable())
? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo);
loadFileType = getLoadFileType(replicationSpec);
loadTmpDir = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo);
}

LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table "
+ table.getCompleteName() + " with source location: "
+ dataPath.toString() + " and target location " + tgtPath.toString());

Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf,
false, false);
/**
* If the Repl staging directory ('hive.repl.rootdir') is on the target cluster itself and the FS scheme is hdfs,
* data is moved directly from Repl staging data dir of the partition to the partition's location on target
* warehouse.
*/
Path moveSrcPath = performOnlyMove ? dataPath : loadTmpDir;

MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
if (AcidUtils.isTransactionalTable(table)) {
if (replicationSpec.isMigratingToTxnTable()) {
// Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir.
// ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata.
LoadTableDesc loadTableWork = new LoadTableDesc(
tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
moveSrcPath, Utilities.getTableDesc(table), new TreeMap<>(),
loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID
);
loadTableWork.setStmtId(0);
Expand All @@ -317,24 +319,35 @@ private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path
moveWork.setLoadTableWork(loadTableWork);
} else {
LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
Collections.singletonList(tmpPath),
Collections.singletonList(moveSrcPath),
Collections.singletonList(tgtPath),
true, null, null);
moveWork.setMultiFilesDesc(loadFilesWork);
}
} else {
LoadTableDesc loadTableWork = new LoadTableDesc(
tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
moveSrcPath, Utilities.getTableDesc(table), new TreeMap<>(),
loadFileType, 0L
);
moveWork.setLoadTableWork(loadTableWork);
}
moveWork.setIsInReplicationScope(replicationSpec.isInReplicationScope());
Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
if (performOnlyMove) {
//If staging directory is on target cluster and on hdfs, use just move opertaion for data copy.
return loadTableTask;
}
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, loadTmpDir, context.hiveConf,
false, false);
copyTask.addDependentTask(loadTableTask);
return copyTask;
}

private LoadFileType getLoadFileType(ReplicationSpec replicationSpec) {
return (replicationSpec.isReplace() || replicationSpec.isMigratingToTxnTable())
? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
}

private Task<?> dropTableTask(Table table) {
assert(table != null);
DropTableDesc dropTblDesc = new DropTableDesc(table.getFullyQualifiedName(), true, false, event.replicationSpec());
Expand Down
Loading