From 06197f6d1b9b0ec3433ce2951f3a249a87dd32df Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Tue, 25 Sep 2018 09:04:48 +0530 Subject: [PATCH 1/3] HIVE-20629 : Hive incremental replication fails with events missing error if database is kept idle for more than an hour --- ...stReplicationScenariosAcrossInstances.java | 43 +++++++++++++++++++ .../IncrementalLoadEventsIterator.java | 4 +- .../IncrementalLoadTasksBuilder.java | 17 ++++++++ .../ql/parse/ReplicationSemanticAnalyzer.java | 1 - 4 files changed, 63 insertions(+), 2 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index fae1a2fa100b..1028a83b287a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -877,6 +877,49 @@ private void verifyIfSrcOfReplPropMissing(Map props) { assertFalse(props.containsKey(SOURCE_OF_REPLICATION)); } + @Test + public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + // create events for some other database and then dump the primaryDbName to dump an empty directory. + String testDbName = primaryDbName + "_test"; + tuple = primary.run(" create database " + testDbName) + .run("create table " + testDbName + ".tbl (fld int)") + .dump(primaryDbName, tuple.lastReplicationId); + + // Incremental load to existing database with empty dump directory should set the repl id to the last event at src. + replica.load(replicatedDbName, tuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId); + Database replicaDb = replica.getDatabase(replicatedDbName); + assertTrue(replicaDb.getParameters().get("repl.last.id").equalsIgnoreCase(tuple.lastReplicationId)); + + // Incremental load to non existing db should return database not exist error. + tuple = primary.dump("someJunkDB", tuple.lastReplicationId); + CommandProcessorResponse response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation); + response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.metadata.hiveException: " + + "database does not exist"); + + // Bootstrap load from an empty dump directory should return empty load directory error. + tuple = primary.dump("someJunkDB", null); + response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation); + response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.parse.semanticException:" + + " no data to load in path"); + + primary.run(" drop database if exists " + testDbName + " cascade"); + replica.run(" drop database if exists " + testDbName + " cascade"); + } + @Test public void testIncrementalDumpMultiIteration() throws Throwable { WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java index 4b37c8dd9891..5638ace714db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -44,7 +44,9 @@ public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOEx FileSystem fs = eventPath.getFileSystem(conf); eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs)); if ((eventDirs == null) || (eventDirs.length == 0)) { - throw new IllegalArgumentException("No data to load in path " + loadPath); + currentIndex = 0; + numEvents = 0; + return; } // For event dump, each sub-dir is an individual event dump. // We need to guarantee that the directory listing we got is in order of event id. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 2aefb574ca10..4a8779ee9e75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -70,6 +70,7 @@ public class IncrementalLoadTasksBuilder { private final HiveConf conf; private final ReplLogger replLogger; private static long numIteration; + private String loadPath; public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath, IncrementalLoadEventsIterator iterator, HiveConf conf) { @@ -83,6 +84,7 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); numIteration = 0; replLogger.startLog(); + this.loadPath = loadPath; } public Task build(DriverContext driverContext, Hive hive, Logger log, @@ -95,6 +97,21 @@ public Task build(DriverContext driverContext, Hive hive this.log.debug("Iteration num " + numIteration); TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); + // if no events are there to replay, then update the last repl id of the database/ table to last event id. + if (!iterator.hasNext()) { + DumpMetaData eventDmd = new DumpMetaData(new Path(loadPath), conf); + if (StringUtils.isEmpty(tableName)) { + taskChainTail = dbUpdateReplStateTask(dbName, eventDmd.getEventTo().toString(), taskChainTail); + this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + + eventDmd.getEventTo().toString()); + } else { + taskChainTail = tableUpdateReplStateTask(dbName, tableName, null, + eventDmd.getEventTo().toString(), taskChainTail); + this.log.debug("no events to replay, set last repl id of table " + dbName + "." + tableName + " to " + + eventDmd.getEventTo().toString()); + } + } + while (iterator.hasNext() && tracker.canAddMoreTasks()) { FileStatus dir = iterator.next(); String location = dir.getPath().toUri().toString(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index e4a128182c34..726c9d5ee063 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -369,7 +369,6 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath); if (srcs == null || (srcs.length == 0)) { LOG.warn("Nothing to load at {}", loadPath.toUri().toString()); - return; } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, From 630510d57169440cce82fac569f45ab3da979f94 Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Wed, 26 Sep 2018 09:14:09 +0530 Subject: [PATCH 2/3] HIVE-20629 : Hive incremental replication fails with events missing error if database is kept idle for more than an hour : review comment fix --- ...estReplicationScenariosAcrossInstances.java | 3 --- .../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 9 ++------- .../IncrementalLoadTasksBuilder.java | 18 ++++++++---------- .../ql/parse/ReplicationSemanticAnalyzer.java | 14 +------------- 4 files changed, 11 insertions(+), 33 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 1028a83b287a..fb797ff3e32c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -901,8 +901,6 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { replica.load(replicatedDbName, tuple.dumpLocation) .status(replicatedDbName) .verifyResult(tuple.lastReplicationId); - Database replicaDb = replica.getDatabase(replicatedDbName); - assertTrue(replicaDb.getParameters().get("repl.last.id").equalsIgnoreCase(tuple.lastReplicationId)); // Incremental load to non existing db should return database not exist error. tuple = primary.dump("someJunkDB", tuple.lastReplicationId); @@ -917,7 +915,6 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { " no data to load in path"); primary.run(" drop database if exists " + testDbName + " cascade"); - replica.run(" drop database if exists " + testDbName + " cascade"); } @Test diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index fdbcb15c72d5..ff21b6a601d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -53,7 +53,7 @@ public class ReplLoadWork implements Serializable { final LineageState sessionStateLineageState; public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump) throws IOException { + String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; @@ -64,7 +64,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad this.bootstrapIterator = null; this.constraintsIterator = null; incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory, - incrementalIterator, hiveConf); + incrementalIterator, hiveConf, eventTo); } else { this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); @@ -73,11 +73,6 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad } } - public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, - LineageState lineageState) throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, false); - } - public BootstrapEventsIterator iterator() { return bootstrapIterator; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 4a8779ee9e75..c62604786776 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -70,10 +70,10 @@ public class IncrementalLoadTasksBuilder { private final HiveConf conf; private final ReplLogger replLogger; private static long numIteration; - private String loadPath; + private Long eventTo; public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath, - IncrementalLoadEventsIterator iterator, HiveConf conf) { + IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo) { this.dbName = dbName; this.tableName = tableName; this.iterator = iterator; @@ -84,7 +84,7 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); numIteration = 0; replLogger.startLog(); - this.loadPath = loadPath; + this.eventTo = eventTo; } public Task build(DriverContext driverContext, Hive hive, Logger log, @@ -99,16 +99,14 @@ public Task build(DriverContext driverContext, Hive hive // if no events are there to replay, then update the last repl id of the database/ table to last event id. if (!iterator.hasNext()) { - DumpMetaData eventDmd = new DumpMetaData(new Path(loadPath), conf); + String lastEventid = eventTo.toString(); if (StringUtils.isEmpty(tableName)) { - taskChainTail = dbUpdateReplStateTask(dbName, eventDmd.getEventTo().toString(), taskChainTail); - this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + - eventDmd.getEventTo().toString()); + taskChainTail = dbUpdateReplStateTask(dbName, lastEventid, taskChainTail); + this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + lastEventid); } else { - taskChainTail = tableUpdateReplStateTask(dbName, tableName, null, - eventDmd.getEventTo().toString(), taskChainTail); + taskChainTail = tableUpdateReplStateTask(dbName, tableName, null, lastEventid, taskChainTail); this.log.debug("no events to replay, set last repl id of table " + dbName + "." + tableName + " to " + - eventDmd.getEventTo().toString()); + lastEventid); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 726c9d5ee063..cfeb31a24fb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -359,20 +359,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { LOG.debug("{} contains an bootstrap dump", loadPath); } - if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState(), false); - rootTasks.add(TaskFactory.get(replLoadWork, conf)); - return; - } - - FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath); - if (srcs == null || (srcs.length == 0)) { - LOG.warn("Nothing to load at {}", loadPath.toUri().toString()); - } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState(), evDump); + tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes From 56a357f2ee0e0f60fb837b07780dc761b20e3ab3 Mon Sep 17 00:00:00 2001 From: Mahesh Kumar Behera Date: Wed, 26 Sep 2018 15:24:14 +0530 Subject: [PATCH 3/3] HIVE-20629 : Hive incremental replication fails with events missing error if database is kept idle for more than an hour : review comment fix --- .../IncrementalLoadTasksBuilder.java | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index c62604786776..5f159981183d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -64,13 +64,13 @@ public class IncrementalLoadTasksBuilder { private final String dbName, tableName; private final IncrementalLoadEventsIterator iterator; - private HashSet inputs; - private HashSet outputs; + private final HashSet inputs; + private final HashSet outputs; private Logger log; private final HiveConf conf; private final ReplLogger replLogger; private static long numIteration; - private Long eventTo; + private final Long eventTo; public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath, IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo) { @@ -97,19 +97,6 @@ public Task build(DriverContext driverContext, Hive hive this.log.debug("Iteration num " + numIteration); TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); - // if no events are there to replay, then update the last repl id of the database/ table to last event id. - if (!iterator.hasNext()) { - String lastEventid = eventTo.toString(); - if (StringUtils.isEmpty(tableName)) { - taskChainTail = dbUpdateReplStateTask(dbName, lastEventid, taskChainTail); - this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + lastEventid); - } else { - taskChainTail = tableUpdateReplStateTask(dbName, tableName, null, lastEventid, taskChainTail); - this.log.debug("no events to replay, set last repl id of table " + dbName + "." + tableName + " to " + - lastEventid); - } - } - while (iterator.hasNext() && tracker.canAddMoreTasks()) { FileStatus dir = iterator.next(); String location = dir.getPath().toUri().toString(); @@ -166,6 +153,18 @@ public Task build(DriverContext driverContext, Hive hive // add load task to start the next iteration taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf)); } else { + // if no events were replayed, then add a task to update the last repl id of the database/table to last event id. + if (taskChainTail == evTaskRoot) { + String lastEventid = eventTo.toString(); + if (StringUtils.isEmpty(tableName)) { + taskChainTail = dbUpdateReplStateTask(dbName, lastEventid, taskChainTail); + this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + lastEventid); + } else { + taskChainTail = tableUpdateReplStateTask(dbName, tableName, null, lastEventid, taskChainTail); + this.log.debug("no events to replay, set last repl id of table " + dbName + "." + tableName + " to " + + lastEventid); + } + } Map dbProps = new HashMap<>(); dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps);