Skip to content

Commit

Permalink
HIVE-23539 : Optimize data copy during repl load operation for HDFS b…
Browse files Browse the repository at this point in the history
…ased staging location - bootstrap: fixed tests
  • Loading branch information
Pravin Sinha committed Jun 18, 2020
1 parent df7d14c commit 2f63f13
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1427,27 +1427,6 @@ public Boolean apply(NotificationEvent entry) {

primary.run("use " + primaryDb)
.run("drop table " + tbl);

//delete load ack to reuse the dump
new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
+ Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
+ LOAD_ACKNOWLEDGEMENT.toString()), true);


InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
try {
replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs);
} finally {
InjectableBehaviourObjectStore.resetAddNotificationModifier();
}

callerVerifier.assertInjectionsPerformed(true, false);
replica.load(replicatedDbName_CM, primaryDbName, withConfigs);

replica.run("use " + replicatedDbName_CM)
.run("select country from " + tbl + " where country == 'india'")
.verifyResults(Arrays.asList("india"))
.run(" drop database if exists " + replicatedDbName_CM + " cascade");
}

// This requires the tables are loaded in a fixed sorted order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.HiveTableName;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
Expand Down Expand Up @@ -269,32 +270,29 @@ private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc add
}

Path loadTmpDir = replicaWarehousePartitionLocation;

boolean performOnlyMove = event.replicationSpec().isInReplicationScope()
&& Utils.onSameHDFSFileSystem(event.dataPath(), replicaWarehousePartitionLocation);
// if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
LoadFileType loadFileType;
if (event.replicationSpec().isInReplicationScope() &&
context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
loadFileType = LoadFileType.IGNORE;
loadFileType = performOnlyMove ? getLoadFileType(event.replicationSpec()) : LoadFileType.IGNORE;
if (event.replicationSpec().isMigratingToTxnTable()) {
// Migrating to transactional tables in bootstrap load phase.
// It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
// ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata.
loadTmpDir = new Path(loadTmpDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
}
} else {
loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL :
(event.replicationSpec().isMigratingToTxnTable()
? LoadFileType.KEEP_EXISTING
: LoadFileType.OVERWRITE_EXISTING);
loadFileType = getLoadFileType(event.replicationSpec());
loadTmpDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
}
Path partDataSrc = new Path(event.dataPath(), getPartitionName(sourceWarehousePartitionLocation));
/**
* If the Repl staging directory ('hive.repl.rootdir') is on the target cluster itself and the FS scheme is hdfs,
* data is moved directly from Repl staging data dir of partition to the partition's location on target warehouse.
*/
boolean performOnlyMove = event.replicationSpec().isInReplicationScope()
&& Utils.onSameHDFSFileSystem(event.dataPath(), replicaWarehousePartitionLocation);

Path moveSource = performOnlyMove ? partDataSrc : loadTmpDir;
Task<?> movePartitionTask = null;
if (loadFileType != LoadFileType.IGNORE) {
Expand Down Expand Up @@ -331,6 +329,13 @@ private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc add
return ptnRootTask;
}

private LoadFileType getLoadFileType(ReplicationSpec replicationSpec) {
return replicationSpec.isReplace()
? LoadFileType.REPLACE_ALL
: (replicationSpec.isMigratingToTxnTable()
? LoadFileType.KEEP_EXISTING
: LoadFileType.OVERWRITE_EXISTING);
}
private String getPartitionName(Path partitionMetadataFullPath) {
//Get partition name by removing the metadata base path.
//Needed for getting the data path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,25 +272,24 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent
return new TableLocationTuple(path.toString(), false);
}

private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath,
Path fromURI) {
private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, Path fromURI) {
Path dataPath = fromURI;
Path loadTmpDir = tgtPath;
boolean performOnlyMove = replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(dataPath, tgtPath);

// if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
LoadFileType loadFileType;
if (replicationSpec.isInReplicationScope() &&
context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
loadFileType = LoadFileType.IGNORE;
loadFileType = performOnlyMove ? getLoadFileType(replicationSpec) : LoadFileType.IGNORE;
if (event.replicationSpec().isMigratingToTxnTable()) {
// Migrating to transactional tables in bootstrap load phase.
// It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
// ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata.
loadTmpDir = new Path(loadTmpDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
}
} else {
loadFileType = (replicationSpec.isReplace() || replicationSpec.isMigratingToTxnTable())
? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
loadFileType = getLoadFileType(replicationSpec);
loadTmpDir = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo);
}

Expand All @@ -302,7 +301,6 @@ private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path
* data is moved directly from Repl staging data dir of the partition to the partition's location on target
* warehouse.
*/
boolean performOnlyMove = replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(dataPath, tgtPath);
Path moveSrcPath = performOnlyMove ? dataPath : loadTmpDir;

MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
Expand Down Expand Up @@ -345,6 +343,11 @@ private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path
return copyTask;
}

private LoadFileType getLoadFileType(ReplicationSpec replicationSpec) {
return (replicationSpec.isReplace() || replicationSpec.isMigratingToTxnTable())
? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
}

private Task<?> dropTableTask(Table table) {
assert(table != null);
DropTableDesc dropTblDesc = new DropTableDesc(table.getFullyQualifiedName(), true, false, event.replicationSpec());
Expand Down

0 comments on commit 2f63f13

Please sign in to comment.