Permalink
Browse files

Multi-site distributed txn works now. Yay :D :D :D

  • Loading branch information...
Atreyee committed Apr 24, 2014
1 parent a0f7e4f commit 13539d00a3c50e9b9d6787215792c150f5059cf1
@@ -334,11 +334,12 @@ protected void processingCallback(QueueEntry next) {
LOG.info("restartin on local");
this.hstore_site.transactionInit(next.ts);
}else{
ee.antiCacheMergeBlocks(next.catalog_tbl);
RemoteTransaction ts = (RemoteTransaction) next.ts;
RpcCallback<UnevictDataResponse> callback = ts.getUnevictCallback();
UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder()
.setSenderSite(this.hstore_site.getSiteId())
.setTransactionId(oldTxnId)
.setTransactionId(ts.getNewTransactionId())
.setPartitionId(next.partition)
.setStatus(Status.OK);
callback.run(builder.build());
@@ -266,15 +266,15 @@ public void run(UnevictDataResponse response) {
response.getStatus()));
long oldTxnId = response.getTransactionId();
int partition = response.getPartitionId();
LocalTransaction ts = hstore_site.getTransaction(oldTxnId);
System.out.println(ts+"***************");
assert(response.getSenderSite() != local_site_id);
hstore_site.getTransactionInitializer().resetTransactionId(ts, ts.getBasePartition());
LOG.info("restartin on local");
hstore_site.transactionInit(ts);
//LocalInitQueueCallback initCallback = (LocalInitQueueCallback)ts.getInitCallback();
//hstore_site.getCoordinator().transactionInit(ts, initCallback);
LocalInitQueueCallback initCallback = (LocalInitQueueCallback)ts.getInitCallback();
hstore_site.getCoordinator().transactionInit(ts, initCallback);
}
}
};
@@ -843,6 +843,9 @@ public void unevictData(RpcController controller,
System.out.println(request.getTransactionId());
assert(ts!=null);
ts.setUnevictCallback(done);
ts.setNewTransactionId(request.getNewTransactionId());
int partition = request.getPartitionId();
Table catalog_tbl = hstore_site.getCatalogContext().getTableById(request.getTableId());
short[] block_ids = new short[request.getBlockIdsList().size()];
@@ -1349,7 +1352,8 @@ public void sendHeartbeat() {
public void sendUnevictDataMessage(int remote_site_id, LocalTransaction txn, int partition_id, Table catalog_tbl, short[] block_ids, int[] tuple_offsets) {
Builder builder = UnevictDataRequest.newBuilder()
.setSenderSite(this.local_site_id)
.setTransactionId(txn.getTransactionId())
.setTransactionId(txn.getOldTransactionId())
.setNewTransactionId(txn.getTransactionId())
.setPartitionId(partition_id)
.setTableId(catalog_tbl.getRelativeIndex());
@@ -2417,10 +2417,10 @@ else if (orig_ts.getRestartCounter() <= 2) { // FIXME
LOG.info(String.format("error has partition id %d", error.getPartitionId()));
if(orig_ts.getBasePartition()!=error.getPartitionId() && !this.isLocalPartition(error.getPartitionId())){
this.anticacheManager.queue(orig_ts, error.getPartitionId(), evicted_table, block_ids, tuple_offsets);
}else{
this.anticacheManager.queue(new_ts, error.getPartitionId(), evicted_table, block_ids, tuple_offsets);
new_ts.setOldTransactionId(orig_ts.getTransactionId());
}
this.anticacheManager.queue(new_ts, error.getPartitionId(), evicted_table, block_ids, tuple_offsets);
}
@@ -4261,7 +4261,6 @@ else if (ts.isPredictSinglePartition()) {
if (hstore_conf.site.exec_profiling) this.profiler.network_time.start();
this.hstore_site.responseSend(ts, cresponse);
if (hstore_conf.site.exec_profiling) this.profiler.network_time.stopIfStarted();
LOG.info("deleted here!!!!");
this.hstore_site.queueDeleteTransaction(ts.getTransactionId(), status);
}
// -------------------------------
@@ -240,7 +240,6 @@ public void runImpl() {
LOG.trace(String.format("%s - Got return result %s after restarting", ts, ret));
ts.unmarkNeedsRestart();
LOG.info("deleted here!!!!");
hstore_site.queueDeleteTransaction(ts.getTransactionId(), status);
} // WHILE
}
@@ -374,7 +373,6 @@ protected void queueTransactionInit(AbstractTransaction ts) {
if (localTxn.profiler != null) localTxn.profiler.startInitQueue();
}
this.initQueue.add(ts);
LOG.info("put in our txn queue");
}
/**
@@ -727,7 +725,6 @@ public void restartTransaction(LocalTransaction ts, Status status) {
LOG.debug(String.format("%s - Unable to add txn to restart queue. Rejecting...", ts));
this.hstore_site.transactionReject(ts, Status.ABORT_REJECT);
ts.unmarkNeedsRestart();
LOG.info("deleted here!!!!");
this.hstore_site.queueDeleteTransaction(ts.getTransactionId(), Status.ABORT_REJECT);
return;
}
@@ -753,7 +750,6 @@ private void checkRestartQueue() {
LOG.trace(String.format("%s - Got return result %s after restarting", ts, ret));
ts.unmarkNeedsRestart();
LOG.info("deleted here!!!!");
this.hstore_site.queueDeleteTransaction(ts.getTransactionId(), status);
if (limit-- == 0) break;
} // WHILE
@@ -119,7 +119,6 @@ protected final void abortCallback(Status status) {
}
}
this.abortFinished = true;
LOG.info("deleted here!!!!");
this.hstore_site.queueDeleteTransaction(this.txn_id, status);
}
@@ -76,7 +76,6 @@ protected void unblockCallback() {
this.ts.getInitCallback().cancel();
try {
LOG.info("deleted here!!!!");
this.hstore_site.queueDeleteTransaction(this.ts.getTransactionId(), this.status);
} catch (Throwable ex) {
String msg = String.format("Failed to queue %s for deletion from %s",
@@ -339,7 +339,6 @@ public final void abort(int partition, Status status) {
// FIXME
}
} else {
LOG.info("deleted here!!!!");
this.hstore_site.queueDeleteTransaction(this.ts.getTransactionId(), status);
}
this.abortFinished = true;
@@ -1367,6 +1367,7 @@ public final void clearReadWriteSets() {
private Debug cachedDebugContext;
private RpcCallback<UnevictDataResponse> unevict_callback;
private long new_transaction_id;
public Debug getDebugContext() {
if (this.cachedDebugContext == null) {
// We don't care if we're thread-safe here...
@@ -1382,4 +1383,12 @@ public void setUnevictCallback(RpcCallback<UnevictDataResponse> done) {
return this.unevict_callback;
}
public void setNewTransactionId(long newTransactionId) {
this.new_transaction_id = newTransactionId;
}
public long getNewTransactionId(){
return this.new_transaction_id;
}
}
@@ -186,6 +186,7 @@
* each query in this transaction.
*/
private final Histogram<Statement> exec_stmtCounters = new ObjectHistogram<Statement>();
private Long old_transaction_id;
// ----------------------------------------------------------------------------
// INITIALIZATION
@@ -948,5 +949,13 @@ else if (d.hasTasksReleased()) {
return (sb.toString());
}
public void setOldTransactionId(Long transactionId) {
this.old_transaction_id = transactionId;
}
public Long getOldTransactionId(){
return this.old_transaction_id;
}
}

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
Oops, something went wrong.

0 comments on commit 13539d0

Please sign in to comment.