Permalink
Browse files

remote transactions are handled similar way as local transaction

  • Loading branch information...
Atreyee committed Apr 22, 2014
1 parent 25637ac commit 4fdf0cfa9332e04366aa8a6f93a8ba9f700ca92f
@@ -317,25 +317,26 @@ protected void processingCallback(QueueEntry next) {
LOG.info("anticache block removal done");
Long oldTxnId = next.ts.getTransactionId();
// HACK HACK HACK HACK HACK HACK
// We need to get a new txnId for ourselves, since the one that we
// were given before is now probably too far in the past
Long newTxnId = this.hstore_site.getTransactionInitializer().resetTransactionId(next.ts, next.partition);
// Now go ahead and requeue our transaction
// if(merge_needed)
next.ts.setAntiCacheMergeTable(next.catalog_tbl);
if (next.ts instanceof LocalTransaction){
// HACK HACK HACK HACK HACK HACK
// We need to get a new txnId for ourselves, since the one that we
// were given before is now probably too far in the past
Long newTxnId = this.hstore_site.getTransactionInitializer().resetTransactionId(next.ts, next.partition);
LOG.info("restartin on local");
this.hstore_site.transactionInit(next.ts);
}else{
RemoteTransaction ts = (RemoteTransaction) next.ts;
RpcCallback<UnevictDataResponse> callback = ts.getUnevictCallback();
UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder()
.setSenderSite(this.hstore_site.getSiteId())
.setOldTransactionId(oldTxnId)
.setNewTransactionId(newTxnId)
.setTransactionId(oldTxnId)
.setPartitionId(next.partition)
.setStatus(Status.OK);
callback.run(builder.build());
@@ -264,15 +264,15 @@ public void run(UnevictDataResponse response) {
HStoreThreadManager.formatSiteName(response.getSenderSite()),
HStoreThreadManager.formatSiteName(local_site_id),
response.getStatus()));
long oldTxnId = response.getOldTransactionId();
long newTxnId = response.getNewTransactionId();
long oldTxnId = response.getTransactionId();
int partition = response.getPartitionId();
LocalTransaction ts = hstore_site.getTransaction(oldTxnId);
hstore_site.getTransactionInitializer().registerTransactionRestartWithId(ts, oldTxnId, newTxnId);
assert(response.getSenderSite() != local_site_id);
// we need needs to initiate a HstoreCoordinator transactionInit for this transaction
LocalInitQueueCallback initCallback = (LocalInitQueueCallback)ts.getInitCallback();
hstore_site.getCoordinator().transactionInit(ts, initCallback);
hstore_site.getTransactionInitializer().resetTransactionId(ts, partition);
LOG.info("restartin on local");
hstore_site.transactionInit(ts);
}
}
};

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
@@ -476,8 +476,8 @@ message UnevictDataRequest {
message UnevictDataResponse {
required int32 sender_site = 1;
required Status status = 2;
required int64 old_transaction_id = 3;
required int64 new_transaction_id = 4;
required int64 transaction_id = 3;
required int32 partition_id = 4;
}
// -----------------------------------

0 comments on commit 4fdf0cf

Please sign in to comment.