From 114b9cb8df9e23f0155364fe1bec143f45817318 Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Wed, 3 Oct 2018 14:06:02 +0530 Subject: [PATCH] HIVE-20680 : hive bootstrap missing partitions in replicated db --- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 ++ ...stReplicationScenariosAcrossInstances.java | 51 +++++++++++++++++++ .../hive/ql/exec/repl/ReplLoadTask.java | 9 ++++ .../filesystem/DatabaseEventsIterator.java | 49 +++++++++++++++++- .../InjectableBehaviourObjectStore.java | 24 +++++++++ 5 files changed, 135 insertions(+), 1 deletion(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d1e6631975e7..ee9c5297e416 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -612,6 +612,9 @@ public static enum ConfVars { HIVE_MAPJOIN_TESTING_NO_HASH_TABLE_LOAD("hive.mapjoin.testing.no.hash.table.load", false, "internal use only, true when in testing map join", true), + HIVE_IN_REPL_TEST_FILES_SORTED("hive.in.repl.test.files.sorted", false, + "internal usage only, set to true if the file listing is required in sorted order during bootstrap load", true), + LOCALMODEAUTO("hive.exec.mode.local.auto", false, "Let Hive determine whether to run in local mode automatically"), LOCALMODEMAXBYTES("hive.exec.mode.local.auto.inputbytes.max", 134217728L, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index fb797ff3e32c..1ea85ebee389 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -1570,4 +1570,55 @@ public void testDumpExternalTableWithAddPartitionEvent() throws Throwable { .run("show partitions t1") .verifyResults(new String[] { "country=india", "country=us" }); } + + // This requires the tables are loaded in a fixed sorted order. + @Test + public void testBootstrapLoadRetryAfterFailureForAlterTable() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (place string)") + .run("insert into table t1 values ('testCheck')") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='china') values ('shenzhen')") + .run("insert into table t2 partition(country='india') values ('banaglore')") + .dump(primaryDbName, null); + + // fail setting ckpt directory property for table t1. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + if (args.tblName.equalsIgnoreCase("t1") && args.dbName.equalsIgnoreCase(replicatedDbName)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName); + return false; + } + return true; + } + }; + + // Fail repl load before the ckpt proeprty is set for t1 and after it is set for t2. So in the next run, for + // t2 it goes directly to partion load with no task for table tracker and for t1 it loads the table + // again from start. + InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier); + try { + replica.loadFailure(replicatedDbName, tuple.dumpLocation); + callerVerifier.assertInjectionsPerformed(true, false); + } finally { + InjectableBehaviourObjectStore.resetAlterTableModifier(); + } + + // Retry with same dump with which it was already loaded should resume the bootstrap load. Make sure that table t1, + // is loaded before t2. So that scope is set to table in first iteration for table t1. In the next iteration, it + // loads only remaining partitions of t2, so that the table tracker has no tasks. + List withConfigs = Arrays.asList("'hive.in.repl.test.files.sorted'='true'"); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 order by country") + .verifyResults(Arrays.asList("china", "india")); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index aa41301ae448..270670d41ecf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -126,6 +126,10 @@ a database ( directory ) loadTaskTracker.update(dbTracker); if (work.hasDbState()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + } else { + // Scope might have set to database in some previous iteration of loop, so reset it to false if database + // tracker has no tasks. + scope.database = false; } work.updateDbEventState(dbEvent.toState()); if (dbTracker.hasTasks()) { @@ -151,7 +155,12 @@ a database ( directory ) if (!scope.database && tableTracker.hasTasks()) { scope.rootTasks.addAll(tableTracker.tasks()); scope.table = true; + } else { + // Scope might have set to table in some previous iteration of loop, so reset it to false if table + // tracker has no tasks. + scope.table = false; } + /* for table replication if we reach the max number of tasks then for the next run we will try to reload the same table again, this is mainly for ease of understanding the code diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index f778cb42f79d..d909945b80e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -22,10 +22,12 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,7 @@ import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; +import java.util.ArrayList; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME; @@ -55,7 +58,51 @@ class DatabaseEventsIterator implements Iterator { if (!fileSystem.exists(new Path(dbLevelPath + Path.SEPARATOR + EximUtil.METADATA_NAME))) { databaseEventProcessed = true; } - remoteIterator = fileSystem.listFiles(dbLevelPath, true); + + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_REPL_TEST_FILES_SORTED)) { + LOG.info(" file sorting is enabled in DatabaseEventsIterator"); + List fileStatuses = new ArrayList<>(); + + // Sort the directories in tha path and then add the files recursively . + getSortedFileList(dbLevelPath, fileStatuses, fileSystem); + remoteIterator = new RemoteIterator() { + private int idx = 0; + private final int numEntry = fileStatuses.size(); + private final List fileStatusesLocal = fileStatuses; + public boolean hasNext() throws IOException { + return idx < numEntry; + } + public LocatedFileStatus next() throws IOException { + LOG.info(" file in next is " + fileStatusesLocal.get(idx)); + return fileStatusesLocal.get(idx++); + } + }; + } else { + remoteIterator = fileSystem.listFiles(dbLevelPath, true); + } + } + + private void getSortedFileList(Path eventPath, List fileStatuses, + FileSystem fileSystem) throws IOException { + //Add all the files in this directory. No need to sort. + RemoteIterator iteratorNext = fileSystem.listFiles(eventPath, false); + while (iteratorNext.hasNext()) { + LocatedFileStatus status = iteratorNext.next(); + LOG.info(" files added at getSortedFileList" + status.getPath()); + fileStatuses.add(status); + } + + // get all the directories in this path and sort them + FileStatus[] eventDirs = fileSystem.listStatus(eventPath, EximUtil.getDirectoryFilter(fileSystem)); + if (eventDirs.length == 0) { + return; + } + Arrays.sort(eventDirs, new EventDumpDirComparator()); + + // add files recursively for each directory + for (FileStatus fs : eventDirs) { + getSortedFileList(fs.getPath(), fileStatuses, fileSystem); + } } public Path dbLevelPath() { diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index 3d5395adde3c..9daff370ef61 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -85,6 +85,8 @@ public CallerArguments(String dbName) { private static com.google.common.base.Function addNotificationEventModifier = null; + private static com.google.common.base.Function alterTableModifier = null; + // Methods to set/reset getTable modifier public static void setGetTableBehaviour(com.google.common.base.Function modifier){ getTableModifier = (modifier == null) ? com.google.common.base.Functions.identity() : modifier; @@ -139,6 +141,13 @@ public static void resetCallerVerifier(){ setCallerVerifier(null); } + public static void setAlterTableModifier(com.google.common.base.Function modifier) { + alterTableModifier = modifier; + } + public static void resetAlterTableModifier() { + setAlterTableModifier(null); + } + // ObjectStore methods to be overridden with injected behavior @Override public Table getTable(String catName, String dbName, String tableName) throws MetaException { @@ -167,6 +176,21 @@ public NotificationEventResponse getNextNotification(NotificationEventRequest rq return getNextNotificationModifier.apply(super.getNextNotification(rqst)); } + @Override + public Table alterTable(String catName, String dbname, String name, Table newTable, String queryValidWriteIds) + throws InvalidObjectException, MetaException { + if (alterTableModifier != null) { + CallerArguments args = new CallerArguments(dbname); + args.tblName = name; + Boolean success = alterTableModifier.apply(args); + if ((success != null) && !success) { + throw new MetaException("InjectableBehaviourObjectStore: Invalid alterTable operation on Catalog : " + catName + + " DB: " + dbname + " table: " + name); + } + } + return super.alterTable(catName, dbname, name, newTable, queryValidWriteIds); + } + @Override public void addNotificationEvent(NotificationEvent entry) throws MetaException { if (addNotificationEventModifier != null) {