Skip to content

Commit

Permalink
HIVE-16171: Support replication of truncate table
Browse files Browse the repository at this point in the history
  • Loading branch information
sankarh committed Mar 31, 2017
1 parent 89c02dd commit 1f250f8
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,132 @@ public void testDumpLimit() throws IOException {
verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
}

@Test
public void testTruncateTable() throws IOException {
String testName = "truncateTable";
LOG.info("Testing " + testName);
String dbName = testName + "_" + tid;

run("CREATE DATABASE " + dbName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");

advanceDumpDir();
run("REPL DUMP " + dbName);
String replDumpLocn = getResult(0, 0);
String replDumpId = getResult(0, 1, true);
LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");

String[] unptn_data = new String[] { "eleven", "twelve" };
String[] empty = new String[] {};
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);

advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
String incrementalDumpLocn = getResult(0, 0);
String incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
printOutput();
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);

run("TRUNCATE TABLE " + dbName + ".unptned");
verifySetup("SELECT a from " + dbName + ".unptned", empty);

advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
incrementalDumpLocn = getResult(0, 0);
incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifyRun("SELECT a from " + dbName + ".unptned", empty);
verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty);

String[] unptn_data_after_ins = new String[] { "thirteen" };
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')");
verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);

advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
incrementalDumpLocn = getResult(0, 0);
incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins);
}

@Test
public void testTruncatePartitionedTable() throws IOException {
String testName = "truncatePartitionedTable";
LOG.info("Testing " + testName);
String dbName = testName + "_" + tid;

run("CREATE DATABASE " + dbName);
run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");

String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
String[] empty = new String[] {};
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')");

run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')");

verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2);
verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1);
verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2);

advanceDumpDir();
run("REPL DUMP " + dbName);
String replDumpLocn = getResult(0, 0);
String replDumpId = getResult(0, 1, true);
LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2);
verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1);
verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2);

run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)");
verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty);

run("TRUNCATE TABLE " + dbName + ".ptned_2");
verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty);
verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty);

advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
String incrementalDumpLocn = getResult(0, 0);
String incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty);
verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty);
verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty);
}

@Test
public void testStatus() throws IOException {
// first test ReplStateMap functionality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,18 @@ public void onCreateTable (CreateTableEvent tableEvent) throws MetaException {
public void onDropTable (DropTableEvent tableEvent) throws MetaException {
}

/**
* @param add partition event
* @throws MetaException
*/

/**
* @param tableEvent alter table event
* @throws MetaException
*/
public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
}

public void onAddPartition (AddPartitionEvent partitionEvent)
throws MetaException {
/**
* @param partitionEvent add partition event
* @throws MetaException
*/
public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException {
}

/**
Expand Down
47 changes: 30 additions & 17 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -4687,6 +4687,9 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H
}
}
}

// Update the table stats using alterTable/Partition operations
updateTableStats(db, table, partSpec);
} catch (Exception e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
}
Expand Down Expand Up @@ -4724,39 +4727,49 @@ private List<Path> getLocations(Hive db, Table table, Map<String, String> partSp
if (table.isPartitioned()) {
for (Partition partition : db.getPartitions(table)) {
locations.add(partition.getDataLocation());
EnvironmentContext environmentContext = new EnvironmentContext();
if (needToUpdateStats(partition.getParameters(), environmentContext)) {
db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
}
}
} else {
locations.add(table.getPath());
EnvironmentContext environmentContext = new EnvironmentContext();
if (needToUpdateStats(table.getParameters(), environmentContext)) {
db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext);
}
}
} else {
for (Partition partition : db.getPartitionsByNames(table, partSpec)) {
locations.add(partition.getDataLocation());
EnvironmentContext environmentContext = new EnvironmentContext();
if (needToUpdateStats(partition.getParameters(), environmentContext)) {
}
}
return locations;
}

private void updateTableStats(Hive db, Table table, Map<String, String> partSpec)
throws HiveException, InvalidOperationException {
if (partSpec == null) {
if (table.isPartitioned()) {
for (Partition partition : db.getPartitions(table)) {
EnvironmentContext environmentContext = new EnvironmentContext();
updateStatsForTruncate(partition.getParameters(), environmentContext);
db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
}
} else {
EnvironmentContext environmentContext = new EnvironmentContext();
updateStatsForTruncate(table.getParameters(), environmentContext);
db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext);
}
} else {
for (Partition partition : db.getPartitionsByNames(table, partSpec)) {
EnvironmentContext environmentContext = new EnvironmentContext();
updateStatsForTruncate(partition.getParameters(), environmentContext);
db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
}
}
return locations;
return;
}

private boolean needToUpdateStats(Map<String,String> props, EnvironmentContext environmentContext) {
private void updateStatsForTruncate(Map<String,String> props, EnvironmentContext environmentContext) {
if (null == props) {
return false;
return;
}
boolean statsPresent = false;
for (String stat : StatsSetupConst.supportedStats) {
String statVal = props.get(stat);
if (statVal != null && Long.parseLong(statVal) > 0) {
statsPresent = true;
if (statVal != null) {
//In the case of truncate table, we set the stats to be 0.
props.put(stat, "0");
}
Expand All @@ -4766,7 +4779,7 @@ private boolean needToUpdateStats(Map<String,String> props, EnvironmentContext e
environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
//then invalidate column stats
StatsSetupConst.clearColumnStatsState(props);
return statsPresent;
return;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ private static void checkTable(Table table, ImportTableDesc tableDesc, Replicati
}
}

// Next, we verify that the destination table is not offline, a view, or a non-native table
// Next, we verify that the destination table is not offline, or a non-native table
EximUtil.validateTable(table);

// If the import statement specified that we're importing to an external
Expand Down
Loading

0 comments on commit 1f250f8

Please sign in to comment.