From c0c1b021df9405e411fb4fa58159ab476f0241eb Mon Sep 17 00:00:00 2001 From: Andy Pavlo Date: Wed, 5 Oct 2011 21:33:12 +0000 Subject: [PATCH] Further optimizations and progress on the new dtxn protocol: + Reduced the amount of data that is allocated per LocalTransaction and AbstractTransaction handle. + Implemented the LocalTransactionInitCallback that is used to initialize distributed transactions on remote HStoreSites + Moved more things into HStoreObjectPools + Removed the intialization latch from LocalTransaction. I'm going to head home to start laundry and eat dinner. My goal tonight is to get single-partition txns working again... git-svn-id: https://database.cs.brown.edu/svn/hstore/branches/newdtxn-branch@2379 7a8f3d5b-8c5f-44cb-9125-2414a256df87 --- src/frontend/edu/mit/hstore/HStoreConf.java | 7 + .../edu/mit/hstore/HStoreMessenger.java | 30 ++-- .../edu/mit/hstore/HStoreObjectPools.java | 33 +++-- src/frontend/edu/mit/hstore/HStoreSite.java | 116 +++++++-------- .../hstore/callbacks/BlockingCallback.java | 11 +- .../hstore/callbacks/InitiateCallback.java | 38 ----- .../LocalTransactionInitCallback.java | 76 ++++++++++ .../RemoteTransactionInitCallback.java | 5 + .../callbacks/TransactionPrepareCallback.java | 8 +- .../callbacks/TransactionWorkCallback.java | 4 +- .../mit/hstore/dtxn/AbstractTransaction.java | 133 ++++++------------ .../edu/mit/hstore/dtxn/LocalTransaction.java | 19 +-- .../mit/hstore/dtxn/RemoteTransaction.java | 7 - src/frontend/org/voltdb/ExecutionSite.java | 45 +++--- src/protorpc/edu/brown/hstore/hstore.proto | 3 +- 15 files changed, 273 insertions(+), 262 deletions(-) delete mode 100644 src/frontend/edu/mit/hstore/callbacks/InitiateCallback.java create mode 100644 src/frontend/edu/mit/hstore/callbacks/LocalTransactionInitCallback.java diff --git a/src/frontend/edu/mit/hstore/HStoreConf.java b/src/frontend/edu/mit/hstore/HStoreConf.java index 0b11b4e482..fbc621cb75 100644 --- a/src/frontend/edu/mit/hstore/HStoreConf.java +++ b/src/frontend/edu/mit/hstore/HStoreConf.java @@ -565,6 +565,13 @@ public final class SiteConf extends Conf { experimental=false ) public int pool_forwardtxnresponses_idle; + + @ConfigProperty( + description="The max number of LocalTransactionInitCallbacks to keep idle in the pool.", + defaultInt=2500, + experimental=false + ) + public int pool_localtxninit_idle; } // ============================================================================ diff --git a/src/frontend/edu/mit/hstore/HStoreMessenger.java b/src/frontend/edu/mit/hstore/HStoreMessenger.java index d30a38bb96..a83c8514ee 100644 --- a/src/frontend/edu/mit/hstore/HStoreMessenger.java +++ b/src/frontend/edu/mit/hstore/HStoreMessenger.java @@ -452,7 +452,6 @@ protected ProtoRpcController getProtoRpcController(LocalTransaction ts, int site private final MessageRouter router_transactionFinish = new MessageRouter() { protected void sendLocal(long txn_id, TransactionFinishRequest msg, Collection partitions) { hstore_site.transactionFinish(txn_id, msg.getStatus(), partitions); - hstore_site.completeTransaction(txn_id, msg.getStatus()); } protected void sendRemote(HStoreService channel, ProtoRpcController controller, TransactionFinishRequest msg, RpcCallback callback) { channel.transactionFinish(controller, msg, callback); @@ -620,10 +619,18 @@ public void shutdown(RpcController controller, ShutdownRequest request, // ---------------------------------------------------------------------------- // TRANSACTION METHODS // ---------------------------------------------------------------------------- - - public void transactionInit(Hstore.TransactionInitRequest request, RpcCallback callback) { - - // TODO + + /** + * + * @param ts + * @param callback + */ + public void transactionInit(LocalTransaction ts, RpcCallback callback) { + Hstore.TransactionInitRequest request = Hstore.TransactionInitRequest.newBuilder() + .setTransactionId(ts.getTransactionId()) + .addAllPartitions(ts.getPredictTouchedPartitions()) + .build(); + this.router_transactionInit.sendMessages(ts, request, callback, request.getPartitionsList()); } @@ -650,15 +657,12 @@ public void transactionWork(LocalTransaction ts, Map partitions) { + public void transactionPrepare(LocalTransaction ts, RpcCallback callback, Collection partitions) { Hstore.TransactionPrepareRequest request = Hstore.TransactionPrepareRequest.newBuilder() .setTransactionId(ts.getTransactionId()) .addAllPartitions(ts.getDonePartitions()) .build(); - this.router_transactionPrepare.sendMessages(ts, - request, - ts.getPrepareCallback(), - partitions); + this.router_transactionPrepare.sendMessages(ts, request, callback, partitions); } /** @@ -683,10 +687,10 @@ public void transactionFinish(LocalTransaction ts, Hstore.Status status, RpcCall /** * Forward a StoredProcedureInvocation request to a remote site for execution * @param serializedRequest - * @param done + * @param callback * @param partition */ - public void transactionRedirect(byte[] serializedRequest, RpcCallback done, int partition) { + public void transactionRedirect(byte[] serializedRequest, RpcCallback callback, int partition) { int dest_site_id = hstore_site.getSiteIdForPartitionId(partition); if (debug.get()) LOG.debug("Redirecting transaction request to partition #" + partition + " on " + HStoreSite.formatSiteName(dest_site_id)); ByteString bs = ByteString.copyFrom(serializedRequest); @@ -694,7 +698,7 @@ public void transactionRedirect(byte[] serializedRequest, RpcCallback POOL_TXNPREPARE; + /** + * + */ + public static TypedStackObjectPool POOL_TXN_LOCALINIT; + /** * RemoteTransaction Object Pool */ @@ -52,23 +58,26 @@ public synchronized static void initialize(HStoreSite hstore_site) { if (POOL_TXNREDIRECT_REQUEST == null) { HStoreConf hstore_conf = hstore_site.getHStoreConf(); POOL_TXNREDIRECT_REQUEST = TypedStackObjectPool.factory(TransactionRedirectCallback.class, - hstore_conf.site.pool_forwardtxnrequests_idle, - hstore_conf.site.pool_profiling); + hstore_conf.site.pool_forwardtxnrequests_idle, + hstore_conf.site.pool_profiling); POOL_FORWARDTXN_RESPONSE = TypedStackObjectPool.factory(TransactionRedirectResponseCallback.class, - hstore_conf.site.pool_forwardtxnresponses_idle, - hstore_conf.site.pool_profiling); + hstore_conf.site.pool_forwardtxnresponses_idle, + hstore_conf.site.pool_profiling); POOL_TXNWORK = TypedStackObjectPool.factory(TransactionWorkCallback.class, - hstore_conf.site.pool_forwardtxnresponses_idle, - hstore_conf.site.pool_profiling); + hstore_conf.site.pool_forwardtxnresponses_idle, + hstore_conf.site.pool_profiling); POOL_TXNPREPARE = TypedStackObjectPool.factory(TransactionPrepareCallback.class, - hstore_conf.site.pool_forwardtxnresponses_idle, - hstore_conf.site.pool_profiling, hstore_site); + hstore_conf.site.pool_forwardtxnresponses_idle, + hstore_conf.site.pool_profiling, hstore_site); + POOL_TXN_LOCALINIT = TypedStackObjectPool.factory(LocalTransactionInitCallback.class, + hstore_conf.site.pool_localtxninit_idle, + hstore_conf.site.pool_profiling, hstore_site); remoteTxnPool = TypedStackObjectPool.factory(RemoteTransaction.class, - hstore_conf.site.pool_remotetxnstate_idle, - hstore_conf.site.pool_profiling); + hstore_conf.site.pool_remotetxnstate_idle, + hstore_conf.site.pool_profiling); localTxnPool = TypedStackObjectPool.factory(LocalTransaction.class, - hstore_conf.site.pool_localtxnstate_idle, - hstore_conf.site.pool_profiling); + hstore_conf.site.pool_localtxnstate_idle, + hstore_conf.site.pool_profiling); } } } diff --git a/src/frontend/edu/mit/hstore/HStoreSite.java b/src/frontend/edu/mit/hstore/HStoreSite.java index a736846a37..e447700d4f 100644 --- a/src/frontend/edu/mit/hstore/HStoreSite.java +++ b/src/frontend/edu/mit/hstore/HStoreSite.java @@ -27,9 +27,16 @@ import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Observer; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -38,9 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.collections15.CollectionUtils; import org.apache.commons.collections15.set.ListOrderedSet; -import org.apache.commons.pool.impl.StackObjectPool; import org.apache.log4j.Logger; import org.voltdb.BackendTarget; import org.voltdb.ClientResponseImpl; @@ -55,29 +60,19 @@ import org.voltdb.catalog.Partition; import org.voltdb.catalog.Procedure; import org.voltdb.catalog.Site; -import org.voltdb.client.ClientResponse; import org.voltdb.messaging.FastDeserializer; import org.voltdb.messaging.FastSerializer; import org.voltdb.messaging.FragmentTaskMessage; import org.voltdb.messaging.InitiateTaskMessage; -import org.voltdb.messaging.TransactionInfoBaseMessage; -import org.voltdb.messaging.VoltMessage; -import org.voltdb.utils.DBBPool; import ca.evanjones.protorpc.NIOEventLoop; -import ca.evanjones.protorpc.ProtoRpcChannel; -import ca.evanjones.protorpc.ProtoRpcController; -import ca.evanjones.protorpc.ProtoServer; -import com.google.protobuf.ByteString; import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; import edu.brown.catalog.CatalogUtil; import edu.brown.graphs.GraphvizExport; import edu.brown.hashing.AbstractHasher; import edu.brown.hstore.Hstore; -import edu.brown.hstore.Hstore.TransactionWorkRequest.PartitionFragment; import edu.brown.markov.EstimationThresholds; import edu.brown.markov.MarkovEdge; import edu.brown.markov.MarkovEstimate; @@ -92,14 +87,11 @@ import edu.brown.utils.*; import edu.brown.utils.LoggerUtil.LoggerBoolean; import edu.brown.workload.Workload; -import edu.mit.hstore.callbacks.ClientResponseFinalCallback; +import edu.mit.hstore.callbacks.LocalTransactionInitCallback; import edu.mit.hstore.callbacks.TransactionRedirectCallback; -import edu.mit.hstore.callbacks.TransactionRedirectResponseCallback; -import edu.mit.hstore.callbacks.InitiateCallback; -import edu.mit.hstore.callbacks.TransactionPrepareCallback; import edu.mit.hstore.callbacks.TransactionWorkCallback; -import edu.mit.hstore.dtxn.LocalTransaction; import edu.mit.hstore.dtxn.AbstractTransaction; +import edu.mit.hstore.dtxn.LocalTransaction; import edu.mit.hstore.dtxn.RemoteTransaction; import edu.mit.hstore.interfaces.Loggable; import edu.mit.hstore.interfaces.Shutdownable; @@ -166,6 +158,12 @@ public static final String formatPartitionName(int site_id, int partition_id) { */ private static HStoreSite SHUTDOWN_HANDLE = null; + + /** + * + */ + public static int LOCAL_PARTITION_OFFSETS[]; + // ---------------------------------------------------------------------------- // OBJECT POOLS // ---------------------------------------------------------------------------- @@ -177,12 +175,8 @@ public static final String formatPartitionName(int site_id, int partition_id) { */ private final TransactionIdManager txnid_managers[]; - private final DBBPool buffer_pool = new DBBPool(true, false); private final HStoreMessenger messenger; - /** ProtoServer EventLoop **/ - private final NIOEventLoop protoEventLoop = new NIOEventLoop(); - /** * Local ExecutionSite Stuff */ @@ -234,6 +228,7 @@ public static final String formatPartitionName(int site_id, int partition_id) { private final Collection single_partition_sets[]; private final int num_local_partitions; + /** PartitionId -> SiteId */ private final Map partition_site_xref = new HashMap(); @@ -262,7 +257,7 @@ public static final String formatPartitionName(int site_id, int partition_id) { /** * Keep track of which txns that we have in-flight right now */ - private final ConcurrentHashMap inflight_txns = new ConcurrentHashMap(); + private final ConcurrentHashMap local_txns = new ConcurrentHashMap(); private final ConcurrentHashMap remote_txns = new ConcurrentHashMap(); private final AtomicInteger inflight_txns_ctr[]; @@ -332,6 +327,13 @@ public HStoreSite(Site catalog_site, Map executors, Part } // FOR final int num_partitions = this.all_partitions.size(); + + LOCAL_PARTITION_OFFSETS = new int[num_partitions]; + int offset = 0; + for (Integer p : executors.keySet()) { + LOCAL_PARTITION_OFFSETS[p.intValue()] = offset++; + } // FOR + this.executors = new ExecutionSite[num_partitions]; this.executor_threads = new Thread[num_partitions]; this.txnid_managers = new TransactionIdManager[num_partitions]; @@ -477,19 +479,18 @@ public Integer getSiteIdForPartitionId(Integer partition_id) { } public LocalTransaction getLocalTransaction(long txn_id) { - return (this.inflight_txns.get(txn_id)); + return (this.local_txns.get(txn_id)); } public RemoteTransaction getRemoteTransaction(long txn_id) { - // FIXME - return (null); + return (this.remote_txns.get(txn_id)); } /** * Get the total number of transactions inflight for all partitions */ protected int getInflightTxnCount() { - return (this.inflight_txns.size()); + return (this.local_txns.size()); } /** * Get the number of transactions inflight for this partition @@ -710,7 +711,7 @@ private final class ShutdownHook implements Runnable { @Override public void run() { // Dump out our status - int num_inflight = inflight_txns.size(); + int num_inflight = local_txns.size(); if (num_inflight > 0) { System.err.println("Shutdown [" + num_inflight + " txns inflight]"); } @@ -1074,10 +1075,10 @@ private void initializeInvocation(LocalTransaction ts) { // For some odd reason we sometimes get duplicate transaction ids from the VoltDB id generator // So we'll just double check to make sure that it's unique, and if not, we'll just ask for a new one - LocalTransaction dupe = this.inflight_txns.put(txn_id, ts); + LocalTransaction dupe = this.local_txns.put(txn_id, ts); if (dupe != null) { // HACK! - this.inflight_txns.put(txn_id, dupe); + this.local_txns.put(txn_id, dupe); long new_txn_id = this.txnid_managers[base_partition].getNextUniqueTransactionId(); if (new_txn_id == txn_id) { String msg = "Duplicate transaction id #" + txn_id; @@ -1088,7 +1089,7 @@ private void initializeInvocation(LocalTransaction ts) { LOG.warn(String.format("Had to fix duplicate txn ids: %d -> %d", txn_id, new_txn_id)); txn_id = new_txn_id; ts.setTransactionId(txn_id); - this.inflight_txns.put(txn_id, ts); + this.local_txns.put(txn_id, ts); } // We have to wrap the StoredProcedureInvocation object into an InitiateTaskMessage so that it can be put @@ -1099,15 +1100,12 @@ private void initializeInvocation(LocalTransaction ts) { // SINGLE-PARTITION TRANSACTION // ------------------------------- if (hstore_conf.site.exec_avoid_coordinator && single_partitioned) { - ts.ignore_dtxn = true; - ts.init_wrapper = wrapper; - - // Always execute this mofo right away and let each ExecutionSite figure out what it needs to do + // Always execute this mofo right away and let each ExecutionSite figure out what it needs to do ExecutionSite executor = this.executors[base_partition]; assert(executor != null) : "No ExecutionSite exists for partition #" + base_partition + " at HStoreSite " + this.site_id; if (hstore_conf.site.txn_profiling) ts.profiler.startQueue(); - executor.doWork(wrapper, ts); + executor.doWork(ts, wrapper); if (hstore_conf.site.status_show_txn_info) { assert(ts.getProcedure() != null) : "Null Procedure for txn #" + txn_id; @@ -1132,9 +1130,6 @@ private void initializeInvocation(LocalTransaction ts) { // if (d && dtxn_txns.isEmpty()) LOG.debug(String.format("Enabling CANADIAN mode [txn=#%d]", txn_id)); // dtxn_txns.add(txn_id); - Hstore.TransactionInitRequest.Builder requestBuilder = Hstore.TransactionInitRequest.newBuilder(); - requestBuilder.setTransactionId(txn_id); - // Partitions // Figure out what partitions we plan on touching for this transaction Set done_partitions = ts.getDonePartitions(); @@ -1154,22 +1149,21 @@ private void initializeInvocation(LocalTransaction ts) { if (touched_partitions.contains(p) == false && p.intValue() != base_partition) done_partitions.add(p); } // FOR } - for (Integer p : this.all_partitions) { - if (done_partitions.contains(p) == false) - requestBuilder.addPartitions(p.intValue()); - } // FOR assert(done_partitions.size() != this.all_partitions.size()) : "Trying to mark " + ts + " as done at EVERY partition!"; - if (d && requestBuilder.getPartitionsCount() > 0) { - LOG.debug(String.format("Marked %s as done at %d partitions: %s", ts, requestBuilder.getPartitionsCount(), requestBuilder.getPartitionsList())); - } // This callback prevents us from making additional requests to the Dtxn.Coordinator until // we get hear back about our our initialization request if (t) LOG.trace("Using InitiateCallback for " + ts); - RpcCallback callback = new InitiateCallback(this, txn_id, ts.init_latch); + LocalTransactionInitCallback callback = null; + try { + callback = HStoreObjectPools.POOL_TXN_LOCALINIT.borrowObject(); + callback.init(ts); + } catch (Exception ex) { + throw new RuntimeException(ex); + } if (hstore_conf.site.txn_profiling) ts.profiler.startCoordinatorBlocked(); - this.messenger.transactionInit(requestBuilder.build(), callback); // txn_info.rpc_request_init + this.messenger.transactionInit(ts, callback); // txn_info.rpc_request_init } // Look at the number of inflight transactions and see whether we should block and wait for the @@ -1193,8 +1187,8 @@ private void initializeInvocation(LocalTransaction ts) { // tell the Dtxn.Coordinator to prune its queue. if (hstore_conf.site.txn_enable_queue_pruning && rand.nextBoolean() == true) { int ctr = 0; - for (Long dtxn_id : this.inflight_txns.keySet()) { - LocalTransaction _ts = this.inflight_txns.get(dtxn_id); + for (Long dtxn_id : this.local_txns.keySet()) { + LocalTransaction _ts = this.local_txns.get(dtxn_id); if (_ts == null) continue; if (_ts.isPredictSinglePartition() == false && _ts.hasStarted() == false && rand.nextInt(10) == 0) { _ts.markAsRejected(); @@ -1285,6 +1279,8 @@ public void transactionPrepare(long txn_id, Collection partitions, Coll if (this.local_partitions.contains(p) == false) continue; if (updated != null) updated.add(p); + // TODO(cjl16): Always tell the queue stuff that the transaction is finished at this partition + // If speculative execution is enabled, then we'll turn it on at the ExecutionSite // for this partition if (hstore_conf.site.exec_speculative_execution) { @@ -1314,12 +1310,20 @@ public void transactionFinish(long txn_id, Hstore.Status status, Collection { - private static final Logger LOG = Logger.getLogger(InitiateCallback.class); - private final CountDownLatch latch; - - public InitiateCallback(HStoreSite hstore_coordinator, long txnId, CountDownLatch latch) { - super(hstore_coordinator, txnId, null); - this.latch = latch; - } - - @Override - public void run(Hstore.TransactionInitRequest parameter) { - if (LOG.isTraceEnabled()) - LOG.trace("Got initialization callback for txn #" + this.txn_id + ". " + - "Releasing latch!"); - - // FIXME - this.latch.countDown(); - } -} \ No newline at end of file diff --git a/src/frontend/edu/mit/hstore/callbacks/LocalTransactionInitCallback.java b/src/frontend/edu/mit/hstore/callbacks/LocalTransactionInitCallback.java new file mode 100644 index 0000000000..f555b40bd4 --- /dev/null +++ b/src/frontend/edu/mit/hstore/callbacks/LocalTransactionInitCallback.java @@ -0,0 +1,76 @@ +package edu.mit.hstore.callbacks; + +import org.apache.log4j.Logger; + +import com.google.protobuf.RpcCallback; + +import edu.brown.hstore.Hstore; +import edu.brown.hstore.Hstore.Status; +import edu.mit.hstore.HStoreSite; +import edu.mit.hstore.dtxn.LocalTransaction; + +/** + * This callback is meant to block a transaction from executing until all of the + * partitions that it needs come back and say they're ready to execute it + * Currently we use a CountDownLatch, but this is probably not what we to actually + * do because that means a thread will have to block on it. So we need a better way of notifying + * ourselves that we can now execute a transaction. + * @author pavlo + */ +public class LocalTransactionInitCallback extends BlockingCallback { + private static final Logger LOG = Logger.getLogger(LocalTransactionInitCallback.class); + + private static final RpcCallback abort_callback = new RpcCallback() { + @Override + public void run(Hstore.TransactionFinishResponse parameter) { + // Ignore! + } + }; + + private final HStoreSite hstore_site; + private LocalTransaction ts; + + public LocalTransactionInitCallback(HStoreSite hstore_site) { + this.hstore_site = hstore_site; + } + + public void init(LocalTransaction ts) { + this.ts = ts; + super.init(ts.getPredictTouchedPartitions().size(), null); + } + + @Override + protected void finishImpl() { + this.ts = null; + } + + @Override + protected void unblockCallback() { + // TODO: Queue this LocalTransaction at the HStoreSite + } + + @Override + protected void abortCallback(Status status) { + assert(status == Hstore.Status.ABORT_REJECT); + + // If we abort, then we have to send out an ABORT_REJECT to + // all of the partitions that we originally sent INIT requests too + // Note that we do this *even* if we haven't heard back from the remote + // HStoreSite that they've acknowledged our tranasction + // We don't care when we get the response for this + this.hstore_site.getMessenger().transactionFinish(this.ts, status, abort_callback); + + // Then re-queue the transaction. We want to make sure that + // we use a new LocalTransaction handle because this one is going to get freed + this.hstore_site.transactionMispredict(this.ts, this.ts.getClientCallback()); + this.hstore_site.completeTransaction(this.ts.getTransactionId(), status); + } + + @Override + protected int runImpl(Hstore.TransactionInitResponse parameter) { + if (parameter.getStatus() != Hstore.Status.OK) { + this.abort(parameter.getStatus()); + } + return (parameter.getPartitionsCount()); + } +} \ No newline at end of file diff --git a/src/frontend/edu/mit/hstore/callbacks/RemoteTransactionInitCallback.java b/src/frontend/edu/mit/hstore/callbacks/RemoteTransactionInitCallback.java index 5b9887a0d0..498200aac6 100644 --- a/src/frontend/edu/mit/hstore/callbacks/RemoteTransactionInitCallback.java +++ b/src/frontend/edu/mit/hstore/callbacks/RemoteTransactionInitCallback.java @@ -48,4 +48,9 @@ protected void abortCallback(Hstore.Status status) { this.builder.setStatus(status); this.unblockCallback(); } + + @Override + protected int runImpl(Integer parameter) { + return 1; + } } diff --git a/src/frontend/edu/mit/hstore/callbacks/TransactionPrepareCallback.java b/src/frontend/edu/mit/hstore/callbacks/TransactionPrepareCallback.java index 313d84086d..69acc7ec45 100644 --- a/src/frontend/edu/mit/hstore/callbacks/TransactionPrepareCallback.java +++ b/src/frontend/edu/mit/hstore/callbacks/TransactionPrepareCallback.java @@ -90,10 +90,11 @@ protected void abortCallback(Status status) { // Change the response's status and send back the result to the client this.cresponse.setStatus(status); this.hstore_site.sendClientResponse(this.ts, this.cresponse); + this.hstore_site.completeTransaction(this.ts.getTransactionId(), status); } @Override - public void run(Hstore.TransactionPrepareResponse response) { + protected int runImpl(Hstore.TransactionPrepareResponse response) { final Hstore.Status status = response.getStatus(); // If any TransactionPrepareResponse comes back with anything but an OK, @@ -101,10 +102,9 @@ public void run(Hstore.TransactionPrepareResponse response) { if (status != Hstore.Status.OK) { this.abort(status); } + // Otherwise we need to update our counter to keep track of how many OKs that we got // back. We'll ignore anything that comes in after we've aborted - else if (this.isAborted() == false && this.getCounter().addAndGet(-1 * response.getPartitionsCount()) == 0) { - this.unblockCallback(); - } + return response.getPartitionsCount(); } } // END CLASS \ No newline at end of file diff --git a/src/frontend/edu/mit/hstore/callbacks/TransactionWorkCallback.java b/src/frontend/edu/mit/hstore/callbacks/TransactionWorkCallback.java index faf5bec39d..32ee69c359 100644 --- a/src/frontend/edu/mit/hstore/callbacks/TransactionWorkCallback.java +++ b/src/frontend/edu/mit/hstore/callbacks/TransactionWorkCallback.java @@ -50,9 +50,9 @@ protected void abortCallback(Status status) { } @Override - public void run(Hstore.TransactionWorkResponse.PartitionResult parameter) { + protected int runImpl(Hstore.TransactionWorkResponse.PartitionResult parameter) { this.builder.addResults(parameter); if (parameter.hasError()) this.builder.setStatus(Hstore.Status.ABORT_UNEXPECTED); - super.run(parameter); + return (1); } } \ No newline at end of file diff --git a/src/frontend/edu/mit/hstore/dtxn/AbstractTransaction.java b/src/frontend/edu/mit/hstore/dtxn/AbstractTransaction.java index 8b0bcbfe0c..473320cfe7 100644 --- a/src/frontend/edu/mit/hstore/dtxn/AbstractTransaction.java +++ b/src/frontend/edu/mit/hstore/dtxn/AbstractTransaction.java @@ -25,19 +25,17 @@ ***************************************************************************/ package edu.mit.hstore.dtxn; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.collections15.map.ListOrderedMap; import org.apache.log4j.Logger; import org.voltdb.ExecutionSite; -import org.voltdb.VoltTable; import org.voltdb.catalog.Procedure; import edu.brown.utils.Poolable; +import edu.mit.hstore.HStoreSite; /** * @author pavlo @@ -57,27 +55,16 @@ protected enum RoundState { FINISHED; } - public static String formatTxnName(Procedure catalog_proc, long txn_id) { - if (catalog_proc != null) { - return (catalog_proc.getName() + " #" + txn_id); - } - return ("#" + txn_id); - } - // ---------------------------------------------------------------------------- // GLOBAL DATA MEMBERS // ---------------------------------------------------------------------------- - /** - * - */ - public final Map> ee_dependencies = new HashMap>(); + protected long txn_id = -1; + protected long client_handle; + protected int base_partition; + protected final Set touched_partitions = new HashSet(); + protected boolean rejected; - /** - * A simple flag that lets us know that the HStoreSite is done with this guy - */ - private boolean hstoresite_finished = false; - // ---------------------------------------------------------------------------- // PREDICTIONS FLAGS // ---------------------------------------------------------------------------- @@ -89,38 +76,27 @@ public static String formatTxnName(Procedure catalog_proc, long txn_id) { private boolean predict_readOnly = false; // ---------------------------------------------------------------------------- - // EXECUTION FLAGS + // PER PARTITION EXECUTION FLAGS // ---------------------------------------------------------------------------- - - /** Whether this transaction has been read-only so far */ - protected boolean exec_readOnly[]; - /** Whether this Transaction has submitted work to the EE that may need to be rolled back */ - protected boolean exec_eeWork[]; - - /** This is set to true if the transaction did some work without an undo buffer **/ - private boolean exec_noUndoBuffer[]; - - /** - * Whether this transaction's control code is executing at this partition - */ - protected boolean exec_local; + protected RuntimeException pending_error; - // ---------------------------------------------------------------------------- - // INVOCATION DATA MEMBERS - // ---------------------------------------------------------------------------- + protected Long ee_finished_timestamp; - protected long txn_id = -1; - protected long client_handle; - protected int base_partition; - protected final Set touched_partitions = new HashSet(); protected Long last_undo_token; protected RoundState round_state; protected int round_ctr = 0; - protected Long ee_finished_timestamp; - protected boolean rejected; - protected RuntimeException pending_error; + /** Whether this transaction has been read-only so far */ + protected final boolean exec_readOnly[]; + + /** Whether this Transaction has submitted work to the EE that may need to be rolled back */ + protected final boolean exec_eeWork[]; + + /** This is set to true if the transaction did some work without an undo buffer **/ + private final boolean exec_noUndoBuffer[]; + + /** * PartitionDependencyKey @@ -140,7 +116,9 @@ public static String formatTxnName(Procedure catalog_proc, long txn_id) { * @param executor */ public AbstractTransaction() { - // FIXME: Allocate execute arrays + this.exec_readOnly = new boolean[HStoreSite.LOCAL_PARTITION_OFFSETS.length]; + this.exec_eeWork = new boolean[HStoreSite.LOCAL_PARTITION_OFFSETS.length]; + this.exec_noUndoBuffer = new boolean[HStoreSite.LOCAL_PARTITION_OFFSETS.length]; } /** @@ -162,7 +140,6 @@ protected final AbstractTransaction init(long txn_id, long client_handle, int ba this.predict_readOnly = predict_readOnly; this.predict_abortable = predict_abortable; - this.exec_local = exec_local; return (this); } @@ -179,7 +156,6 @@ public boolean isInitialized() { @Override public void finish() { this.txn_id = -1; - this.hstoresite_finished = false; this.pending_error = null; this.ee_finished_timestamp = null; this.last_undo_token = null; @@ -188,11 +164,11 @@ public void finish() { this.predict_abortable = true; for (int i = 0; i < this.exec_readOnly.length; i++) { - this.exec_readOnly[i] = true; - this.exec_eeWork[i] = false; - this.exec_noUndoBuffer[i] = false; - } - + int p = HStoreSite.LOCAL_PARTITION_OFFSETS[i]; + this.exec_readOnly[p] = true; + this.exec_eeWork[p] = false; + this.exec_noUndoBuffer[p] = false; + } // FOR this.touched_partitions.clear(); } @@ -233,8 +209,7 @@ public void initRound(long undoToken) { this.round_state = RoundState.INITIALIZED; // this.pending_error = null; - if (d) LOG.debug(String.format("Initializing new round information for %slocal txn #%d [undoToken=%d]", - (this.exec_local ? "" : "non-"), this.txn_id, undoToken)); + if (d) LOG.debug(String.format("Initializing new round information for %s [undoToken=%d]", this, undoToken)); } /** @@ -287,28 +262,28 @@ public boolean isPredictReadOnly() { * Mark this transaction as have performed some modification on this partition */ public void markExecNotReadOnly(int partition) { - this.exec_readOnly[partition] = false; + this.exec_readOnly[HStoreSite.LOCAL_PARTITION_OFFSETS[partition]] = false; } /** * Returns true if this transaction has not executed any modifying work at this partition */ public boolean isExecReadOnly(int partition) { - return (this.exec_readOnly[partition]); + return (this.exec_readOnly[HStoreSite.LOCAL_PARTITION_OFFSETS[partition]]); } /** * Returns true if this transaction executed without undo buffers at some point */ public boolean isExecNoUndoBuffer(int partition) { - return (this.exec_noUndoBuffer[partition]); + return (this.exec_noUndoBuffer[HStoreSite.LOCAL_PARTITION_OFFSETS[partition]]); } public void markExecNoUndoBuffer(int partition) { - this.exec_noUndoBuffer[partition] = true; + this.exec_noUndoBuffer[HStoreSite.LOCAL_PARTITION_OFFSETS[partition]] = true; } /** * Returns true if this transaction's control code running at this partition */ - public boolean isExecLocal() { - return this.exec_local; + public boolean isExecLocal(int partition) { + return (this.base_partition == partition); } // ---------------------------------------------------------------------------- @@ -375,18 +350,18 @@ public synchronized void setPendingError(RuntimeException error) { * Should be called whenever the txn submits work to the EE */ public void setSubmittedEE(int partition) { - this.exec_eeWork[partition] = true; + this.exec_eeWork[HStoreSite.LOCAL_PARTITION_OFFSETS[partition]] = true; } public void unsetSubmittedEE(int partition) { - this.exec_eeWork[partition] = false; + this.exec_eeWork[HStoreSite.LOCAL_PARTITION_OFFSETS[partition]] = false; } /** * Returns true if this txn has submitted work to the EE that needs to be rolled back * @return */ public boolean hasSubmittedEE(int partition) { - return (this.exec_eeWork[partition]); + return (this.exec_eeWork[HStoreSite.LOCAL_PARTITION_OFFSETS[partition]]); } // ---------------------------------------------------------------------------- @@ -418,30 +393,8 @@ public long getEE_FinishedTimestamp() { return (this.ee_finished_timestamp); } - // ---------------------------------------------------------------------------- - // Whether the HStoreSite is finished with the transaction - // This assumes that all of the ExecutionSites are finished with it too - // ---------------------------------------------------------------------------- - - - /** - * Returns true if this transaction is finished at this HStoreSite - * @return - */ - public boolean isHStoreSite_Finished() { - if (t) LOG.trace(String.format("%s - Returning HStoreSite done [val=%s, hash=%d]", this, this.hstoresite_finished, this.hashCode())); - return (this.hstoresite_finished); - } - public void setHStoreSite_Finished(boolean val) { - this.hstoresite_finished = val; - if (t) LOG.trace(String.format("%s - Setting HStoreSite done [val=%s, hash=%d]", this, this.hstoresite_finished, this.hashCode())); - } - - - /** * Get this state's transaction id - * @return */ public long getTransactionId() { return this.txn_id; @@ -449,14 +402,12 @@ public long getTransactionId() { /** * Get the current Round that this TransactionState is in * Used only for testing - * @return */ protected RoundState getCurrentRoundState() { return (this.round_state); } /** - * - * @return + * Get the last undo token used for this transaction */ public Long getLastUndoToken() { return this.last_undo_token; @@ -503,10 +454,16 @@ protected Map getDebugMap() { m.put("Transaction #", this.txn_id); m.put("Current Round State", this.round_state); m.put("Read-Only", this.exec_readOnly); - m.put("Executing Locally", this.exec_local); m.put("Last UndoToken", this.last_undo_token); m.put("# of Rounds", this.round_ctr); m.put("Pending Error", (this.pending_error != null ? this.pending_error.toString() : null)); return (m); } + + public static String formatTxnName(Procedure catalog_proc, long txn_id) { + if (catalog_proc != null) { + return (catalog_proc.getName() + " #" + txn_id); + } + return ("#" + txn_id); + } } \ No newline at end of file diff --git a/src/frontend/edu/mit/hstore/dtxn/LocalTransaction.java b/src/frontend/edu/mit/hstore/dtxn/LocalTransaction.java index 3c40b362ba..c972fe39cb 100644 --- a/src/frontend/edu/mit/hstore/dtxn/LocalTransaction.java +++ b/src/frontend/edu/mit/hstore/dtxn/LocalTransaction.java @@ -46,12 +46,12 @@ import edu.brown.markov.TransactionEstimator; import edu.brown.statistics.Histogram; -import edu.brown.utils.CountingPoolableObjectFactory; import edu.brown.utils.LoggerUtil; import edu.brown.utils.StringUtil; import edu.brown.utils.LoggerUtil.LoggerBoolean; import edu.mit.hstore.HStoreConf; import edu.mit.hstore.HStoreObjectPools; +import edu.mit.hstore.HStoreSite; import edu.mit.hstore.callbacks.TransactionPrepareCallback; /** @@ -115,10 +115,6 @@ public class LocalTransaction extends AbstractTransaction { * Whether this is a sysproc */ public boolean sysproc; - /** - * Whether this txn isn't use the Dtxn.Coordinator - */ - public boolean ignore_dtxn = false; /** Whether this txn is being executed specutatively */ private boolean exec_speculative = false; @@ -285,12 +281,10 @@ public void finish() { this.rpc_request_finish.reset(); this.state = null; - this.init_wrapper = null; this.orig_txn_id = null; this.catalog_proc = null; this.sysproc = false; this.exec_speculative = false; - this.ignore_dtxn = false; if (this.profiler != null) this.profiler.finish(); } @@ -367,7 +361,7 @@ public void finishRound() { assert(this.state.dependency_ctr == this.state.received_ctr) : "Trying to finish round for txn #" + this.txn_id + " before it was started"; assert(this.state.queued_results.isEmpty()) : "Trying to finish round for txn #" + this.txn_id + " but there are " + this.state.queued_results.size() + " queued results"; - if (d) LOG.debug("Finishing " + (this.exec_local ? "" : "non-") + "local round for txn #" + this.txn_id); + if (d) LOG.debug(String.format("Finishing round #%d for %s", this.round_ctr, this)); synchronized (this.state) { super.finishRound(); @@ -377,7 +371,7 @@ public void finishRound() { if (t) LOG.debug("Setting CountDownLatch to null for txn #" + this.txn_id); this.state.dependency_latch = null; } - } // SYNCHRONIZED + } // SYNCH } /** @@ -431,9 +425,7 @@ public TransactionPrepareCallback getPrepareCallback() { public RpcCallback getClientCallback() { return (this.client_callback); } - public CountDownLatch getInitializationLatch() { - return (this.init_latch); - } + /** * Return the original txn id that this txn was restarted for (after a mispredict) * @return @@ -723,7 +715,7 @@ private void processResultResponse(final int partition, final int dependency_id, assert(result != null); assert(this.round_state == RoundState.INITIALIZED || this.round_state == RoundState.STARTED) : "Invalid round state " + this.round_state + " for txn #" + this.txn_id; - assert(this.exec_local) : + assert(this.isExecLocal(partition) == false) : "Trying to store result for txn #" + this.txn_id + " but it is not executing locally!"; DependencyInfo dinfo = null; @@ -863,7 +855,6 @@ public String debug() { m = new ListOrderedMap(); m.put("Exec Single-Partitioned", this.isExecSinglePartition()); m.put("Exec Read Only", this.exec_readOnly); - m.put("Exec Locally", this.exec_local); m.put("Speculative Execution", this.exec_speculative); m.put("Touched Partitions", this.state.exec_touchedPartitions); maps.add(m); diff --git a/src/frontend/edu/mit/hstore/dtxn/RemoteTransaction.java b/src/frontend/edu/mit/hstore/dtxn/RemoteTransaction.java index 9179a0e968..30b1ed30a0 100644 --- a/src/frontend/edu/mit/hstore/dtxn/RemoteTransaction.java +++ b/src/frontend/edu/mit/hstore/dtxn/RemoteTransaction.java @@ -26,12 +26,10 @@ package edu.mit.hstore.dtxn; import org.apache.log4j.Logger; -import org.voltdb.ExecutionSite; import com.google.protobuf.RpcCallback; import edu.brown.hstore.Hstore; -import edu.brown.utils.CountingPoolableObjectFactory; import edu.brown.utils.LoggerUtil; import edu.brown.utils.StringUtil; import edu.brown.utils.LoggerUtil.LoggerBoolean; @@ -86,11 +84,6 @@ public void finishRound() { super.finishRound(); } - @Override - public boolean isHStoreSite_Finished() { - return (true); - } - /** * Return the previously stored callback for a FragmentTaskMessage * @return diff --git a/src/frontend/org/voltdb/ExecutionSite.java b/src/frontend/org/voltdb/ExecutionSite.java index 787050eef5..1de5f181d5 100644 --- a/src/frontend/org/voltdb/ExecutionSite.java +++ b/src/frontend/org/voltdb/ExecutionSite.java @@ -347,6 +347,11 @@ public VoltProcedure makeObjectImpl() throws Exception { */ private final Map tmp_transactionRequestBuildersMap = new HashMap(); + /** + * PartitionId -> List + */ + private final Map> ee_dependencies = new HashMap>(); + // ---------------------------------------------------------------------------- // PROFILING OBJECTS // ---------------------------------------------------------------------------- @@ -924,17 +929,15 @@ protected void processFragmentResponseMessage(LocalTransaction ts, FragmentRespo * @param itask */ protected void processInitiateTaskMessage(LocalTransaction ts, InitiateTaskMessage itask) { - final long txn_id = ts.getTransactionId(); - final boolean predict_singlePartition = ts.isPredictSinglePartition(); + if (hstore_conf.site.txn_profiling) ts.profiler.startExec(); // Always reset the ExecutionState this.execState.clear(); ts.setExecutionState(this.execState); - if (hstore_conf.site.txn_profiling) ts.profiler.startExec(); - ExecutionMode spec_exec = ExecutionMode.COMMIT_ALL; boolean release_latch = false; + boolean predict_singlePartition = ts.isPredictSinglePartition(); synchronized (this.exec_mode) { // If this is going to be a multi-partition transaction, then we will mark it as the current dtxn // for this ExecutionSite. There should be no other dtxn running right now that this partition @@ -1115,7 +1118,7 @@ private boolean canProcessClientResponseNow(LocalTransaction ts, Hstore.Status s */ private void processFragmentTaskMessage(AbstractTransaction ts, FragmentTaskMessage ftask) { // A txn is "local" if the Java is executing at the same site as we are - boolean is_local = ts.isExecLocal(); + boolean is_local = ts.isExecLocal(this.partitionId); boolean is_dtxn = ftask.isUsingDtxnCoordinator(); if (t) LOG.trace(String.format("Executing FragmentTaskMessage %s [partition=%d, is_local=%s, is_dtxn=%s, fragments=%s]", ts, ftask.getSourcePartitionId(), is_local, is_dtxn, Arrays.toString(ftask.getFragmentIds()))); @@ -1253,18 +1256,18 @@ private DependencySet executeFragmentTaskMessage(AbstractTransaction ts, Fragmen } } // FOR - ts.ee_dependencies.clear(); + this.ee_dependencies.clear(); if (ftask.hasAttachedResults()) { if (t) LOG.trace("Retrieving internal dependency results attached to FragmentTaskMessage for " + ts); - ts.ee_dependencies.putAll(ftask.getAttachedResults()); + this.ee_dependencies.putAll(ftask.getAttachedResults()); } LocalTransaction local_ts = null; - if (ftask.hasInputDependencies() && ts != null && ts.isExecLocal() == true) { + if (ftask.hasInputDependencies() && ts != null && ts.isExecLocal(this.partitionId) == true) { local_ts = (LocalTransaction)ts; if (local_ts.getInternalDependencyIds().isEmpty() == false) { if (t) LOG.trace("Retrieving internal dependency results from TransactionState for " + ts); - local_ts.removeInternalDependencies(ftask, local_ts.ee_dependencies); + local_ts.removeInternalDependencies(ftask, this.ee_dependencies); } } @@ -1284,7 +1287,7 @@ private DependencySet executeFragmentTaskMessage(AbstractTransaction ts, Fragmen // HACK: We have to set the TransactionState for sysprocs manually volt_proc.setTransactionState(ts); ts.markExecNotReadOnly(this.partitionId); - result = volt_proc.executePlanFragment(ts.getTransactionId(), ts.ee_dependencies, (int)fragmentIds[0], parameterSets[0], this.m_systemProcedureContext); + result = volt_proc.executePlanFragment(ts.getTransactionId(), this.ee_dependencies, (int)fragmentIds[0], parameterSets[0], this.m_systemProcedureContext); if (t) LOG.trace("Finished executing sysproc fragments for " + volt_proc.getClass().getSimpleName()); // ------------------------------- // REGULAR FRAGMENTS @@ -1390,10 +1393,10 @@ private DependencySet executePlanFragments(AbstractTransaction ts, long undoToke } // pass attached dependencies to the EE (for non-sysproc work). - if (ts.ee_dependencies.isEmpty() == false) { - if (t) LOG.trace("Stashing Dependencies: " + ts.ee_dependencies.keySet()); + if (this.ee_dependencies.isEmpty() == false) { + if (t) LOG.trace("Stashing Dependencies: " + this.ee_dependencies.keySet()); // assert(dependencies.size() == input_depIds.length) : "Expected " + input_depIds.length + " dependencies but we have " + dependencies.size(); - ee.stashWorkUnitDependencies(ts.ee_dependencies); + ee.stashWorkUnitDependencies(this.ee_dependencies); } ts.setSubmittedEE(this.partitionId); @@ -1481,10 +1484,8 @@ public void loadTable(AbstractTransaction ts, String clusterName, String databas * @param callback the RPC handle to send the response to */ public void doWork(RemoteTransaction ts, FragmentTaskMessage task, RpcCallback callback) { - long txn_id = task.getTxnId(); - boolean read_only = task.isReadOnly(); assert(ts.isInitialized()); - + if (ts != this.current_dtxn) { synchronized (this.exec_mode) { assert(this.current_dtxn_blocked.isEmpty()) : @@ -1495,7 +1496,7 @@ public void doWork(RemoteTransaction ts, FragmentTaskMessage task, RpcCallback partitions = CollectionUtils.subtract(ts.getPredictTouchedPartitions(), ts.getDonePartitions()); - this.hstore_messenger.transactionPrepare(ts, partitions); + this.hstore_messenger.transactionPrepare(ts, ts.getPrepareCallback(), partitions); } // ABORT: Distributed Transaction else { diff --git a/src/protorpc/edu/brown/hstore/hstore.proto b/src/protorpc/edu/brown/hstore/hstore.proto index 7571143b83..c41c4dc117 100644 --- a/src/protorpc/edu/brown/hstore/hstore.proto +++ b/src/protorpc/edu/brown/hstore/hstore.proto @@ -46,7 +46,8 @@ message TransactionInitRequest { message TransactionInitResponse { required int64 transaction_id = 1; - required Status status = 2; + repeated int32 partitions = 2 [packed=true]; + required Status status = 3; } // -----------------------------------