Skip to content
Browse files

Fix the broken MR transaction after merging from master branch

  • Loading branch information...
1 parent 54447e5 commit 4ade43332b40eccfb77749b2d75c90274818b504 Xin Jia committed
View
94 src/frontend/edu/brown/hstore/HStoreSite.java
@@ -1225,53 +1225,59 @@ private void dispatchInvocation(LocalTransaction ts) {
// DISTRIBUTED TRANSACTION
// -------------------------------
else {
- if (d) LOG.debug(String.format("%s - Queuing distributed transaction to execute at partition %d [handle=%d]",
- ts, base_partition, ts.getClientHandle()));
-
- // Partitions
- // Figure out what partitions we plan on touching for this transaction
- Collection<Integer> predict_touchedPartitions = ts.getPredictTouchedPartitions();
-
- if (ts.isMapReduce() == false) {
- // TransactionEstimator
- // If we know we're single-partitioned, then we *don't* want to tell the Dtxn.Coordinator
- // that we're done at any partitions because it will throw an error
- // Instead, if we're not single-partitioned then that's that only time that
- // we Tell the Dtxn.Coordinator that we are finished with partitions if we have an estimate
- TransactionEstimator.State s = ts.getEstimatorState();
- if (s != null && s.getInitialEstimate() != null) {
- MarkovEstimate est = s.getInitialEstimate();
- assert(est != null);
- predict_touchedPartitions.addAll(est.getTouchedPartitions(this.thresholds));
- }
- assert(predict_touchedPartitions.isEmpty() == false) :
- "Trying to mark " + ts + " as done at EVERY partition!\n" + ts.debug();
+ if (ts.isMapReduce() && !hstore_conf.site.mr_map_blocking) {
+ if (d) LOG.debug(String.format("%s - Doing MapReduce Transaction asynchronously, start on partition %d [handle=%d]",
+ ts, base_partition, ts.getClientHandle()));
+ this.transactionStart(ts, base_partition);
}
-
- // Check whether our transaction can't run right now because its id is less than
- // the last seen txnid from the remote partitions that it wants to touch
- for (int partition : predict_touchedPartitions) {
- Long last_txn_id = this.txnQueueManager.getLastLockTransaction(partition);
- if (txn_id.compareTo(last_txn_id) < 0) {
- // If we catch it here, then we can just block ourselves until
- // we generate a txn_id with a greater value and then re-add ourselves
- if (d) {
- LOG.warn(String.format("%s - Unable to queue transaction because the last txn id at partition %d is %d. Restarting...",
- ts, partition, last_txn_id));
- LOG.warn(String.format("LastTxnId:#%s / NewTxnId:#%s",
- TransactionIdManager.toString(last_txn_id),
- TransactionIdManager.toString(txn_id)));
+ else {
+ if (d) LOG.debug(String.format("%s - Queuing distributed transaction to execute at partition %d [handle=%d]",
+ ts, base_partition, ts.getClientHandle()));
+ // Partitions
+ // Figure out what partitions we plan on touching for this transaction
+ Collection<Integer> predict_touchedPartitions = ts.getPredictTouchedPartitions();
+
+ if (ts.isMapReduce() == false) {
+ // TransactionEstimator
+ // If we know we're single-partitioned, then we *don't* want to tell the Dtxn.Coordinator
+ // that we're done at any partitions because it will throw an error
+ // Instead, if we're not single-partitioned then that's that only time that
+ // we Tell the Dtxn.Coordinator that we are finished with partitions if we have an estimate
+ TransactionEstimator.State s = ts.getEstimatorState();
+ if (s != null && s.getInitialEstimate() != null) {
+ MarkovEstimate est = s.getInitialEstimate();
+ assert(est != null);
+ predict_touchedPartitions.addAll(est.getTouchedPartitions(this.thresholds));
}
- if (hstore_conf.site.status_show_txn_info && ts.getRestartCounter() == 1) TxnCounter.BLOCKED_LOCAL.inc(ts.getProcedure());
- this.txnQueueManager.blockTransaction(ts, partition, last_txn_id);
- return;
+ assert(predict_touchedPartitions.isEmpty() == false) :
+ "Trying to mark " + ts + " as done at EVERY partition!\n" + ts.debug();
}
- } // FOR
-
- // This callback prevents us from making additional requests to the Dtxn.Coordinator until
- // we get hear back about our our initialization request
- if (hstore_conf.site.txn_profiling) ts.profiler.startInitDtxn();
- this.txnQueueManager.initTransaction(ts);
+
+ // Check whether our transaction can't run right now because its id is less than
+ // the last seen txnid from the remote partitions that it wants to touch
+ for (int partition : predict_touchedPartitions) {
+ Long last_txn_id = this.txnQueueManager.getLastLockTransaction(partition);
+ if (txn_id.compareTo(last_txn_id) < 0) {
+ // If we catch it here, then we can just block ourselves until
+ // we generate a txn_id with a greater value and then re-add ourselves
+ if (d) {
+ LOG.warn(String.format("%s - Unable to queue transaction because the last txn id at partition %d is %d. Restarting...",
+ ts, partition, last_txn_id));
+ LOG.warn(String.format("LastTxnId:#%s / NewTxnId:#%s",
+ TransactionIdManager.toString(last_txn_id),
+ TransactionIdManager.toString(txn_id)));
+ }
+ if (hstore_conf.site.status_show_txn_info && ts.getRestartCounter() == 1) TxnCounter.BLOCKED_LOCAL.inc(ts.getProcedure());
+ this.txnQueueManager.blockTransaction(ts, partition, last_txn_id);
+ return;
+ }
+ } // FOR
+
+ // This callback prevents us from making additional requests to the Dtxn.Coordinator until
+ // we get hear back about our our initialization request
+ if (hstore_conf.site.txn_profiling) ts.profiler.startInitDtxn();
+ this.txnQueueManager.initTransaction(ts);
+ }
}
}
View
24 src/frontend/edu/brown/hstore/dtxn/MapReduceTransaction.java
@@ -157,14 +157,14 @@ public LocalTransaction init(Long txn_id, long clientHandle, int base_partition,
this.mapEmit = catalog_db.getTables().get(this.catalog_proc.getMapemittable());
this.reduceEmit = catalog_db.getTables().get(this.catalog_proc.getReduceemittable());
LOG.info(" CatalogUtil.getVoltTable(thisMapEmit): -> " + this.catalog_proc.getMapemittable());
+ LOG.info("MapReduce LocalPartitionIds: " + this.hstore_site.getLocalPartitionIds());
// Get the Table catalog object for the map/reduce outputs
// For each partition there should be a map/reduce output voltTable
- for (int partition : this.hstore_site.getAllPartitionIds()) {
+ for (int partition : this.hstore_site.getLocalPartitionIds()) {
int offset = hstore_site.getLocalPartitionOffset(partition);
//int offset = partition;
- if (trace.get()) LOG.trace(String.format("Partition[%d] -> Offset[%d]", partition, offset));
- // XXX: THIS IS BROKEN!
+ LOG.info(String.format("Partition[%d] -> Offset[%d]", partition, offset));
this.local_txns[offset].init(this.txn_id, this.client_handle, partition,
Collections.singleton(partition),
this.predict_readOnly, this.predict_abortable,
@@ -400,11 +400,12 @@ public String debug() {
return (StringUtil.formatMaps(this.getDebugMap()));
}
- @Override
- public boolean isPredictSinglePartition() {
- if (debug.get()) LOG.debug("Trying to do asynchronous map execution way, txs:" + this);
- return !this.hstore_site.getHStoreConf().site.mr_map_blocking;
- }
+// @Override
+// public boolean isPredictSinglePartition() {
+// if (debug.get() && !this.hstore_site.getHStoreConf().site.mr_map_blocking)
+// LOG.debug("Trying to do asynchronous map execution way, txs:" + this);
+// return !this.hstore_site.getHStoreConf().site.mr_map_blocking;
+// }
@Override
@@ -425,12 +426,6 @@ public void finishRound(int partition) {
public VoltTable getMapOutputByPartition( int partition ) {
if (debug.get()) LOG.debug("Trying to getMapOutputByPartition: [ " + partition + " ]");
return this.mapOutput[hstore_site.getLocalPartitionOffset(partition)];
- //return this.mapOutput[partition];
- }
-
- public void setMapOutputByPartition ( VoltTable vt,int partition ) {
- if (debug.get()) LOG.debug("Trying to setMapOutputByPartition: [ " + partition + " ]");
- this.mapOutput[hstore_site.getLocalPartitionOffset(partition)] = vt;
}
public VoltTable getReduceInputByPartition ( int partition ) {
@@ -440,6 +435,7 @@ public VoltTable getReduceInputByPartition ( int partition ) {
}
public VoltTable getReduceOutputByPartition ( int partition ) {
+ if (debug.get()) LOG.debug("Trying to getReduceOutputByPartition: [ " + partition + " ]");
return this.reduceOutput[hstore_site.getLocalPartitionOffset(partition)];
//return this.reduceOutput[partition];
}
View
2 src/frontend/edu/brown/hstore/util/MapReduceHelperThread.java
@@ -171,7 +171,7 @@ protected void shuffle(final MapReduceTransaction ts) {
VoltTable table = null;
int rp = -1;
- for (int partition : this.hstore_site.getAllPartitionIds()) {
+ for (int partition : this.hstore_site.getLocalPartitionIds()) {
table = ts.getMapOutputByPartition(partition);

0 comments on commit 4ade433

Please sign in to comment.
Something went wrong with that request. Please try again.