Skip to content

Commit

Permalink
Moved asynchronous operations from moveDamper() to set() (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed May 18, 2021
1 parent 8c73060 commit e2a1432
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package net.sf.dz3.device.actuator.impl;

import java.util.concurrent.CompletableFuture;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import net.sf.servomaster.device.model.TransitionStatus;
Expand All @@ -21,6 +25,14 @@ public abstract class AbstractDamper implements Damper {

protected final Logger logger = LogManager.getLogger(getClass());

/**
* Completion service for asynchronous transitions.
*
* This pool requires exactly one thread, to honor the happened-before relation
* between the series of commands sent to this damper.
*/
CompletionService<TransitionStatus> transitionCompletionService = new ExecutorCompletionService<>(Executors.newFixedThreadPool(1));

/**
* Damper name.
*
Expand Down Expand Up @@ -99,55 +111,54 @@ public boolean isCustomParkPosition() {
@Override
public final Future<TransitionStatus> set(double throttle) {

ThreadContext.push("set");

try {

logger.info("{}: position={}", getName(), throttle);
// VT: NOTE: This object is bogus - the whole concept needs to be revisited; see #132

if ( throttle < 0 || throttle > 1.0 || Double.compare(throttle, Double.NaN) == 0) {
int authToken = hashCode();
TransitionStatus result = new TransitionStatus(authToken);

throw new IllegalArgumentException("Throttle out of 0...1 range: " + throttle);
}
Callable<TransitionStatus> c = () -> {

this.position = throttle;
ThreadContext.push("set/" + getName());

try {

Future<TransitionStatus> done = moveDamper(throttle);
stateChanged();
logger.info("position={}", throttle);

return done;
if ( throttle < 0 || throttle > 1.0 || Double.compare(throttle, Double.NaN) == 0) {

} catch (Throwable t) {
throw new IllegalArgumentException("Throttle out of 0...1 range: " + throttle);
}

logger.fatal("Failed to move damper to position " + throttle, t);
this.position = throttle;

// VT: FIXME: Need to change Damper to be a producer of DataSample<Double>, not Double
moveDamper(throttle);
stateChanged();

TransitionStatus done = new TransitionStatus(t.hashCode());
done.complete(t.hashCode(), t);
result.complete(authToken, null);

return CompletableFuture.completedFuture(done);
return result;

} finally {
ThreadContext.pop();
}
};

} finally {
ThreadContext.pop();
}
return transitionCompletionService.submit(c);
}

/**
* Move the actual damper.
*
* @param position Position to set.
*
* @exception IOException if there was a problem moving the damper.
*/
protected abstract Future<TransitionStatus> moveDamper(double position);
protected abstract void moveDamper(double position) throws IOException;

@Override
public Future<TransitionStatus> park() {

ThreadContext.push("park");
ThreadContext.push("park/" + getName());

try {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.logging.log4j.ThreadContext;

import net.sf.dz3.device.actuator.Damper;
import com.homeclimatecontrol.jukebox.jmx.JmxDescriptor;
import net.sf.dz3.instrumentation.Marker;
import net.sf.servomaster.device.model.TransitionStatus;

/**
Expand All @@ -27,13 +27,6 @@
*/
public class DamperMultiplexer extends AbstractDamper {

/**
* Completion service for asynchronous transitions.
*
* This pool requires exactly one thread.
*/
CompletionService<TransitionStatus> transitionCompletionService = new ExecutorCompletionService<>(Executors.newSingleThreadExecutor());

/**
* Dampers to control.
*/
Expand Down Expand Up @@ -77,7 +70,7 @@ public DamperMultiplexer(String name, Set<Damper> dampers) {
}

@Override
protected synchronized Future<TransitionStatus> moveDamper(double position) {
protected void moveDamper(double position) throws IOException {

multiPosition = position;

Expand All @@ -88,27 +81,19 @@ protected synchronized Future<TransitionStatus> moveDamper(double position) {
targetPosition.put(i.next(), position);
}

return moveDampers(targetPosition);
moveDampers(targetPosition);
}

private Future<TransitionStatus> moveDampers(Map<Damper, Double> targetPosition) {
private void moveDampers(Map<Damper, Double> targetPosition) throws IOException {

transitionCompletionService.submit(new Damper.MoveGroup(targetPosition, false));

try {

// VT: NOTE: The following line unwraps one level of Future. The first Future
// is completed when the transitions have been fired, and the second is
// when they all complete.
transitionCompletionService.take().get();

return transitionCompletionService.take();

} catch (InterruptedException ex) {

// VT: FIXME: Oops... Really don't know what to do with this, will have to collect stats
// before this can be reasonably handled

throw new IllegalStateException("Unhandled exception", ex);
} catch (InterruptedException | ExecutionException ex) {
throw new IOException("failed to move dampers?", ex);
}
}

Expand All @@ -133,39 +118,54 @@ public JmxDescriptor getJmxDescriptor() {
*/
public Future<TransitionStatus> park() {

ThreadContext.push("park");
// VT: NOTE: This object is bogus - the whole concept needs to be revisited; see #132

try {
int authToken = hashCode();
TransitionStatus result = new TransitionStatus(authToken);

multiPosition = getParkPosition();
Callable<TransitionStatus> c = () -> {

// Need to park dampers at their individual positions if they were specified,
// or to the multiplexer parking position if they weren't.

Map<Damper, Double> targetPosition = new HashMap<>();
Marker m = new Marker("park/multi");
ThreadContext.push("park/multi");

for (Iterator<Damper> i = dampers.iterator(); i.hasNext(); ) {
try {

Damper d = i.next();
boolean custom = d.isCustomParkPosition();
double position = custom ? d.getParkPosition() : getParkPosition();
multiPosition = getParkPosition();

targetPosition.put(d, position);
// Need to park dampers at their individual positions if they were specified,
// or to the multiplexer parking position if they weren't.

logger.debug("{}: {} ({})", d.getName(), position, custom ? "custom" : "multiplexer");
Map<Damper, Double> targetPosition = new HashMap<>();

if (custom) {
for (Iterator<Damper> i = dampers.iterator(); i.hasNext(); ) {

// Dude, you better know what you're doing
Damper d = i.next();
boolean custom = d.isCustomParkPosition();
double position = custom ? d.getParkPosition() : getParkPosition();

logger.warn("{} will be parked at custom position {}, consider changing hardware layout so override is not necessary", d.getName(), position);
targetPosition.put(d, position);

logger.debug("{}: {} ({})", d.getName(), position, custom ? "custom" : "multiplexer");

if (custom) {
logger.warn("{} will be parked at custom position {}, consider changing hardware layout so override is not necessary", d.getName(), position);
}
}
}

return moveDampers(targetPosition);
moveDampers(targetPosition);

} finally {
ThreadContext.pop();
}
result.complete(authToken, null);
return result;

} finally {

m.close();
ThreadContext.pop();
ThreadContext.clearStack();
}
};

return transitionCompletionService.submit(c);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package net.sf.dz3.device.actuator.impl;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import org.apache.logging.log4j.ThreadContext;

Expand Down Expand Up @@ -33,21 +31,15 @@ public NullDamper(String name) {
}

@Override
public Future<TransitionStatus> moveDamper(double throttle) {
public void moveDamper(double throttle) throws IOException {

ThreadContext.push("moveDamper");

try {

logger.debug("new position: " + throttle);
this.throttle = throttle;

TransitionStatus status = new TransitionStatus(0);

status.complete(0, null);

return CompletableFuture.completedFuture(status);

} finally {
ThreadContext.pop();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package net.sf.dz3.device.actuator.impl;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import org.apache.logging.log4j.ThreadContext;

import net.sf.dz3.device.sensor.Switch;
import com.homeclimatecontrol.jukebox.jmx.JmxDescriptor;
import net.sf.dz3.instrumentation.Marker;
import net.sf.servomaster.device.model.TransitionStatus;

/**
* Damper controlled by a switch.
Expand Down Expand Up @@ -145,16 +142,10 @@ private void check(double threshold) {
}

@Override
public Future<TransitionStatus> moveDamper(double position) {
public void moveDamper(double position) throws IOException {

ThreadContext.push("moveDamper");

// VT: NOTE: For now, let's assume that transition is instantaneous.
// However, let's keep an eye on it - there may be slow acting switches
// such as ShellSwitch.

Marker m = new Marker("moveDamper");
TransitionStatus status = new TransitionStatus(m.hashCode());

try {

Expand Down Expand Up @@ -185,23 +176,20 @@ public Future<TransitionStatus> moveDamper(double position) {

this.position = position;

status.complete(m.hashCode(), null);

} catch (Throwable t) {

// This is pretty serious, closed damper may cause the compressor to slug
// or the boiler to blow up
logger.fatal("Failed to set the damper state", t);
// or the boiler to blow up - so no harm in logging this multiple times, hopefully

status.complete(m.hashCode(), t);
logger.fatal("failed to set state for {}", target.getAddress(), t);

throw new IOException("failed to set state for " + target.getAddress(), t);

} finally {

m.close();
ThreadContext.pop();
}

return CompletableFuture.completedFuture(status);
}

@Override
Expand Down

0 comments on commit e2a1432

Please sign in to comment.