Skip to content
Permalink
Browse files
HIVE-25872: Skip tracking of alterDatabase events for replication spe…
…cific properties (#2950).  (Haymant Mangla, reviewed by Ayush Saxena andRajesh Balamohan)
  • Loading branch information
hmangla98 committed May 17, 2022
1 parent 74e3f29 commit 8077bb118cb418cae24a46f7155c481bf893da99
Showing 37 changed files with 216 additions and 139 deletions.
@@ -39,7 +39,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -36,7 +36,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

public class BaseReplicationAcrossInstances {
@Rule
@@ -59,7 +59,7 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

/**
* TestReplicationScenariosAcidTablesBase - base class for replication for ACID tables tests
@@ -43,7 +43,7 @@

import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

public class TestCopyUtils {
@Rule
@@ -44,7 +44,7 @@
import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

/**
* TestMetaStoreEventListenerInRepl - Test metastore events created by replication.
@@ -122,9 +122,6 @@ private Map<String, Set<String>> prepareBootstrapData(String primaryDbName) thro
// Add expected events with associated tables, if any.
Map<String, Set<String>> eventsMap = new HashMap<>();
eventsMap.put(CreateDatabaseEvent.class.getName(), null);
// Replication causes many implicit alter database operations, so metastore will see some
// alter table events as well.
eventsMap.put(AlterDatabaseEvent.class.getName(), null);
eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t4")));
eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t4")));
return eventsMap;
@@ -142,9 +139,6 @@ Map<String, Set<String>> prepareIncData(String dbName) throws Throwable {

// Add expected events with associated tables, if any.
Map<String, Set<String>> eventsMap = new HashMap<>();
// Replication causes many implicit alter database operations, so metastore will see some
// alter table events as well.
eventsMap.put(AlterDatabaseEvent.class.getName(), null);
eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t6")));
eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1", "t2", "t6")));
eventsMap.put(DropTableEvent.class.getName(), new HashSet<>(Arrays.asList("t2")));
@@ -160,9 +154,6 @@ Map<String, Set<String>> prepareInc2Data(String dbName) throws Throwable {
.run("drop table t1");
// Add expected events with associated tables, if any.
Map<String, Set<String>> eventsMap = new HashMap<>();
// Replication causes many implicit alter database operations, so metastore will see some
// alter table events as well.
eventsMap.put(AlterDatabaseEvent.class.getName(), null);
eventsMap.put(CreateTableEvent.class.getName(), new HashSet<>(Arrays.asList("t7")));
eventsMap.put(AlterTableEvent.class.getName(), new HashSet<>(Arrays.asList("t4", "t7")));
eventsMap.put(DropTableEvent.class.getName(), new HashSet<>(Arrays.asList("t1")));
@@ -41,7 +41,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

/**
* TestReplicationOfHiveStreaming - test replication for streaming ingest on ACID tables.
@@ -43,7 +43,7 @@
import java.util.List;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

public class TestReplicationOnHDFSEncryptedZones {
private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks";
@@ -550,9 +550,8 @@ public void testTargetEventIdGenerationAfterFirstIncremental() throws Throwable
.getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), -1,
new DatabaseAndTableFilter(replicatedDbName, null));

// There should be 4 events, one for alter db, second to remove first incremental pending and then two custom
// alter operations.
assertEquals(4, nl.getEvents().size());
// There should be 2 events, two custom alter operations.
assertEquals(2, nl.getEvents().size());
}

@Test
@@ -660,7 +659,7 @@ public void testTargetEventIdGeneration() throws Throwable {
.getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), 10,
new DatabaseAndTableFilter(replicatedDbName, null));

assertEquals(1, nl.getEvents().size());
assertEquals(0, nl.getEventsSize());
}

@Test
@@ -134,7 +134,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL;
@@ -77,7 +77,7 @@
import java.util.Map;


import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.junit.Assert.assertEquals;
@@ -156,6 +156,50 @@ public void tearDown() throws Throwable {
primary.run("drop database if exists " + primaryDbName + "_extra cascade");
}

@Test
public void testReplAlterDbEventsNotCapturedInNotificationLog() throws Throwable {
String srcDbName = "srcDb";
String replicaDb = "tgtDb";
try {
//Perform empty bootstrap dump and load
primary.run("CREATE DATABASE " + srcDbName);
long lastEventId = primary.getCurrentNotificationEventId().getEventId();
//Assert that repl.source.for is not captured in NotificationLog
WarehouseInstance.Tuple dumpData = primary.dump(srcDbName);
long latestEventId = primary.getCurrentNotificationEventId().getEventId();
assertEquals(lastEventId, latestEventId);

replica.run("REPL LOAD " + srcDbName + " INTO " + replicaDb);
latestEventId = replica.getCurrentNotificationEventId().getEventId();
//Assert that repl.target.id, hive.repl.ckpt.key and hive.repl.first.inc.pending is not captured in notificationLog.
assertEquals(latestEventId, lastEventId + 1); //This load will generate only 1 event i.e. CREATE_DATABASE

WarehouseInstance.Tuple incDump = primary.run("use " + srcDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
.run("insert into t1 values(1)")
.dump(srcDbName);

//Assert that repl.last.id is not captured in notification log.
long noOfEventsInDumpDir = primary.getNoOfEventsDumped(incDump.dumpLocation, conf);
lastEventId = primary.getCurrentNotificationEventId().getEventId();
replica.run("REPL LOAD " + srcDbName + " INTO " + replicaDb);

latestEventId = replica.getCurrentNotificationEventId().getEventId();

//Validate that there is no addition event generated in notificationLog table apart from replayed ones.
assertEquals(latestEventId, lastEventId + noOfEventsInDumpDir);

long targetDbReplId = Long.parseLong(replica.getDatabase(replicaDb)
.getParameters().get(ReplConst.REPL_TARGET_TABLE_PROPERTY));
//Validate that repl.last.id db property has been updated successfully.
assertEquals(targetDbReplId, lastEventId);
} finally {
primary.run("DROP DATABASE IF EXISTS " + srcDbName + " CASCADE");
replica.run("DROP DATABASE IF EXISTS " + replicaDb + " CASCADE");
}
}

@Test
public void testTargetDbReplIncompatibleWithNoPropSet() throws Throwable {
testTargetDbReplIncompatible(false);
@@ -930,8 +974,7 @@ public void testReplOperationsNotCapturedInNotificationLog() throws Throwable {
lastEventId = replica.getCurrentNotificationEventId().getEventId();
replica.run("REPL LOAD " + primaryDbName + " INTO " + replicatedDbName);
currentEventId = replica.getCurrentNotificationEventId().getEventId();
//This iteration of repl load will have only one event i.e ALTER_DATABASE to update repl.last.id for the target db.
assert currentEventId == lastEventId + 1;
assert currentEventId == lastEventId;

primary.run("ALTER DATABASE " + primaryDbName +
" SET DBPROPERTIES('" + ReplConst.TARGET_OF_REPLICATION + "'='')");
@@ -1729,7 +1772,7 @@ public void testOpenTxnEvent() throws Throwable {
primary.dump(primaryDbName);

long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId);
primary.testEventCounts(primaryDbName, lastReplId, null, null, 12);
primary.testEventCounts(primaryDbName, lastReplId, null, null, 10);

// Test load
replica.load(replicatedDbName, primaryDbName)
@@ -2278,14 +2321,18 @@ public void testCheckpointingOnFirstEventDump() throws Throwable {
fs.delete(ackLastEventID, false);
fs.delete(dumpMetaData, false);
//delete all the event folder except first one.
long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
long firstIncEventID = -1;
long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
assertTrue(lastIncEventID > (firstIncEventID + 1));

for (long eventId=firstIncEventID + 1; eventId<=lastIncEventID; eventId++) {
for (long eventId=Long.parseLong(bootstrapDump.lastReplicationId) + 1; eventId<=lastIncEventID; eventId++) {
Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
if (fs.exists(eventRoot)) {
fs.delete(eventRoot, true);
if (firstIncEventID == -1){
firstIncEventID = eventId;
} else {
fs.delete(eventRoot, true);
}
}
}

@@ -74,7 +74,7 @@

import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_DONT_SET;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -967,13 +967,13 @@ public void testShouldDumpMetaDataForNonNativeTableIfSetMeataDataOnly() throws T
}

private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
assertTrue(props.containsKey(ReplConst.REPL_TARGET_DB_PROPERTY));
String hiveDumpDir = dumpDir + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(hiveDumpDir));
assertTrue(props.get(ReplConst.REPL_TARGET_DB_PROPERTY).equals(hiveDumpDir));
}

private void verifyIfCkptPropMissing(Map<String, String> props) {
assertFalse(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
assertFalse(props.containsKey(ReplConst.REPL_TARGET_DB_PROPERTY));
}

private void verifyIfSrcOfReplPropMissing(Map<String, String> props) {
@@ -46,7 +46,7 @@
import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

/**
* Test replication scenarios with staging on replica.
@@ -83,7 +83,7 @@

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK_PATHS;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -728,20 +728,24 @@ public void externalTableIncrementalCheckpointing() throws Throwable {
fs.delete(ackFile, false);
fs.delete(ackLastEventID, false);
//delete all the event folders except first event
long startEvent = Long.valueOf(tuple.lastReplicationId) + 1;
long startEvent = -1;
long endEvent = Long.valueOf(incrementalDump1.lastReplicationId);
for (long eventDir = Long.valueOf(tuple.lastReplicationId) + 1; eventDir <= endEvent; eventDir++) {
Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventDir));
if (fs.exists(eventRoot)) {
if (startEvent == -1){
startEvent = eventDir;
} else {
fs.delete(eventRoot, true);
}
}
}
Path startEventRoot = new Path(hiveDumpDir, String.valueOf(startEvent));
Map<Path, Long> firstEventModTimeMap = new HashMap<>();
for (FileStatus fileStatus: fs.listStatus(startEventRoot)) {
firstEventModTimeMap.put(fileStatus.getPath(), fileStatus.getModificationTime());
}
long endEvent = Long.valueOf(incrementalDump1.lastReplicationId);
assertTrue(endEvent - startEvent > 1);
for (long eventDir = startEvent + 1; eventDir <= endEvent; eventDir++) {
Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventDir));
if (fs.exists(eventRoot)) {
fs.delete(eventRoot, true);
}
}
Utils.writeOutput(String.valueOf(startEvent), ackLastEventID, primary.hiveConf);
WarehouseInstance.Tuple incrementalDump2 = primary.dump(primaryDbName, withClause);
assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
@@ -48,7 +48,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_HIVE_BASE_DIR;
import static org.junit.Assert.assertFalse;
@@ -27,7 +27,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreClientWithLocalCache;
import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.shims.Utils;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import org.junit.rules.TestName;

import org.slf4j.Logger;
@@ -57,7 +57,7 @@
import java.util.List;
import java.util.ArrayList;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

@@ -58,7 +58,7 @@
import java.util.Map;
import javax.annotation.Nullable;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

/**
* Tests for statistics replication.
@@ -432,8 +432,8 @@ private void printOutput() throws IOException {
}

private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir));
assertTrue(props.containsKey(ReplConst.REPL_TARGET_DB_PROPERTY));
assertTrue(props.get(ReplConst.REPL_TARGET_DB_PROPERTY).equals(dumpDir));
}

public void verifyIfCkptSet(String dbName, String dumpDir) throws Exception {
@@ -18,7 +18,7 @@

package org.apache.hadoop.hive.ql.security.authorization.plugin;

import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -29,7 +29,7 @@
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.Table;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -87,7 +87,7 @@
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;

public class TestJdbcWithMiniHS2 {
private static MiniHS2 miniHS2 = null;
@@ -18,8 +18,8 @@

package org.apache.hadoop.hive.ql.ddl.misc.flags;

import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;

import java.util.Map;

@@ -43,9 +43,9 @@ public int execute() throws Exception {
for (String dbName : Utils.matchesDb(context.getDb(), dbNameOrPattern)) {
Database database = context.getDb().getMSC().getDatabase(dbName);
Map<String, String> parameters = database.getParameters();
String incPendPara = parameters != null ? parameters.get(ReplUtils.REPL_FIRST_INC_PENDING_FLAG) : null;
String incPendPara = parameters != null ? parameters.get(ReplConst.REPL_FIRST_INC_PENDING_FLAG) : null;
if (incPendPara != null) {
parameters.remove(ReplUtils.REPL_FIRST_INC_PENDING_FLAG);
parameters.remove(ReplConst.REPL_FIRST_INC_PENDING_FLAG);
context.getDb().getMSC().alterDatabase(dbName, database);
}
}

0 comments on commit 8077bb1

Please sign in to comment.