Skip to content

Commit

Permalink
Using Future instead of ACT; moving towards proper DamperMultiplexer (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Aug 11, 2018
1 parent 8297afd commit 72d4b6d
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 196 deletions.
3 changes: 2 additions & 1 deletion dz3-master/dz3-model/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {

compile project(':dz3-sensors')
compile 'net.sf.servomaster:servomaster-common:2.0.0-SNAPSHOT'

compile project(':dz3-sensors')
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package net.sf.dz3.device.actuator;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import net.sf.jukebox.datastream.signal.model.DataSink;
import net.sf.jukebox.datastream.signal.model.DataSource;
import net.sf.jukebox.jmx.JmxAttribute;
import net.sf.jukebox.jmx.JmxAware;
import net.sf.jukebox.sem.ACT;
import net.sf.servomaster.device.model.TransitionStatus;

/**
* The damper abstraction.
Expand All @@ -33,12 +35,12 @@ public interface Damper extends DataSink<Double>, DataSource<Double>, JmxAware {
* @param position 0 is fully closed, 1 is fully open, 0...1 corresponds
* to partially open position.
*
* @exception IOException if there was a problem communicating with the
* hardware.
* @return A token that allows to track the completion of the damper
* movement.
*
* @exception IllegalArgumentException if {@code position} is outside of 0...1 range.
*/
public void set(double position) throws IOException;
public Future<TransitionStatus> set(double position);

/**
* Get current damper position.
Expand Down Expand Up @@ -104,5 +106,26 @@ public interface Damper extends DataSink<Double>, DataSource<Double>, JmxAware {
* may take a while if the damper is configured with a transition
* controller).
*/
public ACT park();
public Future<TransitionStatus> park();

/**
* Synchronous wrapper for {@link Damper#set(double)}.
*/
public static class Move implements Callable<TransitionStatus> {

private final Damper target;
private final double position;

public Move(Damper target, double position) {

this.target = target;
this.position = position;
}

@Override
public TransitionStatus call() throws Exception {

return target.set(position).get();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.sf.dz3.device.actuator.impl;

import java.io.IOException;
import java.util.concurrent.Future;

import org.apache.logging.log4j.ThreadContext;

Expand All @@ -10,7 +11,7 @@
import net.sf.jukebox.datastream.signal.model.DataSample;
import net.sf.jukebox.datastream.signal.model.DataSink;
import net.sf.jukebox.logger.LogAware;
import net.sf.jukebox.sem.ACT;
import net.sf.servomaster.device.model.TransitionStatus;

/**
* @author Copyright &copy; <a href="mailto:vt@freehold.crocodile.org"> Vadim Tkachenko</a> 2001-2018
Expand Down Expand Up @@ -85,7 +86,7 @@ public final double getParkPosition() {
* {@inheritDoc}
*/
@Override
public final void set(double throttle) {
public final Future<TransitionStatus> set(double throttle) {

ThreadContext.push("set");

Expand All @@ -112,6 +113,11 @@ public final void set(double throttle) {
stateChanged();
}

// VT: FIXME: Not possible to all things in one commit.
// https://github.com/home-climate-control/dz/issues/48

return null;

} finally {
ThreadContext.pop();
}
Expand All @@ -130,34 +136,9 @@ public final void set(double throttle) {
* {@inheritDoc}
*/
@Override
public ACT park() {

ThreadContext.push("park");

try {

// VT: NOTE: Careful here. This implementation will work correctly only if
// moveDamper() works synchronously. For others (ServoDamper being a good example)
// you will have to provide your own implementation (again, ServoDamper is an
// example of how this is done).

ACT done = new ACT();

try {

moveDamper(parkPosition);
done.complete(true);

} catch (Throwable t) {

done.complete(false);
}
public Future<TransitionStatus> park() {

return done;

} finally {
ThreadContext.pop();
}
return set(getParkPosition());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package net.sf.dz3.device.actuator.impl;

import java.io.IOException;
import java.security.SecureRandom;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.logging.log4j.ThreadContext;

import net.sf.dz3.device.actuator.Damper;
import net.sf.jukebox.jmx.JmxDescriptor;
import net.sf.jukebox.sem.ACT;
import net.sf.jukebox.sem.SemaphoreGroup;
import net.sf.jukebox.service.Messenger;
import net.sf.servomaster.device.model.TransitionStatus;

/**
* Damper multiplexer.
Expand All @@ -23,6 +28,15 @@
*/
public class DamperMultiplexer extends AbstractDamper {

private static final Random rg = new SecureRandom();

/**
* Thread pool for parking assistants.
*
* This pool requires exactly one thread.
*/
private final ExecutorService parkingExecutor = Executors.newFixedThreadPool(1);

/**
* Dampers to control.
*/
Expand All @@ -47,20 +61,10 @@ protected synchronized void moveDamper(double position) throws IOException {

Damper d = i.next();

try {

d.set(position);

} catch (IOException ex) {

// VT: NOTE: Multiplexer is less prone to errors than a regular damper,
// because different dampers may be controlled by different controllers and
// not fail all at once. However, low probability of this happening
// makes it impractical to handle such errors separately. If you feel otherwise,
// feel free to interfere (i.e. not throw an exception if not all dampers failed).

throw new IOException("One of controlled dampers failed", ex);
}
// VT: FIXME: Not possible to all things in one commit.
// https://github.com/home-climate-control/dz/issues/49

d.set(position);
}
}

Expand Down Expand Up @@ -89,54 +93,95 @@ public JmxDescriptor getJmxDescriptor() {
/**
* {@inheritDoc}
*/
public ACT park() {
public Future<TransitionStatus> park() {

// VT: This implementation is similar to the one used in ServoDamper,
// but abstractions are different.
ThreadContext.push("park");

logger.info(getName() + ": parking at " + getParkPosition());
try {

return new ParkingAssistant().start();
}
final long authToken = rg.nextLong();
final TransitionStatus status = new TransitionStatus(authToken);

/**
* Commands the {@link ServoDamper#servo} to move to {@link ServoDamper#getParkPosition
* parked position} and waits until the servo has done so.
*/
private class ParkingAssistant extends Messenger {
Runnable parkingAssistant = new Runnable() {

/**
* Move the {@link ServoDamper#servo} and wait until it gets there.
*/
@Override
protected final Object execute() throws Throwable {
@Override
public void run() {

ThreadContext.push("execute");

try {

SemaphoreGroup parked = new SemaphoreGroup();

for (Iterator<Damper> i = dampers.iterator(); i.hasNext(); ) {

Damper d = i.next();

parked.add(d.park());
}
ThreadContext.push("run");

try {

logger.info(getName() + ": parking at " + getParkPosition());

// VT: NOTE: Ignoring state consistency of the damper collection

int count = dampers.size();
CompletionService<TransitionStatus> cs = new ExecutorCompletionService<>(Executors.newFixedThreadPool(count));

for (Iterator<Damper> i = dampers.iterator(); i.hasNext(); ) {

Damper d = i.next();

// VT: FIXME: https://github.com/home-climate-control/dz/issues/41
//
// Need to park dampers at their individual positions if they were specified,
// or to the multiplexer parking position if they weren't.
//
// At this point, it is not known whether the parking positions were specified or not
// (the parking position is double, not Double); need to fix this.

parked.waitForAll();
cs.submit(new Damper.Move(d, getParkPosition()));
}

logger.info(getName() + ": parked at " + getParkPosition());
boolean ok = true;

} catch (Throwable t) {
while (count-- > 0) {

Future<TransitionStatus> result = cs.take();
TransitionStatus s = result.get();

// This will cause the whole park() call to report failure

ok = s.isOK();

if (!ok) {

// This is potentially expensive - may slug the HVAC if the dampers are
// left closed while it is running, hence fatal level

logger.fatal("can't park one of the dampers", s.getCause());
}
}

if (ok) {

status.complete(authToken, null);
return;
}

status.complete(authToken, new IllegalStateException("one or more dampers failed to park"));

} catch (Throwable t) {

// This is potentially expensive - may slug the HVAC if the dampers are
// left closed while it is running, hence fatal level

logger.fatal("can't park", t);

status.complete(authToken, t);

} finally {

ThreadContext.pop();
ThreadContext.clearStack();
}
}
};

logger.error(getName() + ": failed to park at " + getParkPosition(), t);

} finally {
ThreadContext.pop();
}
return parkingExecutor.submit(parkingAssistant, status);

return null;
} finally {
ThreadContext.pop();
}
}
}
Loading

0 comments on commit 72d4b6d

Please sign in to comment.