Skip to content

Commit

Permalink
Ok well now that was easy. PartitionExecutor.executeSQLStmtBatch() wi…
Browse files Browse the repository at this point in the history
…ll notify remote sites that we are done with their partitions whenever we're at the final task of a dtxn. We can do this even if the markov model estimations are not enabled
  • Loading branch information
apavlo committed Jun 28, 2013
1 parent 2ef0f01 commit 151802b
Showing 1 changed file with 55 additions and 35 deletions.
90 changes: 55 additions & 35 deletions src/frontend/edu/brown/hstore/PartitionExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,12 @@ private class DonePartitionNotification {
*/
private PartitionSet[] notificationsPerSite;

public void addSiteNotification(Site remoteSite, int partitionId) {
/**
* Sites that we need to notify separately about the done partitions.
*/
private Collection<Site> _sitesToNotify;

public void addSiteNotification(Site remoteSite, int partitionId, boolean noQueriesInBatch) {
int remoteSiteId = remoteSite.getId();
if (this.notificationsPerSite == null) {
this.notificationsPerSite = new PartitionSet[catalogContext.numberOfSites];
Expand All @@ -525,6 +530,16 @@ public void addSiteNotification(Site remoteSite, int partitionId) {
this.notificationsPerSite[remoteSiteId] = new PartitionSet();
}
this.notificationsPerSite[remoteSiteId].add(partitionId);
if (noQueriesInBatch) {
if (this._sitesToNotify == null) {
this._sitesToNotify = new HashSet<Site>();
}
this._sitesToNotify.add(remoteSite);
}
}

public boolean hasSitesToNotify() {
return (this._sitesToNotify != null && this._sitesToNotify.isEmpty() == false);
}
}

Expand Down Expand Up @@ -3325,10 +3340,13 @@ else if (debug.val) {
LOG.trace(String.format("%s - Sending %s directly to the ExecutionEngine at partition %d",
ts, plan.getClass().getSimpleName(), this.partitionId));

// TODO: If this the finalTask flag is set to true, and we're only executing queries at this
// partition, then we need to notify the other partitions that we're done with them.
// If this the finalTask flag is set to true, and we're only executing queries at this
// partition, then we need to notify the other partitions that we're done with them.
if (finalTask && ts.isPredictSinglePartition() == false) {

tmp_fragmentsPerPartition.clearValues();
tmp_fragmentsPerPartition.put(this.partitionId, batchSize);
DonePartitionNotification notify = this.computeDonePartitions(ts, null, tmp_fragmentsPerPartition, finalTask);
if (notify.hasSitesToNotify()) this.notifyDonePartitions(ts, notify);
}

// Execute the queries right away.
Expand Down Expand Up @@ -3617,14 +3635,18 @@ private DonePartitionNotification computeDonePartitions(final LocalTransaction t
}
// Otherwise, we'll rely on the transaction's current estimate to figure it out.
else {
assert(estimate != null) :
String.format("Unexpected null %s for %s when finalTask is not true", ts);
estDonePartitions = estimate.getDonePartitions(this.thresholds);
if (estDonePartitions == null || estDonePartitions.isEmpty()) {
if (debug.val)
LOG.debug(String.format("%s - There are no new done partitions identified by %s",
ts, estimate.getClass().getSimpleName()));
return (null);
}
}
if (estDonePartitions == null || estDonePartitions.isEmpty()) {
if (debug.val)
LOG.debug(String.format("%s - There are no new done partitions identified by %s",
ts, estimate.getClass().getSimpleName()));
return (null);
}
assert(estDonePartitions != null) : "Null done partitions for " + ts;
assert(estDonePartitions.isEmpty() == false) : "Empty done partitions for " + ts;

if (debug.val)
LOG.debug(String.format("%s - New estimated done partitions %s%s",
Expand All @@ -3639,7 +3661,6 @@ private DonePartitionNotification computeDonePartitions(final LocalTransaction t

// Make sure that we only tell partitions that we actually touched, otherwise they will
// be stuck waiting for a finish request that will never come!
final Collection<Site> toNotify = new HashSet<Site>();
DonePartitionNotification notify = new DonePartitionNotification();
for (int partition : estDonePartitions.values()) {
// Only mark the txn done at this partition if the Estimate says we were done
Expand Down Expand Up @@ -3668,45 +3689,43 @@ private DonePartitionNotification computeDonePartitions(final LocalTransaction t
// the same site
else {
Site remoteSite = catalogContext.getSiteForPartition(partition);
notify.addSiteNotification(remoteSite, partition);

boolean found = false;
for (Partition remotePartition : remoteSite.getPartitions().values()) {
if (fragmentsPerPartition.get(remotePartition.getId(), 0) != 0) {
found = true;
break;
}
} // FOR
if (found == false) toNotify.add(remoteSite);
notify.addSiteNotification(remoteSite, partition, (found == false));
}
}
} // FOR

// BLAST OUT NOTIFICATIONS!
if (toNotify.isEmpty() == false) {
for (Site remoteSite : toNotify) {
int remoteSiteId = remoteSite.getId();
if (debug.val)
LOG.info(String.format("%s - Notifying %s that txn is finished with partitions %s",
ts, HStoreThreadManager.formatSiteName(remoteSiteId),
notify.notificationsPerSite[remoteSite.getId()]));
hstore_coordinator.transactionPrepare(ts, ts.getPrepareCallback(),
notify.notificationsPerSite[remoteSiteId]);
notify.notificationsPerSite[remoteSiteId] = null; // play it safe!
} // FOR
}

return (notify);
}

/**
*
* @param ts
* @param parameters
* @param allFragmentBuilders
* @param finalTask Whether the txn has marked this as the last batch that they will ever execute
* @return
* Send asynchronous notification messages to any remote site to tell them that we
* are done with partitions that they have.
* @param ts
* @param notify
*/
private void notifyDonePartitions(LocalTransaction ts, DonePartitionNotification notify) {
// BLAST OUT NOTIFICATIONS!
for (Site remoteSite : notify._sitesToNotify) {
int remoteSiteId = remoteSite.getId();
if (debug.val)
LOG.info(String.format("%s - Notifying %s that txn is finished with partitions %s",
ts, HStoreThreadManager.formatSiteName(remoteSiteId),
notify.notificationsPerSite[remoteSite.getId()]));
hstore_coordinator.transactionPrepare(ts, ts.getPrepareCallback(),
notify.notificationsPerSite[remoteSiteId]);

// Make sure that we remove the PartitionSet for this site so that we don't
// try to send the notifications again.
notify.notificationsPerSite[remoteSiteId] = null;
} // FOR
}


/**
* Execute the given tasks and then block the current thread waiting for the list of dependency_ids to come
Expand Down Expand Up @@ -3820,6 +3839,7 @@ public VoltTable[] dispatchWorkFragments(final LocalTransaction ts,
lastEstimate != null &&
lastEstimate.isValid()) {
notify = this.computeDonePartitions(ts, lastEstimate, tmp_fragmentsPerPartition, finalTask);
if (notify.hasSitesToNotify()) this.notifyDonePartitions(ts, notify);
}

// Attach the ParameterSets to our transaction handle so that anybody on this HStoreSite
Expand Down

0 comments on commit 151802b

Please sign in to comment.