Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-21403: Incorrect error code returned when retry bootstrap with different dump. #559

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1237,9 +1237,8 @@ public Boolean apply(CallerArguments args) {
assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size());

// Retry with different dump should fail.
CommandProcessorResponse ret = replica.runCommand("REPL LOAD " + replicatedDbName +
" FROM '" + tuple2.dumpLocation + "'");
Assert.assertEquals(ret.getResponseCode(), ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
replica.loadFailure(replicatedDbName, tuple2.dumpLocation, null,
ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());

// Verify if create table is not called on table t1 but called for t2 and t3.
// Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
Expand Down Expand Up @@ -548,6 +549,35 @@ private List<String> externalTableBasePathWithClause() throws IOException, Seman
);
}

@Test
public void retryIncBootstrapExternalTablesFromDifferentDumpWithoutCleanTablesConfig() throws Throwable {
List<String> dumpWithClause = Collections.singletonList(
"'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"
);
List<String> loadWithClause = externalTableBasePathWithClause();

WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary
.dump(primaryDbName, null, dumpWithClause);

replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause);
sankarh marked this conversation as resolved.
Show resolved Hide resolved

dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
"'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName)
.run("create external table t1 (id int)")
.run("insert into table t1 values (1)")
.run("create table t2 as select * from t1")
.dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause);
WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap
= primary.dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause);

replica.load(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause);

// Re-bootstrapping from different bootstrap dump without clean tables config should fail.
replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause,
ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
}

private void assertExternalFileInfo(List<String> expected, Path externalTableInfoFile)
throws IOException {
DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,18 @@ WarehouseInstance runFailure(String command) throws Throwable {
return this;
}

WarehouseInstance runFailure(String command, int errorCode) throws Throwable {
CommandProcessorResponse ret = driver.run(command);
if (ret.getException() == null) {
throw new RuntimeException("command execution passed for a invalid command" + command);
}
if (ret.getResponseCode() != errorCode) {
throw new RuntimeException("Command: " + command + " returned incorrect error code: "
+ ret.getResponseCode() + " instead of " + errorCode);
}
return this;
}

Tuple dump(String dbName, String lastReplicationId, List<String> withClauseOptions)
throws Throwable {
String dumpCommand =
Expand Down Expand Up @@ -288,7 +300,7 @@ WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocatio
WarehouseInstance load(String replicatedDbName, String dumpLocation, List<String> withClauseOptions)
throws Throwable {
String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
if (!withClauseOptions.isEmpty()) {
if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
}
run("EXPLAIN " + replLoadCmd);
Expand All @@ -303,26 +315,35 @@ WarehouseInstance status(String replicatedDbName) throws Throwable {

WarehouseInstance status(String replicatedDbName, List<String> withClauseOptions) throws Throwable {
String replStatusCmd = "REPL STATUS " + replicatedDbName;
if (!withClauseOptions.isEmpty()) {
if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replStatusCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
}
return run(replStatusCmd);
}

WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) throws Throwable {
runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
loadFailure(replicatedDbName, dumpLocation, null);
return this;
}

WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List<String> withClauseOptions)
throws Throwable {
String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
if (!withClauseOptions.isEmpty()) {
if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
}
return runFailure(replLoadCmd);
}

WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List<String> withClauseOptions,
int errorCode) throws Throwable {
String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
}
return runFailure(replLoadCmd, errorCode);
}

WarehouseInstance verifyResult(String data) throws IOException {
verifyResults(data == null ? new String[] {} : new String[] { data });
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,26 @@ public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn,
isTableLevelLoad = tblNameToLoadIn != null && !tblNameToLoadIn.isEmpty();
}

public TaskTracker tasks() throws SemanticException {
try {
Database dbInMetadata = readDbMetadata();
String dbName = dbInMetadata.getName();
Task<? extends Serializable> dbRootTask = null;
ReplLoadOpType loadDbType = getLoadDbType(dbName);
switch (loadDbType) {
case LOAD_NEW:
dbRootTask = createDbTask(dbInMetadata);
break;
case LOAD_REPLACE:
dbRootTask = alterDbTask(dbInMetadata);
break;
default:
break;
}
if (dbRootTask != null) {
dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata));
tracker.addTask(dbRootTask);
}
return tracker;
} catch (Exception e) {
throw new SemanticException(e.getMessage(), e);
public TaskTracker tasks() throws Exception {
Database dbInMetadata = readDbMetadata();
String dbName = dbInMetadata.getName();
Task<? extends Serializable> dbRootTask = null;
ReplLoadOpType loadDbType = getLoadDbType(dbName);
switch (loadDbType) {
case LOAD_NEW:
dbRootTask = createDbTask(dbInMetadata);
break;
case LOAD_REPLACE:
dbRootTask = alterDbTask(dbInMetadata);
break;
default:
break;
}
if (dbRootTask != null) {
dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata));
tracker.addTask(dbRootTask);
}
return tracker;
}

Database readDbMetadata() throws SemanticException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,49 +101,45 @@ public LoadPartitions(Context context, ReplLogger replLogger, TableContext table
this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
}

public TaskTracker tasks() throws SemanticException {
try {
/*
We are doing this both in load table and load partitions
*/
Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
LoadTable.TableLocationTuple tableLocationTuple =
LoadTable.tableLocation(tableDesc, parentDb, tableContext, context);
tableDesc.setLocation(tableLocationTuple.location);

if (table == null) {
//new table
table = tableDesc.toTable(context.hiveConf);
if (isPartitioned(tableDesc)) {
public TaskTracker tasks() throws Exception {
/*
We are doing this both in load table and load partitions
*/
Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
LoadTable.TableLocationTuple tableLocationTuple =
LoadTable.tableLocation(tableDesc, parentDb, tableContext, context);
tableDesc.setLocation(tableLocationTuple.location);

if (table == null) {
//new table
table = tableDesc.toTable(context.hiveConf);
if (isPartitioned(tableDesc)) {
updateReplicationState(initialReplicationState());
if (!forNewTable().hasReplicationState()) {
// Add ReplStateLogTask only if no pending table load tasks left for next cycle
Task<? extends Serializable> replLogTask
= ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
tracker.addDependentTask(replLogTask);
}
return tracker;
}
} else {
// existing
if (table.isPartitioned()) {
List<AddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) {
updateReplicationState(initialReplicationState());
if (!forNewTable().hasReplicationState()) {
if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
// Add ReplStateLogTask only if no pending table load tasks left for next cycle
Task<? extends Serializable> replLogTask
= ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
tracker.addDependentTask(replLogTask);
}
return tracker;
}
} else {
// existing
if (table.isPartitioned()) {
List<AddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) {
updateReplicationState(initialReplicationState());
if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
// Add ReplStateLogTask only if no pending table load tasks left for next cycle
Task<? extends Serializable> replLogTask
= ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
tracker.addDependentTask(replLogTask);
}
return tracker;
}
}
}
return tracker;
} catch (Exception e) {
throw new SemanticException(e);
}
return tracker;
}

private void updateReplicationState(ReplicationState replicationState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,83 +84,79 @@ public LoadTable(TableEvent event, Context context, ReplLogger replLogger,
this.tracker = new TaskTracker(limiter);
}

public TaskTracker tasks() throws SemanticException {
public TaskTracker tasks() throws Exception {
// Path being passed to us is a table dump location. We go ahead and load it in as needed.
// If tblName is null, then we default to the table name specified in _metadata, which is good.
// or are both specified, in which case, that's what we are intended to create the new table as.
try {
if (event.shouldNotReplicate()) {
return tracker;
}
String dbName = tableContext.dbNameToLoadIn; //this can never be null or empty;
// Create table associated with the import
// Executed if relevant, and used to contain all the other details about the table if not.
ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName));
Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
if (event.shouldNotReplicate()) {
return tracker;
}
String dbName = tableContext.dbNameToLoadIn; //this can never be null or empty;
// Create table associated with the import
// Executed if relevant, and used to contain all the other details about the table if not.
ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName));
Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);

// Normally, on import, trying to create a table or a partition in a db that does not yet exist
// is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying
// to create tasks to create a table inside a db that as-of-now does not exist, but there is
// a precursor Task waiting that will create it before this is encountered. Thus, we instantiate
// defaults and do not error out in that case.
// the above will change now since we are going to split replication load in multiple execution
// tasks and hence we could have created the database earlier in which case the waitOnPrecursor will
// be false and hence if db Not found we should error out.
Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
if (parentDb == null) {
if (!tableContext.waitOnPrecursor()) {
throw new SemanticException(
ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName()));
}
// Normally, on import, trying to create a table or a partition in a db that does not yet exist
// is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying
// to create tasks to create a table inside a db that as-of-now does not exist, but there is
// a precursor Task waiting that will create it before this is encountered. Thus, we instantiate
// defaults and do not error out in that case.
// the above will change now since we are going to split replication load in multiple execution
// tasks and hence we could have created the database earlier in which case the waitOnPrecursor will
// be false and hence if db Not found we should error out.
Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
if (parentDb == null) {
if (!tableContext.waitOnPrecursor()) {
throw new SemanticException(
ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName()));
}
}

Task<?> tblRootTask = null;
ReplLoadOpType loadTblType = getLoadTableType(table);
switch (loadTblType) {
case LOAD_NEW:
break;
case LOAD_REPLACE:
tblRootTask = dropTableTask(table);
break;
case LOAD_SKIP:
return tracker;
default:
break;
}
Task<?> tblRootTask = null;
ReplLoadOpType loadTblType = getLoadTableType(table);
switch (loadTblType) {
case LOAD_NEW:
break;
case LOAD_REPLACE:
tblRootTask = dropTableTask(table);
break;
case LOAD_SKIP:
return tracker;
default:
break;
}

TableLocationTuple
tableLocationTuple = tableLocation(tableDesc, parentDb, tableContext, context);
tableDesc.setLocation(tableLocationTuple.location);
TableLocationTuple
tableLocationTuple = tableLocation(tableDesc, parentDb, tableContext, context);
tableDesc.setLocation(tableLocationTuple.location);

/* Note: In the following section, Metadata-only import handling logic is
interleaved with regular repl-import logic. The rule of thumb being
followed here is that MD-only imports are essentially ALTERs. They do
not load data, and should not be "creating" any metadata - they should
be replacing instead. The only place it makes sense for a MD-only import
to create is in the case of a table that's been dropped and recreated,
or in the case of an unpartitioned table. In all other cases, it should
behave like a noop or a pure MD alter.
*/
newTableTasks(tableDesc, tblRootTask, tableLocationTuple);
/* Note: In the following section, Metadata-only import handling logic is
interleaved with regular repl-import logic. The rule of thumb being
followed here is that MD-only imports are essentially ALTERs. They do
not load data, and should not be "creating" any metadata - they should
be replacing instead. The only place it makes sense for a MD-only import
to create is in the case of a table that's been dropped and recreated,
or in the case of an unpartitioned table. In all other cases, it should
behave like a noop or a pure MD alter.
*/
newTableTasks(tableDesc, tblRootTask, tableLocationTuple);

// Set Checkpoint task as dependant to create table task. So, if same dump is retried for
// bootstrap, we skip current table update.
Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
tableDesc,
null,
context.dumpDirectory,
context.hiveConf
);
if (!isPartitioned(tableDesc)) {
Task<? extends Serializable> replLogTask
= ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
ckptTask.addDependentTask(replLogTask);
}
tracker.addDependentTask(ckptTask);
return tracker;
} catch (Exception e) {
throw new SemanticException(e);
// Set Checkpoint task as dependant to create table task. So, if same dump is retried for
// bootstrap, we skip current table update.
Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
tableDesc,
null,
context.dumpDirectory,
context.hiveConf
);
if (!isPartitioned(tableDesc)) {
Task<? extends Serializable> replLogTask
= ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
ckptTask.addDependentTask(replLogTask);
}
tracker.addDependentTask(ckptTask);
return tracker;
}

private ReplLoadOpType getLoadTableType(Table table) throws InvalidOperationException, HiveException {
Expand Down