Skip to content

Commit

Permalink
Addressed review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
sankarh committed Jun 18, 2019
1 parent a4ff345 commit aba61fe
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 65 deletions.
Expand Up @@ -478,8 +478,8 @@ public void testBasicReplaceReplPolicy() throws Throwable {

@Test
public void testReplacePolicyOnBootstrapAcidTablesIncrementalPhase() throws Throwable {
String[] originalNonAcidTables = new String[] {"a1", "b2" };
String[] originalFullAcidTables = new String[] {"a2", "b1" };
String[] originalNonAcidTables = new String[] {"a1", "b1", "c1" };
String[] originalFullAcidTables = new String[] {"a2", "b2" };
String[] originalMMAcidTables = new String[] {"a3", "a4" };
createTables(originalNonAcidTables, CreateTableType.NON_ACID);
createTables(originalFullAcidTables, CreateTableType.FULL_ACID);
Expand All @@ -488,28 +488,54 @@ public void testReplacePolicyOnBootstrapAcidTablesIncrementalPhase() throws Thro
// Replicate and verify if only non-acid tables are replicated to target.
List<String> dumpWithoutAcidClause = Collections.singletonList(
"'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'");
String replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a4']";
String[] bootstrapReplicatedTables = new String[] {"a1", "b2" };
String replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['b1']";
String[] bootstrapReplicatedTables = new String[] {"a1" };
String lastReplId = replicateAndVerify(replPolicy, null,
dumpWithoutAcidClause, null, bootstrapReplicatedTables);

// Enable acid tables for replication. Also, replace, replication policy to exclude "a3"
// instead of "a4" and also "b2".
// Enable acid tables for replication. Also, replace, replication policy to exclude "b1" and "a3"
// instead of "a1" alone.
String oldReplPolicy = replPolicy;
replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a3', 'b2']";
replPolicy = primaryDbName + ".['[a-z]+[0-9]+'].['a3', 'b1']";
List<String> dumpWithAcidBootstrapClause = Arrays.asList(
"'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'",
"'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
String[] incrementalReplicatedTables = new String[] {"a1", "a2", "a4", "b1" };
String[] bootstrappedTables = new String[] {"a2", "a4", "b1" };
String[] incrementalReplicatedTables = new String[] {"a1", "a2", "a4", "b2", "c1" };
String[] bootstrappedTables = new String[] {"a2", "a4", "b2", "c1" };
replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
dumpWithAcidBootstrapClause, null, bootstrappedTables, incrementalReplicatedTables);
}

@Test
public void testReplacePolicyWhenAcidTablesDisabledForRepl() throws Throwable {
String[] originalNonAcidTables = new String[] {"a1", "b1", "c1" };
String[] originalFullAcidTables = new String[] {"a2" };
createTables(originalNonAcidTables, CreateTableType.NON_ACID);
createTables(originalFullAcidTables, CreateTableType.FULL_ACID);

// Replicate and verify if only non-acid tables are replicated to target.
List<String> dumpWithoutAcidClause = Collections.singletonList(
"'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'");
String replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['b1']";
String[] bootstrapReplicatedTables = new String[] {"a1" };
String lastReplId = replicateAndVerify(replPolicy, null,
dumpWithoutAcidClause, null, bootstrapReplicatedTables);

// Continue to disable ACID tables for replication. Also, replace, replication policy to include
// "a2" but exclude "a1" and "b1". Still ACID tables shouldn't be bootstrapped. Only non-ACID
// table "b1" should be bootstrapped.
String oldReplPolicy = replPolicy;
replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a2']";
String[] incrementalReplicatedTables = new String[] {"a1", "b1" };
String[] bootstrappedTables = new String[] {"b1" };
lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
dumpWithoutAcidClause, null, bootstrappedTables, incrementalReplicatedTables);
}

@Test
public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws Throwable {
String[] originalAcidTables = new String[] {"a1", "b2" };
String[] originalExternalTables = new String[] {"a2", "b1", "c3" };
String[] originalAcidTables = new String[] {"a1", "b1" };
String[] originalExternalTables = new String[] {"a2", "b2", "c2" };
createTables(originalAcidTables, CreateTableType.FULL_ACID);
createTables(originalExternalTables, CreateTableType.EXTERNAL);

Expand All @@ -518,17 +544,26 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws
List<String> dumpWithClause = Collections.singletonList(
"'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"
);
String replPolicy = primaryDbName + ".['a[0-9]+', 'b2'].['a1']";
String[] bootstrapReplicatedTables = new String[] {"b2" };
String replPolicy = primaryDbName + ".['a[0-9]+', 'b1'].['a1']";
String[] bootstrapReplicatedTables = new String[] {"b1" };
String lastReplId = replicateAndVerify(replPolicy, null,
dumpWithClause, loadWithClause, bootstrapReplicatedTables);

// Continue to disable external tables for replication. Also, replace, replication policy to exclude
// "b1" and include "a1".
String oldReplPolicy = replPolicy;
replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a2', 'b1']";
String[] incrementalReplicatedTables = new String[] {"a1" };
String[] bootstrappedTables = new String[] {"a1" };
lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
dumpWithClause, loadWithClause, bootstrappedTables, incrementalReplicatedTables);

// Enable external tables replication and bootstrap in incremental phase. Also, replace,
// replication policy to exclude tables with prefix "b".
String oldReplPolicy = replPolicy;
oldReplPolicy = replPolicy;
replPolicy = primaryDbName + ".['[a-z]+[0-9]+'].['b[0-9]+']";
String[] incrementalReplicatedTables = new String[] {"a1", "a2", "c3" };
String[] bootstrappedTables = new String[] {"a1", "a2", "c3" };
incrementalReplicatedTables = new String[] {"a1", "a2", "c2" };
bootstrappedTables = new String[] {"a2", "c2" };
dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
"'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
Expand All @@ -538,8 +573,8 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws
Assert.assertTrue(primary.miniDFSCluster.getFileSystem()
.exists(new Path(tuple.dumpLocation, FILE_NAME)));

// Verify that the external table info contains table "a2" and "c3".
ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2", "c3"),
// Verify that the external table info contains table "a2" and "c2".
ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2", "c2"),
new Path(tuple.dumpLocation, FILE_NAME));

// Verify if the expected tables are bootstrapped.
Expand Down
68 changes: 50 additions & 18 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
Expand Up @@ -143,35 +143,68 @@ private void prepareReturnValues(List<String> values) throws SemanticException {
* locations.
* 2. External or ACID tables are being bootstrapped for the first time : so that we can dump
* those tables as a whole.
* @return
* 3. If replication policy is changed/replaced, then need to examine all the tables to see if
* any of them need to be bootstrapped as old policy doesn't include it but new one does.
* @return true if need to examine tables for dump and false if not.
*/
private boolean shouldExamineTablesToDump() {
return (work.oldReplScope != null)
|| conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
|| conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES);
}

/**
* Decide whether to dump external tables data. If external tables are enabled for replication,
* then need to dump it's data in all the incremental dumps.
* @return true if need to dump external table data and false if not.
*/
private boolean shouldDumpExternalTableLocation() {
return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
&& (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)
|| conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES));
&& !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
}

private boolean shouldExamineTablesToDump() {
return (work.oldReplScope != null)
|| shouldDumpExternalTableLocation()
|| conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
/**
* Decide whether to dump external tables.
* @param tableName - Name of external table to be replicated
* @return true if need to bootstrap dump external table and false if not.
*/
private boolean shouldBootstrapDumpExternalTable(String tableName) {
// Note: If repl policy is replaced, then need to dump external tables if table is getting replicated
// for the first time in current dump. So, need to check if table is included in old policy.
return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
&& (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)
|| !ReplUtils.tableIncludedInReplScope(work.oldReplScope, tableName));
}

/**
* Decide whether to dump ACID tables.
* @param tableName - Name of ACID table to be replicated
* @return true if need to bootstrap dump ACID table and false if not.
*/
private boolean shouldBootstrapDumpAcidTable(String tableName) {
// Note: If repl policy is replaced, then need to dump ACID tables if table is getting replicated
// for the first time in current dump. So, need to check if table is included in old policy.
return ReplUtils.includeAcidTableInDump(conf)
&& (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
|| !ReplUtils.tableIncludedInReplScope(work.oldReplScope, tableName));
}

private boolean shouldBootstrapDumpTable(Table table) {
if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)
&& TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
// Note: If control reaches here, it means, table is already included in new replication policy.
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
&& shouldBootstrapDumpExternalTable(table.getTableName())) {
return true;
}

if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
&& AcidUtils.isTransactionalTable(table)) {
if (AcidUtils.isTransactionalTable(table)
&& shouldBootstrapDumpAcidTable(table.getTableName())) {
return true;
}

// If replication policy is replaced with new included/excluded tables list, then tables which
// If replication policy is changed with new included/excluded tables list, then tables which
// are not included in old policy but included in new policy should be bootstrapped along with
// the current incremental replication dump.
// Note: If control reaches here, it means, table is included in new replication policy.
// Control reaches for Non-ACID tables.
return !ReplUtils.tableIncludedInReplScope(work.oldReplScope, table.getTableName());
}

Expand Down Expand Up @@ -263,10 +296,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
Table table = hiveDb.getTable(dbName, tableName);

// Dump external table locations if required.
// Note: If repl policy is replaced, then need to dump external tables if table is getting replicated
// for the first time in current dump. So, need to check if table is included in old policy.
if ((shouldDumpExternalTableLocation() || !ReplUtils.tableIncludedInReplScope(work.oldReplScope, tableName))
&& TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
&& shouldDumpExternalTableLocation()) {
writer.dataLocationDump(table);
}

Expand All @@ -291,7 +322,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
private boolean needBootstrapAcidTablesDuringIncrementalDump() {
// If old replication policy is available, then it is possible some of the ACID tables might be
// included for bootstrap during incremental dump.
return (work.oldReplScope != null) || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
return (ReplUtils.includeAcidTableInDump(conf)
&& ((work.oldReplScope != null) || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)));
}

private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) {
Expand Down
Expand Up @@ -491,7 +491,7 @@ private int executeIncrementalLoad(DriverContext driverContext) {

// If replication policy is changed between previous and current repl load, then drop the tables
// that are excluded in the new replication policy.
dropTablesExcludedInReplScope(work.changedReplScope);
dropTablesExcludedInReplScope(work.currentReplScope);

IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();

Expand Down
Expand Up @@ -43,7 +43,7 @@
Explain.Level.EXTENDED })
public class ReplLoadWork implements Serializable {
final String dbNameToLoadIn;
final ReplScope changedReplScope;
final ReplScope currentReplScope;
final String dumpDirectory;
final String bootstrapDumpToCleanTables;
boolean needCleanTablesFromBootstrap;
Expand All @@ -64,17 +64,17 @@ public class ReplLoadWork implements Serializable {
final LineageState sessionStateLineageState;

public ReplLoadWork(HiveConf hiveConf, String dumpDirectory,
String dbNameToLoadIn, ReplScope changedReplScope,
String dbNameToLoadIn, ReplScope currentReplScope,
LineageState lineageState, boolean isIncrementalDump, Long eventTo,
List<DirCopyWork> pathsToCopyIterator) throws IOException {
sessionStateLineageState = lineageState;
this.dumpDirectory = dumpDirectory;
this.dbNameToLoadIn = dbNameToLoadIn;
this.changedReplScope = changedReplScope;
this.currentReplScope = currentReplScope;

// If DB name is changed during REPL LOAD, then set it instead of referring to source DB name.
if ((changedReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) {
changedReplScope.setDbName(dbNameToLoadIn);
if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) {
currentReplScope.setDbName(dbNameToLoadIn);
}
this.bootstrapDumpToCleanTables = hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG);
this.needCleanTablesFromBootstrap = StringUtils.isNotBlank(this.bootstrapDumpToCleanTables);
Expand Down
22 changes: 11 additions & 11 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
Expand Up @@ -890,28 +890,28 @@ importStatement
;
replDumpStatement
@init { pushMsg("replication dump statement", state); }
@init { pushMsg("Replication dump statement", state); }
@after { popMsg(state); }
: KW_REPL KW_DUMP
(dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)?
(KW_REPLACE replacePolicy=replReplacePolicy)?
(dbPolicy=replDbPolicy)
(KW_REPLACE oldDbPolicy=replDbPolicy)?
(KW_FROM (eventId=Number)
(KW_TO (rangeEnd=Number))?
(KW_LIMIT (batchSize=Number))?
)?
(KW_WITH replConf=replConfigs)?
-> ^(TOK_REPL_DUMP $dbName $tablePolicy? $replacePolicy? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?)
-> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?)
;
replReplacePolicy
@init { pushMsg("repl dump replaces replication policy", state); }
replDbPolicy
@init { pushMsg("Repl dump DB replication policy", state); }
@after { popMsg(state); }
:
(dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? -> ^(TOK_REPLACE $dbName $tablePolicy?)
(dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? -> $dbName $tablePolicy?
;
replLoadStatement
@init { pushMsg("replication load statement", state); }
@init { pushMsg("Replication load statement", state); }
@after { popMsg(state); }
: KW_REPL KW_LOAD
(dbName=identifier)?
Expand All @@ -921,21 +921,21 @@ replLoadStatement
;
replConfigs
@init { pushMsg("repl configurations", state); }
@init { pushMsg("Repl configurations", state); }
@after { popMsg(state); }
:
LPAREN replConfigsList RPAREN -> ^(TOK_REPL_CONFIG replConfigsList)
;
replConfigsList
@init { pushMsg("repl configurations list", state); }
@init { pushMsg("Repl configurations list", state); }
@after { popMsg(state); }
:
keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_REPL_CONFIG_LIST keyValueProperty+)
;
replTableLevelPolicy
@init { pushMsg("replication table level policy definition", state); }
@init { pushMsg("Replication table level policy definition", state); }
@after { popMsg(state); }
:
((replTablesIncludeList=replTablesList) (DOT replTablesExcludeList=replTablesList)?)
Expand Down

0 comments on commit aba61fe

Please sign in to comment.