Skip to content

Commit

Permalink
HIVE-16171: Support replication of truncate table
Browse files Browse the repository at this point in the history
  • Loading branch information
sankarh committed Apr 7, 2017
1 parent a01a6a3 commit ec3f2c5
Show file tree
Hide file tree
Showing 18 changed files with 332 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,132 @@ public void testDumpLimit() throws IOException {
verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);
}

@Test
public void testTruncateTable() throws IOException {
String testName = "truncateTable";
LOG.info("Testing " + testName);
String dbName = testName + "_" + tid;

run("CREATE DATABASE " + dbName);
run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");

advanceDumpDir();
run("REPL DUMP " + dbName);
String replDumpLocn = getResult(0, 0);
String replDumpId = getResult(0, 1, true);
LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");

String[] unptn_data = new String[] { "eleven", "twelve" };
String[] empty = new String[] {};
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);

advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
String incrementalDumpLocn = getResult(0, 0);
String incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
printOutput();
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data);
verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data);

run("TRUNCATE TABLE " + dbName + ".unptned");
verifySetup("SELECT a from " + dbName + ".unptned", empty);

advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
incrementalDumpLocn = getResult(0, 0);
incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifyRun("SELECT a from " + dbName + ".unptned", empty);
verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty);

String[] unptn_data_after_ins = new String[] { "thirteen" };
run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')");
verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);

advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
incrementalDumpLocn = getResult(0, 0);
incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins);
verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins);
}

@Test
public void testTruncatePartitionedTable() throws IOException {
String testName = "truncatePartitionedTable";
LOG.info("Testing " + testName);
String dbName = testName + "_" + tid;

run("CREATE DATABASE " + dbName);
run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");

String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" };
String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" };
String[] empty = new String[] {};
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')");

run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')");
run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')");

verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2);
verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1);
verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2);

advanceDumpDir();
run("REPL DUMP " + dbName);
String replDumpLocn = getResult(0, 0);
String replDumpId = getResult(0, 1, true);
LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2);
verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1);
verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2);

run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)");
verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1);
verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty);

run("TRUNCATE TABLE " + dbName + ".ptned_2");
verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty);
verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty);

advanceDumpDir();
run("REPL DUMP " + dbName + " FROM " + replDumpId);
String incrementalDumpLocn = getResult(0, 0);
String incrementalDumpId = getResult(0, 1, true);
LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId);
replDumpId = incrementalDumpId;
run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1);
verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty);
verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty);
verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty);
}

@Test
public void testStatus() throws IOException {
// first test ReplStateMap functionality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,18 @@ public void onCreateTable (CreateTableEvent tableEvent) throws MetaException {
public void onDropTable (DropTableEvent tableEvent) throws MetaException {
}

/**
* @param add partition event
* @throws MetaException
*/

/**
* @param tableEvent alter table event
* @throws MetaException
*/
public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
}

public void onAddPartition (AddPartitionEvent partitionEvent)
throws MetaException {
/**
* @param partitionEvent add partition event
* @throws MetaException
*/
public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException {
}

/**
Expand Down
47 changes: 30 additions & 17 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -4687,6 +4687,9 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H
}
}
}

// Update the table stats using alterTable/Partition operations
updateTableStats(db, table, partSpec);
} catch (Exception e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
}
Expand Down Expand Up @@ -4724,39 +4727,49 @@ private List<Path> getLocations(Hive db, Table table, Map<String, String> partSp
if (table.isPartitioned()) {
for (Partition partition : db.getPartitions(table)) {
locations.add(partition.getDataLocation());
EnvironmentContext environmentContext = new EnvironmentContext();
if (needToUpdateStats(partition.getParameters(), environmentContext)) {
db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
}
}
} else {
locations.add(table.getPath());
EnvironmentContext environmentContext = new EnvironmentContext();
if (needToUpdateStats(table.getParameters(), environmentContext)) {
db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext);
}
}
} else {
for (Partition partition : db.getPartitionsByNames(table, partSpec)) {
locations.add(partition.getDataLocation());
EnvironmentContext environmentContext = new EnvironmentContext();
if (needToUpdateStats(partition.getParameters(), environmentContext)) {
}
}
return locations;
}

private void updateTableStats(Hive db, Table table, Map<String, String> partSpec)
throws HiveException, InvalidOperationException {
if (partSpec == null) {
if (table.isPartitioned()) {
for (Partition partition : db.getPartitions(table)) {
EnvironmentContext environmentContext = new EnvironmentContext();
updateStatsForTruncate(partition.getParameters(), environmentContext);
db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
}
} else {
EnvironmentContext environmentContext = new EnvironmentContext();
updateStatsForTruncate(table.getParameters(), environmentContext);
db.alterTable(table.getDbName()+"."+table.getTableName(), table, environmentContext);
}
} else {
for (Partition partition : db.getPartitionsByNames(table, partSpec)) {
EnvironmentContext environmentContext = new EnvironmentContext();
updateStatsForTruncate(partition.getParameters(), environmentContext);
db.alterPartition(table.getDbName(), table.getTableName(), partition, environmentContext);
}
}
return locations;
return;
}

private boolean needToUpdateStats(Map<String,String> props, EnvironmentContext environmentContext) {
private void updateStatsForTruncate(Map<String,String> props, EnvironmentContext environmentContext) {
if (null == props) {
return false;
return;
}
boolean statsPresent = false;
for (String stat : StatsSetupConst.supportedStats) {
String statVal = props.get(stat);
if (statVal != null && Long.parseLong(statVal) > 0) {
statsPresent = true;
if (statVal != null) {
//In the case of truncate table, we set the stats to be 0.
props.put(stat, "0");
}
Expand All @@ -4766,7 +4779,7 @@ private boolean needToUpdateStats(Map<String,String> props, EnvironmentContext e
environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
//then invalidate column stats
StatsSetupConst.clearColumnStatsState(props);
return statsPresent;
return;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ private static void checkTable(Table table, ImportTableDesc tableDesc, Replicati
}
}

// Next, we verify that the destination table is not offline, a view, or a non-native table
// Next, we verify that the destination table is not offline, or a non-native table
EximUtil.validateTable(table);

// If the import statement specified that we're importing to an external
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.IOUtils;
Expand Down Expand Up @@ -128,8 +129,10 @@ public enum DUMPTYPE {
EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"),
EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"),
EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"),
EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE"),
EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"),
EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"),
EVENT_INSERT("EVENT_INSERT"),
EVENT_UNKNOWN("EVENT_UNKNOWN");

Expand Down Expand Up @@ -451,8 +454,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex
conf,
getNewEventOnlyReplicationSpec(ev.getEventId())
);
EventHandlerFactory.handlerFor(ev).handle(context);

EventHandlerFactory.handlerFor(ev, context).handle();
}

public static void injectNextDumpDirForTest(String dumpdir){
Expand Down Expand Up @@ -938,6 +940,26 @@ private List<Task<? extends Serializable>> analyzeEventLoad(
}
}
}
case EVENT_TRUNCATE_TABLE: {
AlterTableMessage truncateTableMessage = md.getAlterTableMessage(dmd.getPayload());
String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncateTableMessage.getDB() : dbName);
String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncateTableMessage.getTable() : tblName);

// First perform alter table for just in case if current alter happen after truncate table
List<Task<? extends Serializable>> tasks =
analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);

TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
actualDbName + "." + actualTblName, null);
Task<DDLWork> truncateTableTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf);
if (precursor != null) {
precursor.addDependentTask(truncateTableTask);
}
tasks.add(truncateTableTask);
LOG.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName());
dbsUpdated.put(actualDbName,dmd.getEventTo());
return tasks;
}
case EVENT_ALTER_PARTITION: {
return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);
}
Expand Down Expand Up @@ -979,6 +1001,42 @@ private List<Task<? extends Serializable>> analyzeEventLoad(
tablesUpdated.put(tableName, dmd.getEventTo());
return tasks;
}
case EVENT_TRUNCATE_PARTITION: {
AlterPartitionMessage truncatePtnMessage = md.getAlterPartitionMessage(dmd.getPayload());
String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncatePtnMessage.getDB() : dbName);
String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncatePtnMessage.getTable() : tblName);

Map<String, String> partSpec = new LinkedHashMap<String,String>();
try {
org.apache.hadoop.hive.metastore.api.Table tblObj = truncatePtnMessage.getTableObj();
org.apache.hadoop.hive.metastore.api.Partition pobjAfter = truncatePtnMessage.getPtnObjAfter();
Iterator<String> afterValIter = pobjAfter.getValuesIterator();
for (FieldSchema fs : tblObj.getPartitionKeys()){
partSpec.put(fs.getName(), afterValIter.next());
}
} catch (Exception e) {
if (!(e instanceof SemanticException)){
throw new SemanticException("Error reading message members", e);
} else {
throw (SemanticException)e;
}
}

// First perform alter partition for just in case if current alter happen after truncate partition
List<Task<? extends Serializable>> tasks =
analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated);

TruncateTableDesc truncateTableDesc = new TruncateTableDesc(
actualDbName + "." + actualTblName, partSpec);
Task<DDLWork> truncatePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf);
if (precursor != null) {
precursor.addDependentTask(truncatePtnTask);
}
tasks.add(truncatePtnTask);
LOG.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName());
dbsUpdated.put(actualDbName,dmd.getEventTo());
return tasks;
}
case EVENT_INSERT: {
md = MessageFactory.getInstance().getDeserializer();
InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ public abstract class AbstractHandler implements EventHandler {
static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class);

final NotificationEvent event;
final Context withinContext;
final MessageDeserializer deserializer;

AbstractHandler(NotificationEvent event) {
AbstractHandler(NotificationEvent event, Context withinContext) {
this.event = event;
this.withinContext = withinContext;
deserializer = MessageFactory.getInstance().getDeserializer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;

public class AddPartitionHandler extends AbstractHandler {
protected AddPartitionHandler(NotificationEvent notificationEvent) {
super(notificationEvent);
protected AddPartitionHandler(NotificationEvent notificationEvent, Context withinContext) {
super(notificationEvent, withinContext);
}

@Override
public void handle(Context withinContext) throws Exception {
public void handle() throws Exception {
AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage());
LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage());
Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs();
Expand Down Expand Up @@ -89,7 +89,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition
Iterable<String> files = partitionFilesIter.next().getFiles();
if (files != null) {
// encoded filename/checksum of files, write into _files
try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) {
try (BufferedWriter fileListWriter = writer(qlPtn)) {
for (String file : files) {
fileListWriter.write(file + "\n");
}
Expand All @@ -99,7 +99,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition
withinContext.createDmd(this).write();
}

private BufferedWriter writer(Context withinContext, Partition qlPtn)
private BufferedWriter writer(Partition qlPtn)
throws IOException {
Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName());
FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf);
Expand Down
Loading

0 comments on commit ec3f2c5

Please sign in to comment.