From 9a08f0e61f88bb89f4a0f94c7610aa721d440661 Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Tue, 31 Dec 2013 22:15:45 +0800 Subject: [PATCH] code clean up 4 --- .../brown/hstore/TransactionQueueManager.java | 198 +++++++++--------- 1 file changed, 103 insertions(+), 95 deletions(-) diff --git a/src/frontend/edu/brown/hstore/TransactionQueueManager.java b/src/frontend/edu/brown/hstore/TransactionQueueManager.java index 2ddaa3cc71..b510171ada 100644 --- a/src/frontend/edu/brown/hstore/TransactionQueueManager.java +++ b/src/frontend/edu/brown/hstore/TransactionQueueManager.java @@ -1,6 +1,7 @@ package edu.brown.hstore; import java.util.ArrayList; +import java.util.Arrays; import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.LinkedHashMap; @@ -74,7 +75,7 @@ public class TransactionQueueManager extends ExceptionHandlingRunnable implement // ---------------------------------------------------------------------------- // TRANSACTION PARTITION LOCKS QUEUES // ---------------------------------------------------------------------------- - + /** * Contains one queue for every partition managed by this coordinator */ @@ -83,7 +84,7 @@ public class TransactionQueueManager extends ExceptionHandlingRunnable implement private final ReentrantLock lockQueueBarriers[]; /** - * The last txns that was executed for each partition + * The last txns that was executed for each partition * Our local partitions must be accurate, but we can be off for the remote ones. * This should be just the transaction id, since the AbstractTransaction handles * could have been cleaned up by the time we need this data. @@ -95,12 +96,13 @@ public class TransactionQueueManager extends ExceptionHandlingRunnable implement // ---------------------------------------------------------------------------- // TRANSACTIONS THAT NEED TO ADDED TO LOCK QUEUES // ---------------------------------------------------------------------------- - + /** - * A queue of transactions that need to be added to the lock queues at the partitions + * A queue of transactions that need to be added to the lock queues at the partitions * at this site. */ - private final BlockingQueue initQueue; + private final BlockingQueue initQueue; + // ---------------------------------------------------------------------------- // TRANSACTIONS THAT NEED TO BE REQUEUED @@ -111,8 +113,8 @@ public class TransactionQueueManager extends ExceptionHandlingRunnable implement * NOTE: Anything that shows up in this queue will be deleted by this manager */ private final BlockingQueue> restartQueue; - // new ConcurrentLinkedQueue>(); - + // new ConcurrentLinkedQueue>(); + // ---------------------------------------------------------------------------- // INTIALIZATION // ---------------------------------------------------------------------------- @@ -145,15 +147,14 @@ public TransactionQueueManager(HStoreSite hstore_site) { this.profilers[partition] = new TransactionQueueManagerProfiler(); } // FOR Arrays.fill(this.lockQueueLastTxns, Long.valueOf(-1l)); - + // Use updateConf() to initialize our internal values from the HStoreConf this.updateConf(this.hstore_conf, null); - + // Add a EventObservable that will tell us when the first non-sysproc // request arrives from a client. This will then tell the queues that its ok // to increase their limits if they're empty - if (hstore_site.getJvmSnapshotManager() != null && - hstore_site.getJvmSnapshotManager().isParent()) { + if (hstore_site.getJvmSnapshotManager() != null && hstore_site.getJvmSnapshotManager().isParent()) { hstore_site.getStartWorkloadObservable().addObserver(new EventObserver() { public void update(EventObservable o, HStoreSite arg) { for (PartitionLockQueue queue : lockQueues) { @@ -162,13 +163,13 @@ public void update(EventObservable o, HStoreSite arg) { }; }); } + if (debug.val) LOG.debug(String.format("Created %d %s for %s", - this.localPartitions.size(), - PartitionLockQueue.class.getSimpleName(), - hstore_site.getSiteName())); + this.localPartitions.size(), PartitionLockQueue.class.getSimpleName(), + hstore_site.getSiteName())); } - + @Override public void updateConf(HStoreConf hstore_conf, String[] changed) { this.initThrottleThreshold = (int)(hstore_conf.site.network_incoming_limit_txns * hstore_conf.site.queue_threshold_factor); @@ -186,13 +187,13 @@ public void updateConf(HStoreConf hstore_conf, String[] changed) { queue.reset(); } } // FOR - + } - + // ---------------------------------------------------------------------------- // RUN METHOD // ---------------------------------------------------------------------------- - + private class Initializer extends ExceptionHandlingRunnable { public void runImpl() { Thread self = Thread.currentThread(); @@ -202,7 +203,7 @@ public void runImpl() { self.setName(HStoreThreadManager.getThreadName(hstore_site, HStoreConstants.THREAD_NAME_QUEUE_INIT)); } hstore_site.getThreadManager().registerProcessingThread(); - + if (debug.val) LOG.debug(String.format("Starting %s thread", this.getClass().getSimpleName())); AbstractTransaction nextTxn = null; @@ -216,7 +217,7 @@ public void runImpl() { } // WHILE }; } - + private class Restarter extends ExceptionHandlingRunnable { @Override public void runImpl() { @@ -224,7 +225,7 @@ public void runImpl() { self.setName(HStoreThreadManager.getThreadName(hstore_site, HStoreConstants.THREAD_NAME_QUEUE_RESTART)); hstore_site.getThreadManager().registerProcessingThread(); - + if (debug.val) LOG.debug(String.format("Starting %s thread", this.getClass().getSimpleName())); Pair pair = null; @@ -236,19 +237,19 @@ public void runImpl() { } LocalTransaction ts = pair.getFirst(); Status status = pair.getSecond(); - + if (trace.val) LOG.trace(String.format("%s - Ready to restart transaction [status=%s]", ts, status)); Status ret = hstore_site.transactionRestart(ts, status); if (trace.val) - LOG.trace(String.format( "%s - Got return result %s after restarting", ts, ret)); - + LOG.trace(String.format("%s - Got return result %s after restarting", ts, ret)); + ts.unmarkNeedsRestart(); hstore_site.queueDeleteTransaction(ts.getTransactionId(), status); } // WHILE } } - + /** * Every time this thread gets woken up, it locks the queues, loops through the txn_queues, * and looks at the lowest id in each queue. If any id is lower than the last_txn id for @@ -262,7 +263,7 @@ public void runImpl() { public void runImpl() { int numInitialzers = 1; int numRestarters = 1; - + List threads = new ArrayList(); for (int i = 0; i < numInitialzers; i++) { Thread t = new Thread(new Initializer()); @@ -280,7 +281,7 @@ public void runImpl() { threads.add(t); } // FOR } - + for (Thread t : threads) { try { t.join(); @@ -290,14 +291,15 @@ public void runImpl() { } } // FOR } - + /** * Reject any and all transactions that are in our queues! */ public void clearQueues(int partition) { AbstractTransaction ts = null; // Long txnId = null; - + + if (debug.val) LOG.debug("Clearing out lock queue for partition " + partition); // LOCK QUEUES synchronized (this.lockQueues[partition]) { @@ -308,20 +310,20 @@ public void clearQueues(int partition) { this.lockQueueLastTxns[partition]); } // WHILE } // SYNCH - + // INIT QUEUE // while ((ts = this.initQueues[partition].poll()) != null) { // TransactionInitQueueCallback callback = ts.getTransactionInitQueueCallback(); // callback.abort(Status.ABORT_REJECT); // } // WHILE - + // RESTART QUEUE Pair pair = null; while ((pair = this.restartQueue.poll()) != null) { hstore_site.transactionReject(pair.getFirst(), Status.ABORT_REJECT); } // WHILE } - + // ---------------------------------------------------------------------------- // INIT QUEUES // ---------------------------------------------------------------------------- @@ -331,23 +333,23 @@ private boolean initTransaction(AbstractTransaction nextTxn) { LocalTransaction localTxn = (LocalTransaction)nextTxn; if (localTxn.profiler != null) localTxn.profiler.startQueueLock(); } - + PartitionCountingCallback callback = nextTxn.getInitCallback(); - assert (callback.isInitialized()) : + assert(callback.isInitialized()) : String.format("Unexpected uninitialized %s for %s\n%s", callback.getClass().getSimpleName(), - nextTxn, callback.toString()); + nextTxn, callback.toString()); boolean ret = (callback.isAborted() == false); Status status = null; - + if (trace.val) LOG.trace(String.format("Adding %s to lock queus for partitions %s\n%s", nextTxn, nextTxn.getPredictTouchedPartitions(), callback)); for (int partition : nextTxn.getPredictTouchedPartitions().values()) { // Skip any non-local partition if (this.lockQueues[partition] == null) continue; - - // If this txn gets rejected when we try to insert it, then we + + // If this txn gets rejected when we try to insert it, then we // just need to stop trying to add it to other partitions if (ret) { status = this.lockQueueInsert(nextTxn, partition, callback); @@ -364,7 +366,7 @@ private boolean initTransaction(AbstractTransaction nextTxn) { } return (ret); } - + /** * Queue a brand new transaction at this HStoreSite to be added into * the appropriate lock queues for the partitions that it needs to access. @@ -379,7 +381,7 @@ protected void queueTransactionInit(AbstractTransaction ts) { } this.initQueue.add(ts); } - + /** * Add a new transaction to this queue manager. * Returns true if the transaction was successfully inserted at all partitions. @@ -393,9 +395,9 @@ protected Status lockQueueInsert(AbstractTransaction ts, int partition, PartitionCountingCallback callback) { if (hstore_conf.site.queue_profiling) profilers[partition].init_time.start(); - assert (ts.isInitialized()) : + assert(ts.isInitialized()) : String.format("Unexpected uninitialized transaction %s [partition=%d]", ts, partition); - assert (this.hstore_site.isLocalPartition(partition)) : + assert(this.hstore_site.isLocalPartition(partition)) : String.format("Trying to add %s to non-local partition %d", ts, partition); // This is actually bad and should never happen. But for the sake of trying @@ -406,39 +408,39 @@ protected Status lockQueueInsert(AbstractTransaction ts, if (hstore_conf.site.queue_profiling) profilers[partition].init_time.stopIfStarted(); return (Status.ABORT_UNEXPECTED); } - + if (debug.val) LOG.debug(String.format("Adding %s into lockQueue for partition %d [allPartitions=%s]", ts, partition, ts.getPredictTouchedPartitions())); - + // We can preemptively check whether this txnId is greater than // the largest one that we know about at a partition - // We don't need to acquire the lock on last_txns at this partition because + // We don't need to acquire the lock on last_txns at this partition because // all that we care about is that whatever value is in there now is greater than // the what the transaction was trying to use. - // 2012-12-03 - There is a race condition here where we may get back the last txn that + // 2012-12-03 - There is a race condition here where we may get back the last txn that // was released but then it was deleted and cleaned-up. This means that its txn id // might be null. A better way to do this is to only have each PartitionExecutor - // insert the new transaction into its queue. + // insert the new transaction into its queue. Long txn_id = ts.getTransactionId(); Long next_safe_id = null; Status status = Status.OK; - + this.lockQueueBarriers[partition].lock(); try { next_safe_id = this.lockQueues[partition].noteTransactionRecievedAndReturnLastSafeTxnId(txn_id); } finally { this.lockQueueBarriers[partition].unlock(); } // SYNCH - + // The next txnId that we're going to try to execute is already greater // than this new txnId that we were given! Rejection! if (next_safe_id != null && next_safe_id.compareTo(txn_id) > 0) { - if (debug.val) + if (debug.val) LOG.warn(String.format("The next safe lockQueue txn for partition #%d is %s but this " + "is greater than our new txn %s. Rejecting...", partition, next_safe_id, ts)); - status = Status.ABORT_RESTART; + status = Status.ABORT_RESTART; } // Our queue is overloaded. We have to reject the txnId! else { @@ -457,6 +459,7 @@ protected Status lockQueueInsert(AbstractTransaction ts, status = Status.ABORT_REJECT; } } + // Reject the txn if (status != Status.OK) { @@ -467,10 +470,14 @@ protected Status lockQueueInsert(AbstractTransaction ts, profilers[partition].init_time.stopIfStarted(); } } + else if (trace.val) { + LOG.trace(String.format("Added %s to initQueue for partition %d [queueSize=%d]", + ts, partition, this.lockQueues[partition].size())); + } if (hstore_conf.site.queue_profiling) profilers[partition].init_time.stopIfStarted(); return (status); } - + /** * Check whether there are any transactions that need to be released for execution * at the partitions controlled by this queue manager @@ -481,7 +488,7 @@ protected AbstractTransaction checkLockQueue(int partition) throws InterruptedEx if (trace.val) LOG.trace(String.format("Checking lock queue for partition %d [queueSize=%d]", partition, this.lockQueues[partition].size())); - + // Poll the queue and get the next value. AbstractTransaction nextTxn = null; this.lockQueueBarriers[partition].lockInterruptibly(); @@ -490,17 +497,17 @@ protected AbstractTransaction checkLockQueue(int partition) throws InterruptedEx } finally { this.lockQueueBarriers[partition].unlock(); } // SYNCH - + if (nextTxn == null) { if (hstore_conf.site.queue_profiling) profilers[partition].lock_time.stopIfStarted(); return (nextTxn); } PartitionCountingCallback callback = nextTxn.getInitCallback(); - assert (callback.isInitialized()) : + assert(callback.isInitialized()) : String.format("Uninitialized %s callback for %s [hashCode=%d]", callback.getClass().getSimpleName(), nextTxn, callback.hashCode()); - + // HACK if (nextTxn.isAborted()) { if (debug.val) @@ -528,10 +535,11 @@ else if (callback.isAborted()) { partition, nextTxn, callback.getClass().getSimpleName())); this.lockQueueLastTxns[partition] = nextTxn.getTransactionId(); } - + + if (nextTxn != null) { // Send the init request for the specified partition - if (debug.val) + if (debug.val) LOG.debug(String.format("%s - Invoking %s.run() for partition %d", nextTxn, nextTxn.getInitCallback().getClass().getSimpleName(), partition)); try { @@ -548,10 +556,10 @@ else if (callback.isAborted()) { nextTxn, partition); throw new ServerFaultException(msg, ex, nextTxn.getTransactionId()); } - + // Mark the txn being released to the given partition nextTxn.markReleased(partition); - + if (trace.val && nextTxn != null) LOG.trace(String.format("Finished processing lock queue for partition %d [next=%s]", partition, nextTxn)); @@ -559,7 +567,8 @@ else if (callback.isAborted()) { if (hstore_conf.site.queue_profiling) profilers[partition].lock_time.stopIfStarted(); return (nextTxn); } - + + /** * Mark the transaction as being finished with the given local partition. This can be called * either before or after the transaction was initialized at all partitions. @@ -568,16 +577,16 @@ else if (callback.isAborted()) { * @param partition */ public void lockQueueFinished(AbstractTransaction ts, Status status, int partition) { - assert (ts.isInitialized()) : + assert(ts.isInitialized()) : String.format("Unexpected uninitialized transaction %s [status=%s, partition=%d]", ts, status, partition); - assert (ts.getPredictTouchedPartitions().contains(partition)) : + assert(ts.getPredictTouchedPartitions().contains(partition)) : String.format("Trying to remove %s from partition %d lock queue but it " + - "is not one of its original partitions: %s", + "is not one of its original partitions: %s", ts, partition, ts.getPredictTouchedPartitions()); - assert (this.hstore_site.isLocalPartition(partition)) : + assert(this.hstore_site.isLocalPartition(partition)) : "Trying to mark txn #" + ts + " as finished on remote partition #" + partition; - + // If the given txnId is the current transaction at this partition and still holds // the lock on the partition, then we want to make sure that we don't have to // look into the queue to see if it's in there. @@ -589,8 +598,8 @@ public void lockQueueFinished(AbstractTransaction ts, Status status, int partiti LOG.trace(String.format("%s is the last txn released at partition %d", ts, partition)); checkQueue = false; - - + } + // Always attempt to remove it from this partition's queue // If this remove() returns false, then we know that our transaction wasn't // sitting in the queue for that partition. @@ -605,16 +614,16 @@ public void lockQueueFinished(AbstractTransaction ts, Status status, int partiti if (debug.val && removed) LOG.warn(String.format("Removed %s from partition %d queue", ts, partition)); } - + // Calling contains() is super slow, so we'll only do this if we have tracing enabled if (trace.val) { - assert (this.lockQueues[partition].contains(ts) == false) : + assert(this.lockQueues[partition].contains(ts) == false) : String.format("The %s for partition %d contains %s even though it should not! " + - "[checkQueue=%s, removed=%s]", - this.lockQueues[partition].getClass().getSimpleName(), partition, - checkQueue, removed); + "[checkQueue=%s, removed=%s]", + this.lockQueues[partition].getClass().getSimpleName(), partition, + checkQueue, removed); } - + // Make sure that if this txn is being aborted, that everyone // that is part of it knows what's going on. PartitionCountingCallback callback = ts.getInitCallback(); @@ -622,7 +631,7 @@ public void lockQueueFinished(AbstractTransaction ts, Status status, int partiti } finally { // this.lockQueueBarriers[partition].unlock(); } // SYNCH - + if (debug.val) LOG.warn(String.format("%s is finished on partition %d " + "[status=%s, checkQueue=%s, removed=%s]", @@ -632,7 +641,7 @@ public void lockQueueFinished(AbstractTransaction ts, Status status, int partiti // ---------------------------------------------------------------------------- // INTERNAL METHODS // ---------------------------------------------------------------------------- - + /** * Reject the given transaction at this QueueManager. * @param ts @@ -645,17 +654,17 @@ private void rejectTransaction(AbstractTransaction ts, Status status, int reject_partition, Long reject_txnId) { - assert (ts.isInitialized()) : + assert(ts.isInitialized()) : String.format("Uninitialized transaction handle %s [status=%s, rejectPartition=%d]", ts, status, reject_partition); - assert (reject_txnId != null) : + assert(reject_txnId != null) : String.format("Null reject txn id for %s [status=%s, rejectPartition=%d]", ts, status, reject_partition); if (debug.val) { Long txnId = ts.getTransactionId(); boolean is_valid = (reject_txnId == null || txnId.compareTo(reject_txnId) > 0); LOG.debug(String.format("Rejecting %s on partition %d. Blocking until a txnId greater than #%d " + - "[status=%s, valid=%s]", + "[status=%s, valid=%s]", ts, reject_partition, reject_txnId, status, is_valid)); } @@ -669,11 +678,11 @@ private void rejectTransaction(AbstractTransaction ts, "[status=%s, rejectPartition=%d, rejectTxnId=%s]\n" + "Failed Callback: %s", ts, status, reject_partition, reject_txnId, callback); - if (debug.val) LOG.warn(msg, ex); + if (debug.val) LOG.warn(msg, ex); throw new RuntimeException(msg, ex); } } - + // ---------------------------------------------------------------------------- // BLOCKED DTXN QUEUE MANAGEMENT // ---------------------------------------------------------------------------- @@ -684,9 +693,9 @@ private void rejectTransaction(AbstractTransaction ts, * @param txn_id */ public void markLastTransaction(int partition, Long txn_id) { - assert (this.hstore_site.isLocalPartition(partition) == false) : + assert(this.hstore_site.isLocalPartition(partition) == false) : "Trying to mark the last seen txnId for local partition #" + partition; - + // This lock is low-contention because we don't update the last txnId seen // at partitions very often. synchronized (this.lockQueueLastTxns[partition]) { @@ -709,17 +718,15 @@ public void markLastTransaction(int partition, Long txn_id) { * @param status */ public void restartTransaction(LocalTransaction ts, Status status) { - assert (ts != null) : + assert(ts != null) : String.format("Unexpected null transaction %s [status=%s]", ts, status); - assert (ts.isInitialized()) : + assert(ts.isInitialized()) : String.format("Unexpected uninitialized transaction %s [status=%s]", ts, status); - + if (debug.val) - LOG.debug(String.format( - "%s - Requeing transaction for execution [status=%s]", ts, - status)); + LOG.debug(String.format("%s - Requeing transaction for execution [status=%s]", ts, status)); ts.markNeedsRestart(); - + if (this.restartQueue.offer(Pair.of(ts, status)) == false) { if (debug.val) LOG.debug(String.format("%s - Unable to add txn to restart queue. Rejecting...", ts)); @@ -731,12 +738,12 @@ public void restartTransaction(LocalTransaction ts, Status status) { if (debug.val) LOG.debug(String.format("%s - Successfully added txn to restart queue.", ts)); } - + private void checkRestartQueue() { if (debug.val && this.restartQueue.isEmpty() == false) LOG.trace(String.format("Checking whether we can restart %d held txns", this.restartQueue.size())); - + Pair pair = null; int limit = CHECK_RESTART_QUEUE_LIMIT; while ((pair = this.restartQueue.poll()) != null) { @@ -748,7 +755,7 @@ private void checkRestartQueue() { Status ret = this.hstore_site.transactionRestart(ts, status); if (trace.val) LOG.trace(String.format("%s - Got return result %s after restarting", ts, ret)); - + ts.unmarkNeedsRestart(); this.hstore_site.queueDeleteTransaction(ts.getTransactionId(), status); if (limit-- == 0) break; @@ -813,7 +820,7 @@ public String debug() { idx = 0; for (int partition : this.localPartitions.values()) { - debug[idx++] += line + StringUtil.join("\n", this.lockQueues[partition]); + debug[idx++] += line + StringUtil.join("\n", this.lockQueues[partition]); } // FOR return (StringUtil.columns(debug)); } @@ -867,7 +874,7 @@ public Long getCurrentTransaction(int partition) { return (lockQueueLastTxns[partition]); } } - + private TransactionQueueManager.Debug cachedDebugContext; public TransactionQueueManager.Debug getDebugContext() { if (cachedDebugContext == null) { @@ -877,3 +884,4 @@ public TransactionQueueManager.Debug getDebugContext() { return cachedDebugContext; } } +