Skip to content

Commit

Permalink
Reorganized the PrefetchQueryPlanner so that it returns builders inst…
Browse files Browse the repository at this point in the history
…ead of actual messages. It is now responsible for updating the txn's information in the DependencyTracker. This is necessary because this planner has all of the parameter information.
  • Loading branch information
apavlo committed May 16, 2013
1 parent 9980a97 commit 51cf9d8
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 380 deletions.
36 changes: 17 additions & 19 deletions src/frontend/edu/brown/hstore/HStoreCoordinator.java
Expand Up @@ -32,6 +32,7 @@
import com.google.protobuf.RpcController;

import edu.brown.catalog.CatalogUtil;
import edu.brown.catalog.special.CountedStatement;
import edu.brown.hstore.Hstoreservice.HStoreService;
import edu.brown.hstore.Hstoreservice.InitializeRequest;
import edu.brown.hstore.Hstoreservice.InitializeResponse;
Expand Down Expand Up @@ -62,6 +63,7 @@
import edu.brown.hstore.Hstoreservice.TransactionReduceResponse;
import edu.brown.hstore.Hstoreservice.TransactionWorkRequest;
import edu.brown.hstore.Hstoreservice.TransactionWorkResponse;
import edu.brown.hstore.Hstoreservice.WorkFragment;
import edu.brown.hstore.callbacks.ShutdownPrepareCallback;
import edu.brown.hstore.callbacks.LocalFinishCallback;
import edu.brown.hstore.callbacks.TransactionPrefetchCallback;
Expand All @@ -80,7 +82,9 @@
import edu.brown.hstore.handlers.TransactionReduceHandler;
import edu.brown.hstore.handlers.TransactionWorkHandler;
import edu.brown.hstore.txns.AbstractTransaction;
import edu.brown.hstore.txns.DependencyTracker;
import edu.brown.hstore.txns.LocalTransaction;
import edu.brown.hstore.txns.PrefetchState;
import edu.brown.hstore.txns.RemoteTransaction;
import edu.brown.hstore.util.PrefetchQueryPlanner;
import edu.brown.hstore.util.TransactionCounter;
Expand Down Expand Up @@ -772,14 +776,12 @@ public void transactionInit(LocalTransaction ts, RpcCallback<TransactionInitResp
LOG.debug(String.format("%s - Generating %s with prefetchable queries",
ts, TransactionInitRequest.class.getSimpleName()));

// Make sure that we initialize our internal PrefetchState for this txn
ts.initializePrefetch();

// We also need to add our boy to its base partition's DependencyTracker
// This is so that we can store the prefetch results when they come back
hstore_site.getDependencyTracker(ts.getBasePartition()).addTransaction(ts);

TransactionInitRequest[] requests = this.queryPrefetchPlanner.generateWorkFragments(ts);
DependencyTracker depTracker = hstore_site.getDependencyTracker(ts.getBasePartition());
TransactionInitRequest.Builder[] requests = this.queryPrefetchPlanner.plan(ts,
ts.getProcedureParameters(),
depTracker);

// If the PrefetchQueryPlanner returns a null array, then there is nothing
// that we can actually prefetch, so we'll just send the normal txn init requests
Expand All @@ -796,23 +798,19 @@ public void transactionInit(LocalTransaction ts, RpcCallback<TransactionInitResp
this.num_sites, TransactionInitRequest.class.getSimpleName(), requests.length);
for (int site_id = 0; site_id < this.num_sites; site_id++) {
if (requests[site_id] == null) continue;

if (site_id == this.local_site_id) {
this.hstore_site.transactionInit(ts);
// this.transactionInit_handler.sendLocal(ts.getTransactionId(),
// requests[site_id],
// ts.getPredictTouchedPartitions(),
// callback);
// Blast out this mofo. Tell them Rico sent you...
if (site_id != this.local_site_id) {
TransactionInitRequest initRequest = requests[site_id].build();
ProtoRpcController controller = ts.getTransactionInitController(site_id);
this.channels[site_id].transactionInit(controller, initRequest, callback);
prefetch_ctr += initRequest.getPrefetchFragmentsCount();
}
// Send the default message for the local site
else {
ProtoRpcController controller = ts.getTransactionInitController(site_id);
this.channels[site_id].transactionInit(controller,
requests[site_id],
callback);
assert(site_id == this.local_site_id);
this.hstore_site.transactionInit(ts);
}

sent_ctr++;
prefetch_ctr += requests[site_id].getPrefetchFragmentsCount();
} // FOR
assert(sent_ctr > 0) : "No TransactionInitRequests available for " + ts;
if (debug.val)
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/edu/brown/hstore/txns/DependencyTracker.java
Expand Up @@ -336,9 +336,9 @@ protected void finishRound(LocalTransaction ts) {

private TransactionState getState(LocalTransaction ts) {
TransactionState state = this.txnStates.get(ts.getTransactionId());
// assert(state != null) :
// String.format("Unexpected null %s handle for %s at %s",
// TransactionState.class.getSimpleName(), ts, this);
assert(state != null) :
String.format("Unexpected null %s handle for %s at %s",
TransactionState.class.getSimpleName(), ts, this);
return (state);
}

Expand Down
83 changes: 33 additions & 50 deletions src/frontend/edu/brown/hstore/util/PrefetchQueryPlanner.java
Expand Up @@ -26,8 +26,8 @@
import edu.brown.hstore.BatchPlanner.BatchPlan;
import edu.brown.hstore.Hstoreservice.TransactionInitRequest;
import edu.brown.hstore.Hstoreservice.WorkFragment;
import edu.brown.hstore.txns.DependencyTracker;
import edu.brown.hstore.txns.LocalTransaction;
import edu.brown.hstore.txns.PrefetchState;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.mappings.ParameterMapping;
Expand Down Expand Up @@ -149,7 +149,7 @@ protected BatchPlanner addPlanner(Procedure catalog_proc,
* @param ts
* @return
*/
public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
public TransactionInitRequest.Builder[] plan(LocalTransaction ts, ParameterSet procParams, DependencyTracker depTracker) {
// We can't do this without a ParameterMappingSet
if (this.catalogContext.paramMappings == null) {
return (null);
Expand All @@ -160,11 +160,9 @@ public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
return (null);
}

assert (ts.getProcedureParameters() != null) :
"Unexpected null ParameterSet for " + ts;
if (debug.val)
LOG.debug(String.format("%s - Generating prefetch WorkFragments using %s",
ts, ts.getProcedureParameters()));
ts, procParams));

// Create a SQLStmt batch as if it was created during the normal txn execution process
SQLStmt[] prefetchStmts = new SQLStmt[prefetchable.size()];
Expand Down Expand Up @@ -192,7 +190,7 @@ public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
// Makes a list of ByteStrings containing the ParameterSets that we need
// to send over to the remote sites so that they can execute our
// prefetchable queries
Object proc_params[] = ts.getProcedureParameters().toArray();
Object proc_params[] = procParams.toArray();
for (int i = 0; i < prefetchParams.length; i++) {
CountedStatement counted_stmt = prefetchable.get(i);
if (debug.val)
Expand All @@ -211,7 +209,7 @@ public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
catalog_param.fullName(), counted_stmt);

if (pm.procedure_parameter.getIsarray()) {
stmt_params[catalog_param.getIndex()] = ParametersUtil.getValue(ts.getProcedureParameters(), pm);
stmt_params[catalog_param.getIndex()] = ParametersUtil.getValue(procParams, pm);
}
else {
ProcParameter catalog_proc_param = pm.procedure_parameter;
Expand All @@ -236,7 +234,6 @@ public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
throw new RuntimeException("Failed to serialize ParameterSet " + i + " for " + ts, ex);
}


} // FOR (Statement)

// Generate the WorkFragments that we will need to send in our TransactionInitRequest
Expand All @@ -247,19 +244,6 @@ public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
prefetchParams);
List<WorkFragment.Builder> fragmentBuilders = new ArrayList<WorkFragment.Builder>();
plan.getWorkFragmentsBuilders(ts.getTransactionId(), prefetchCounters, fragmentBuilders);

// IMPORTANT: Make sure that we tell the PrefetchState handle that
// we have marked this Statement as prefetched. We have to do this here
// because we need to have the BatchPlanner tell us what partitions the
// query is going to be executed on.
PrefetchState prefetchState = ts.getPrefetchState();
for (int i = 0; i < prefetchParams.length; i++) {
CountedStatement counted_stmt = prefetchable.get(i);
prefetchState.markPrefetchedQuery(counted_stmt.statement,
counted_stmt.counter,
plan.getStatementPartitions()[i],
prefetchParams[i]);
} // FOR

// Loop through the fragments and check whether at least one of
// them needs to be executed at the base (local) partition. If so, we need a
Expand All @@ -269,8 +253,9 @@ public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
// PartitionExecutor is idle. That means, we don't want to serialize all this
// if it's only going to the base partition.
TransactionInitRequest.Builder[] builders = new TransactionInitRequest.Builder[this.catalogContext.numberOfSites];
for (WorkFragment.Builder fragmentBuilder : fragmentBuilders) {
int site_id = this.partitionSiteXref[fragmentBuilder.getPartitionId()];
boolean first = true;
for (WorkFragment.Builder fragment : fragmentBuilders) {
int site_id = this.partitionSiteXref[fragment.getPartitionId()];
if (builders[site_id] == null) {
builders[site_id] = TransactionInitRequest.newBuilder()
.setTransactionId(ts.getTransactionId().longValue())
Expand All @@ -281,7 +266,16 @@ public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
builders[site_id].addPrefetchParams(bs);
} // FOR
}
builders[site_id].addPrefetchFragments(fragmentBuilder);
// Update DependencyTracker
// This has to be done *before* you add it to the TransactionInitRequest
if (first) {
// Make sure that we initialize our internal PrefetchState for this txn
ts.initializePrefetch();
depTracker.addTransaction(ts);
}
depTracker.addPrefetchWorkFragment(ts, fragment, prefetchParams);

builders[site_id].addPrefetchFragments(fragment);
} // FOR (WorkFragment)

PartitionSet touched_partitions = ts.getPredictTouchedPartitions();
Expand All @@ -290,37 +284,26 @@ public TransactionInitRequest[] generateWorkFragments(LocalTransaction ts) {
for (int partition : touched_partitions) {
touched_sites[this.partitionSiteXref[partition]] = true;
} // FOR
TransactionInitRequest[] init_requests = new TransactionInitRequest[this.catalogContext.numberOfSites];
TransactionInitRequest default_request = null;

TransactionInitRequest.Builder default_request = null;
for (int site_id = 0; site_id < this.catalogContext.numberOfSites; ++site_id) {
// If this site has no prefetched fragments ...
if (builders[site_id] == null) {
// but it has other non-prefetched WorkFragments, create a
// default TransactionInitRequest.
if (touched_sites[site_id]) {
if (default_request == null) {
default_request = TransactionInitRequest.newBuilder()
.setTransactionId(ts.getTransactionId())
.setProcedureId(ts.getProcedure().getId())
.setBasePartition(ts.getBasePartition())
.addAllPartitions(ts.getPredictTouchedPartitions()).build();
}
init_requests[site_id] = default_request;
if (debug.val) LOG.debug(ts + " - Sending default TransactionInitRequest to site " + site_id);
}
// And no other WorkFragments, set the TransactionInitRequest to null.
else {
init_requests[site_id] = null;
// but it has other non-prefetched WorkFragments, create a
// default TransactionInitRequest.
if (builders[site_id] == null && touched_sites[site_id]) {
if (default_request == null) {
default_request = TransactionInitRequest.newBuilder()
.setTransactionId(ts.getTransactionId())
.setProcedureId(ts.getProcedure().getId())
.setBasePartition(ts.getBasePartition())
.addAllPartitions(ts.getPredictTouchedPartitions());
}
}
// Otherwise, just build it.
else {
init_requests[site_id] = builders[site_id].build();
if (debug.val) LOG.debug(ts + " - Sending prefetch WorkFragments to site " + site_id);
builders[site_id] = default_request;
if (debug.val) LOG.debug(ts + " - Sending default TransactionInitRequest to site " + site_id);
}
} // FOR (Site)

if (debug.val) LOG.debug(ts + " - TransactionInitRequests\n" + StringUtil.join("\n", init_requests));
return (init_requests);
if (debug.val) LOG.debug(ts + " - TransactionInitRequests\n" + StringUtil.join("\n", builders));
return (builders);
}
}
Expand Up @@ -24,6 +24,7 @@
import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.estimators.EstimatorState;
import edu.brown.hstore.estimators.markov.MarkovEstimatorState;
import edu.brown.hstore.txns.DependencyTracker;
import edu.brown.hstore.txns.LocalTransaction;
import edu.brown.utils.PartitionSet;
import edu.brown.utils.ProjectType;
Expand All @@ -43,6 +44,7 @@ public class TestPrefetchQueryPlanner extends BaseTestCase {

private final MockHStoreSite[] hstore_sites = new MockHStoreSite[NUM_SITES];
private final HStoreCoordinator[] coordinators = new HStoreCoordinator[NUM_SITES];
private DependencyTracker depTracker;

private LocalTransaction ts;

Expand Down Expand Up @@ -137,6 +139,8 @@ public edu.brown.hstore.estimators.EstimatorState getEstimatorState() {
for (Partition catalog_part : catalogContext.getAllPartitions()) {
this.partition_site_xref[catalog_part.getId()] = ((Site) catalog_part.getParent()).getId();
} // FOR
this.depTracker = this.hstore_sites[0].getDependencyTracker(LOCAL_PARTITION);
assertNotNull(this.depTracker);
}

/**
Expand All @@ -146,7 +150,13 @@ public void testGenerateWorkFragments() throws Exception {
int num_sites = catalogContext.numberOfSites;

this.ts.setTransactionId(TXN_ID);
TransactionInitRequest[] requests = this.prefetcher.generateWorkFragments(this.ts);
TransactionInitRequest.Builder[] builders = this.prefetcher.plan(this.ts,
this.ts.getProcedureParameters(),
this.depTracker);
TransactionInitRequest[] requests = new TransactionInitRequest[builders.length];
for (int i = 0; i < builders.length; i++) {
if (builders[i] != null) requests[i] = builders[i].build();
}
assertEquals(num_sites, requests.length);

// The TransactionInitRequest for the local partition will be the
Expand Down

0 comments on commit 51cf9d8

Please sign in to comment.