Skip to content

Commit

Permalink
Oh man this a good one. We're getting closer to getting the LocalClus…
Browse files Browse the repository at this point in the history
…ter regression suite working again. We fixed up HStoreSite + HStoreCoordinator shutdown() methods to make sure that we kill everything from one test case to the other
  • Loading branch information
apavlo committed Mar 9, 2012
1 parent 44bbb8a commit c434fa2
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 59 deletions.
11 changes: 11 additions & 0 deletions src/frontend/edu/brown/hstore/HStoreCoordinator.java
Expand Up @@ -121,6 +121,8 @@ public class HStoreCoordinator implements Shutdownable {
private final TransactionFinishDispatcher transactionFinish_dispatcher;
private final TransactionRedirectDispatcher transactionRedirect_dispatcher;

private final List<Thread> dispatcherThreads = new ArrayList<Thread>();

private Shutdownable.ShutdownState state = ShutdownState.INITIALIZED;

private final EventObservable<HStoreCoordinator> ready_observable = new EventObservable<HStoreCoordinator>();
Expand Down Expand Up @@ -230,18 +232,21 @@ public synchronized void start() {
Thread t = new Thread(transactionInit_dispatcher, HStoreSite.getThreadName(this.hstore_site, "init"));
t.setDaemon(true);
t.start();
this.dispatcherThreads.add(t);
}
if (this.transactionFinish_dispatcher != null) {
if (debug.get()) LOG.debug("Starting FinishTransaction dispatcher thread");
Thread t = new Thread(transactionFinish_dispatcher, HStoreSite.getThreadName(this.hstore_site, "finish"));
t.setDaemon(true);
t.start();
this.dispatcherThreads.add(t);
}
if (this.transactionRedirect_dispatcher != null) {
if (debug.get()) LOG.debug("Starting ForwardTxn dispatcher thread");
Thread t = new Thread(transactionRedirect_dispatcher, HStoreSite.getThreadName(this.hstore_site, "frwd"));
t.setDaemon(true);
t.start();
this.dispatcherThreads.add(t);
}

if (debug.get()) LOG.debug("Starting listener thread");
Expand Down Expand Up @@ -283,6 +288,12 @@ public synchronized void shutdown() {
this.state = ShutdownState.SHUTDOWN;

try {
// Kill all of our dispatchers
for (Thread t : this.dispatcherThreads) {
if (trace.get()) LOG.trace("Stopping dispatcher thread " + t.getName());
t.interrupt();
} // FOR

if (trace.get()) LOG.trace("Stopping eventLoop for Site #" + this.getLocalSiteId());
this.eventLoop.exitLoop();

Expand Down
54 changes: 21 additions & 33 deletions src/frontend/edu/brown/hstore/HStoreSite.java
Expand Up @@ -158,11 +158,6 @@ public static final String formatPartitionName(int site_id, int partition_id) {
return (HStoreSite.getThreadName(site_id, null, partition_id));
}

/**
* TODO
*/
private static HStoreSite SHUTDOWN_HANDLE = null;

// ----------------------------------------------------------------------------
// OBJECT POOLS
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -351,6 +346,10 @@ public static final String formatPartitionName(int site_id, int partition_id) {
protected HStoreSite(Site catalog_site, HStoreConf hstore_conf) {
assert(catalog_site != null);

// **IMPORTANT**
// Always clear out the CatalogUtil cache before we start our new HStoreSite
CatalogUtil.clearCache(catalog_site);

this.hstore_conf = hstore_conf;
this.catalog_site = catalog_site;
this.catalog_db = CatalogUtil.getDatabase(this.catalog_site);
Expand Down Expand Up @@ -460,6 +459,9 @@ protected HStoreSite(Site catalog_site, HStoreConf hstore_conf) {
* @return
*/
public int getLocalPartitionOffset(int partition) {
assert(partition < this.local_partition_offsets.length) :
String.format("Unable to get offset of local partition %d %s [hashCode=%d]",
partition, Arrays.toString(this.local_partition_offsets), this.hashCode());
return this.local_partition_offsets[partition];
}

Expand Down Expand Up @@ -781,13 +783,7 @@ public void update(EventObservable<Pair<Thread, Throwable>> o, Pair<Thread, Thro
}

// Add in our shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));

// And mark ourselves as the current HStoreSite in case somebody wants to take us down!
// TODO: Move to HStore.java
synchronized (HStoreSite.class) {
if (SHUTDOWN_HANDLE == null) SHUTDOWN_HANDLE = this;
} // SYNCH
// Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));

return (this);
}
Expand Down Expand Up @@ -839,30 +835,19 @@ public ProfileMeasurement getEmptyQueueTime() {
// HSTORESTITE SHUTDOWN STUFF
// ----------------------------------------------------------------------------

public static void crash() {
if (SHUTDOWN_HANDLE != null) {
SHUTDOWN_HANDLE.hstore_coordinator.shutdownCluster();
} else {
LOG.fatal("H-Store has encountered an unrecoverable error and is exiting.");
LOG.fatal("The log may contain additional information.");
System.exit(-1);
}

}

/**
* Shutdown Hook Thread
*/
private final class ShutdownHook implements Runnable {
@Override
public void run() {
// Dump out our status
int num_inflight = inflight_txns.size();
if (num_inflight > 0) {
System.err.println("Shutdown [" + num_inflight + " txns inflight]");
}
}
} // END CLASS
// private final class ShutdownHook implements Runnable {
// @Override
// public void run() {
// // Dump out our status
// int num_inflight = inflight_txns.size();
// if (num_inflight > 0) {
// System.err.println("Shutdown [" + num_inflight + " txns inflight]");
// }
// }
// } // END CLASS

@Override
public void prepareShutdown(boolean error) {
Expand Down Expand Up @@ -898,6 +883,9 @@ public synchronized void shutdown() {
// Stop the monitor thread
if (this.status_monitor != null) this.status_monitor.shutdown();

// Kill the queue manager
this.txnQueueManager.shutdown();

// Tell our local boys to go down too
for (PartitionExecutorPostProcessor p : this.processors) {
p.shutdown();
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/edu/brown/hstore/PartitionExecutor.java
Expand Up @@ -603,8 +603,7 @@ else if (target == BackendTarget.NATIVE_EE_JNI) {
}
// just print error info an bail if we run into an error here
catch (final Exception ex) {
LOG.fatal("Failed to initialize PartitionExecutor", ex);
VoltDB.crashVoltDB();
throw new RuntimeException("Failed to initialize PartitionExecutor", ex);
}
this.ee = eeTemp;
this.hsql = hsqlTemp;
Expand Down
23 changes: 21 additions & 2 deletions src/frontend/edu/brown/hstore/dtxn/TransactionQueueManager.java
Expand Up @@ -14,13 +14,14 @@
import edu.brown.hstore.Hstoreservice.Status;
import edu.brown.hstore.callbacks.TransactionInitWrapperCallback;
import edu.brown.hstore.interfaces.Loggable;
import edu.brown.hstore.interfaces.Shutdownable;
import edu.brown.hstore.util.TxnCounter;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.statistics.Histogram;
import edu.brown.utils.CollectionUtil;

public class TransactionQueueManager implements Runnable, Loggable {
public class TransactionQueueManager implements Runnable, Loggable, Shutdownable {
private static final Logger LOG = Logger.getLogger(TransactionQueueManager.class);
private final static LoggerBoolean debug = new LoggerBoolean(LOG.isDebugEnabled());
private final static LoggerBoolean trace = new LoggerBoolean(LOG.isTraceEnabled());
Expand All @@ -38,6 +39,8 @@ public class TransactionQueueManager implements Runnable, Loggable {
private final Collection<Integer> localPartitions;
private final int localPartitionsArray[];

private boolean stop = false;

/**
*
*/
Expand Down Expand Up @@ -155,7 +158,7 @@ public void run() {
long txn_id = -1;
long last_id = -1;

while (true) {
while (this.stop == false) {
synchronized (this) {
try {
this.wait(this.wait_time * 10); // FIXME
Expand Down Expand Up @@ -471,4 +474,20 @@ public TransactionInitPriorityQueue getQueue(int partition) {
public TransactionInitWrapperCallback getCallback(long txn_id) {
return (this.txn_callbacks.get(txn_id));
}

@Override
public void prepareShutdown(boolean error) {
// Nothing for now
// Probably should abort all queued txns.
}

@Override
public void shutdown() {
this.stop = true;
}

@Override
public boolean isShuttingDown() {
return (this.stop);
}
}
13 changes: 9 additions & 4 deletions src/frontend/org/voltdb/VoltDB.java
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.TimeZone;

import edu.brown.hstore.HStore;
import edu.brown.hstore.HStoreConstants;
import edu.brown.hstore.HStoreSite;

Expand Down Expand Up @@ -212,10 +213,14 @@ public static void crashVoltDB() {
System.err.println(t.toString());
}

HStoreSite.crash();
// System.err.println("VoltDB has encountered an unrecoverable error and is exiting.");
// System.err.println("The log may contain additional information.");
// System.exit(-1);
HStoreSite handle = HStore.instance();
if (handle != null) {
handle.getCoordinator().shutdownCluster();
} else {
System.err.println("H-Store has encountered an unrecoverable error and is exiting.");
System.err.println("The log may contain additional information.");
System.exit(-1);
}
}

/**
Expand Down
36 changes: 18 additions & 18 deletions tests/frontend/org/voltdb/regressionsuites/LocalCluster.java
Expand Up @@ -58,8 +58,8 @@ public class LocalCluster implements VoltServerConfig {

// configuration data
final String m_jarFileName;
final int m_partitionPerSite;
final int m_siteCount;
final int m_hostCount;
final int m_replication;
final BackendTarget m_target;
final String m_buildDir;
Expand Down Expand Up @@ -147,15 +147,15 @@ public void run() {
}

public LocalCluster(String jarFileName, int siteCount,
int hostCount, int replication, BackendTarget target)
int partitionsPerSite, int replication, BackendTarget target)
{
System.out.println("Instantiating LocalCluster for " + jarFileName);
System.out.println("Sites: " + siteCount + " hosts: " + hostCount
System.out.println("Sites: " + siteCount + " hosts: " + partitionsPerSite
+ " replication factor: " + replication);

assert (jarFileName != null);
assert (siteCount > 0);
assert (hostCount > 0);
assert (partitionsPerSite > 0);
assert (replication >= 0);

/*// (1) Load catalog from Jar
Expand Down Expand Up @@ -187,9 +187,9 @@ public LocalCluster(String jarFileName, int siteCount,
System.err.println("XXXXXXXXXXXXXXXXXXXXX\n" + CatalogInfo.getInfo(this.catalog, new File(jarFileName)));*/

m_jarFileName = VoltDB.Configuration.getPathToCatalogForTest(jarFileName);
m_siteCount = siteCount;
m_partitionPerSite = siteCount;
m_target = target;
m_hostCount = hostCount;
m_siteCount = partitionsPerSite;
m_replication = replication;
String buildDir = System.getenv("VOLTDB_BUILD_DIR"); // via build.xml
if (buildDir == null)
Expand Down Expand Up @@ -242,7 +242,7 @@ public boolean compile(VoltProjectBuilder builder) {
if (m_compiled) {
return true;
}
m_compiled = builder.compile(m_jarFileName, m_siteCount, m_hostCount,
m_compiled = builder.compile(m_jarFileName, m_partitionPerSite, m_siteCount,
m_replication, "localhost");

// (1) Load catalog from Jar
Expand All @@ -251,8 +251,8 @@ public boolean compile(VoltProjectBuilder builder) {
// (2) Update catalog to include target cluster configuration
ClusterConfiguration cc = new ClusterConfiguration();
// Update cc with a bunch of hosts/sites/partitions
for (int site = 0, currentPartition = 0; site < m_hostCount; ++site) {
for (int partition = 0; partition < m_siteCount; ++partition, ++currentPartition) {
for (int site = 0, currentPartition = 0; site < m_siteCount; ++site) {
for (int partition = 0; partition < m_partitionPerSite; ++partition, ++currentPartition) {
cc.addPartition("localhost", site, currentPartition);
}
}
Expand All @@ -262,12 +262,11 @@ public boolean compile(VoltProjectBuilder builder) {
try {
CatalogUtil.updateCatalogInJar(m_jarFileName, catalog, "catalog.txt");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new RuntimeException(e);
}

tmpCatalog = CatalogUtil.loadCatalogFromJar(m_jarFileName);
System.err.println("XXXXXXXXXXXXXXXXXXXXX\n" + CatalogInfo.getInfo(this.catalog, new File(m_jarFileName)));
System.err.println(CatalogInfo.getInfo(this.catalog, new File(m_jarFileName)));

return m_compiled;
}
Expand All @@ -280,15 +279,15 @@ public void startUp() {
}

// set to true to spew startup timing data
boolean logtime = false;
boolean logtime = true;
long startTime = 0;
if (logtime) {
startTime = System.currentTimeMillis();
System.out.println("********** Starting cluster at: " + startTime);
}

// create the in-process server
Configuration config = new Configuration();
// Configuration config = new Configuration();
// config.m_backend = m_target;
// config.m_noLoadLibVOLTDB = (m_target == BackendTarget.HSQLDB_BACKEND);
// config.m_pathToCatalog = m_jarFileName;
Expand Down Expand Up @@ -326,7 +325,8 @@ public void startUp() {
}

PipeToFile ptf = new PipeToFile(testoutputdir + File.separator +
getName() + "-" + site_id + ".txt", proc.getInputStream());
getName() + "-" + site_id + ".txt", proc.getInputStream());
ptf.m_writer.write(m_procBuilder.command().toString() + "\n");
m_pipes.add(ptf);
Thread t = new Thread(ptf);
t.setName("ClusterPipe:" + String.valueOf(site_id));
Expand Down Expand Up @@ -421,14 +421,14 @@ public List<String> getListenerAddresses() {

@Override
public String getName() {
return "localCluster-" + String.valueOf(m_siteCount) +
"-" + String.valueOf(m_hostCount) + "-" + m_target.display.toUpperCase();
return "localCluster-" + String.valueOf(m_partitionPerSite) +
"-" + String.valueOf(m_siteCount) + "-" + m_target.display.toUpperCase();
}

@Override
public int getNodeCount()
{
return m_hostCount;
return m_siteCount;
}

@Override
Expand Down

0 comments on commit c434fa2

Please sign in to comment.