Skip to content

Commit

Permalink
Simplified MoveGroup (#48, #130, #131)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed May 18, 2021
1 parent 680d251 commit d657e63
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 116 deletions.
146 changes: 37 additions & 109 deletions dz3-model/src/main/java/net/sf/dz3/device/actuator/Damper.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package net.sf.dz3.device.actuator;

import java.io.IOException;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -156,16 +154,7 @@ public TransitionStatus call() throws Exception {
/**
* Utility class to move a set of dampers to given positions, synchronously or asynchronously.
*/
public static class MoveGroup implements Callable<Future<TransitionStatus>> {

private static final Random rg = new SecureRandom();

/**
* Thread pool for group transitions.
*
* This pool requires exactly two threads: one to scatter, one to gather.
*/
private final ExecutorService transitionExecutor = Executors.newFixedThreadPool(2);
public static class MoveGroup implements Callable<TransitionStatus> {

protected final Logger logger = LogManager.getLogger(getClass());

Expand All @@ -185,131 +174,70 @@ public MoveGroup(Map<Damper, Double> targetPosition, boolean async) {
}

@Override
public Future<TransitionStatus> call() throws Exception {

final long authTokenScatter = rg.nextLong();
final long authTokenGather = rg.nextLong();

TransitionStatus scatterStatus = new TransitionStatus(authTokenScatter);
TransitionStatus gatherStatus = new TransitionStatus(authTokenGather);
int count = targetPosition.size();
CompletionService<TransitionStatus> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(count));

Runnable scatter = () -> {

ThreadContext.push("run/scatter");

try {


for (Iterator<Entry<Damper, Double>> i = targetPosition.entrySet().iterator(); i.hasNext(); ) {
public TransitionStatus call() throws Exception {

Entry<Damper, Double> entry = i.next();
Damper d = entry.getKey();
double position = entry.getValue();
ThreadContext.push("run/scatter");

logger.debug("{}: {}", d.getName(), position);
try {

cs.submit(new Damper.Move(d, position));
}
// VT: NOTE: This object is bogus - the whole concept needs to be revisited;
// see #132

logger.debug("fired transitions");
scatterStatus.complete(authTokenScatter, null);
TransitionStatus result = new TransitionStatus(hashCode());

} catch (Throwable t) {
result.complete(hashCode(), null);

// This is potentially expensive - may slug the HVAC if the dampers are
// left closed while it is running, hence fatal level
int count = targetPosition.size();
CompletionService<TransitionStatus> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(count));

logger.fatal("can't set damper position", t);
for (Iterator<Entry<Damper, Double>> i = targetPosition.entrySet().iterator(); i.hasNext(); ) {

scatterStatus.complete(authTokenScatter, t);
Entry<Damper, Double> entry = i.next();
Damper d = entry.getKey();
double position = entry.getValue();

} finally {
logger.debug("{}: {}", d.getName(), position);

ThreadContext.pop();
ThreadContext.clearStack();
cs.submit(new Damper.Move(d, position));
}
};

Future<TransitionStatus> scatterDone = transitionExecutor.submit(scatter, scatterStatus);
logger.debug("fired transitions");

Runnable gather = () -> {
if (async) {
logger.debug("async call, bailing out");
return result;
}

ThreadContext.pop();
ThreadContext.push("run/gather");

try {

logger.debug("waiting for transitions to be fired...");

scatterDone.get();

logger.debug("gathering transition results");
int left = count;

boolean ok = true;
int left = count;
while (left-- > 0) {

while (left-- > 0) {
try {

Future<TransitionStatus> result = cs.take();
TransitionStatus s = result.get();
cs.take().get();

// This will cause the whole park() call to report failure
} catch (ExecutionException ex) {

ok = s.isOK();
// This is potentially expensive - may slug the HVAC if the dampers are
// left closed while it is running, hence fatal level, and rethrow
// to indicate the group move failure

if (!s.isOK()) {
logger.fatal("can't set damper position", ex);

// This is potentially expensive - may slug the HVAC if the dampers are
// left closed while it is running, hence fatal level

logger.fatal("can't set damper position", s.getCause());
}
}

logger.debug("transitions complete, ok={}", ok);

if (ok) {

gatherStatus.complete(authTokenGather, null);
return;
throw new IllegalStateException("group move failed", ex);
}

gatherStatus.complete(authTokenGather, new IllegalStateException("one or more dampers failed to set position"));

} catch (Throwable t) {

// This is potentially expensive - may slug the HVAC if the dampers are
// left closed while it is running, hence fatal level

logger.fatal("can't set damper position", t);

scatterStatus.complete(authTokenGather, t);

} finally {

ThreadContext.pop();
ThreadContext.clearStack();
}
};

Future<TransitionStatus> done = transitionExecutor.submit(gather, gatherStatus);
return result;

if (async) {
} finally {

logger.debug("async call, bailing out");
return done;
ThreadContext.pop();
ThreadContext.clearStack();
}

logger.debug("sync: waiting for completion...");

// We need to wait until all the transitions are complete

done.get();

logger.debug("sync: done");

return done;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -33,7 +32,7 @@ public class DamperMultiplexer extends AbstractDamper {
*
* This pool requires exactly one thread.
*/
CompletionService<Future<TransitionStatus>> transitionCompletionService = new ExecutorCompletionService<>(Executors.newSingleThreadExecutor());
CompletionService<TransitionStatus> transitionCompletionService = new ExecutorCompletionService<>(Executors.newSingleThreadExecutor());

/**
* Dampers to control.
Expand Down Expand Up @@ -102,9 +101,9 @@ private Future<TransitionStatus> moveDampers(Map<Damper, Double> targetPosition)
// is completed when the transitions have been fired, and the second is
// when they all complete.

return transitionCompletionService.take().get();
return transitionCompletionService.take();

} catch (InterruptedException | ExecutionException ex) {
} catch (InterruptedException ex) {

// VT: FIXME: Oops... Really don't know what to do with this, will have to collect stats
// before this can be reasonably handled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class AbstractDamperController implements DamperController, JmxA
*
* This pool requires exactly one thread.
*/
CompletionService<Future<TransitionStatus>> transitionCompletionService = new ExecutorCompletionService<>(Executors.newFixedThreadPool(1));
CompletionService<TransitionStatus> transitionCompletionService = new ExecutorCompletionService<>(Executors.newFixedThreadPool(1));

/**
* Association from a thermostat to a damper.
Expand Down Expand Up @@ -290,9 +290,9 @@ private Future<TransitionStatus> shuffle(Map<Damper, Double> damperMap, boolean
// is completed when the transitions have been fired, and the second is
// when they all complete.

return transitionCompletionService.take().get();
return transitionCompletionService.take();

} catch (InterruptedException | ExecutionException ex) {
} catch (InterruptedException ex) {

// VT: FIXME: Oops... Really don't know what to do with this, will have to collect stats
// before this can be reasonably handled
Expand Down

0 comments on commit d657e63

Please sign in to comment.