Skip to content

Commit

Permalink
Properly implemented sync/async damper transitions (#48, #49)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed May 18, 2021
1 parent 8176d88 commit 1f6dba1
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 20 deletions.
89 changes: 69 additions & 20 deletions dz3-model/src/main/java/net/sf/dz3/device/actuator/Damper.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,14 @@ public static class MoveGroup implements Callable<Future<TransitionStatus>> {
/**
* Thread pool for group transitions.
*
* This pool requires exactly one thread.
* This pool requires exactly two threads: one to scatter, one to gather.
*/
private final ExecutorService transitionExecutor = Executors.newFixedThreadPool(1);
private final ExecutorService transitionExecutor = Executors.newFixedThreadPool(2);

protected final Logger logger = LogManager.getLogger(getClass());

private final Map<Damper, Double> targetPosition;
private final boolean async;
private final long authToken;

/**
* @param targetPosition Map between damper and positions they're supposed to be set to.
Expand All @@ -173,30 +172,28 @@ public MoveGroup(Map<Damper, Double> targetPosition, boolean async) {

this.targetPosition = Collections.unmodifiableMap(targetPosition);
this.async = async;

this.authToken = rg.nextLong();

if (this.async) {
logger.fatal("FIXME: async not implemented");
}
}

@Override
public Future<TransitionStatus> call() throws Exception {

TransitionStatus status = new TransitionStatus(authToken);
final long authTokenScatter = rg.nextLong();
final long authTokenGather = rg.nextLong();

Runnable mover = new Runnable() {
TransitionStatus scatterStatus = new TransitionStatus(authTokenScatter);
TransitionStatus gatherStatus = new TransitionStatus(authTokenGather);
int count = targetPosition.size();
CompletionService<TransitionStatus> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(count));

Runnable scatter = new Runnable() {

@Override
public void run() {

ThreadContext.push("run");
ThreadContext.push("run/scatter");

try {

int count = targetPosition.size();
CompletionService<TransitionStatus> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(count));

for (Iterator<Entry<Damper, Double>> i = targetPosition.entrySet().iterator(); i.hasNext(); ) {

Expand All @@ -210,10 +207,46 @@ public void run() {
}

logger.debug("fired transitions");
scatterStatus.complete(authTokenScatter, null);

} catch (Throwable t) {

// This is potentially expensive - may slug the HVAC if the dampers are
// left closed while it is running, hence fatal level

logger.fatal("can't set damper position", t);

scatterStatus.complete(authTokenScatter, t);

} finally {

ThreadContext.pop();
ThreadContext.clearStack();
}
}
};

Future<TransitionStatus> scatterDone = transitionExecutor.submit(scatter, scatterStatus);

Runnable gather = new Runnable() {

@Override
public void run() {

ThreadContext.push("run/gather");

try {

logger.debug("waiting for transitions to be fired...");

scatterDone.get();

logger.debug("gathering transition results");

boolean ok = true;
int left = count;

while (count-- > 0) {
while (left-- > 0) {

Future<TransitionStatus> result = cs.take();
TransitionStatus s = result.get();
Expand All @@ -222,7 +255,7 @@ public void run() {

ok = s.isOK();

if (!ok) {
if (!s.isOK()) {

// This is potentially expensive - may slug the HVAC if the dampers are
// left closed while it is running, hence fatal level
Expand All @@ -235,11 +268,11 @@ public void run() {

if (ok) {

status.complete(authToken, null);
gatherStatus.complete(authTokenGather, null);
return;
}

status.complete(authToken, new IllegalStateException("one or more dampers failed to park"));
gatherStatus.complete(authTokenGather, new IllegalStateException("one or more dampers failed to set position"));

} catch (Throwable t) {

Expand All @@ -248,7 +281,7 @@ public void run() {

logger.fatal("can't set damper position", t);

status.complete(authToken, t);
scatterStatus.complete(authTokenGather, t);

} finally {

Expand All @@ -258,7 +291,23 @@ public void run() {
}
};

return transitionExecutor.submit(mover, status);
Future<TransitionStatus> done = transitionExecutor.submit(gather, gatherStatus);

if (async) {

logger.debug("async call, bailing out");
return done;
}

logger.debug("sync: waiting for completion...");

// We need to wait until all the transitions are complete

done.get();

logger.debug("sync: done");

return done;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,33 @@ public void testNaN() throws InterruptedException, ExecutionException {
}
}

/**
* Make sure that dampers are correctly parked on power off.
*/
public void testPowerOff() throws InterruptedException, ExecutionException {

ThreadContext.push("testPowerOff");

try {

Thermostat ts1 = new ThermostatModel("ts1", new NullSensor("address1", 0), new SimplePidController(20, 1, 0, 0, 0));

DummyDamper d1 = new DummyDamper("d1");

BalancingDamperController damperController = new BalancingDamperController();

damperController.put(ts1, d1);

damperController.powerOff();

// VT: FIXME: can't assert anything unless status is propagated here

} finally {

ThreadContext.pop();
}
}

private static class DummyDamper implements Damper {

private final String name;
Expand Down

0 comments on commit 1f6dba1

Please sign in to comment.