HIVE-18679 : create/replicate open transaction event#305
HIVE-18679 : create/replicate open transaction event#305maheshk114 wants to merge 3 commits intoapache:masterfrom
Conversation
| String name = testName.getMethodName(); | ||
| String dbName = createDB(name, driver); | ||
|
|
||
| run("CREATE TABLE " + dbName + ".unptned(a int) PARTITIONED BY (load_date date) CLUSTERED BY(a) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); |
There was a problem hiding this comment.
Table name says unptned but it is partitioned. Probably use better name as it is acid table.
| advanceDumpDir(); | ||
| run("REPL DUMP " + dbName, driver); | ||
| String replDumpLocn = getResult(0,0,driver); | ||
| String replDumpId = getResult(0,1,true,driver); |
There was a problem hiding this comment.
Can use utility methods like bootstrapLoadAndVerify and incrementalLoadAndVerify to avoid too many lines of code.
| LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId); | ||
| run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); | ||
|
|
||
| run("INSERT INTO " + dbName + ".unptned values (1)", driver); |
There was a problem hiding this comment.
This line will fail as unptned have 2 columns and is partitioned but of course it starts a txn. Change the test to just have select query which starts a txn. In fact, DDL will also starts txn. Also, need to check if explicit "START TRANSACTION" also gets replicated.
| } | ||
|
|
||
| @Test | ||
| public void testOpenTxnRelication() throws IOException { |
There was a problem hiding this comment.
Proposed to have all ACID table replication test cases to new test file which uses WarehouseInstance class as it is simple and scalable.
| @@ -0,0 +1,6 @@ | |||
| CREATE TABLE TXN_MAP ( | |||
There was a problem hiding this comment.
Shall use REPL_ prefix for all tables specific to Replication. REPL_TXN_MAP.
| @JsonProperty | ||
| Long txnid; | ||
|
|
||
| Long timestamp; |
There was a problem hiding this comment.
timestamp also should be JsonProperty.
There was a problem hiding this comment.
why we need timestamp to be sent if its not used ?
| } | ||
|
|
||
| @Override | ||
| public String getServicePrincipal() { |
There was a problem hiding this comment.
Need to set/get MS_SERVER_URL, MS_SERVICE_PRINCIPAL here. Only Db can be null.
| if (rqst.isSetReplPolicy()) { | ||
| List<String> rowsRepl = new ArrayList<>(); | ||
| String selectRepl = "select target_txn_id from TXN_MAP where repl_policy = " + quoteString(rqst.getReplPolicy()) + "and src_txn_id = " + rqst.getReplSrcTxnId(); | ||
| for (long i = first; i < first + numTxns; i++) { |
There was a problem hiding this comment.
This is misleading as one srcId cannot map to many txns. Either need to assert for numTxns as 1 here or use different method.
| List<String> queriesRepl = sqlGenerator.createInsertValuesStmt( | ||
| "TXN_MAP (repl_policy, src_txn_id, target_txn_id)", rowsRepl); | ||
| for (String q : queriesRepl) { | ||
| rs = stmt.executeQuery(selectRepl); |
There was a problem hiding this comment.
I think, this query is there to ensure open_txn event is idempotent. Need to see if this can be checked in server itself.
There was a problem hiding this comment.
if checked at server then we need to send extra message from hive server to meta store to check for txn validity
| import org.apache.hadoop.hive.ql.plan.api.StageType; | ||
| import org.apache.hadoop.util.StringUtils; | ||
|
|
||
| public class OpenTxnTask extends Task<OpenTxnWork> { |
There was a problem hiding this comment.
Does it make sense to have one task type ReplTxnTask which takes operation type as input and work also as per operation type? Shall re-use for all txn events.
a593043 to
a24d3a7
Compare
a24d3a7 to
63ed7fc
Compare
EVENT_OPEN_TXN:
Source Warehouse:
Create new event type EVENT_OPEN_TXN with related message format etc.
When any transaction is opened either by auto-commit mode or multi-statement mode, need to capture this event.
Repl dump should read this event from EventNotificationTable and dump the message.
Target Warehouse:
Repl load should read the event from the dump and get the message.
Open a txn in target warehouse.
Create a map of source txn ID against target txn ID and persist the same in metastore. There should be one map per replication policy (DBName.* incase of DB level replication, DBName.TableName incase of table level replication)