Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failure mode to Pubber for not-gateway-proxy #768

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/src/main/java/com/google/udmi/util/GeneralUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,14 @@ public static <T> T ifTrueGet(Object conditional, Supplier<T> action, Supplier<T
}

public static <T> void ifTrueThen(Object conditional, Runnable action) {
ifTrueThen(conditional, action, null);
}

public static <T> void ifTrueThen(Object conditional, Runnable action, Runnable alternative) {
if (isTrue(conditional)) {
action.run();
} else if (alternative != null) {
alternative.run();
}
}

Expand Down
4 changes: 2 additions & 2 deletions etc/test_itemized.out
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ RESULT fail system config_logging BETA 5 Bad version string number format
RESULT skip system device_config_acked BETA 5 No config check for proxy device
RESULT fail system device_config_acked BETA 5 timeout waiting for config acked
RESULT fail enumeration.features feature_enumeration PREVIEW 5 Failed check that feature enumeration matches metadata; missing { enumeration }, extra { unknown }
RESULT fail gateway gateway_proxy_events ALPHA 5 timeout waiting for All proxy devices received data
RESULT fail gateway gateway_proxy_events ALPHA 5 timeout waiting for All proxy devices received data
RESULT pass gateway gateway_proxy_events ALPHA 5 Sequence complete
RESULT fail gateway gateway_proxy_events ALPHA 5 timeout waiting for Missing data from AHU-22
RESULT fail pointset pointset_remove_point BETA 5 timeout waiting for pointset state reports same points as defined in config
RESULT fail pointset pointset_remove_point BETA 5 timeout waiting for pointset status does not contain removed point
RESULT fail pointset pointset_request_extraneous BETA 5 timeout waiting for pointset event contains correct points with present_value
Expand Down
3 changes: 3 additions & 0 deletions pubber/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pubber/pubber.iml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<orderEntry type="library" name="Gradle: io.moquette:moquette-broker:0.15" level="project" />
<orderEntry type="library" name="Gradle: com.librato.metrics:metrics-librato:5.1.0" level="project" />
<orderEntry type="library" name="Gradle: com.librato.metrics:librato-java:2.1.0" level="project" />
<orderEntry type="library" name="Gradle: com.bugsnag:bugsnag:3.7.0" level="project" />
<orderEntry type="library" name="Gradle: com.bugsnag:bugsnag:3.7.1" level="project" />
<orderEntry type="library" name="Gradle: com.fasterxml.jackson.core:jackson-annotations:2.14.1" level="project" />
<orderEntry type="library" name="Gradle: com.fasterxml.jackson.core:jackson-core:2.14.1" level="project" />
<orderEntry type="library" name="Gradle: com.fasterxml.jackson.core:jackson-databind:2.14.1" level="project" />
Expand Down
15 changes: 5 additions & 10 deletions pubber/src/main/java/daq/pubber/DeviceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import udmi.schema.Level;
import udmi.schema.Metadata;
import udmi.schema.Operation.SystemMode;
import udmi.schema.PubberOptions;
import udmi.schema.PubberConfiguration;

/**
* Uber-manager for a complete device.
Expand All @@ -19,15 +19,10 @@ public class DeviceManager extends ManagerBase {
/**
* Create a new instance.
*/
public DeviceManager(ManagerHost host, PubberOptions options, String serialNo) {
super(host, options);
systemManager = new SystemManager(host, options, serialNo);
pointsetManager = new PointsetManager(host, options);
}

@Override
protected void periodicUpdate() {

public DeviceManager(ManagerHost host, PubberConfiguration configuration) {
super(host, configuration);
systemManager = new SystemManager(host, configuration);
pointsetManager = new PointsetManager(host, configuration);
}

public void setPersistentData(DevicePersistent persistentData) {
Expand Down
13 changes: 7 additions & 6 deletions pubber/src/main/java/daq/pubber/ListPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import udmi.schema.PubberConfiguration;

Expand All @@ -24,6 +25,10 @@ public ListPublisher(PubberConfiguration configuration, Consumer<Exception> onEr
usePrefix = configuration.endpoint.msg_prefix;
}

static String getMessageString(String deviceId, String topic, Object message) {
return String.format("%s/%s/%s", deviceId, topic, JsonUtil.stringify(message));
}

/**
* Get messages that have been mocked-published.
*
Expand All @@ -41,8 +46,8 @@ public void setDeviceTopicPrefix(String deviceId, String topicPrefix) {
}

@Override
public <T> void registerHandler(String deviceId, String topicSuffix, Consumer<T> handler,
Class<T> messageType) {
public <T> void registerHandler(String deviceId, String topicSuffix,
Consumer<T> handler, Class<T> messageType) {

}

Expand All @@ -58,10 +63,6 @@ public void publish(String deviceId, String topicSuffix, Object message, Runnabl
publisherExecutor.submit(callback);
}

static String getMessageString(String deviceId, String topic, Object message) {
return String.format("%s/%s/%s", deviceId, topic, JsonUtil.stringify(message));
}

@Override
public boolean isActive() {
return false;
Expand Down
26 changes: 18 additions & 8 deletions pubber/src/main/java/daq/pubber/ManagerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.getNow;
import static daq.pubber.Pubber.configuration;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -12,6 +13,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import udmi.schema.PubberConfiguration;
import udmi.schema.PubberOptions;

/**
Expand All @@ -26,10 +28,15 @@ public abstract class ManagerBase {
protected final PubberOptions options;
protected final ManagerHost host;
private final ScheduledExecutorService executor = new CatchingScheduledThreadPoolExecutor(1);
final String deviceId;
protected ScheduledFuture<?> periodicSender;

public ManagerBase(ManagerHost host, PubberOptions pubberOptions) {
this.options = pubberOptions;
/**
* New instance.
*/
public ManagerBase(ManagerHost host, PubberConfiguration configuration) {
options = configuration.options;
deviceId = configuration.deviceId;
this.host = host;
}

Expand All @@ -46,23 +53,23 @@ protected ScheduledFuture<?> scheduleFuture(Date futureTime, Runnable futureTask
return executor.schedule(futureTask, delay, TimeUnit.MILLISECONDS);
}

protected void debug(String message) {
public void debug(String message) {
host.debug(message);
}

protected void info(String message) {
public void info(String message) {
host.info(message);
}

protected void warn(String message) {
public void warn(String message) {
host.warn(message);
}

protected void error(String message, Throwable e) {
public void error(String message, Throwable e) {
host.error(message, e);
}

protected void error(String message) {
public void error(String message) {
host.error(message, null);
}

Expand All @@ -76,13 +83,16 @@ protected void updateInterval(Integer sampleRateSec) {
}
}

protected abstract void periodicUpdate();
protected void periodicUpdate() {
throw new IllegalStateException("No periodic update handler defined");
}

protected synchronized void startPeriodicSend() {
checkState(periodicSender == null);
int sec = sendRateSec.get();
warn(format("Starting %s sender with delay %ds", this.getClass().getSimpleName(), sec));
if (sec != 0) {
periodicUpdate(); // To this now to synchronously raise any obvious exceptions.
periodicSender = executor.scheduleAtFixedRate(this::periodicUpdate, sec, sec, SECONDS);
}
}
Expand Down
6 changes: 5 additions & 1 deletion pubber/src/main/java/daq/pubber/MqttDevice.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ public void connect() {
publisher.connect(deviceId);
}

public void publish(String topicSuffix, Object message, Runnable callback) {
public void connect(String targetId) {
publisher.connect(targetId);
}

public void publish(String deviceId, String topicSuffix, Object message, Runnable callback) {
publisher.publish(deviceId, topicSuffix, message, callback);
}

Expand Down
Loading
Loading