From c434fa220c8d809285fd520e5bf5751019515ccc Mon Sep 17 00:00:00 2001 From: Andy Pavlo Date: Fri, 9 Mar 2012 15:00:13 -0500 Subject: [PATCH] Oh man this a good one. We're getting closer to getting the LocalCluster regression suite working again. We fixed up HStoreSite + HStoreCoordinator shutdown() methods to make sure that we kill everything from one test case to the other --- .../edu/brown/hstore/HStoreCoordinator.java | 11 ++++ src/frontend/edu/brown/hstore/HStoreSite.java | 54 ++++++++----------- .../edu/brown/hstore/PartitionExecutor.java | 3 +- .../hstore/dtxn/TransactionQueueManager.java | 23 +++++++- src/frontend/org/voltdb/VoltDB.java | 13 +++-- .../voltdb/regressionsuites/LocalCluster.java | 36 ++++++------- 6 files changed, 81 insertions(+), 59 deletions(-) diff --git a/src/frontend/edu/brown/hstore/HStoreCoordinator.java b/src/frontend/edu/brown/hstore/HStoreCoordinator.java index 19c42f741d..262c9bb492 100644 --- a/src/frontend/edu/brown/hstore/HStoreCoordinator.java +++ b/src/frontend/edu/brown/hstore/HStoreCoordinator.java @@ -121,6 +121,8 @@ public class HStoreCoordinator implements Shutdownable { private final TransactionFinishDispatcher transactionFinish_dispatcher; private final TransactionRedirectDispatcher transactionRedirect_dispatcher; + private final List dispatcherThreads = new ArrayList(); + private Shutdownable.ShutdownState state = ShutdownState.INITIALIZED; private final EventObservable ready_observable = new EventObservable(); @@ -230,18 +232,21 @@ public synchronized void start() { Thread t = new Thread(transactionInit_dispatcher, HStoreSite.getThreadName(this.hstore_site, "init")); t.setDaemon(true); t.start(); + this.dispatcherThreads.add(t); } if (this.transactionFinish_dispatcher != null) { if (debug.get()) LOG.debug("Starting FinishTransaction dispatcher thread"); Thread t = new Thread(transactionFinish_dispatcher, HStoreSite.getThreadName(this.hstore_site, "finish")); t.setDaemon(true); t.start(); + this.dispatcherThreads.add(t); } if (this.transactionRedirect_dispatcher != null) { if (debug.get()) LOG.debug("Starting ForwardTxn dispatcher thread"); Thread t = new Thread(transactionRedirect_dispatcher, HStoreSite.getThreadName(this.hstore_site, "frwd")); t.setDaemon(true); t.start(); + this.dispatcherThreads.add(t); } if (debug.get()) LOG.debug("Starting listener thread"); @@ -283,6 +288,12 @@ public synchronized void shutdown() { this.state = ShutdownState.SHUTDOWN; try { + // Kill all of our dispatchers + for (Thread t : this.dispatcherThreads) { + if (trace.get()) LOG.trace("Stopping dispatcher thread " + t.getName()); + t.interrupt(); + } // FOR + if (trace.get()) LOG.trace("Stopping eventLoop for Site #" + this.getLocalSiteId()); this.eventLoop.exitLoop(); diff --git a/src/frontend/edu/brown/hstore/HStoreSite.java b/src/frontend/edu/brown/hstore/HStoreSite.java index 93f56de81f..54c934088c 100644 --- a/src/frontend/edu/brown/hstore/HStoreSite.java +++ b/src/frontend/edu/brown/hstore/HStoreSite.java @@ -158,11 +158,6 @@ public static final String formatPartitionName(int site_id, int partition_id) { return (HStoreSite.getThreadName(site_id, null, partition_id)); } - /** - * TODO - */ - private static HStoreSite SHUTDOWN_HANDLE = null; - // ---------------------------------------------------------------------------- // OBJECT POOLS // ---------------------------------------------------------------------------- @@ -351,6 +346,10 @@ public static final String formatPartitionName(int site_id, int partition_id) { protected HStoreSite(Site catalog_site, HStoreConf hstore_conf) { assert(catalog_site != null); + // **IMPORTANT** + // Always clear out the CatalogUtil cache before we start our new HStoreSite + CatalogUtil.clearCache(catalog_site); + this.hstore_conf = hstore_conf; this.catalog_site = catalog_site; this.catalog_db = CatalogUtil.getDatabase(this.catalog_site); @@ -460,6 +459,9 @@ protected HStoreSite(Site catalog_site, HStoreConf hstore_conf) { * @return */ public int getLocalPartitionOffset(int partition) { + assert(partition < this.local_partition_offsets.length) : + String.format("Unable to get offset of local partition %d %s [hashCode=%d]", + partition, Arrays.toString(this.local_partition_offsets), this.hashCode()); return this.local_partition_offsets[partition]; } @@ -781,13 +783,7 @@ public void update(EventObservable> o, Pair 0) { - System.err.println("Shutdown [" + num_inflight + " txns inflight]"); - } - } - } // END CLASS +// private final class ShutdownHook implements Runnable { +// @Override +// public void run() { +// // Dump out our status +// int num_inflight = inflight_txns.size(); +// if (num_inflight > 0) { +// System.err.println("Shutdown [" + num_inflight + " txns inflight]"); +// } +// } +// } // END CLASS @Override public void prepareShutdown(boolean error) { @@ -898,6 +883,9 @@ public synchronized void shutdown() { // Stop the monitor thread if (this.status_monitor != null) this.status_monitor.shutdown(); + // Kill the queue manager + this.txnQueueManager.shutdown(); + // Tell our local boys to go down too for (PartitionExecutorPostProcessor p : this.processors) { p.shutdown(); diff --git a/src/frontend/edu/brown/hstore/PartitionExecutor.java b/src/frontend/edu/brown/hstore/PartitionExecutor.java index abb04ad5ab..cd4bbd4bd4 100644 --- a/src/frontend/edu/brown/hstore/PartitionExecutor.java +++ b/src/frontend/edu/brown/hstore/PartitionExecutor.java @@ -603,8 +603,7 @@ else if (target == BackendTarget.NATIVE_EE_JNI) { } // just print error info an bail if we run into an error here catch (final Exception ex) { - LOG.fatal("Failed to initialize PartitionExecutor", ex); - VoltDB.crashVoltDB(); + throw new RuntimeException("Failed to initialize PartitionExecutor", ex); } this.ee = eeTemp; this.hsql = hsqlTemp; diff --git a/src/frontend/edu/brown/hstore/dtxn/TransactionQueueManager.java b/src/frontend/edu/brown/hstore/dtxn/TransactionQueueManager.java index 0a16f494b2..328282e2ef 100644 --- a/src/frontend/edu/brown/hstore/dtxn/TransactionQueueManager.java +++ b/src/frontend/edu/brown/hstore/dtxn/TransactionQueueManager.java @@ -14,13 +14,14 @@ import edu.brown.hstore.Hstoreservice.Status; import edu.brown.hstore.callbacks.TransactionInitWrapperCallback; import edu.brown.hstore.interfaces.Loggable; +import edu.brown.hstore.interfaces.Shutdownable; import edu.brown.hstore.util.TxnCounter; import edu.brown.logging.LoggerUtil; import edu.brown.logging.LoggerUtil.LoggerBoolean; import edu.brown.statistics.Histogram; import edu.brown.utils.CollectionUtil; -public class TransactionQueueManager implements Runnable, Loggable { +public class TransactionQueueManager implements Runnable, Loggable, Shutdownable { private static final Logger LOG = Logger.getLogger(TransactionQueueManager.class); private final static LoggerBoolean debug = new LoggerBoolean(LOG.isDebugEnabled()); private final static LoggerBoolean trace = new LoggerBoolean(LOG.isTraceEnabled()); @@ -38,6 +39,8 @@ public class TransactionQueueManager implements Runnable, Loggable { private final Collection localPartitions; private final int localPartitionsArray[]; + private boolean stop = false; + /** * */ @@ -155,7 +158,7 @@ public void run() { long txn_id = -1; long last_id = -1; - while (true) { + while (this.stop == false) { synchronized (this) { try { this.wait(this.wait_time * 10); // FIXME @@ -471,4 +474,20 @@ public TransactionInitPriorityQueue getQueue(int partition) { public TransactionInitWrapperCallback getCallback(long txn_id) { return (this.txn_callbacks.get(txn_id)); } + + @Override + public void prepareShutdown(boolean error) { + // Nothing for now + // Probably should abort all queued txns. + } + + @Override + public void shutdown() { + this.stop = true; + } + + @Override + public boolean isShuttingDown() { + return (this.stop); + } } diff --git a/src/frontend/org/voltdb/VoltDB.java b/src/frontend/org/voltdb/VoltDB.java index effdf9ce66..932e7654be 100644 --- a/src/frontend/org/voltdb/VoltDB.java +++ b/src/frontend/org/voltdb/VoltDB.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.TimeZone; +import edu.brown.hstore.HStore; import edu.brown.hstore.HStoreConstants; import edu.brown.hstore.HStoreSite; @@ -212,10 +213,14 @@ public static void crashVoltDB() { System.err.println(t.toString()); } - HStoreSite.crash(); -// System.err.println("VoltDB has encountered an unrecoverable error and is exiting."); -// System.err.println("The log may contain additional information."); -// System.exit(-1); + HStoreSite handle = HStore.instance(); + if (handle != null) { + handle.getCoordinator().shutdownCluster(); + } else { + System.err.println("H-Store has encountered an unrecoverable error and is exiting."); + System.err.println("The log may contain additional information."); + System.exit(-1); + } } /** diff --git a/tests/frontend/org/voltdb/regressionsuites/LocalCluster.java b/tests/frontend/org/voltdb/regressionsuites/LocalCluster.java index 860f62e9dc..75ff249271 100644 --- a/tests/frontend/org/voltdb/regressionsuites/LocalCluster.java +++ b/tests/frontend/org/voltdb/regressionsuites/LocalCluster.java @@ -58,8 +58,8 @@ public class LocalCluster implements VoltServerConfig { // configuration data final String m_jarFileName; + final int m_partitionPerSite; final int m_siteCount; - final int m_hostCount; final int m_replication; final BackendTarget m_target; final String m_buildDir; @@ -147,15 +147,15 @@ public void run() { } public LocalCluster(String jarFileName, int siteCount, - int hostCount, int replication, BackendTarget target) + int partitionsPerSite, int replication, BackendTarget target) { System.out.println("Instantiating LocalCluster for " + jarFileName); - System.out.println("Sites: " + siteCount + " hosts: " + hostCount + System.out.println("Sites: " + siteCount + " hosts: " + partitionsPerSite + " replication factor: " + replication); assert (jarFileName != null); assert (siteCount > 0); - assert (hostCount > 0); + assert (partitionsPerSite > 0); assert (replication >= 0); /*// (1) Load catalog from Jar @@ -187,9 +187,9 @@ public LocalCluster(String jarFileName, int siteCount, System.err.println("XXXXXXXXXXXXXXXXXXXXX\n" + CatalogInfo.getInfo(this.catalog, new File(jarFileName)));*/ m_jarFileName = VoltDB.Configuration.getPathToCatalogForTest(jarFileName); - m_siteCount = siteCount; + m_partitionPerSite = siteCount; m_target = target; - m_hostCount = hostCount; + m_siteCount = partitionsPerSite; m_replication = replication; String buildDir = System.getenv("VOLTDB_BUILD_DIR"); // via build.xml if (buildDir == null) @@ -242,7 +242,7 @@ public boolean compile(VoltProjectBuilder builder) { if (m_compiled) { return true; } - m_compiled = builder.compile(m_jarFileName, m_siteCount, m_hostCount, + m_compiled = builder.compile(m_jarFileName, m_partitionPerSite, m_siteCount, m_replication, "localhost"); // (1) Load catalog from Jar @@ -251,8 +251,8 @@ public boolean compile(VoltProjectBuilder builder) { // (2) Update catalog to include target cluster configuration ClusterConfiguration cc = new ClusterConfiguration(); // Update cc with a bunch of hosts/sites/partitions - for (int site = 0, currentPartition = 0; site < m_hostCount; ++site) { - for (int partition = 0; partition < m_siteCount; ++partition, ++currentPartition) { + for (int site = 0, currentPartition = 0; site < m_siteCount; ++site) { + for (int partition = 0; partition < m_partitionPerSite; ++partition, ++currentPartition) { cc.addPartition("localhost", site, currentPartition); } } @@ -262,12 +262,11 @@ public boolean compile(VoltProjectBuilder builder) { try { CatalogUtil.updateCatalogInJar(m_jarFileName, catalog, "catalog.txt"); } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); + throw new RuntimeException(e); } tmpCatalog = CatalogUtil.loadCatalogFromJar(m_jarFileName); - System.err.println("XXXXXXXXXXXXXXXXXXXXX\n" + CatalogInfo.getInfo(this.catalog, new File(m_jarFileName))); + System.err.println(CatalogInfo.getInfo(this.catalog, new File(m_jarFileName))); return m_compiled; } @@ -280,7 +279,7 @@ public void startUp() { } // set to true to spew startup timing data - boolean logtime = false; + boolean logtime = true; long startTime = 0; if (logtime) { startTime = System.currentTimeMillis(); @@ -288,7 +287,7 @@ public void startUp() { } // create the in-process server - Configuration config = new Configuration(); +// Configuration config = new Configuration(); // config.m_backend = m_target; // config.m_noLoadLibVOLTDB = (m_target == BackendTarget.HSQLDB_BACKEND); // config.m_pathToCatalog = m_jarFileName; @@ -326,7 +325,8 @@ public void startUp() { } PipeToFile ptf = new PipeToFile(testoutputdir + File.separator + - getName() + "-" + site_id + ".txt", proc.getInputStream()); + getName() + "-" + site_id + ".txt", proc.getInputStream()); + ptf.m_writer.write(m_procBuilder.command().toString() + "\n"); m_pipes.add(ptf); Thread t = new Thread(ptf); t.setName("ClusterPipe:" + String.valueOf(site_id)); @@ -421,14 +421,14 @@ public List getListenerAddresses() { @Override public String getName() { - return "localCluster-" + String.valueOf(m_siteCount) + - "-" + String.valueOf(m_hostCount) + "-" + m_target.display.toUpperCase(); + return "localCluster-" + String.valueOf(m_partitionPerSite) + + "-" + String.valueOf(m_siteCount) + "-" + m_target.display.toUpperCase(); } @Override public int getNodeCount() { - return m_hostCount; + return m_siteCount; } @Override