Skip to content

Commit

Permalink
Building out the new infrastructure for tracking prefetched queries. #…
Browse files Browse the repository at this point in the history
  • Loading branch information
apavlo committed May 11, 2013
1 parent e3a429f commit 859972b
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 67 deletions.
6 changes: 5 additions & 1 deletion src/frontend/edu/brown/hstore/HStoreCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -771,10 +771,14 @@ public void transactionInit(LocalTransaction ts, RpcCallback<TransactionInitResp
// Make sure that we initialize our internal PrefetchState for this txn
ts.initializePrefetch();
TransactionInitRequest[] requests = this.queryPrefetchPlanner.generateWorkFragments(ts);

// If the PrefetchQueryPlanner returns a null array, then there is nothing
// that we can actually prefetch, so we'll just send the normal txn init requests
if (requests == null) {
this.sendDefaultTransactionInitRequests(ts, callback);
return;
}

TransactionCounter.PREFETCH_LOCAL.inc(ts.getProcedure());
int sent_ctr = 0;
int prefetch_ctr = 0;
Expand All @@ -783,7 +787,7 @@ public void transactionInit(LocalTransaction ts, RpcCallback<TransactionInitResp
this.num_sites, TransactionInitRequest.class.getSimpleName(), requests.length);
for (int site_id = 0; site_id < this.num_sites; site_id++) {
if (requests[site_id] == null) continue;

if (site_id == this.local_site_id) {
this.hstore_site.transactionInit(ts);
// this.transactionInit_handler.sendLocal(ts.getTransactionId(),
Expand Down
56 changes: 43 additions & 13 deletions src/frontend/edu/brown/hstore/PartitionExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,13 @@ else if (ts.getEstimatorState() != null && singlePartition && ts.isSpeculative()
return (undoToken);
}


/**
*
* @param ts
* @param fragment
* @param allParams
* @return
*/
private ParameterSet[] getFragmentParameters(AbstractTransaction ts, WorkFragment fragment, ParameterSet allParams[]) {
int num_fragments = fragment.getFragmentIdCount();
ParameterSet fragmentParams[] = tmp_fragmentParams.getParameterSet(num_fragments);
Expand All @@ -1779,6 +1785,13 @@ private ParameterSet[] getFragmentParameters(AbstractTransaction ts, WorkFragmen
return (fragmentParams);
}

/**
*
* @param ts
* @param input_dep_ids
* @param inputs
* @return
*/
private Map<Integer, List<VoltTable>> getFragmentInputs(AbstractTransaction ts,
List<Integer> input_dep_ids,
Map<Integer, List<VoltTable>> inputs) {
Expand Down Expand Up @@ -3118,19 +3131,40 @@ else if (debug.val) {

// If we prefetched queries for this txn, we need to check whether we
// have already submit a request for one of the queries in our batch
if (ts.hasPrefetchQueries()) {
if (hstore_conf.site.exec_prefetch_queries && ts.hasPrefetchQueries()) {
PartitionSet stmtPartitions[] = plan.getStatementPartitions();
PrefetchState prefetchState = ts.getPrefetchState();
assert(prefetchState != null);
for (int i = 0; i < batchSize; i++) {
Statement stmt = batchStmts[i].getStatement();
// We only need to maintain counters for the prefetchable queries.
if (stmt.getPrefetchable()) {
int stmtCnt = ts.updateStatementCounter(stmt, stmtPartitions[i]);
if (stmt.getPrefetchable() == false) continue;

// Always increase the counter so that we know how many times
// we have executed this query in the past.
int stmtCnt = prefetchState.updateStatementCounter(stmt, stmtPartitions[i]);

// Check whether we have already sent a request to execute this query
if (ts.isMarkedPrefetched(stmt, stmtCnt)) {
// We have... so that means that we don't want to send it
// again and should expect the result to come from somewhere else.

// Check whether we have already sent a request to execute this query
PrefetchState.PrefetchedQuery pq = prefetchState.findPrefetchedQuery(stmt, stmtCnt);
if (pq != null) {
// We have... so that means that we don't want to send it
// again and should expect the result to come from somewhere else.
// But we need to check whether we sent it to the same partitions
boolean samePartitions = pq.partitions.containsAll(stmtPartitions[i]);
boolean sameParams = (pq.paramsHash == batchParams[i].hashCode());

// Everything is the same, so we need to remove this
// statement from the batch.
if (samePartitions && sameParams) {
// TODO
}
// The parameters are different
else if (sameParams == false) {
// TODO
}
// The partitions are different
else if (samePartitions == false) {
// TODO
}
}
} // FOR
Expand All @@ -3150,10 +3184,6 @@ else if (debug.val) {
// Otherwise, we need to generate WorkFragments and then send the messages out
// to our remote partitions using the HStoreCoordinator
else {




if (trace.val)
LOG.trace(ts + " - Using PartitionExecutor.dispatchWorkFragments() to execute distributed queries");
ExecutionState execState = ts.getExecutionState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ public void finish() {

@Override
public void run(ClientResponseImpl parameter) {
if (debug.val) LOG.debug(String.format("Got ClientResponse callback for txn #%d! Sending back to %s",
parameter.getTransactionId(), HStoreThreadManager.formatSiteName(this.destSiteId)));
if (debug.val)
LOG.debug(String.format("Got ClientResponse callback for txn #%d! Sending back to %s",
parameter.getTransactionId(), HStoreThreadManager.formatSiteName(this.destSiteId)));
FastSerializer fs = new FastSerializer();
try {
parameter.writeExternal(fs);
Expand All @@ -80,9 +81,10 @@ public void run(ClientResponseImpl parameter) {
.setOutput(bs)
.build();
this.orig_callback.run(response);
if (debug.val) LOG.debug(String.format("Sent back ClientResponse for txn #%d to %s [bytes=%d]",
parameter.getTransactionId(), HStoreThreadManager.formatSiteName(this.destSiteId),
bs.size()));
if (debug.val)
LOG.debug(String.format("Sent back ClientResponse for txn #%d to %s [bytes=%d]",
parameter.getTransactionId(), HStoreThreadManager.formatSiteName(this.destSiteId),
bs.size()));

// IMPORTANT: Since we're the only one that knows that we're finished (and actually even
// cares), we need to be polite and clean-up after ourselves...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ public void remoteHandler(RpcController controller, TransactionInitRequest reque
// our state when pre-fetching queries.
if (request.getPrefetchFragmentsCount() > 0) {
// Stick the prefetch information into the transaction
if (debug.val) LOG.debug(String.format("%s - Attaching %d prefetch WorkFragments at %s",
ts, request.getPrefetchFragmentsCount(), hstore_site.getSiteName()));
if (debug.val)
LOG.debug(String.format("%s - Attaching %d prefetch WorkFragments at %s",
ts, request.getPrefetchFragmentsCount(), hstore_site.getSiteName()));
ts.initializePrefetch();
ts.attachPrefetchQueries(request.getPrefetchFragmentsList(),
request.getPrefetchParamsList());
Expand Down
36 changes: 8 additions & 28 deletions src/frontend/edu/brown/hstore/txns/AbstractTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -1089,29 +1089,17 @@ public final void initializePrefetch() {
}
}

public final PrefetchState getPrefetchState() {
return (this.prefetch);
}

/**
* Returns true if this transaction has prefetched queries
*/
public final boolean hasPrefetchQueries() {
// return (this.prefetch.fragments != null && this.prefetch.fragments.isEmpty() == false);
return (this.prefetch != null);
}

public final void markPrefetchQuery(Statement stmt, int counter) {

}

/**
* Returns true if this query
* @param stmt
* @param counter
* @return
*/
public final boolean isMarkedPrefetched(Statement stmt, int counter) {

return (false);
}

/**
* Attach prefetchable WorkFragments for this transaction
* This should be invoked on the remote side of the initialization request.
Expand All @@ -1131,7 +1119,8 @@ public void attachPrefetchQueries(List<WorkFragment> fragments, List<ByteString>

public void attachPrefetchParameters(ParameterSet params[]) {
assert(this.prefetch.params == null) :
"Trying to attach Prefetch ParameterSets more than once!";
String.format("Trying to attach Prefetch %s more than once for %s",
ParameterSet.class.getSimpleName(), this);
this.prefetch.params = params;
}

Expand All @@ -1150,18 +1139,9 @@ public final ParameterSet[] getPrefetchParameterSets() {

public final void markExecPrefetchQuery(int partition) {
assert(this.prefetch != null);
this.prefetch.partitions.set(partition);
}

/**
* Update an internal counter for the number of times that we've invoked queries
* @param stmt
* @return
*/
public final int updateStatementCounter(Statement stmt, PartitionSet partitions) {
assert(this.prefetch != null);
return (int)this.prefetch.stmtCounters.put(stmt);
this.prefetch.partitions.add(partition);
}


// ----------------------------------------------------------------------------
// DEBUG METHODS
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/edu/brown/hstore/txns/ExecutionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -19,16 +17,18 @@
import org.voltdb.utils.Pair;

import edu.brown.hstore.Hstoreservice.WorkFragment;
import edu.brown.hstore.util.ParameterSetArrayCache;
import edu.brown.hstore.PartitionExecutor;
import edu.brown.hstore.util.ParameterSetArrayCache;
import edu.brown.interfaces.DebugContext;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.pools.Poolable;

/**
* The internal state of a transaction while it is running at a PartitionExecutor
* This will be removed from the LocalTransaction once its control code is finished executing
* This will be removed from the LocalTransaction once its control code is finished executing.
* If you need to track anything that may occur *before* the txn starts running, then you don't
* want to put those data structures in here.
* @author pavlo
*/
public class ExecutionState implements Poolable {
Expand Down Expand Up @@ -102,7 +102,7 @@ public class ExecutionState implements Poolable {
* Internal cache of the result queues that were used by the txn in this round.
* This is so that we don't have to clear all of the queues in the entire results_dependency_stmt_ctr cache.
*/
private final Set<Queue<Integer>> results_queue_cache = new HashSet<Queue<Integer>>();
private final Collection<Queue<Integer>> results_queue_cache = new ArrayList<Queue<Integer>>();

/**
* Sometimes we will get results back while we are still queuing up the rest of the tasks and
Expand Down
1 change: 1 addition & 0 deletions src/frontend/edu/brown/hstore/txns/LocalTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.voltdb.catalog.CatalogType;
import org.voltdb.catalog.PlanFragment;
import org.voltdb.catalog.Procedure;
import org.voltdb.catalog.Statement;
import org.voltdb.catalog.Table;
import org.voltdb.exceptions.SerializableException;
import org.voltdb.types.SpeculationType;
Expand Down
88 changes: 83 additions & 5 deletions src/frontend/edu/brown/hstore/txns/PrefetchState.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.voltdb.ParameterSet;
import org.voltdb.catalog.Statement;
Expand All @@ -15,22 +18,40 @@
import edu.brown.pools.Poolable;
import edu.brown.statistics.Histogram;
import edu.brown.statistics.ObjectHistogram;
import edu.brown.utils.PartitionSet;

/**
* Special internal state information for when the txn requests prefetch queries
* @author pavlo
*/
public class PrefetchState implements Poolable {

public class PrefetchedQuery {
public final Statement stmt;
public final int counter;
public final PartitionSet partitions;
public final int paramsHash;

public PrefetchedQuery(Statement stmt, int counter, PartitionSet partitions, int paramsHash) {
this.stmt = stmt;
this.counter = counter;
this.partitions = partitions;
this.paramsHash = paramsHash;
}
}

private final Collection<PrefetchedQuery> prefetchQueries = new ArrayList<>();

/**
* Internal counter for the number of times that we've executed this query in the past.
* Internal counter for the number of times that we've executed queries in the past.
* If a Statement is not in this list, then we know that it wasn't prefetched.
*/
protected final Histogram<Statement> stmtCounters = new ObjectHistogram<Statement>();
private final Histogram<Statement> stmtCounters = new ObjectHistogram<Statement>(true);

/**
* Which partitions have received prefetch WorkFragments
*/
protected final BitSet partitions;
protected final PartitionSet partitions = new PartitionSet();

/**
* The list of the FragmentIds that were sent out in a prefetch request
Expand Down Expand Up @@ -59,9 +80,12 @@ public class PrefetchState implements Poolable {
*/
protected final List<WorkResult> results = new ArrayList<WorkResult>();

// ----------------------------------------------------------------------------
// INITIALIZATION
// ----------------------------------------------------------------------------

public PrefetchState(HStoreSite hstore_site) {
int num_partitions = hstore_site.getLocalPartitionIds().size();
this.partitions = new BitSet(num_partitions);
// int num_partitions = hstore_site.getLocalPartitionIds().size();
}

public void init(AbstractTransaction ts) {
Expand All @@ -82,6 +106,60 @@ public void finish() {
this.params = null;
this.results.clear();
}

// ----------------------------------------------------------------------------
// INTERNAL METHODS
// ----------------------------------------------------------------------------


public PrefetchedQuery findPrefetchedQuery(Statement stmt, int counter) {
for (PrefetchedQuery pq : this.prefetchQueries) {
if (pq.stmt.equals(stmt) && pq.counter == counter) {
return (pq);
}
} // FOR
return (null);
}


// ----------------------------------------------------------------------------
// API METHODS
// ----------------------------------------------------------------------------

/**
* Mark the given query instance as being prefetched. This doesn't keep track
* of whether the result has returned, only that we sent the prefetch request
* out to the given partitions.
* @param stmt
* @param counter
* @param partitions
* @param stmtParams
*/
public void markPrefetchedQuery(Statement stmt, int counter,
PartitionSet partitions, ParameterSet stmtParams) {
PrefetchedQuery pq = new PrefetchedQuery(stmt, counter, partitions, stmtParams.hashCode());
this.prefetchQueries.add(pq);
}


/**
* Update an internal counter for the number of times that we've invoked queries
* @param stmt
* @return
*/
public final int updateStatementCounter(Statement stmt, PartitionSet partitions) {
return (int)this.stmtCounters.put(stmt);
}


/**
* Returns true if this query
* @param stmt
* @param counter
* @return
*/
public final boolean isMarkedPrefetched(Statement stmt, int counter) {
return (this.findPrefetchedQuery(stmt, counter) != null);
}

}
Loading

0 comments on commit 859972b

Please sign in to comment.