From abb4c3fc44bf5a514833a635e59675a630b0e3dc Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Tue, 11 Sep 2018 16:55:26 +0530 Subject: [PATCH 1/5] HIVE-20531 : One of the task , either move or add partition can be avoided in repl load flow --- .../org/apache/hadoop/hive/conf/HiveConf.java | 6 ++ ...stReplicationScenariosAcrossInstances.java | 80 ++++++++++++++++++- .../bootstrap/load/table/LoadPartitions.java | 17 +++- .../repl/bootstrap/load/table/LoadTable.java | 2 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 10 +++ .../ql/parse/ReplicationSemanticAnalyzer.java | 18 +++-- 6 files changed, 122 insertions(+), 11 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d1e6631975e7..465308ca1ab6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -495,6 +495,12 @@ public static enum ConfVars { + " the REPL LOAD on object data stores such as S3 or WASB where creating a directory and move \n" + " files are costly operations. In file system like HDFS where move operation is atomic, this \n" + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), + REPL_MOVE_OPTIMIZED_FILE_SCHEMES("hive.repl.move.optimized.scheme", "s3a, wasb", + "Comma separated list of schemes for which move optimization will be enabled during repl load. \n" + + "This configuration overrides the value set using REPL_ENABLE_MOVE_OPTIMIZATION for the given schemes. \n" + + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n " + + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n" + + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index fb797ff3e32c..e3bcb57e0ea2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -1393,7 +1393,7 @@ public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Thro .run("insert overwrite table t1 select * from t2") .dump(primaryDbName, tuple.lastReplicationId); - testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "INSERT", tuple); + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION", tuple); } @Test @@ -1470,6 +1470,84 @@ public Boolean apply(@Nullable NotificationEvent entry) { .run(" drop database if exists " + replicatedDbName_CM + " cascade"); } + private void injectableForTaskCreation(String primarydb, String replicadb, String tbl, String eventType, + String eventTypeExpect, WarehouseInstance.Tuple tuple) throws Throwable { + List withConfigs = Arrays.asList("'hive.repl.move.optimized.scheme'= ' hDfs , '"); + + // fail add notification for given event type. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable NotificationEvent entry) { + if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName()) + + " Table: " + String.valueOf(entry.getTableName()) + + " Event: " + String.valueOf(entry.getEventType())); + return false; + } + if (entry.getEventType().equalsIgnoreCase(eventTypeExpect) && entry.getTableName().equalsIgnoreCase(tbl)) { + nonInjectedPathCalled = true; + } + return true; + } + }; + + InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); + try { + replica.load(replicadb, tuple.dumpLocation, withConfigs); + } finally { + InjectableBehaviourObjectStore.resetAddNotificationModifier(); + } + + callerVerifier.assertInjectionsPerformed(false, true); + } + + @Test + public void testTaskCreationOptimization() throws Throwable { + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .dump(primaryDbName, null); + + //no insert event should be added + injectableForTaskCreation(primaryDbName, replicatedDbName, "t2", + "INSERT", "ADD_PARTITION", tuple); + + replica.run("use " + replicatedDbName) + .run("select country from t2 where country == 'india'") + .verifyResults(Arrays.asList("india")); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='india') values ('delhi')") + .dump(primaryDbName, tuple.lastReplicationId); + + //no ADD_PARTITION event should be added + injectableForTaskCreation(primaryDbName, replicatedDbName, "t2","ADD_PARTITION", + "INSERT", tuple); + + replica.run("use " + replicatedDbName) + .run("select place from t2 where country == 'india'") + .verifyResults(Arrays.asList("bangalore", "delhi")); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='us') values ('sf')") + .dump(primaryDbName, tuple.lastReplicationId); + + //no insert event should be added + injectableForTaskCreation(primaryDbName, replicatedDbName, "t2","INSERT", + "ADD_PARTITION", tuple); + + replica.run("use " + replicatedDbName) + .run("select place from t2 where country == 'india'") + .verifyResults(Arrays.asList("bangalore", "delhi")) + .run("select place from t2 where country == 'us'") + .verifyResults(Arrays.asList("sf")); + } + @Test public void testDumpExternalTableSetFalse() throws Throwable { WarehouseInstance.Tuple tuple = primary diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 45b674e2876e..172b4ac44629 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -244,7 +244,12 @@ private Task tasksForAddPartition(Table table, AddPartitionDesc addPartitionD tmpPath, context.hiveConf, false, false ); - Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); + + Task movePartitionTask = null; + if (loadFileType != LoadFileType.IGNORE) { + // no need to create move task, if file is moved directly to target location. + movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); + } // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for // bootstrap, we skip current partition update. @@ -260,10 +265,14 @@ private Task tasksForAddPartition(Table table, AddPartitionDesc addPartitionD } else { ptnRootTask.addDependentTask(copyTask); } - copyTask.addDependentTask(addPartTask); - addPartTask.addDependentTask(movePartitionTask); - movePartitionTask.addDependentTask(ckptTask); + copyTask.addDependentTask(addPartTask); + if (movePartitionTask != null) { + addPartTask.addDependentTask(movePartitionTask); + movePartitionTask.addDependentTask(ckptTask); + } else { + addPartTask.addDependentTask(ckptTask); + } return ptnRootTask; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 82f687b7d67a..8538463cc6a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -234,7 +234,7 @@ private Task loadTableTask(Table table, ReplicationSpec replicationSpec, Path LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table " + table.getCompleteName() + " with source location: " - + dataPath.toString() + " and target location " + tmpPath.toString()); + + dataPath.toString() + " and target location " + tgtPath.toString()); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, false, false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index c99d9c1b1f94..1c3416e527b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -609,6 +609,16 @@ private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTabl moveWork.setLoadTableWork(loadTableWork); } + if (loadFileType == LoadFileType.IGNORE) { + // if file is coped directly to the target location, then no need of move task in case the operation getting + // replayed is add partition. For insert operations, add partition task is anyways a no-op. + if (x.getEventType() == DumpType.EVENT_INSERT) { + copyTask.addDependentTask(TaskFactory.get(moveWork, x.getConf())); + } else { + copyTask.addDependentTask(addPartTask); + } + return copyTask; + } Task loadPartTask = TaskFactory.get(moveWork, x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index cfeb31a24fb1..87c69cfb2552 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -50,6 +50,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -84,8 +85,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; - private static final List CLOUD_SCHEME_PREFIXES = Arrays.asList("s3a", "wasb"); - ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); this.db = super.db; @@ -222,7 +221,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } - private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { + private boolean ifEnableMoveOptimization(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { if (filePath == null) { throw new HiveException("filePath cannot be null"); } @@ -233,7 +232,16 @@ private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration co if (StringUtils.isBlank(scheme)) { throw new HiveException("Cannot get valid scheme for " + filePath); } - return CLOUD_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()); + + LOG.info("scheme is " + scheme); + + String[] schmeList = conf.get(REPL_MOVE_OPTIMIZED_FILE_SCHEMES.varname).toLowerCase().split(","); + for (String schemeIter : schmeList) { + if (schemeIter.trim().equalsIgnoreCase(scheme.trim())) { + return true; + } + } + return false; } // REPL LOAD @@ -326,7 +334,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { try { Warehouse wh = new Warehouse(conf); Path filePath = wh.getWhRoot(); - if (isCloudFS(filePath, conf)) { + if (ifEnableMoveOptimization(filePath, conf)) { conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true); LOG.info(" Set move optimization to true for warehouse " + filePath.toString()); } From 909eda279ccc25db9ea7653ee6d87e2b99e1c88a Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Wed, 26 Sep 2018 11:31:26 +0530 Subject: [PATCH 2/5] HIVE-20531 : One of the task , either move or add partition can be avoided in repl load flow : review comment fix --- ...stReplicationScenariosAcrossInstances.java | 116 ++++++++++-------- .../hive/ql/parse/ImportSemanticAnalyzer.java | 5 +- .../hadoop/hive/ql/plan/LoadTableDesc.java | 4 +- 3 files changed, 74 insertions(+), 51 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index e3bcb57e0ea2..c6501ce0fcc3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -25,6 +25,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; @@ -50,6 +57,7 @@ import org.junit.Assert; import java.io.IOException; +import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -1470,82 +1478,92 @@ public Boolean apply(@Nullable NotificationEvent entry) { .run(" drop database if exists " + replicatedDbName_CM + " cascade"); } - private void injectableForTaskCreation(String primarydb, String replicadb, String tbl, String eventType, - String eventTypeExpect, WarehouseInstance.Tuple tuple) throws Throwable { - List withConfigs = Arrays.asList("'hive.repl.move.optimized.scheme'= ' hDfs , '"); - - // fail add notification for given event type. - BehaviourInjection callerVerifier - = new BehaviourInjection() { - @Nullable - @Override - public Boolean apply(@Nullable NotificationEvent entry) { - if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) { - injectionPathCalled = true; - LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName()) - + " Table: " + String.valueOf(entry.getTableName()) - + " Event: " + String.valueOf(entry.getEventType())); - return false; - } - if (entry.getEventType().equalsIgnoreCase(eventTypeExpect) && entry.getTableName().equalsIgnoreCase(tbl)) { - nonInjectedPathCalled = true; - } + private abstract class checkTaskPresent { + public boolean hasTask(Task rootTask) { + if (validate(rootTask)) { return true; } + List> childTasks = rootTask.getChildTasks(); + if (childTasks == null) { + return false; + } + for (Task childTask : childTasks) { + if (hasTask(childTask)) { + return true; + } + } + return false; + } + + public abstract boolean validate(Task task); + } + + private boolean hasMoveTask(Task rootTask) { + checkTaskPresent validator = new checkTaskPresent() { + public boolean validate(Task task) { + return (task instanceof MoveTask); + } }; + return validator.hasTask(rootTask); + } - InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); - try { - replica.load(replicadb, tuple.dumpLocation, withConfigs); - } finally { - InjectableBehaviourObjectStore.resetAddNotificationModifier(); - } + private boolean hasPartitionTask(Task rootTask) { + checkTaskPresent validator = new checkTaskPresent() { + public boolean validate(Task task) { + if (task instanceof DDLTask) { + DDLTask ddlTask = (DDLTask)task; + if (ddlTask.getWork().getAddPartitionDesc() != null) { + return true; + } + } + return false; + } + }; + return validator.hasTask(rootTask); + } - callerVerifier.assertInjectionsPerformed(false, true); + private void checkTaskCreation(String replicadb, boolean hasMoveTask, boolean hasPartitionTask, boolean isIncDump, + WarehouseInstance.Tuple tuple) throws Throwable { + HiveConf confTemp = new HiveConf(conf); + confTemp.set("hive.repl.enable.move.optimization", "true"); + ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, + null, null, isIncDump); + Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); + replLoadTask.initialize(null, null, new DriverContext(new Context(confTemp)), null); + replLoadTask.executeTask(null); + Task rootTask = replLoadWork.getRootTask(); + assertEquals(hasMoveTask, hasMoveTask(rootTask)); + assertEquals(hasPartitionTask, hasPartitionTask(rootTask)); } @Test public void testTaskCreationOptimization() throws Throwable { - WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create table t2 (place string) partitioned by (country string)") .run("insert into table t2 partition(country='india') values ('bangalore')") .dump(primaryDbName, null); - //no insert event should be added - injectableForTaskCreation(primaryDbName, replicatedDbName, "t2", - "INSERT", "ADD_PARTITION", tuple); + //bootstrap load should not have move task + checkTaskCreation(replicatedDbName, false, true, false, tuple); - replica.run("use " + replicatedDbName) - .run("select country from t2 where country == 'india'") - .verifyResults(Arrays.asList("india")); + replica.load(replicatedDbName, tuple.dumpLocation); tuple = primary.run("use " + primaryDbName) .run("insert into table t2 partition(country='india') values ('delhi')") .dump(primaryDbName, tuple.lastReplicationId); - //no ADD_PARTITION event should be added - injectableForTaskCreation(primaryDbName, replicatedDbName, "t2","ADD_PARTITION", - "INSERT", tuple); + //no partition task should be added as the operation is inserting into an existing partition + checkTaskCreation(replicatedDbName, true, false, true, tuple); - replica.run("use " + replicatedDbName) - .run("select place from t2 where country == 'india'") - .verifyResults(Arrays.asList("bangalore", "delhi")); + replica.load(replicatedDbName, tuple.dumpLocation); tuple = primary.run("use " + primaryDbName) .run("insert into table t2 partition(country='us') values ('sf')") .dump(primaryDbName, tuple.lastReplicationId); - //no insert event should be added - injectableForTaskCreation(primaryDbName, replicatedDbName, "t2","INSERT", - "ADD_PARTITION", tuple); - - replica.run("use " + replicatedDbName) - .run("select place from t2 where country == 'india'") - .verifyResults(Arrays.asList("bangalore", "delhi")) - .run("select place from t2 where country == 'us'") - .verifyResults(Arrays.asList("sf")); + //no move task should be added as the operation is adding a dynamic partition + checkTaskCreation(replicatedDbName, false, true, true, tuple); } @Test diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 1c3416e527b2..16ce5d562db9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -611,7 +611,10 @@ private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTabl if (loadFileType == LoadFileType.IGNORE) { // if file is coped directly to the target location, then no need of move task in case the operation getting - // replayed is add partition. For insert operations, add partition task is anyways a no-op. + // replayed is add partition. As add partition will add the event for create partition. Even the statics are + // updated properly in create partition flow as the copy is done directly to the partition location. For insert + // operations, add partition task is anyways a no-op as alter partition operation does just some statistics + // update which is again done in load operations as part of move task. if (x.getEventType() == DumpType.EVENT_INSERT) { copyTask.addDependentTask(TaskFactory.get(moveWork, x.getConf())); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 2267df934e0b..d9333b569292 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -67,7 +67,9 @@ public enum LoadFileType { */ OVERWRITE_EXISTING, /** - * No need to move the file, used in case of replication to s3 + * No need to move the file, used in case of replication to s3. If load type is set to ignore, then only the file + * operations(move/rename) is ignored at load table/partition method. Other operations like statistics update, + * event notification happens as usual. */ IGNORE } From e5e12ce4b15c14579392e302c9bc5f6b5ae9d224 Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Wed, 26 Sep 2018 18:36:05 +0530 Subject: [PATCH 3/5] HIVE-20531 : One of the task , either move or add partition can be avoided in repl load flow : review comment fix --- ...stReplicationScenariosAcrossInstances.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index c6501ce0fcc3..52a7357fce94 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -1522,18 +1522,16 @@ public boolean validate(Task task) { return validator.hasTask(rootTask); } - private void checkTaskCreation(String replicadb, boolean hasMoveTask, boolean hasPartitionTask, boolean isIncDump, + private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, WarehouseInstance.Tuple tuple) throws Throwable { HiveConf confTemp = new HiveConf(conf); confTemp.set("hive.repl.enable.move.optimization", "true"); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, - null, null, isIncDump); + null, null, isIncrementalDump); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new DriverContext(new Context(confTemp)), null); replLoadTask.executeTask(null); - Task rootTask = replLoadWork.getRootTask(); - assertEquals(hasMoveTask, hasMoveTask(rootTask)); - assertEquals(hasPartitionTask, hasPartitionTask(rootTask)); + return replLoadWork.getRootTask(); } @Test @@ -1545,7 +1543,9 @@ public void testTaskCreationOptimization() throws Throwable { .dump(primaryDbName, null); //bootstrap load should not have move task - checkTaskCreation(replicatedDbName, false, true, false, tuple); + Task task = getReplLoadRootTask(replicatedDbName, false, tuple); + assertEquals(false, hasMoveTask(task)); + assertEquals(true, hasPartitionTask(task)); replica.load(replicatedDbName, tuple.dumpLocation); @@ -1554,7 +1554,9 @@ public void testTaskCreationOptimization() throws Throwable { .dump(primaryDbName, tuple.lastReplicationId); //no partition task should be added as the operation is inserting into an existing partition - checkTaskCreation(replicatedDbName, true, false, true, tuple); + task = getReplLoadRootTask(replicatedDbName, true, tuple); + assertEquals(true, hasMoveTask(task)); + assertEquals(false, hasPartitionTask(task)); replica.load(replicatedDbName, tuple.dumpLocation); @@ -1563,7 +1565,9 @@ public void testTaskCreationOptimization() throws Throwable { .dump(primaryDbName, tuple.lastReplicationId); //no move task should be added as the operation is adding a dynamic partition - checkTaskCreation(replicatedDbName, false, true, true, tuple); + task = getReplLoadRootTask(replicatedDbName, true, tuple); + assertEquals(false, hasMoveTask(task)); + assertEquals(true, hasPartitionTask(task)); } @Test From b39a7941edc3ee11f8b981b5665349eef37cebc5 Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Thu, 27 Sep 2018 15:26:07 +0530 Subject: [PATCH 4/5] HIVE-20531 : One of the task , either move or add partition can be avoided in repl load flow : test failure fix --- .../ql/parse/TestReplicationScenarios.java | 104 ++++++++++++++++++ ...stReplicationScenariosAcrossInstances.java | 99 ----------------- 2 files changed, 104 insertions(+), 99 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index a9783abe10fe..0d6ad99e0e09 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -52,9 +52,17 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -80,6 +88,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -315,6 +324,101 @@ public void testBasic() throws IOException { verifyRun("SELECT * from " + replicatedDbName + ".unptned_empty", empty, driverMirror); } + private abstract class checkTaskPresent { + public boolean hasTask(Task rootTask) { + if (rootTask == null) { + return false; + } + if (validate(rootTask)) { + return true; + } + List> childTasks = rootTask.getChildTasks(); + if (childTasks == null) { + return false; + } + for (Task childTask : childTasks) { + if (hasTask(childTask)) { + return true; + } + } + return false; + } + + public abstract boolean validate(Task task); + } + + private boolean hasMoveTask(Task rootTask) { + checkTaskPresent validator = new checkTaskPresent() { + public boolean validate(Task task) { + return (task instanceof MoveTask); + } + }; + return validator.hasTask(rootTask); + } + + private boolean hasPartitionTask(Task rootTask) { + checkTaskPresent validator = new checkTaskPresent() { + public boolean validate(Task task) { + if (task instanceof DDLTask) { + DDLTask ddlTask = (DDLTask)task; + if (ddlTask.getWork().getAddPartitionDesc() != null) { + return true; + } + } + return false; + } + }; + return validator.hasTask(rootTask); + } + + private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tuple tuple) throws Throwable { + HiveConf confTemp = new HiveConf(); + confTemp.set("hive.repl.enable.move.optimization", "true"); + ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, + null, null, isIncrementalDump); + Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); + replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null); + replLoadTask.executeTask(null); + Hive.getThreadLocal().closeCurrent(); + return replLoadWork.getRootTask(); + } + + @Test + public void testTaskCreationOptimization() throws Throwable { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String dbNameReplica = dbName + "_replica"; + run("create table " + dbName + ".t2 (place string) partitioned by (country string)", driver); + run("insert into table " + dbName + ".t2 partition(country='india') values ('bangalore')", driver); + + Tuple dump = replDumpDb(dbName, null, null, null); + + //bootstrap load should not have move task + Task task = getReplLoadRootTask(dbNameReplica, false, dump); + assertEquals(false, hasMoveTask(task)); + assertEquals(true, hasPartitionTask(task)); + + loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId); + + run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver); + dump = replDumpDb(dbName, dump.lastReplId, null, null); + + //no partition task should be added as the operation is inserting into an existing partition + task = getReplLoadRootTask(dbNameReplica, true, dump); + assertEquals(true, hasMoveTask(task)); + assertEquals(false, hasPartitionTask(task)); + + loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId); + + run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver); + dump = replDumpDb(dbName, dump.lastReplId, null, null); + + //no move task should be added as the operation is adding a dynamic partition + task = getReplLoadRootTask(dbNameReplica, true, dump); + assertEquals(false, hasMoveTask(task)); + assertEquals(true, hasPartitionTask(task)); + } + @Test public void testBasicWithCM() throws Exception { String name = testName.getMethodName(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 52a7357fce94..6cd1ac05d746 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -25,13 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.exec.DDLTask; -import org.apache.hadoop.hive.ql.exec.MoveTask; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; @@ -1478,98 +1471,6 @@ public Boolean apply(@Nullable NotificationEvent entry) { .run(" drop database if exists " + replicatedDbName_CM + " cascade"); } - private abstract class checkTaskPresent { - public boolean hasTask(Task rootTask) { - if (validate(rootTask)) { - return true; - } - List> childTasks = rootTask.getChildTasks(); - if (childTasks == null) { - return false; - } - for (Task childTask : childTasks) { - if (hasTask(childTask)) { - return true; - } - } - return false; - } - - public abstract boolean validate(Task task); - } - - private boolean hasMoveTask(Task rootTask) { - checkTaskPresent validator = new checkTaskPresent() { - public boolean validate(Task task) { - return (task instanceof MoveTask); - } - }; - return validator.hasTask(rootTask); - } - - private boolean hasPartitionTask(Task rootTask) { - checkTaskPresent validator = new checkTaskPresent() { - public boolean validate(Task task) { - if (task instanceof DDLTask) { - DDLTask ddlTask = (DDLTask)task; - if (ddlTask.getWork().getAddPartitionDesc() != null) { - return true; - } - } - return false; - } - }; - return validator.hasTask(rootTask); - } - - private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, - WarehouseInstance.Tuple tuple) throws Throwable { - HiveConf confTemp = new HiveConf(conf); - confTemp.set("hive.repl.enable.move.optimization", "true"); - ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, - null, null, isIncrementalDump); - Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); - replLoadTask.initialize(null, null, new DriverContext(new Context(confTemp)), null); - replLoadTask.executeTask(null); - return replLoadWork.getRootTask(); - } - - @Test - public void testTaskCreationOptimization() throws Throwable { - WarehouseInstance.Tuple tuple = primary - .run("use " + primaryDbName) - .run("create table t2 (place string) partitioned by (country string)") - .run("insert into table t2 partition(country='india') values ('bangalore')") - .dump(primaryDbName, null); - - //bootstrap load should not have move task - Task task = getReplLoadRootTask(replicatedDbName, false, tuple); - assertEquals(false, hasMoveTask(task)); - assertEquals(true, hasPartitionTask(task)); - - replica.load(replicatedDbName, tuple.dumpLocation); - - tuple = primary.run("use " + primaryDbName) - .run("insert into table t2 partition(country='india') values ('delhi')") - .dump(primaryDbName, tuple.lastReplicationId); - - //no partition task should be added as the operation is inserting into an existing partition - task = getReplLoadRootTask(replicatedDbName, true, tuple); - assertEquals(true, hasMoveTask(task)); - assertEquals(false, hasPartitionTask(task)); - - replica.load(replicatedDbName, tuple.dumpLocation); - - tuple = primary.run("use " + primaryDbName) - .run("insert into table t2 partition(country='us') values ('sf')") - .dump(primaryDbName, tuple.lastReplicationId); - - //no move task should be added as the operation is adding a dynamic partition - task = getReplLoadRootTask(replicatedDbName, true, tuple); - assertEquals(false, hasMoveTask(task)); - assertEquals(true, hasPartitionTask(task)); - } - @Test public void testDumpExternalTableSetFalse() throws Throwable { WarehouseInstance.Tuple tuple = primary From 9fcd0ef3eaa10ee29014ed5dd64717c9bc7e7023 Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Mon, 1 Oct 2018 08:47:51 +0530 Subject: [PATCH 5/5] HIVE-20531 : Repl load on cloud storage file system can skip redundant move or add partition tasks --- .../apache/hadoop/hive/ql/parse/TestReplicationScenarios.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 0d6ad99e0e09..a6943f71db02 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -375,7 +375,7 @@ private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tu HiveConf confTemp = new HiveConf(); confTemp.set("hive.repl.enable.move.optimization", "true"); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, - null, null, isIncrementalDump); + null, null, isIncrementalDump, Long.parseLong(tuple.lastReplId)); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null); replLoadTask.executeTask(null);