Permalink
Browse files

Nice optimizations for the BatchPlanner: We now cache the StmtPartiti…

…ons and FragPartitions for single-partition plans so that we can swap them in place without having to reconstruct new sets everytime. This cuts down the overall time spent in the batch planner by half for single-partition txns. Also moved some unnecessary code that we were doing all the time in the Histogram into the internal methods so that they are lazily calculated
  • Loading branch information...
1 parent 893f20b commit 0425253c4382ba7fb9544f045445734493d59f2a @apavlo committed Feb 5, 2012
View
@@ -422,7 +422,7 @@ def setup_env():
# Bash Aliases
code_dir = os.path.join("$HOME", "hstore", env["hstore.git_branch"])
- log_dir = env.get("site.log_dir", os.path.join(code_dir, "obj/logs"))
+ log_dir = env.get("site.log_dir", os.path.join(code_dir, "obj/logs/sites"))
aliases = {
# H-Store Home
'hh': 'cd ' + code_dir,
@@ -79,6 +79,10 @@
* @see HStoreConf.SiteConf.planner_unique_dependency_ids
*/
private static final AtomicInteger NEXT_DEPENDENCY_ID = new AtomicInteger(FIRST_DEPENDENCY_ID);
+
+ private static Set<Integer> CACHED_SINGLE_PARTITION_SETS[];
+
+ private static Map<Statement, Map<PlanFragment, Set<Integer>>> CACHED_FRAGMENT_PARTITION_MAPS[];
// ----------------------------------------------------------------------------
// GLOBAL DATA MEMBERS
@@ -114,6 +118,8 @@
private final int cache_fastLookups[][];
private final BatchPlan cache_singlePartitionPlans[];
+ private Map<Statement, Map<PlanFragment, Set<Integer>>> cache_singlePartitionFragmentPartitions;
+
// PROFILING
private final ProfileMeasurement time_plan;
private final ProfileMeasurement time_partitionEstimator;
@@ -231,11 +237,13 @@ public PlanGraph() {
* StmtIndex -> Target Partition Ids
*/
private final Set<Integer>[] stmt_partitions;
+ private final Set<Integer>[] stmt_partitions_swap;
/**
* StmtIndex -> Map{PlanFragment, Set<PartitionIds>}
*/
private final Map<PlanFragment, Set<Integer>> frag_partitions[];
+ private final Map<PlanFragment, Set<Integer>> frag_partitions_swap[];
/**
* A bitmap of whether each query at the given index in the batch was single-partitioned or not
@@ -275,7 +283,9 @@ public BatchPlan(int max_round_size) {
// Batch Data
this.frag_list = (List<PlanFragment>[])new List<?>[batch_size];
this.stmt_partitions = (Set<Integer>[])new Set<?>[batch_size];
+ this.stmt_partitions_swap = (Set<Integer>[])new Set<?>[batch_size];
this.frag_partitions = (Map<PlanFragment, Set<Integer>>[])new HashMap<?, ?>[batch_size];
+ this.frag_partitions_swap = (Map<PlanFragment, Set<Integer>>[])new HashMap<?, ?>[batch_size];
this.singlepartition_bitmap = new boolean[batch_size];
for (int i = 0; i < batch_size; i++) {
this.stmt_partitions[i] = new HashSet<Integer>();
@@ -301,7 +311,7 @@ private BatchPlan init(long client_handle, Integer base_partition) {
for (int i = 0; i < this.frag_list.length; i++) {
if (this.frag_list[i] != null) this.frag_list[i] = null;
- if (this.stmt_partitions[i] != null) this.stmt_partitions[i].clear();
+ if (this.stmt_partitions[i] != null && this.stmt_partitions_swap[i] == null) this.stmt_partitions[i].clear();
// if (this.frag_partitions[i] != null) {
// for (Set<Integer> s : this.frag_partitions[i].values()) {
// s.clear();
@@ -422,14 +432,14 @@ public BatchPlanner(SQLStmt[] batchStmts, int batchSize, Procedure catalog_proc,
assert(p_estimator != null);
HStoreConf hstore_conf = HStoreConf.singleton();
-
+
+ this.num_partitions = CatalogUtil.getNumberOfPartitions(catalog_proc);
this.batchSize = batchSize;
this.maxRoundSize = hstore_conf.site.planner_max_round_size;
this.catalog_proc = catalog_proc;
this.catalog = catalog_proc.getCatalog();
this.p_estimator = p_estimator;
this.hasher = p_estimator.getHasher();
- this.num_partitions = CatalogUtil.getNumberOfPartitions(catalog_proc);
this.plan = new BatchPlan(this.maxRoundSize);
this.enable_profiling = hstore_conf.site.planner_profiling;
this.enable_caching = hstore_conf.site.planner_caching;
@@ -450,8 +460,7 @@ public BatchPlanner(SQLStmt[] batchStmts, int batchSize, Procedure catalog_proc,
this.stmt_is_readonly[i] = batchStmts[i].getStatement().getReadonly();
this.stmt_is_replicatedonly[i] = batchStmts[i].getStatement().getReplicatedonly() ||
batchStmts[i].getStatement().getSecondaryindex();
- if (t)
- LOG.trace(batchStmts[i].getStatement().fullName() + " -> " + this.stmt_is_replicatedonly[i]);
+ if (t) LOG.trace(batchStmts[i].getStatement().fullName() + " -> " + this.stmt_is_replicatedonly[i]);
// CACHING
// Since most batches are going to be single-partition, we will cache
@@ -477,6 +486,21 @@ public BatchPlanner(SQLStmt[] batchStmts, int batchSize, Procedure catalog_proc,
this.time_planGraph = null;
this.time_partitionEstimator = null;
}
+
+ // Static Members
+ if (CACHED_SINGLE_PARTITION_SETS == null) {
+ synchronized (BatchPlanner.class) {
+ if (CACHED_SINGLE_PARTITION_SETS == null) {
+ CACHED_SINGLE_PARTITION_SETS = (Set<Integer>[])new Set<?>[this.num_partitions];
+ CACHED_FRAGMENT_PARTITION_MAPS = (Map<Statement, Map<PlanFragment, Set<Integer>>>[])new Map<?, ?>[this.num_partitions];
+
+ for (int i = 0; i < num_partitions; i++) {
+ CACHED_SINGLE_PARTITION_SETS[i] = Collections.unmodifiableSet(Collections.singleton(i));
+ CACHED_FRAGMENT_PARTITION_MAPS[i] = new HashMap<Statement, Map<PlanFragment,Set<Integer>>>();
+ } // FOR
+ }
+ } // SYNCH
+ }
}
public BatchPlan getBatchPlan() {
@@ -527,9 +551,8 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
Histogram<Integer> touched_partitions,
ParameterSet[] batchArgs) {
if (this.enable_profiling) time_plan.start();
- if (d)
- LOG.debug(String.format("Constructing a new %s BatchPlan for %s txn #%d",
- this.catalog_proc.getName(), (predict_singlepartitioned ? "single-partition" : "distributed"), txn_id));
+ if (d) LOG.debug(String.format("Constructing a new %s BatchPlan for %s txn #%d",
+ this.catalog_proc.getName(), (predict_singlepartitioned ? "single-partition" : "distributed"), txn_id));
boolean cache_isSinglePartition[] = null;
@@ -541,19 +564,16 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
// OPTIMIZATION: Skip all of this if we know that we're always suppose to be single-partitioned
if (this.force_singlePartition == false) {
for (int stmt_index = 0; stmt_index < this.batchSize; stmt_index++) {
- Object params[] = batchArgs[stmt_index].toArray();
-
if (cache_fastLookups[stmt_index] == null) {
- if (d)
- LOG.debug(String.format("[#%d-%02d] No fast look-ups for %s. Cache is marked as not single-partitioned",
- txn_id, stmt_index, this.catalog_stmts[stmt_index].fullName()));
+ if (d) LOG.debug(String.format("[#%d-%02d] No fast look-ups for %s. Cache is marked as not single-partitioned",
+ txn_id, stmt_index, this.catalog_stmts[stmt_index].fullName()));
cache_isSinglePartition[stmt_index] = false;
} else {
- if (d)
- LOG.debug(String.format("[#%d-%02d] Using fast-lookup caching for %s: %s",
- txn_id, stmt_index,
- this.catalog_stmts[stmt_index].fullName(),
- Arrays.toString(cache_fastLookups[stmt_index])));
+ if (d) LOG.debug(String.format("[#%d-%02d] Using fast-lookup caching for %s: %s",
+ txn_id, stmt_index,
+ this.catalog_stmts[stmt_index].fullName(),
+ Arrays.toString(cache_fastLookups[stmt_index])));
+ Object params[] = batchArgs[stmt_index].toArray();
cache_isSinglePartition[stmt_index] = true;
for (int idx : cache_fastLookups[stmt_index]) {
if (hasher.hash(params[idx]) != base_partition.intValue()) {
@@ -562,21 +582,18 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
}
} // FOR
}
- if (d)
- LOG.debug(String.format("[#%d-%02d] cache_isSinglePartition[%s] = %s",
- txn_id, stmt_index, this.catalog_stmts[stmt_index].fullName(), cache_isSinglePartition[stmt_index]));
+ if (d) LOG.debug(String.format("[#%d-%02d] cache_isSinglePartition[%s] = %s",
+ txn_id, stmt_index, this.catalog_stmts[stmt_index].fullName(), cache_isSinglePartition[stmt_index]));
is_allSinglePartition = is_allSinglePartition && cache_isSinglePartition[stmt_index];
} // FOR (Statement)
}
- if (t)
- LOG.trace(String.format("[#%d] is_allSinglePartition=%s", txn_id, is_allSinglePartition));
+ if (t) LOG.trace(String.format("[#%d] is_allSinglePartition=%s", txn_id, is_allSinglePartition));
// If all of the Statements are single-partition, then we can use the cached BatchPlan
// if we already have one. This saves a lot of trouble
if (is_allSinglePartition && cache_singlePartitionPlans[base_partition.intValue()] != null) {
- if (d)
- LOG.debug(String.format("[#%d] Using cached BatchPlan at partition #%02d: %s",
- txn_id, base_partition, Arrays.toString(this.catalog_stmts)));
+ if (d) LOG.debug(String.format("[#%d] Using cached BatchPlan at partition #%02d: %s",
+ txn_id, base_partition, Arrays.toString(this.catalog_stmts)));
if (this.enable_profiling) time_plan.stop();
return (cache_singlePartitionPlans[base_partition.intValue()]);
}
@@ -606,9 +623,8 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
final Statement catalog_stmt = this.catalog_stmts[stmt_index];
assert(catalog_stmt != null) : "The Statement at index " + stmt_index + " is null for " + this.catalog_proc;
final Object params[] = batchArgs[stmt_index].toArray();
- if (t)
- LOG.trace(String.format("[#%d-%02d] Calculating touched partitions plans for %s",
- txn_id, stmt_index, catalog_stmt.fullName()));
+ if (t) LOG.trace(String.format("[#%d-%02d] Calculating touched partitions plans for %s",
+ txn_id, stmt_index, catalog_stmt.fullName()));
Map<PlanFragment, Set<Integer>> frag_partitions = plan.frag_partitions[stmt_index];
Set<Integer> stmt_all_partitions = plan.stmt_partitions[stmt_index];
@@ -638,24 +654,40 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
}
}
assert(has_singlepartition_plan);
- for (PlanFragment catalog_frag : catalog_stmt.getFragments().values()) {
- Set<Integer> p = frag_partitions.get(catalog_frag);
- if (p == null) {
- p = new HashSet<Integer>();
- frag_partitions.put(catalog_frag, p);
- } else {
- p.clear();
- }
- p.add(base_partition);
- } // FOR
- stmt_all_partitions.add(base_partition);
+
+ if (this.cache_singlePartitionFragmentPartitions == null) {
+ this.cache_singlePartitionFragmentPartitions = CACHED_FRAGMENT_PARTITION_MAPS[base_partition.intValue()];
+ }
+ Map<PlanFragment, Set<Integer>> cached_frag_partitions = this.cache_singlePartitionFragmentPartitions.get(catalog_stmt);
+ if (cached_frag_partitions == null) {
+ cached_frag_partitions = new HashMap<PlanFragment, Set<Integer>>();
+ Set<Integer> p = CACHED_SINGLE_PARTITION_SETS[base_partition.intValue()];
+ for (PlanFragment catalog_frag : catalog_stmt.getFragments().values()) {
+ cached_frag_partitions.put(catalog_frag, p);
+ } // FOR
+ this.cache_singlePartitionFragmentPartitions.put(catalog_stmt, cached_frag_partitions);
+ }
+ if (plan.stmt_partitions_swap[stmt_index] == null) {
+ plan.stmt_partitions_swap[stmt_index] = plan.stmt_partitions[stmt_index];
+ plan.frag_partitions_swap[stmt_index] = plan.frag_partitions[stmt_index];
+ }
+ stmt_all_partitions = plan.stmt_partitions[stmt_index] = CACHED_SINGLE_PARTITION_SETS[base_partition.intValue()];
+ frag_partitions = plan.frag_partitions[stmt_index] = cached_frag_partitions;
}
// Otherwise figure out whether the query can execute as single-partitioned or not
else {
- if (t)
- LOG.trace(String.format("[#%d-%02d] Computing touched partitions %s in txn #%d with the PartitionEstimator",
- txn_id, stmt_index, catalog_stmt.fullName(), txn_id));
+ if (t) LOG.trace(String.format("[#%d-%02d] Computing touched partitions %s in txn #%d with the PartitionEstimator",
+ txn_id, stmt_index, catalog_stmt.fullName(), txn_id));
+
+ if (plan.stmt_partitions_swap[stmt_index] != null) {
+ stmt_all_partitions = plan.stmt_partitions[stmt_index] = plan.stmt_partitions_swap[stmt_index];
+ plan.stmt_partitions_swap[stmt_index] = null;
+ stmt_all_partitions.clear();
+
+ frag_partitions = plan.frag_partitions[stmt_index] = plan.frag_partitions_swap[stmt_index];
+ plan.frag_partitions_swap[stmt_index] = null;
+ }
try {
// OPTIMIZATION: If we were told that the transaction is suppose to be single-partitioned, then we will
@@ -666,10 +698,8 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
// whether the query should be single-partitioned or not. This is because a query may actually just want
// to execute on just one partition (note that it could be a local partition or the remote partition).
// We'll assume that it's single-partition <<--- Can we cache that??
- boolean first = true;
while (true) {
- if (first == false) stmt_all_partitions.clear();
- first = false;
+ if (is_singlepartition == false) stmt_all_partitions.clear();
fragments = (is_singlepartition ? catalog_stmt.getFragments() : catalog_stmt.getMs_fragments());
// PARTITION ESTIMATOR
@@ -678,7 +708,7 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
stmt_all_partitions,
fragments.values(),
params,
- plan.base_partition);
+ base_partition);
if (this.enable_profiling) ProfileMeasurement.swap(this.time_partitionEstimator, this.time_plan);
int stmt_all_partitions_size = stmt_all_partitions.size();
@@ -694,7 +724,7 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
is_singlepartition = false;
continue;
}
- is_local = (stmt_all_partitions_size == 1 && stmt_all_partitions.contains(plan.base_partition));
+ is_local = (stmt_all_partitions_size == 1 && stmt_all_partitions.contains(base_partition));
if (is_local == false && predict_singlepartitioned) {
// Again, this is not what was suppose to happen!
if (t) LOG.trace(String.format("Mispredicted txn #%d - Remote Partitions %s",
@@ -725,9 +755,8 @@ public BatchPlan plan(Long txn_id, long client_handle, Integer base_partition,
throw new RuntimeException("Unexpected error when planning " + catalog_stmt.fullName(), ex);
}
}
- if (d)
- LOG.debug(String.format("[#%d-%02d] is_singlepartition=%s, partitions=%s",
- txn_id, stmt_index, is_singlepartition, stmt_all_partitions));
+ if (d) LOG.debug(String.format("[#%d-%02d] is_singlepartition=%s, partitions=%s",
+ txn_id, stmt_index, is_singlepartition, stmt_all_partitions));
// Get a sorted list of the PlanFragments that we need to execute for this query
if (is_singlepartition) {
Oops, something went wrong.

0 comments on commit 0425253

Please sign in to comment.