Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -853,7 +854,8 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event
InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper);

advanceDumpDir();
verifyFail("REPL DUMP " + dbName + " FROM " + replDumpId, driver);
CommandProcessorResponse ret = driver.run("REPL DUMP " + dbName + " FROM " + replDumpId);
assertTrue(ret.getResponseCode() == ErrorMsg.REPL_EVENTS_MISSING_IN_METASTORE.getErrorCode());
eventIdSkipper.assertInjectionsPerformed(true,false);
InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour
}
Expand Down Expand Up @@ -3157,6 +3159,31 @@ public void testRecycleFileDropTempTable() throws IOException {
assertTrue(fileCount == fileCountAfter);
}

@Test
public void testLoadCmPathMissing() throws IOException {
String dbName = createDB(testName.getMethodName(), driver);
run("CREATE TABLE " + dbName + ".normal(a int)", driver);
run("INSERT INTO " + dbName + ".normal values (1)", driver);

advanceDumpDir();
run("repl dump " + dbName, true, driver);
String dumpLocation = getResult(0, 0, driver);

run("DROP TABLE " + dbName + ".normal", driver);

String cmDir = hconf.getVar(HiveConf.ConfVars.REPLCMDIR);
Path path = new Path(cmDir);
FileSystem fs = path.getFileSystem(hconf);
ContentSummary cs = fs.getContentSummary(path);
long fileCount = cs.getFileCount();
assertTrue(fileCount != 0);
fs.delete(path);

CommandProcessorResponse ret = driverMirror.run("REPL LOAD " + dbName + " FROM '" + dumpLocation + "'");
assertTrue(ret.getResponseCode() == ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getErrorCode());
run("drop database " + dbName, true, driver);
}

@Test
public void testDumpNonReplDatabase() throws IOException {
String dbName = createDBNonRepl(testName.getMethodName(), driver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.hive.ql.ErrorMsg;

import static org.apache.hadoop.hive.conf.SystemVariables.SET_COLUMN_NAME;
import static org.apache.hadoop.hive.ql.exec.ExplainTask.EXPL_COLUMN_NAME;
Expand Down Expand Up @@ -2927,6 +2928,27 @@ public void testCreateTableExecAsync() throws Exception {
stmt.close();
}

@Test
public void testReplErrorScenarios() throws Exception {
HiveStatement stmt = (HiveStatement) con.createStatement();

try {
// source of replication not set
stmt.execute("repl dump default");
} catch(SQLException e){
assertTrue(e.getErrorCode() == ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getErrorCode());
}

try {
// invalid load path
stmt.execute("repl load default1 from '/tmp/junk'");
} catch(SQLException e){
assertTrue(e.getErrorCode() == ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getErrorCode());
}

stmt.close();
}

/**
* Test {@link HiveStatement#executeAsync(String)} for an insert overwrite into a table
* @throws Exception
Expand Down
11 changes: 11 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,15 @@ public enum ErrorMsg {
" queue: {1}. Please fix and try again.", true),
SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."),

//if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then need modification in getNextNotification
//method in HiveMetaStoreClient
REPL_EVENTS_MISSING_IN_METASTORE(20016, "Notification events are missing in the meta store."),
REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(20017, "Target database is bootstrapped from some other path."),
REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20018, "File is missing from both source and cm path."),
REPL_LOAD_PATH_NOT_FOUND(20019, "Load path does not exist."),
REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(20020,
"Source of replication (repl.source.for) is not set in the database properties."),

// An exception from runtime that will show the full stack to client
UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true),

Expand Down Expand Up @@ -588,6 +597,8 @@ public enum ErrorMsg {
SPARK_GET_JOB_INFO_INTERRUPTED(30045, "Spark job was interrupted while getting job info"),
SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}"),

REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired."),

//========================== 40000 range starts here ========================//

SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true),
Expand Down
3 changes: 2 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.plan.CopyWork;
Expand Down Expand Up @@ -165,7 +166,7 @@ protected int execute(DriverContext driverContext) {
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
setException(e);
return (1);
return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few return statements inside try block without throwing exception. Do we need to return error code there as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently only very few scenarios are covered ..in the followup jira need to add all other condition

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
Expand Down Expand Up @@ -123,7 +124,7 @@ protected int execute(DriverContext driverContext) {
} catch (Exception e) {
LOG.error("failed", e);
setException(e);
return 1;
return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
}
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
Expand Down Expand Up @@ -221,7 +222,7 @@ a database ( directory )
} catch (Exception e) {
LOG.error("failed replication", e);
setException(e);
return 1;
return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
}
LOG.info("completed load task run : {}", work.executedLoadTask());
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public boolean hasNext() {
return true;
} catch (Exception e) {
// may be do some retry logic here.
throw new RuntimeException("could not traverse the file via remote iterator " + dbLevelPath,
e);
LOG.error("could not traverse the file via remote iterator " + dbLevelPath, e);
throw new RuntimeException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
Expand Down Expand Up @@ -110,7 +109,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException {
try {
initReplDump(ast);
} catch (HiveException e) {
throw new SemanticException("repl dump failed " + e.getMessage());
throw new SemanticException(e.getMessage(), e);
}
analyzeReplDump(ast);
break;
Expand Down Expand Up @@ -141,8 +140,9 @@ private void initReplDump(ASTNode ast) throws HiveException {
Database database = db.getDatabase(dbName);
if (database != null) {
if (!ReplChangeManager.isSourceOfReplication(database)) {
throw new SemanticException("Cannot dump database " + dbNameOrPattern +
" as it is not a source of replication");
LOG.error("Cannot dump database " + dbNameOrPattern +
" as it is not a source of replication (repl.source.for)");
throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getMsg());
}
} else {
throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist");
Expand Down Expand Up @@ -358,7 +358,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {

if (!fs.exists(loadPath)) {
// supposed dump path does not exist.
throw new FileNotFoundException(loadPath.toUri().toString());
LOG.error("File not found " + loadPath.toUri().toString());
throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
}

// Now, the dumped path can be one of three things:
Expand Down Expand Up @@ -504,7 +505,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {

} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
throw new SemanticException(e);
throw new SemanticException(e.getMessage(), e);
}
}

Expand Down
16 changes: 9 additions & 7 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -68,7 +70,7 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
// changed/removed during copy, so double check the checksum after copy,
// if not match, copy again from cm
public void copyAndVerify(FileSystem destinationFs, Path destRoot,
List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException {
List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException, HiveFatalException {
Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot);
for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
FileSystem sourceFs = entry.getKey();
Expand All @@ -92,7 +94,7 @@ public void copyAndVerify(FileSystem destinationFs, Path destRoot,

private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList,
FileSystem destinationFs, Path destination,
boolean useRegularCopy) throws IOException, LoginException {
boolean useRegularCopy) throws IOException, LoginException, HiveFatalException {
int repeat = 0;
boolean isCopyError = false;
List<Path> pathList = Lists.transform(srcFileList, ReplChangeManager.FileInfo::getEffectivePath);
Expand Down Expand Up @@ -145,7 +147,7 @@ private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> s
// If still files remains to be copied due to failure/checksum mismatch after several attempts, then throw error
if (!pathList.isEmpty()) {
LOG.error("File copy failed even after several attempts. Files list: " + pathList);
throw new IOException("File copy failed even after several attempts.");
throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
}
}

Expand All @@ -154,7 +156,7 @@ private void doCopyRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> s
// itself is missing, then throw error.
private List<Path> getFilesToRetry(FileSystem sourceFs, List<ReplChangeManager.FileInfo> srcFileList,
FileSystem destinationFs, Path destination, boolean isCopyError)
throws IOException {
throws IOException, HiveFatalException {
List<Path> pathList = new ArrayList<Path>();

// Going through file list and make the retry list
Expand Down Expand Up @@ -190,9 +192,9 @@ private List<Path> getFilesToRetry(FileSystem sourceFs, List<ReplChangeManager.F
srcPath = srcFile.getEffectivePath();
if (null == srcPath) {
// This case possible if CM path is not enabled.
LOG.error("File copy failed and likely source file is deleted or modified. "
LOG.error("File copy failed and likely source file is deleted or modified."
+ "Source File: " + srcFile.getSourcePath());
throw new IOException("File copy failed and likely source file is deleted or modified.");
throw new HiveFatalException(ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getMsg());
}

if (!srcFile.isUseSourcePath() && !sourceFs.exists(srcFile.getCmPath())) {
Expand All @@ -201,7 +203,7 @@ private List<Path> getFilesToRetry(FileSystem sourceFs, List<ReplChangeManager.F
+ "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". "
+ "Try setting higher value for hive.repl.cm.retain in source warehouse. "
+ "Also, bootstrap the system again to get back the consistent replicated state.");
throw new IOException("Both source and CM path are missing from source.");
throw new HiveFatalException(ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getMsg());
}

pathList.add(srcPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void writeData(PartitionIterable partitions) throws SemanticException {
.export(replicationSpec);
}
} catch (Exception e) {
throw new SemanticException(e);
throw new SemanticException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import javax.security.auth.login.LoginException;

import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
Expand Down Expand Up @@ -161,7 +160,7 @@ private void exportFilesAsList() throws SemanticException, IOException, LoginExc
logger.info("writeFilesList failed", e);
if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) {
logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
throw e;
throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
}

int sleepTime = FileUtils.getSleepTime(repeat - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
private long retryDelaySeconds = 0;
private final ClientCapabilities version;

//copied from ErrorMsg.java
private static final String REPL_EVENTS_MISSING_IN_METASTORE = "Notification events are missing in the meta store.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this msg same as from ErrorMsg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as error code is hashed with error message


static final protected Logger LOG = LoggerFactory.getLogger(HiveMetaStoreClient.class);

public HiveMetaStoreClient(Configuration conf) throws MetaException {
Expand Down Expand Up @@ -2717,7 +2720,7 @@ public NotificationEventResponse getNextNotification(long lastEventId, int maxEv
+ "Try setting higher value for hive.metastore.event.db.listener.timetolive. "
+ "Also, bootstrap the system again to get back the consistent replicated state.",
nextEventId, e.getEventId());
throw new IllegalStateException("Notification events are missing.");
throw new IllegalStateException(REPL_EVENTS_MISSING_IN_METASTORE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even this is non-recoverable error. So, shouldn't this be FatalException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need not be ..

}
if ((filter != null) && filter.accept(e)) {
filtered.addToEvents(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public List<NotificationEvent> getNextNotificationEvents(
try {
return msc.getNextNotification(pos,getBatchSize(), filter).getEvents();
} catch (TException e) {
throw new IOException(e);
throw new IOException(e.getMessage(), e);
}
}
}
Expand Down Expand Up @@ -179,7 +179,7 @@ public boolean hasNext() {
// but throwing the exception is the appropriate result here, and hasNext()
// signature will only allow RuntimeExceptions. Iterator.hasNext() really
// should have allowed IOExceptions
throw new RuntimeException(e);
throw new RuntimeException(e.getMessage(), e);
}
// New batch has been fetched. If it's not empty, we have more elements to process.
return !batch.isEmpty();
Expand Down