HIVE-19089 : Create/Replicate AllocWriteId Event#329
Conversation
| return 0; | ||
| case REPL_ALLOC_WRITE_ID: | ||
| List<Long> targetTxnIds = txnManager.replGetTargetTxnIds(replPolicy, work.getTxnIds()); | ||
| txnManager.allocateTableWriteIdsBatch(targetTxnIds, work.getDbName(), work.getTableName()); |
There was a problem hiding this comment.
Instead of 2 metastore calls, it can be just one method replAllocateTableWriteIdsBatch which takes replPolicy, srcTxnsIds and writeIds as input. If input is valid, then take repl flow and get targetTxnId and allocate writeId. In fact AllocateTableWriteIdsRequest can be modified to take optional repl arguments.
There was a problem hiding this comment.
remove gettargettxn id and pass replpolicy, txntowriteid list in the request
| this.tableName = tableName; | ||
| } | ||
|
|
||
| public String getReplPolicy() { |
There was a problem hiding this comment.
getReplPolicy can be a utility function which takes db and table name as input. This can be kept in a separate ql/parse/load/Utils.java class. This makes ReplTxnWork class very clean where we set db/table name only in constructor and replPolicy is just an input.
There was a problem hiding this comment.
move the construction of repl policy to a utility class
| @@ -250,7 +250,8 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws Sema | |||
| // repl id stored in the database/table is valid. | |||
There was a problem hiding this comment.
Do we really need this method shouldReplayEvent as all txn events are idempotent when it reaches metastore and able to find it from REPL_TO_TXN_MAP table?
Yes, this may help for the case where we repeat OPEN-ABORT/COMMIT events. But, even with this method, we cannot guarantee idempotent behaviour as db/table name can be null here. So, better to push this low level before allocating writeID or copy data files during commit. Also, need to check it on each tables if a particular commit operates on multiple tables.
There was a problem hiding this comment.
remove this if check ..let it filer it for all operation
There was a problem hiding this comment.
check if some checks can be done at lower layer based on table/partition object specially for write with commit and alloc write id events
| if (txnIds != null && listeners != null && !listeners.isEmpty()) { | ||
| MetaStoreListenerNotifier.notifyEvent(listeners, EventType.OPEN_TXN, | ||
| new OpenTxnEvent(txnIds, this)); | ||
| MetaStoreListenerNotifier.notifyTxnEvent(listeners, EventType.OPEN_TXN, |
There was a problem hiding this comment.
Shall use original notify() method itself if last 2 parameters are null. Also, add implementation for the same.
There was a problem hiding this comment.
revert back this call to notifyevent . change notifyTxnEvent to notifyEventDirectSql and change the interface name also
| AllocateTableWriteIdsResponse response = getTxnHandler().allocateTableWriteIds(rqst); | ||
| if (listeners != null && !listeners.isEmpty()) { | ||
| MetaStoreListenerNotifier.notifyTxnEvent(listeners, EventType.ALLOC_WRITE_ID, | ||
| new AllocWriteIdEvent(rqst.getTxnIds(), rqst.getTableName(), this), null, null); |
There was a problem hiding this comment.
AllocateWriteIdEvent should take writeIds as well to ensure idempotent behaviour in target when apply. Better to pass TxnToWriteId list.
Also, shouldn't use the regular allocate writeid flow in target. For repl flow, need to check if the latest allocated writeid > requested writeid, then noop. If allocated writeid < requested writeid, then throw exception as it is invalid usage where they skipped some events.
There was a problem hiding this comment.
ignore if its greater and fail load if its smaller
| * map if no parameters were updated or if no listeners were notified. | ||
| * @throws MetaException If an error occurred while calling the listeners. | ||
| */ | ||
| public static Map<String, String> notifyTxnEvent(List<? extends MetaStoreEventListener> listeners, |
There was a problem hiding this comment.
Shall name is more generic to denote directSQL implementation. I would suggest notifyEventWithDirectSql()
| public class AllocWriteIdEvent extends ListenerEvent { | ||
|
|
||
| private final List<Long> txnIds; | ||
| private final String tableName; |
There was a problem hiding this comment.
Even database name should be part of the event, else, how can we filter this event for given database during REPL DUMP? Also, it may end of allocating write id on a table which was not intended to. For reference, Create table event takes DB name as input.
There was a problem hiding this comment.
change it to txntowriteid list and add dbname also
| (event.getEventType().equals(MessageFactory.COMMIT_TXN_EVENT)) || | ||
| (event.getEventType().equals(MessageFactory.ABORT_TXN_EVENT)) | ||
| (event.getEventType().equals(MessageFactory.ABORT_TXN_EVENT)) || | ||
| (event.getEventType().equals(MessageFactory.ALLOC_WRITE_ID_EVENT)) |
There was a problem hiding this comment.
Allocate write ID is a DB/table related event and should be ignored if not lookup on this DB/table.
There was a problem hiding this comment.
remove this check for alloc write id
22b3335 to
09a5a19
Compare
| tbl = context.db.getTable(dbName, tableName); | ||
| } catch (InvalidTableException e) { | ||
| // return a empty list if table does not exist. | ||
| return new ArrayList<>(); |
There was a problem hiding this comment.
If the current dump have both create table and allocate write id events, then this table object would be missing in metastore. But this is a valid scenario and need to allocate write id. Shall refer to last repl ID if database object to decide this.
There was a problem hiding this comment.
During load, by the time alloc write id is replayed ..create table should have been done.
There was a problem hiding this comment.
move all these check to replTxntask
| //context table name is passed to ReplTxnWork , as the repl policy will be created based | ||
| //on this table name. ReplPolicy is used to extract the target transaction id based on source | ||
| //transaction id. | ||
| ReplTxnWork work = new ReplTxnWork(dbName, context.tableName, ReplTxnWork.OperationType.REPL_ALLOC_WRITE_ID, |
There was a problem hiding this comment.
We shall directly set tableName instead of setting it later. replPolicy can be passed as input.
| int numTxn = 50000; | ||
| List<Long> txnList = openTxnForTest(1, numTxn, "default.*"); | ||
| assert(txnList.size() == numTxn); | ||
| txnHandler.abortTxns(new AbortTxnsRequest(txnList)); |
There was a problem hiding this comment.
What are we testing here? No verification after abort.
There was a problem hiding this comment.
the assert is added in the common function oopenTxnFortest
There was a problem hiding this comment.
change the assert ..just count the number of entry added for repltotxnmap
| } | ||
| allocMsg = new AllocateTableWriteIdsRequest("default1", "tbl1"); | ||
| allocMsg.setReplPolicy("default1.*"); | ||
| allocMsg.setTxnToWriteIdList(txnToWriteId); |
There was a problem hiding this comment.
Here, the input txnid should be that of source (default). Not target(default1) txn ids.
There was a problem hiding this comment.
here the transactions are started as target only ..and then those txn ids are passed to alloc the write id
There was a problem hiding this comment.
change the txn list to source and update the asserts
| allocMsg.setReplPolicy("default1.*"); | ||
| allocMsg.setTxnToWriteIdList(txnToWriteId); | ||
| txnToWriteId1 = txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds(); | ||
| assert (txnToWriteId.equals(txnToWriteId1)); |
There was a problem hiding this comment.
The result may not match as txn id of source and target doesn't match. However, you can validate if txn id mapped to write id properly or not.
There was a problem hiding this comment.
The txns are always target txn ids only
| } | ||
| } | ||
|
|
||
| private List<Long> getValidTxnIds(AllocateTableWriteIdsRequest rqst, Statement stmt, ResultSet rs) |
There was a problem hiding this comment.
The method name can be better such as checkAndGetTargetTxnIds.
|
|
||
| int size = txnToWriteIds.size(); | ||
| for (TxnToWriteId txnToWriteId : txnToWriteIds) { | ||
| Long targetTxnId = getTargetTxnId(rqst.getReplPolicy(), txnToWriteId.getTxnId(), stmt); |
There was a problem hiding this comment.
The implementation of getTargetTxnId can be changed to take list of source txn ids and use IN condition for queries.
There was a problem hiding this comment.
in can not guarantee the input to output map ..we may not get the target txn id correctly
| // Request msg to allocate table write ids for the given list of txns | ||
| struct AllocateTableWriteIdsRequest { | ||
| 1: required list<i64> txnIds, | ||
| 1: optional list<i64> txnIds, |
There was a problem hiding this comment.
Optional members can be moved to bottom and keep repl related members together.
There was a problem hiding this comment.
txnIds is not replication related ..it will be used by normal flow ..so have kept it at the starting
| 2: required string dbName, | ||
| 3: required string tableName, | ||
| 4: optional string replPolicy, | ||
| 5: optional list<TxnToWriteId> txnToWriteIdList, |
There was a problem hiding this comment.
rename as srcTxnToWriteIdList. Also, add a comment that txn and write ids in this list are assumed to be sorted in ascending order and are in continuous sequence.
|
|
||
| // Get the next write id for the table and check if its in consistent with the input txntowriteids. | ||
| String s = sqlGenerator.addForUpdateClause( | ||
| "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) |
There was a problem hiding this comment.
Next_write_id is queried twice in this allocate write id flow. One here and other by caller after this call.
Add a comment, that this query is run under read-committed connection but still it is guaranteed that repeated query on next_write_id within this connection is consistent as this is repl flow and we assume that only repl task would allocate write id.
Also, check if we can refactor and avoid this repeated call.
627b08c to
6f993ea
Compare
| long txnId = txnList.get((int) i); | ||
| assertEquals(i + startId, txnId); | ||
| } | ||
| int numTxnPresentNow = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXNS"); |
There was a problem hiding this comment.
Instead of 2 queries on TXNS table, shall add where clause for range of txnids allocated.
| " and RTM_REPL_POLICY = \'" + replPolicy + "\'"); | ||
| assertEquals(numReplTxns, targetTxnId.size()); | ||
|
|
||
| String[] output = TxnDbUtil.queryToString(conf, "select RTM_TARGET_TXN_ID from REPL_TXN_MAP where " + |
There was a problem hiding this comment.
Instead of 2 queries, can't we just compare the size of "output" for count?
| public void testReplOpenTxn() throws Exception { | ||
| int numTxn = 50000; | ||
| List<Long> txnList = openTxnForTest(1, numTxn, "default.*"); | ||
| int numTxnPresentStart = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXNS"); |
There was a problem hiding this comment.
Unused query result. Redundant code.
| allocMsg = new AllocateTableWriteIdsRequest("default", "tbl"); | ||
| allocMsg.setReplPolicy("default.*"); | ||
| allocMsg.setTxnToWriteIdList(txnToWriteId); | ||
| allocMsg.setSrcTxnToWriteIdList(txnToWriteId); |
There was a problem hiding this comment.
This is an invalid usage where write ids are allocated using normal flow first and then through repl flow. Also, the input txnToWriteId should be that of source but here it is that of target itself.
For idempotent case, we call same method twice with same input parameters.
I think, we shall remove this code here as idempotent case is verified somewhere below this test.
|
|
||
| for (int idx = 0; idx < txnList.size(); idx++) { | ||
| txnToWriteId.add(new TxnToWriteId(txnList.get(idx), idx+1)); | ||
| txnToWriteId.add(new TxnToWriteId(startTxnId+idx, idx+1)); |
There was a problem hiding this comment.
Better to use appropriate variable names such as srcTxnToWriteId for input and targetTxnToWriteId for output.
| } | ||
|
|
||
| Collections.sort(txnIds); //easier to read logs | ||
| Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow |
There was a problem hiding this comment.
What assumption in replication flow?
| * @throws LockException | ||
| */ | ||
| void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy, | ||
| List<TxnToWriteId> txnToWriteIdList) throws LockException; |
There was a problem hiding this comment.
rename to srcTxnToWriteIdList.
| } | ||
| @Override | ||
| public void replAllocateTableWriteIdsBatch(String dbName, String tableName, | ||
| String replPolicy, List<TxnToWriteId> txnToWriteIdList) throws LockException { |
There was a problem hiding this comment.
rename to srcTxnToWriteIdList.
| } | ||
|
|
||
| public void replAllocateTableWriteIdsBatch(String dbName, String tableName, | ||
| String replPolicy, List<TxnToWriteId> txnToWriteIdList) throws LockException { |
There was a problem hiding this comment.
rename to srcTxnToWriteIdList.
| String tableName = (context.tableName != null && !context.tableName.isEmpty() ? context.tableName : msg | ||
| .getTableName()); | ||
|
|
||
| //context table name is passed to ReplTxnWork , as the repl policy will be created based |
| LOG.debug("Table does not exist so, ignoring the operation"); | ||
| //TODO : need to return if table does not exist. Will be done once acid table replication is implemented. | ||
| //return 0; | ||
| LOG.info("Table does not exist so, ignoring the operation"); |
There was a problem hiding this comment.
Edit the comment to include that this could be idempotent case.
| ReplicationSpec replicationSpec = work.getReplicationSpec(); | ||
| if (replicationSpec != null && !replicationSpec.allowReplacementInto(tbl.getParameters())) { | ||
| // if the event is already replayed, then no need to replay it again. | ||
| LOG.debug("ReplTxnTask: Event is skipped as it is already replayed"); |
There was a problem hiding this comment.
Shall log the event type and event id.
| return true; | ||
| } | ||
| if (tbl != null) { | ||
| Map<String, String> params = tbl.getParameters(); |
There was a problem hiding this comment.
This code snippet to validate repl state is duplicated for database case as well. Can we have a static method in ReplicationSpec which takes current event id and parameters map as arguments?
| private String server, servicePrincipal, dbName, tableName; | ||
|
|
||
| @JsonProperty | ||
| private List<Long> txnIdList, writeIdList; |
There was a problem hiding this comment.
Can't we make list of TxnToWriteId in son format too? instead if 2 more lists.
There was a problem hiding this comment.
json does not support TxnToWriteId
| } | ||
|
|
||
| @Override | ||
| public String getDbName() { |
There was a problem hiding this comment.
Why 2 methods to get dbName?
There was a problem hiding this comment.
removed the autogenerated one
|
|
||
| import javax.sql.DataSource; | ||
|
|
||
| import com.google.common.collect.Lists; |
There was a problem hiding this comment.
Unused import as we changed all to Collections.singletonList, right?
| s = sqlGenerator.addForUpdateClause( | ||
| "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) | ||
| + " and nwi_table = " + quoteString(tblName)); | ||
| LOG.debug("Going to execute query <" + s + ">"); |
There was a problem hiding this comment.
why do we need to move this select from bottom?
There was a problem hiding this comment.
code refactoring to have one select per flow
|
|
||
| if (txnIds.size() == 0) { | ||
| // Idempotent case for replication flow. | ||
| return new AllocateTableWriteIdsResponse(new ArrayList<>()); |
There was a problem hiding this comment.
txn leak. should rollback.
There was a problem hiding this comment.
connection close will rollback the transaction
| @@ -1287,27 +1391,16 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds | |||
| return new AllocateTableWriteIdsResponse(txnToWriteIds); | |||
There was a problem hiding this comment.
txn leak here as well. need rollback.
There was a problem hiding this comment.
connection close will rollback the transaction
| 3: required string tableName, | ||
| 1: required string dbName, | ||
| 2: required string tableName, | ||
| 3: optional list<i64> txnIds, |
There was a problem hiding this comment.
add a comment saying either txnIds or replPolicy+srcTxnToWriteIdList can exist in a call.
| return txnIds; | ||
| public ReplTxnWork(String replPolicy, String dbName, String tableName, Long txnId, | ||
| OperationType type, ReplicationSpec replicationSpec) { | ||
| this(replPolicy, dbName, tableName, Lists.newArrayList(txnId), type, null, replicationSpec); |
There was a problem hiding this comment.
Singleton list can be used.
28442c3 to
bf712f7
Compare
bf712f7 to
d7f7b75
Compare
EVENT_ALLOCATE_WRITE_ID
Source Warehouse:
Create new event type EVENT_ALLOCATE_WRITE_ID with related message format etc.
Capture this event when allocate a table write ID from the sequence table by ACID operation.
Repl dump should read this event from EventNotificationTable and dump the message.
Target Warehouse:
Repl load should read the event from the dump and get the message.
Validate if source txn ID from the event is there in the source-target txn ID map. If not there, just noop the event.
If valid, then Allocate table write ID from sequence table
Extend listener notify event API to add two new parameter , dbconn and sqlgenerator to add the events to notification_log table within the same transaction