Skip to content
3 changes: 2 additions & 1 deletion common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ public enum ErrorMsg {
REPL_PERMISSION_DENIED(40014, "{0}org.apache.hadoop.security.AccessControlException{1}", true),
REPL_DISTCP_SNAPSHOT_EXCEPTION(40015, "SNAPSHOT_ERROR", true),
RANGER_AUTHORIZATION_FAILED(40016, "Authorization Failure while communicating to Ranger admin", true),
RANGER_AUTHENTICATION_FAILED(40017, "Authentication Failure while communicating to Ranger admin", true)
RANGER_AUTHENTICATION_FAILED(40017, "Authentication Failure while communicating to Ranger admin", true),
REPL_INCOMPATIBLE_EXCEPTION(40018, "Cannot load into database {0} as it is replication incompatible.", true)
;

private int errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
Expand All @@ -33,12 +34,15 @@
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
Expand Down Expand Up @@ -142,6 +146,60 @@ public void tearDown() throws Throwable {
primary.run("drop database if exists " + primaryDbName + "_extra cascade");
}

@Test
public void testTargetDbReplIncompatibleWithNoPropSet() throws Throwable {
testTargetDbReplIncompatible(false);
}

@Test
public void testTargetDbReplIncompatibleWithPropSet() throws Throwable {
testTargetDbReplIncompatible(true);
}

private void testTargetDbReplIncompatible(boolean setReplIncompProp) throws Throwable {
HiveConf primaryConf = primary.getConf();
TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());

primary.run("use " + primaryDbName)
.run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName);

if (setReplIncompProp) {
replica.run("ALTER DATABASE " + replicatedDbName +
" SET DBPROPERTIES('" + ReplConst.REPL_INCOMPATIBLE + "'='false')");
assert "false".equals(replica.getDatabase(replicatedDbName).getParameters().get(ReplConst.REPL_INCOMPATIBLE));
}

assertFalse(MetaStoreUtils.isDbReplIncompatible(replica.getDatabase(replicatedDbName)));

Long sourceTxnId = openTxns(1, txnHandler, primaryConf).get(0);
txnHandler.abortTxn(new AbortTxnRequest(sourceTxnId));

try {
sourceTxnId = openTxns(1, txnHandler, primaryConf).get(0);

primary.dump(primaryDbName);
replica.load(replicatedDbName, primaryDbName);
assertFalse(MetaStoreUtils.isDbReplIncompatible(replica.getDatabase(replicatedDbName)));

Long targetTxnId = txnHandler.getTargetTxnId(HiveUtils.getReplPolicy(replicatedDbName), sourceTxnId);
txnHandler.abortTxn(new AbortTxnRequest(targetTxnId));
assertTrue(MetaStoreUtils.isDbReplIncompatible(replica.getDatabase(replicatedDbName)));

WarehouseInstance.Tuple dumpData = primary.dump(primaryDbName);

assertFalse(ReplUtils.failedWithNonRecoverableError(new Path(dumpData.dumpLocation), conf));
replica.loadFailure(replicatedDbName, primaryDbName, null, ErrorMsg.REPL_INCOMPATIBLE_EXCEPTION.getErrorCode());
assertTrue(ReplUtils.failedWithNonRecoverableError(new Path(dumpData.dumpLocation), conf));

primary.dumpFailure(primaryDbName);
assertTrue(ReplUtils.failedWithNonRecoverableError(new Path(dumpData.dumpLocation), conf));
} finally {
txnHandler.abortTxn(new AbortTxnRequest(sourceTxnId));
}
}

@Test
public void testReplOperationsNotCapturedInNotificationLog() throws Throwable {
//Perform empty bootstrap dump and load
Expand Down Expand Up @@ -1139,8 +1197,9 @@ public void testMultiDBTxn() throws Throwable {
//End of additional steps
try {
replica.loadWithoutExplain("", "`*`");
} catch (SemanticException e) {
assertEquals("REPL LOAD * is not supported", e.getMessage());
fail();
} catch (HiveException e) {
assertEquals("MetaException(message:Database name cannot be null.)", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.WarehouseInstance.Tuple;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
Expand Down Expand Up @@ -692,8 +693,9 @@ public void testBootStrapDumpOfWarehouse() throws Throwable {
.run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')");
try {
replica.load("", "`*`");
} catch (SemanticException e) {
assertEquals("REPL LOAD * is not supported", e.getMessage());
Assert.fail();
} catch (HiveException e) {
assertEquals("MetaException(message:Database name cannot be null.)", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ public int execute() {
try {
long loadTaskStartTime = System.currentTimeMillis();
SecurityUtils.reloginExpiringKeytabUser();
//Don't proceed if target db is replication incompatible.
if (MetaStoreUtils.isDbReplIncompatible(getHive().getDatabase(work.getTargetDatabase()))) {
throw new SemanticException(ErrorMsg.REPL_INCOMPATIBLE_EXCEPTION, work.dbNameToLoadIn);
}
Task<?> rootTask = work.getRootTask();
if (rootTask != null) {
rootTask.setChildTasks(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.logging.log4j.Level;
Expand All @@ -78,6 +79,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -241,12 +243,23 @@ public void testAbortTxn() throws Exception {

@Test
public void testAbortTxns() throws Exception {
createDatabaseForReplTests("default", MetaStoreUtils.getDefaultCatalog(conf));
OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
List<Long> txnList = openedTxns.getTxn_ids();
txnHandler.abortTxns(new AbortTxnsRequest(txnList));

OpenTxnRequest replRqst = new OpenTxnRequest(2, "me", "localhost");
replRqst.setReplPolicy("default.*");
replRqst.setTxn_type(TxnType.REPL_CREATED);
replRqst.setReplSrcTxnIds(Arrays.asList(1L, 2L));
List<Long> targetTxns = txnHandler.openTxns(replRqst).getTxn_ids();

assertTrue(targetTxnsPresentInReplTxnMap(1L, 2L, targetTxns));
txnHandler.abortTxns(new AbortTxnsRequest(targetTxns));
assertFalse(targetTxnsPresentInReplTxnMap(1L, 2L, targetTxns));

GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
assertEquals(3, txnsInfo.getOpen_txns().size());
assertEquals(5, txnsInfo.getOpen_txns().size());
txnsInfo.getOpen_txns().forEach(txn ->
assertEquals(TxnState.ABORTED, txn.getState())
);
Expand Down Expand Up @@ -1240,6 +1253,7 @@ public void testRecoverManyTimeouts() throws Exception {

@Test
public void testReplTimeouts() throws Exception {
createDatabaseForReplTests("default", MetaStoreUtils.getDefaultCatalog(conf));
long timeout = txnHandler.setTimeout(1);
try {
OpenTxnRequest request = new OpenTxnRequest(3, "me", "localhost");
Expand Down Expand Up @@ -1691,8 +1705,28 @@ private void checkReplTxnForTest(Long startTxnId, Long endTxnId, String replPoli
}
}

private boolean targetTxnsPresentInReplTxnMap(Long startTxnId, Long endTxnId, List<Long> targetTxnId) throws Exception {
String[] output = TestTxnDbUtil.queryToString(conf, "SELECT \"RTM_TARGET_TXN_ID\" FROM \"REPL_TXN_MAP\" WHERE " +
" \"RTM_SRC_TXN_ID\" >= " + startTxnId + "AND \"RTM_SRC_TXN_ID\" <= " + endTxnId).split("\n");
List<Long> replayedTxns = new ArrayList<>();
for (int idx = 1; idx < output.length; idx++) {
replayedTxns.add(Long.parseLong(output[idx].trim()));
}
return replayedTxns.equals(targetTxnId);
}

private void createDatabaseForReplTests(String dbName, String catalog) throws Exception {
String query = "select \"DB_ID\" from \"DBS\" where \"NAME\" = '" + dbName + "' and \"CTLG_NAME\" = '" + catalog + "'";
String[] output = TestTxnDbUtil.queryToString(conf, query).split("\n");
if (output.length == 1) {
query = "INSERT INTO \"DBS\"(\"DB_ID\", \"NAME\", \"CTLG_NAME\", \"DB_LOCATION_URI\") VALUES (1, '" + dbName + "','" + catalog + "','dummy')";
TestTxnDbUtil.executeUpdate(conf, query);
}
}

@Test
public void testReplOpenTxn() throws Exception {
createDatabaseForReplTests("default", MetaStoreUtils.getDefaultCatalog(conf));
int numTxn = 50000;
String[] output = TestTxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n");
long startTxnId = Long.parseLong(output[1].trim());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,15 @@ public void testHeartbeaterReplicationTxn() throws Exception {
} catch (LockException e) {
exception = e;
}
Assert.assertNotNull("Txn should have been aborted", exception);
Assert.assertEquals(ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
Assert.assertNull("This CommitTxnRequest is no op since transaction is already aborted by reaper.", exception);
try {
txnMgr.replRollbackTxn(replPolicy, 1L);
} catch (LockException e) {
exception = e;
}
Assert.assertNull("This AbortTxnRequest is no op since transaction is already aborted by reaper.", exception);
Assert.assertEquals(1, Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_TIMED_OUT_TXNS).getCount());
};
}

@Before
public void setUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public void testReplLoadFailure() throws Exception {
IncrementalLoadMetricCollector metricCollector =
new IncrementalLoadMetricCollector(null, TEST_PATH, 0, conf);
ReplLoadWork replLoadWork = Mockito.mock(ReplLoadWork.class);
Mockito.when(replLoadWork.getTargetDatabase()).thenReturn("dummy");
Mockito.when(replLoadWork.getDumpDirectory()).thenReturn(
new Path(dumpDir + Path.SEPARATOR + "test").toString());
Mockito.when(replLoadWork.getMetricCollector()).thenReturn(metricCollector);
Expand All @@ -172,6 +173,7 @@ public void testReplLoadRecoverableMissingStage() throws Exception {
BootstrapLoadMetricCollector metricCollector =
new BootstrapLoadMetricCollector(null, TEST_PATH, 0, conf);
ReplLoadWork replLoadWork = Mockito.mock(ReplLoadWork.class);
Mockito.when(replLoadWork.getTargetDatabase()).thenReturn("dummy");
Mockito.when(replLoadWork.getDumpDirectory()).thenReturn(
new Path(dumpDir + Path.SEPARATOR + "test").toString());
Mockito.when(replLoadWork.getMetricCollector()).thenReturn(metricCollector);
Expand All @@ -192,6 +194,7 @@ public void testReplLoadNonRecoverableMissingStage() throws Exception {
IncrementalLoadMetricCollector metricCollector =
new IncrementalLoadMetricCollector(null, TEST_PATH, 0, conf);
ReplLoadWork replLoadWork = Mockito.mock(ReplLoadWork.class);
Mockito.when(replLoadWork.getTargetDatabase()).thenReturn("dummy");
Mockito.when(replLoadWork.getDumpDirectory()).thenReturn(
new Path(dumpDir + Path.SEPARATOR + "test").toString());
Mockito.when(replLoadWork.getMetricCollector()).thenReturn(metricCollector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ public class ReplConst {
public static final String REPL_FAILOVER_ENABLED = "repl.failover.enabled";

public static final String TARGET_OF_REPLICATION = "repl.target.for";

public static final String REPL_INCOMPATIBLE = "repl.incompatible";
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,19 @@ public static boolean isExternalTable(Table table) {
return isExternal(params);
}

public static String getDbNameFromReplPolicy(String replPolicy) {
assert replPolicy != null;
return replPolicy.split(Pattern.quote("."))[0];
}

public static boolean isDbReplIncompatible(Database db) {
if (db == null) {
return false;
}
Map<String, String> dbParameters = db.getParameters();
return dbParameters != null && ReplConst.TRUE.equalsIgnoreCase(dbParameters.get(ReplConst.REPL_INCOMPATIBLE));
}

public static boolean isDbBeingFailedOver(Database db) {
assert (db != null);
Map<String, String> dbParameters = db.getParameters();
Expand Down
Loading