Skip to content

Commit

Permalink
HIVE-19970 : Replication dump has a NPE when table is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshk114 committed Jun 24, 2018
1 parent 4b7f88a commit a1ad4c1
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import org.junit.Assert;

public class TestReplicationScenarios {

Expand Down Expand Up @@ -3185,6 +3186,27 @@ public void testLoadCmPathMissing() throws IOException {
fs.create(path, false);
}

@Test
public void testDumpFileMissing() throws IOException {
String dbName = createDB(testName.getMethodName(), driver);
run("CREATE TABLE " + dbName + ".normal(a int)", driver);
run("INSERT INTO " + dbName + ".normal values (1)", driver);

Path path = new Path(System.getProperty("test.warehouse.dir",""));
path = new Path(path, dbName.toLowerCase()+".db");
path = new Path(path, "normal");
FileSystem fs = path.getFileSystem(hconf);
fs.delete(path);

advanceDumpDir();
CommandProcessorResponse ret = driver.run("REPL DUMP " + dbName);
Assert.assertEquals(ret.getResponseCode(), ErrorMsg.FILE_NOT_FOUND.getErrorCode());

run("DROP TABLE " + dbName + ".normal", driver);
run("drop database " + dbName, true, driver);
}


@Test
public void testDumpNonReplDatabase() throws IOException {
String dbName = createDBNonRepl(testName.getMethodName(), driver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.junit.Assert;

public class TestReplicationScenariosAcrossInstances {
@Rule
Expand Down Expand Up @@ -1097,6 +1100,8 @@ public Boolean apply(@Nullable CallerArguments args) {

// Retry with different dump should fail.
replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
CommandProcessorResponse ret = replica.runCommand("REPL LOAD " + replicatedDbName + " FROM '" + tuple2.dumpLocation + "'");
Assert.assertEquals(ret.getResponseCode(), ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());

// Verify if create table is not called on table t1 but called for t2 and t3.
// Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ public WarehouseInstance run(String command) throws Throwable {
return this;
}

public CommandProcessorResponse runCommand(String command) throws Throwable {
return driver.run(command);
}

WarehouseInstance runFailure(String command) throws Throwable {
CommandProcessorResponse ret = driver.run(command);
if (ret.getException() == null) {
Expand Down
3 changes: 2 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ public enum ErrorMsg {
//if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then need modification in getNextNotification
//method in HiveMetaStoreClient
REPL_EVENTS_MISSING_IN_METASTORE(20016, "Notification events are missing in the meta store."),
REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(20017, "Target database is bootstrapped from some other path."),
REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(20017, "Load path {0} not valid as target database is bootstrapped " +
"from some other path : {1}."),
REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20018, "File is missing from both source and cm path."),
REPL_LOAD_PATH_NOT_FOUND(20019, "Load path does not exist."),
REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(20020,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ protected int execute(DriverContext driverContext) {
lastReplId = incrementalDump(dumpRoot, dmd, cmRoot);
}
prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
} catch (RuntimeException e) {
LOG.error("failed", e);
setException(e);
return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
} catch (Exception e) {
LOG.error("failed", e);
setException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.metadata.Table;
Expand Down Expand Up @@ -114,9 +115,8 @@ public static boolean replCkptStatus(String dbName, Map<String, String> props, S
if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) {
return true;
}
throw new InvalidOperationException("REPL LOAD with Dump: " + dumpRoot
+ " is not allowed as the target DB: " + dbName
+ " is already bootstrap loaded by another Dump " + props.get(REPL_CHECKPOINT_KEY));
throw new InvalidOperationException(ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.format(dumpRoot,
props.get(REPL_CHECKPOINT_KEY)));
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public TaskTracker tasks() throws SemanticException {
}
return tracker;
} catch (Exception e) {
throw new SemanticException(e);
throw new SemanticException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
Expand All @@ -29,13 +30,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Future;

import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths;

Expand Down Expand Up @@ -70,7 +73,7 @@ class PartitionExport {
this.callersSession = SessionState.get();
}

void write(final ReplicationSpec forReplicationSpec) throws InterruptedException {
void write(final ReplicationSpec forReplicationSpec) throws InterruptedException, HiveException {
ExecutorService producer = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build());
producer.submit(() -> {
Expand All @@ -89,6 +92,7 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException
ThreadFactory namingThreadFactory =
new ThreadFactoryBuilder().setNameFormat("partition-dump-thread-%d").build();
ExecutorService consumer = Executors.newFixedThreadPool(nThreads, namingThreadFactory);
List<Future<?>> futures = new LinkedList<>();

while (!producer.isTerminated() || !queue.isEmpty()) {
/*
Expand All @@ -102,7 +106,7 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException
continue;
}
LOG.debug("scheduling partition dump {}", partition.getName());
consumer.submit(() -> {
futures.add(consumer.submit(() -> {
String partitionName = partition.getName();
String threadName = Thread.currentThread().getName();
LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName);
Expand All @@ -115,11 +119,19 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException
.export(forReplicationSpec);
LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
} catch (Exception e) {
throw new RuntimeException("Error while export of data files", e);
throw new RuntimeException(e.getMessage(), e);
}
});
}));
}
consumer.shutdown();
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
LOG.error("failed", e.getCause());
throw new HiveException(e.getCause().getMessage(), e.getCause());
}
}
// may be drive this via configuration as well.
consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.io;

import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
Expand Down Expand Up @@ -46,6 +47,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hive.ql.ErrorMsg.FILE_NOT_FOUND;

//TODO: this object is created once to call one method and then immediately destroyed.
//So it's basically just a roundabout way to pass arguments to a static method. Simplify?
public class FileOperations {
Expand Down Expand Up @@ -156,6 +159,10 @@ private void exportFilesAsList() throws SemanticException, IOException, LoginExc
}
done = true;
} catch (IOException e) {
if (e instanceof FileNotFoundException) {
logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
}
repeat++;
logger.info("writeFilesList failed", e);
if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,10 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam

Table oldt = null;

List<TransactionalMetaStoreEventListener> transactionalListeners = null;
List<MetaStoreEventListener> listeners = null;
List<TransactionalMetaStoreEventListener> transactionalListeners = handler.getTransactionalListeners();
List<MetaStoreEventListener> listeners = handler.getListeners();
Map<String, String> txnAlterTableEventResponses = Collections.emptyMap();

transactionalListeners = handler.getTransactionalListeners();
listeners = handler.getListeners();

try {
boolean rename = false;
List<Partition> parts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4986,6 +4986,10 @@ private void alter_table_core(final String catName, final String dbname, final S
try {
Table oldt = get_table_core(catName, dbname, name);
firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
if (newTable.getDbName() == null) {
// This check is done here to support backward compatibility of exception thrown.
throw new InvalidOperationException("Unable to alter table" + dbname + "." + name + " as new db name is null");
}
alterHandler.alterTable(getMS(), wh, catName, dbname, name, newTable,
envContext, this);
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ public void testAlterTableCascade() throws Exception {
}
}

@Test(expected = MetaException.class)
@Test(expected = InvalidOperationException.class)
public void testAlterTableNullDatabaseInNew() throws Exception {
Table originalTable = testTables[0];
Table newTable = originalTable.deepCopy();
Expand Down

0 comments on commit a1ad4c1

Please sign in to comment.