Skip to content

Commit

Permalink
Allowing null setpoints; detailed review and followup required (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Jun 6, 2024
1 parent 34ebb1d commit 93657bb
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import reactor.core.publisher.FluxSink;

import java.time.Duration;
import java.util.Optional;

/**
* Base class for reactive process controllers.
Expand All @@ -22,10 +23,10 @@ public abstract class AbstractProcessController<I, O, P> implements ProcessContr
/**
* The process setpoint.
*/
private double setpoint;
private Double setpoint;

private final Flux<Double> setpointFlux;
private FluxSink<Double> setpointSink;
private final Flux<Optional<Double>> setpointFlux;
private FluxSink<Optional<Double>> setpointSink;

/**
* The current process variable value.
Expand All @@ -43,26 +44,26 @@ public abstract class AbstractProcessController<I, O, P> implements ProcessContr
* @param jmxName This controller's JMX name.
* @param setpoint Initial setpoint.
*/
protected AbstractProcessController(String jmxName, double setpoint) {
protected AbstractProcessController(String jmxName, Double setpoint) {
this.jmxName = jmxName;

setpointFlux = Flux.create(this::connectSetpoint);
setpointFlux.subscribe(s -> this.setpoint = s);
setpointFlux.subscribe(s -> this.setpoint = s.orElse(null));

setSetpoint(setpoint);
}

private void connectSetpoint(FluxSink<Double> setpointSink) {
private void connectSetpoint(FluxSink<Optional<Double>> setpointSink) {
this.setpointSink = setpointSink;
}

@Override
public void setSetpoint(double setpoint) {
setpointSink.next(setpoint);
public void setSetpoint(Double setpoint) {
setpointSink.next(Optional.ofNullable(setpoint));
}

@Override
public double getSetpoint() {
public Double getSetpoint() {
return setpoint;
}

Expand All @@ -72,17 +73,16 @@ public Signal<I, P> getProcessVariable() {
}

@Override
public final synchronized double getError() {
public final synchronized Double getError() {

if (pv == null) {
// No sample, no error
return 0;
if (setpoint == null || pv == null) {
return null;
}

return getError(pv, setpoint);
}

protected abstract double getError(Signal<I, P> pv, double setpoint);
protected abstract double getError(Signal<I, P> pv, Double setpoint);

/**
* Get last output signal value.
Expand All @@ -100,22 +100,22 @@ public final Flux<Signal<Status<O>, P>> compute(Flux<Signal<I, P>> pv) {
// Need to re-inject it.
return Flux.combineLatest(
Flux.concat(
Flux.just(setpoint),
Flux.just(Optional.ofNullable(setpoint)),
setpointFlux
),
pv.doOnComplete(() -> setpointSink.complete()), // or it will hang forever
this::compute);
}

private Signal<Status<O>, P> compute(Double setpoint, Signal<I, P> pv) {
private Signal<Status<O>, P> compute(Optional<Double> setpoint, Signal<I, P> pv) {

if (pv.isError()) {

// VT: FIXME: Ideally, even the error signal must be passed to wrapCompute() in case it needs to
// recalculate the state. In practice, this will have to wait.

// For now, let's throw them a NaN, they better pay attention.
return new Signal<>(pv.timestamp, new Status(setpoint, null, Double.NaN), pv.payload, pv.status, pv.error);
return new Signal<>(pv.timestamp, new Status(setpoint.orElse(null), null, Double.NaN), pv.payload, pv.status, pv.error);
}

if (lastOutputSignal != null && lastOutputSignal.timestamp.isAfter(pv.timestamp)) {
Expand All @@ -127,7 +127,7 @@ private Signal<Status<O>, P> compute(Double setpoint, Signal<I, P> pv) {
}

this.pv = pv;
lastOutputSignal = wrapCompute(setpoint, pv);
lastOutputSignal = wrapCompute(setpoint.orElse(null), pv);

return lastOutputSignal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public double getThresholdHigh() {
}

@Override
protected double getError(Signal<Double, P> pv, double setpoint) {
protected double getError(Signal<Double, P> pv, Double setpoint) {
return pv.getValue() - setpoint;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ public interface ProcessController<I, O, P> extends SignalProcessor<I, ProcessCo
/**
* Set the setpoint.
*
* @param setpoint Setpoint to set.
* @param setpoint Setpoint to set. {@code null} value is allowed.
*/
void setSetpoint(double setpoint);
void setSetpoint(Double setpoint);

/**
* Get the setpoint.
*
* @return Current setpoint value.
*/
double getSetpoint();
Double getSetpoint();

/**
* Get the process variable.
Expand All @@ -43,13 +43,15 @@ public interface ProcessController<I, O, P> extends SignalProcessor<I, ProcessCo
/**
* Get the current value of the error.
*
* @return Current error value.
* @return Current error value. {@code null} indicates either {@link #getSetpoint() no setpoint}, or no signal.
*/
double getError();
Double getError();

/**
* Compute the output signal.
*
* {@code null} {@link #getSetpoint()} or signal will result in an error status.
*
* @param pv Process variable flux.
*
* @return Output signal flux. The end of this flux indicates the need for the subscriber to shut down.
Expand All @@ -59,11 +61,11 @@ public interface ProcessController<I, O, P> extends SignalProcessor<I, ProcessCo

class Status<T> {

public final double setpoint;
public final Double setpoint;
public final Double error;
public final T signal;

public Status(double setpoint, Double error, T signal) {
public Status(Double setpoint, Double error, T signal) {
this.setpoint = setpoint;
this.error = error;
this.signal = signal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class AbstractPidController<P> extends AbstractProcessController
*/
private double lastD = 0;

protected AbstractPidController(String jmxName, final double setpoint, final double P, final double I, final double D, double saturationLimit) {
protected AbstractPidController(String jmxName, final Double setpoint, final double P, final double I, final double D, double saturationLimit) {

super(jmxName, setpoint);

Expand All @@ -71,7 +71,7 @@ protected AbstractPidController(String jmxName, final double setpoint, final dou
}

@Override
protected double getError(Signal<Double, P> pv, double setpoint) {
protected double getError(Signal<Double, P> pv, Double setpoint) {
return pv.getValue() - setpoint;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class SimplePidController<P> extends AbstractPidController<P> {
*/
private double lastError = 0;

public SimplePidController(String jmxName, double setpoint, double P, double I, double D, double saturationLimit) {
public SimplePidController(String jmxName, Double setpoint, double P, double I, double D, double saturationLimit) {
super(jmxName, setpoint, P, I, D, saturationLimit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public PidEconomizer(

super(clock, name, settings, device, timeout);

controller = new SimplePidController<>("(controller) " + getAddress(), 0, settings.P, settings.I, 0, settings.saturationLimit);
controller = new SimplePidController<>("(controller) " + getAddress(), 0d, settings.P, settings.I, 0, settings.saturationLimit);
signalRenderer = new HysteresisController<>("(signalRenderer) " + getAddress(), 0, HYSTERESIS);

initFluxes(ambientFlux);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void setpointChangeEmitsSignalPid() {
.create(this::connectSetpoint)
.map(v -> new Signal<Double, Void>(Instant.now(), v));

var pc = new SimplePidController<Void>("pc", 20, 1, 0, 0, 1.1);
var pc = new SimplePidController<Void>("pc", 20d, 1, 0, 0, 1.1);

var accumulator = new ArrayList<Signal<ProcessController.Status<Double>, Void>>();
var out = pc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

class SimplePidControllerTest {

private final Random rg = new Random();

@Test
void testPSimple() throws InterruptedException {
testP(new SimplePidController<>("simple", 0, 1, 0, 0, 0));
testP(new SimplePidController<>("simple", 0d, 1, 0, 0, 0));
}

private void testP(ProcessController<Double, Double, Void> pc) {
Expand Down Expand Up @@ -68,4 +69,11 @@ private void testP(ProcessController<Double, Double, Void> pc) {
ThreadContext.pop();
}
}

@Test
void nullSetpointAtStart() {
assertThatCode(() -> {
new SimplePidController<Double>("simple", null, 1, 0, 0, 1);
}).doesNotThrowAnyException();
}
}

0 comments on commit 93657bb

Please sign in to comment.