Skip to content

Commit

Permalink
Add proxy device
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Nov 17, 2023
1 parent d384f96 commit 7e29fec
Show file tree
Hide file tree
Showing 19 changed files with 331 additions and 119 deletions.
6 changes: 6 additions & 0 deletions common/src/main/java/com/google/udmi/util/GeneralUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,14 @@ public static <T> T ifTrueGet(Object conditional, Supplier<T> action, Supplier<T
}

public static <T> void ifTrueThen(Object conditional, Runnable action) {
ifTrueThen(conditional, action, null);
}

public static <T> void ifTrueThen(Object conditional, Runnable action, Runnable alternative) {
if (isTrue(conditional)) {
action.run();
} else if (alternative != null) {
alternative.run();
}
}

Expand Down
4 changes: 2 additions & 2 deletions etc/test_itemized.out
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ RESULT fail system config_logging BETA 5 Bad version string number format
RESULT skip system device_config_acked BETA 5 No config check for proxy device
RESULT fail system device_config_acked BETA 5 timeout waiting for config acked
RESULT fail enumeration.features feature_enumeration PREVIEW 5 Failed check that feature enumeration matches metadata; missing { enumeration }, extra { unknown }
RESULT fail gateway gateway_proxy_events ALPHA 5 timeout waiting for All proxy devices received data
RESULT fail gateway gateway_proxy_events ALPHA 5 timeout waiting for All proxy devices received data
RESULT pass gateway gateway_proxy_events ALPHA 5 Sequence complete
RESULT fail gateway gateway_proxy_events ALPHA 5 timeout waiting for Missing data from AHU-22
RESULT fail pointset pointset_remove_point BETA 5 timeout waiting for pointset state reports same points as defined in config
RESULT fail pointset pointset_remove_point BETA 5 timeout waiting for pointset status does not contain removed point
RESULT fail pointset pointset_request_extraneous BETA 5 timeout waiting for pointset event contains correct points with present_value
Expand Down
3 changes: 3 additions & 0 deletions pubber/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pubber/pubber.iml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<orderEntry type="library" name="Gradle: io.moquette:moquette-broker:0.15" level="project" />
<orderEntry type="library" name="Gradle: com.librato.metrics:metrics-librato:5.1.0" level="project" />
<orderEntry type="library" name="Gradle: com.librato.metrics:librato-java:2.1.0" level="project" />
<orderEntry type="library" name="Gradle: com.bugsnag:bugsnag:3.7.0" level="project" />
<orderEntry type="library" name="Gradle: com.bugsnag:bugsnag:3.7.1" level="project" />
<orderEntry type="library" name="Gradle: com.fasterxml.jackson.core:jackson-annotations:2.14.1" level="project" />
<orderEntry type="library" name="Gradle: com.fasterxml.jackson.core:jackson-core:2.14.1" level="project" />
<orderEntry type="library" name="Gradle: com.fasterxml.jackson.core:jackson-databind:2.14.1" level="project" />
Expand Down
15 changes: 5 additions & 10 deletions pubber/src/main/java/daq/pubber/DeviceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import udmi.schema.Level;
import udmi.schema.Metadata;
import udmi.schema.Operation.SystemMode;
import udmi.schema.PubberOptions;
import udmi.schema.PubberConfiguration;

/**
* Uber-manager for a complete device.
Expand All @@ -19,15 +19,10 @@ public class DeviceManager extends ManagerBase {
/**
* Create a new instance.
*/
public DeviceManager(ManagerHost host, PubberOptions options, String serialNo) {
super(host, options);
systemManager = new SystemManager(host, options, serialNo);
pointsetManager = new PointsetManager(host, options);
}

@Override
protected void periodicUpdate() {

public DeviceManager(ManagerHost host, PubberConfiguration configuration) {
super(host, configuration);
systemManager = new SystemManager(host, configuration);
pointsetManager = new PointsetManager(host, configuration);
}

public void setPersistentData(DevicePersistent persistentData) {
Expand Down
13 changes: 7 additions & 6 deletions pubber/src/main/java/daq/pubber/ListPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import udmi.schema.PubberConfiguration;

Expand All @@ -24,6 +25,10 @@ public ListPublisher(PubberConfiguration configuration, Consumer<Exception> onEr
usePrefix = configuration.endpoint.msg_prefix;
}

static String getMessageString(String deviceId, String topic, Object message) {
return String.format("%s/%s/%s", deviceId, topic, JsonUtil.stringify(message));
}

/**
* Get messages that have been mocked-published.
*
Expand All @@ -41,8 +46,8 @@ public void setDeviceTopicPrefix(String deviceId, String topicPrefix) {
}

@Override
public <T> void registerHandler(String deviceId, String topicSuffix, Consumer<T> handler,
Class<T> messageType) {
public <T> void registerHandler(String deviceId, String topicSuffix,
Consumer<T> handler, Class<T> messageType) {

}

Expand All @@ -58,10 +63,6 @@ public void publish(String deviceId, String topicSuffix, Object message, Runnabl
publisherExecutor.submit(callback);
}

static String getMessageString(String deviceId, String topic, Object message) {
return String.format("%s/%s/%s", deviceId, topic, JsonUtil.stringify(message));
}

@Override
public boolean isActive() {
return false;
Expand Down
26 changes: 18 additions & 8 deletions pubber/src/main/java/daq/pubber/ManagerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.getNow;
import static daq.pubber.Pubber.configuration;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -12,6 +13,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import udmi.schema.PubberConfiguration;
import udmi.schema.PubberOptions;

/**
Expand All @@ -26,10 +28,15 @@ public abstract class ManagerBase {
protected final PubberOptions options;
protected final ManagerHost host;
private final ScheduledExecutorService executor = new CatchingScheduledThreadPoolExecutor(1);
final String deviceId;
protected ScheduledFuture<?> periodicSender;

public ManagerBase(ManagerHost host, PubberOptions pubberOptions) {
this.options = pubberOptions;
/**
* New instance.
*/
public ManagerBase(ManagerHost host, PubberConfiguration configuration) {
options = configuration.options;
deviceId = configuration.deviceId;
this.host = host;
}

Expand All @@ -46,23 +53,23 @@ protected ScheduledFuture<?> scheduleFuture(Date futureTime, Runnable futureTask
return executor.schedule(futureTask, delay, TimeUnit.MILLISECONDS);
}

protected void debug(String message) {
public void debug(String message) {
host.debug(message);
}

protected void info(String message) {
public void info(String message) {
host.info(message);
}

protected void warn(String message) {
public void warn(String message) {
host.warn(message);
}

protected void error(String message, Throwable e) {
public void error(String message, Throwable e) {
host.error(message, e);
}

protected void error(String message) {
public void error(String message) {
host.error(message, null);
}

Expand All @@ -76,13 +83,16 @@ protected void updateInterval(Integer sampleRateSec) {
}
}

protected abstract void periodicUpdate();
protected void periodicUpdate() {
throw new IllegalStateException("No periodic update handler defined");
}

protected synchronized void startPeriodicSend() {
checkState(periodicSender == null);
int sec = sendRateSec.get();
warn(format("Starting %s sender with delay %ds", this.getClass().getSimpleName(), sec));
if (sec != 0) {
periodicUpdate(); // To this now to synchronously raise any obvious exceptions.
periodicSender = executor.scheduleAtFixedRate(this::periodicUpdate, sec, sec, SECONDS);
}
}
Expand Down
6 changes: 5 additions & 1 deletion pubber/src/main/java/daq/pubber/MqttDevice.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ public void connect() {
publisher.connect(deviceId);
}

public void publish(String topicSuffix, Object message, Runnable callback) {
public void connect(String targetId) {
publisher.connect(targetId);
}

public void publish(String deviceId, String topicSuffix, Object message, Runnable callback) {
publisher.publish(deviceId, topicSuffix, message, callback);
}

Expand Down
Loading

0 comments on commit 7e29fec

Please sign in to comment.