Skip to content

Commit

Permalink
Implement Device Manager (#767)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Nov 16, 2023
1 parent dfa4441 commit d384f96
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 162 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ jobs:
- name: registrar clean
run: bin/test_regclean $TARGET_PROJECT
- name: sequence tests clean
run: bin/test_sequencer nocheck $TARGET_PROJECT
run: bin/test_sequencer clean nocheck $TARGET_PROJECT
- name: sequence tests alpha
run: bin/test_sequencer noclean alpha nocheck $TARGET_PROJECT
run: bin/test_sequencer alpha nocheck $TARGET_PROJECT
- name: sequence test post-process
if: ${{ always() }}
run: egrep ' test .* after .*s ' out/sequencer.log > out/timing_sequencer.out
Expand Down
11 changes: 4 additions & 7 deletions bin/test_sequencer
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ source etc/shell_common.sh

debug_opts=-v
seq_opts=
clean_cache=y
clean_cache=
pubber_opts=
suffix=
out_check=y

if [[ $1 == 'noclean' ]]; then
if [[ $1 == 'clean' ]]; then
shift
clean_cache=
clean_cache=y
fi

if [[ $1 == 'alpha' ]]; then
Expand All @@ -42,16 +42,13 @@ if [[ $1 == 'full' ]]; then
fi

if [[ $# -lt 1 ]]; then
echo Usage: $0 [noclean] [alpha] [nostate] [full] PROJECT_ID [tests...]
echo Usage: $0 [clean] [alpha] [nostate] [full] PROJECT_ID [tests...]
false
fi

project_id=$1
shift
targets=$*
if [[ -n $targets ]]; then
clean_cache=
fi

[[ -n $GITHUB_RUN_NUMBER ]] && echo "Workflow run number $GITHUB_RUN_NUMBER" || true
echo "export TARGET_PROJECT=$project_id"
Expand Down
86 changes: 86 additions & 0 deletions pubber/src/main/java/daq/pubber/DeviceManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package daq.pubber;

import udmi.schema.Config;
import udmi.schema.DevicePersistent;
import udmi.schema.Entry;
import udmi.schema.Level;
import udmi.schema.Metadata;
import udmi.schema.Operation.SystemMode;
import udmi.schema.PubberOptions;

/**
* Uber-manager for a complete device.
*/
public class DeviceManager extends ManagerBase {

private PointsetManager pointsetManager;
private SystemManager systemManager;

/**
* Create a new instance.
*/
public DeviceManager(ManagerHost host, PubberOptions options, String serialNo) {
super(host, options);
systemManager = new SystemManager(host, options, serialNo);
pointsetManager = new PointsetManager(host, options);
}

@Override
protected void periodicUpdate() {

}

public void setPersistentData(DevicePersistent persistentData) {
systemManager.setPersistentData(persistentData);
}

public void setMetadata(Metadata metadata) {
pointsetManager.setPointsetModel(metadata.pointset);
systemManager.setMetadata(metadata);
}

@Override
public void cancelPeriodicSend() {
super.cancelPeriodicSend();
pointsetManager.cancelPeriodicSend();
systemManager.cancelPeriodicSend();
}

public void systemLifecycle(SystemMode mode) {
systemManager.systemLifecycle(mode);
}

public void maybeRestartSystem() {
systemManager.maybeRestartSystem();
}

public void localLog(Entry report) {
systemManager.localLog(report);
}

public void localLog(String message, Level trace, String timestamp, String detail) {
systemManager.localLog(message, trace, timestamp, detail);
}

public String getTestingTag() {
return systemManager.getTestingTag();
}

public void updateConfig(Config config) {
pointsetManager.updateConfig(config.pointset);
systemManager.updateConfig(config.system, config.timestamp);
}

public void publishLogMessage(Entry logEntry) {
systemManager.publishLogMessage(logEntry);
}

public void cloudLog(String message, Level level, String detail) {
systemManager.cloudLog(message, level, detail);
}

public void shutdown() {
systemManager.shutdown();
pointsetManager.shutdown();
}
}
35 changes: 32 additions & 3 deletions pubber/src/main/java/daq/pubber/ManagerBase.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package daq.pubber;

import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
import static com.google.udmi.util.GeneralUtils.getNow;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.daq.mqtt.util.CatchingScheduledThreadPoolExecutor;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import udmi.schema.PubberOptions;

Expand All @@ -19,6 +21,7 @@ public abstract class ManagerBase {

public static final int DISABLED_INTERVAL = 0;
protected static final int DEFAULT_REPORT_SEC = 10;
protected static final int WAIT_TIME_SEC = 10;
protected final AtomicInteger sendRateSec = new AtomicInteger(DEFAULT_REPORT_SEC);
protected final PubberOptions options;
protected final ManagerHost host;
Expand All @@ -34,6 +37,15 @@ protected void updateState(Object state) {
host.update(state);
}

protected ScheduledFuture<?> scheduleFuture(Date futureTime, Runnable futureTask) {
if (executor.isShutdown() || executor.isTerminated()) {
throw new RuntimeException("Executor shutdown/terminated, not scheduling");
}
long delay = futureTime.getTime() - getNow().getTime();
debug(format("Scheduling future in %dms", delay));
return executor.schedule(futureTask, delay, TimeUnit.MILLISECONDS);
}

protected void debug(String message) {
host.debug(message);
}
Expand Down Expand Up @@ -69,8 +81,7 @@ protected void updateInterval(Integer sampleRateSec) {
protected synchronized void startPeriodicSend() {
checkState(periodicSender == null);
int sec = sendRateSec.get();
String simpleName = this.getClass().getSimpleName();
info(format("Setting %s sender with delay %ds", simpleName, sec));
warn(format("Starting %s sender with delay %ds", this.getClass().getSimpleName(), sec));
if (sec != 0) {
periodicSender = executor.scheduleAtFixedRate(this::periodicUpdate, sec, sec, SECONDS);
}
Expand All @@ -79,6 +90,7 @@ protected synchronized void startPeriodicSend() {
protected synchronized void cancelPeriodicSend() {
if (periodicSender != null) {
try {
warn(format("Terminating %s sender", this.getClass().getSimpleName()));
periodicSender.cancel(false);
} catch (Exception e) {
throw new RuntimeException("While cancelling executor", e);
Expand All @@ -87,4 +99,21 @@ protected synchronized void cancelPeriodicSend() {
}
}
}

private void stopExecutor() {
try {
executor.shutdown();
if (!executor.awaitTermination(WAIT_TIME_SEC, TimeUnit.SECONDS)) {
throw new RuntimeException("Failed to shutdown scheduled tasks");
}
} catch (Exception e) {
throw new RuntimeException("While stopping executor", e);
}
}


protected void shutdown() {
cancelPeriodicSend();
stopExecutor();
}
}
1 change: 1 addition & 0 deletions pubber/src/main/java/daq/pubber/PointsetManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class PointsetManager extends ManagerBase {
*/
public PointsetManager(ManagerHost host, PubberOptions options) {
super(host, options);
setExtraField(options.extraField);
updateState();
}

Expand Down
Loading

0 comments on commit d384f96

Please sign in to comment.