diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 53e4b738e..f9d305a1d 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -152,9 +152,9 @@ jobs: - name: registrar clean run: bin/test_regclean $TARGET_PROJECT - name: sequence tests clean - run: bin/test_sequencer nocheck $TARGET_PROJECT + run: bin/test_sequencer clean nocheck $TARGET_PROJECT - name: sequence tests alpha - run: bin/test_sequencer noclean alpha nocheck $TARGET_PROJECT + run: bin/test_sequencer alpha nocheck $TARGET_PROJECT - name: sequence test post-process if: ${{ always() }} run: egrep ' test .* after .*s ' out/sequencer.log > out/timing_sequencer.out diff --git a/bin/test_sequencer b/bin/test_sequencer index f658fa354..37cf40aec 100755 --- a/bin/test_sequencer +++ b/bin/test_sequencer @@ -10,14 +10,14 @@ source etc/shell_common.sh debug_opts=-v seq_opts= -clean_cache=y +clean_cache= pubber_opts= suffix= out_check=y -if [[ $1 == 'noclean' ]]; then +if [[ $1 == 'clean' ]]; then shift - clean_cache= + clean_cache=y fi if [[ $1 == 'alpha' ]]; then @@ -42,16 +42,13 @@ if [[ $1 == 'full' ]]; then fi if [[ $# -lt 1 ]]; then - echo Usage: $0 [noclean] [alpha] [nostate] [full] PROJECT_ID [tests...] + echo Usage: $0 [clean] [alpha] [nostate] [full] PROJECT_ID [tests...] false fi project_id=$1 shift targets=$* -if [[ -n $targets ]]; then - clean_cache= -fi [[ -n $GITHUB_RUN_NUMBER ]] && echo "Workflow run number $GITHUB_RUN_NUMBER" || true echo "export TARGET_PROJECT=$project_id" diff --git a/pubber/src/main/java/daq/pubber/DeviceManager.java b/pubber/src/main/java/daq/pubber/DeviceManager.java new file mode 100644 index 000000000..0020baf1a --- /dev/null +++ b/pubber/src/main/java/daq/pubber/DeviceManager.java @@ -0,0 +1,86 @@ +package daq.pubber; + +import udmi.schema.Config; +import udmi.schema.DevicePersistent; +import udmi.schema.Entry; +import udmi.schema.Level; +import udmi.schema.Metadata; +import udmi.schema.Operation.SystemMode; +import udmi.schema.PubberOptions; + +/** + * Uber-manager for a complete device. + */ +public class DeviceManager extends ManagerBase { + + private PointsetManager pointsetManager; + private SystemManager systemManager; + + /** + * Create a new instance. + */ + public DeviceManager(ManagerHost host, PubberOptions options, String serialNo) { + super(host, options); + systemManager = new SystemManager(host, options, serialNo); + pointsetManager = new PointsetManager(host, options); + } + + @Override + protected void periodicUpdate() { + + } + + public void setPersistentData(DevicePersistent persistentData) { + systemManager.setPersistentData(persistentData); + } + + public void setMetadata(Metadata metadata) { + pointsetManager.setPointsetModel(metadata.pointset); + systemManager.setMetadata(metadata); + } + + @Override + public void cancelPeriodicSend() { + super.cancelPeriodicSend(); + pointsetManager.cancelPeriodicSend(); + systemManager.cancelPeriodicSend(); + } + + public void systemLifecycle(SystemMode mode) { + systemManager.systemLifecycle(mode); + } + + public void maybeRestartSystem() { + systemManager.maybeRestartSystem(); + } + + public void localLog(Entry report) { + systemManager.localLog(report); + } + + public void localLog(String message, Level trace, String timestamp, String detail) { + systemManager.localLog(message, trace, timestamp, detail); + } + + public String getTestingTag() { + return systemManager.getTestingTag(); + } + + public void updateConfig(Config config) { + pointsetManager.updateConfig(config.pointset); + systemManager.updateConfig(config.system, config.timestamp); + } + + public void publishLogMessage(Entry logEntry) { + systemManager.publishLogMessage(logEntry); + } + + public void cloudLog(String message, Level level, String detail) { + systemManager.cloudLog(message, level, detail); + } + + public void shutdown() { + systemManager.shutdown(); + pointsetManager.shutdown(); + } +} diff --git a/pubber/src/main/java/daq/pubber/ManagerBase.java b/pubber/src/main/java/daq/pubber/ManagerBase.java index 2e804b5a7..df5208263 100644 --- a/pubber/src/main/java/daq/pubber/ManagerBase.java +++ b/pubber/src/main/java/daq/pubber/ManagerBase.java @@ -1,14 +1,16 @@ package daq.pubber; import static com.google.common.base.Preconditions.checkState; -import static com.google.udmi.util.GeneralUtils.ifNotNullGet; +import static com.google.udmi.util.GeneralUtils.getNow; import static java.lang.String.format; import static java.util.Optional.ofNullable; import static java.util.concurrent.TimeUnit.SECONDS; import com.google.daq.mqtt.util.CatchingScheduledThreadPoolExecutor; +import java.util.Date; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import udmi.schema.PubberOptions; @@ -19,6 +21,7 @@ public abstract class ManagerBase { public static final int DISABLED_INTERVAL = 0; protected static final int DEFAULT_REPORT_SEC = 10; + protected static final int WAIT_TIME_SEC = 10; protected final AtomicInteger sendRateSec = new AtomicInteger(DEFAULT_REPORT_SEC); protected final PubberOptions options; protected final ManagerHost host; @@ -34,6 +37,15 @@ protected void updateState(Object state) { host.update(state); } + protected ScheduledFuture scheduleFuture(Date futureTime, Runnable futureTask) { + if (executor.isShutdown() || executor.isTerminated()) { + throw new RuntimeException("Executor shutdown/terminated, not scheduling"); + } + long delay = futureTime.getTime() - getNow().getTime(); + debug(format("Scheduling future in %dms", delay)); + return executor.schedule(futureTask, delay, TimeUnit.MILLISECONDS); + } + protected void debug(String message) { host.debug(message); } @@ -69,8 +81,7 @@ protected void updateInterval(Integer sampleRateSec) { protected synchronized void startPeriodicSend() { checkState(periodicSender == null); int sec = sendRateSec.get(); - String simpleName = this.getClass().getSimpleName(); - info(format("Setting %s sender with delay %ds", simpleName, sec)); + warn(format("Starting %s sender with delay %ds", this.getClass().getSimpleName(), sec)); if (sec != 0) { periodicSender = executor.scheduleAtFixedRate(this::periodicUpdate, sec, sec, SECONDS); } @@ -79,6 +90,7 @@ protected synchronized void startPeriodicSend() { protected synchronized void cancelPeriodicSend() { if (periodicSender != null) { try { + warn(format("Terminating %s sender", this.getClass().getSimpleName())); periodicSender.cancel(false); } catch (Exception e) { throw new RuntimeException("While cancelling executor", e); @@ -87,4 +99,21 @@ protected synchronized void cancelPeriodicSend() { } } } + + private void stopExecutor() { + try { + executor.shutdown(); + if (!executor.awaitTermination(WAIT_TIME_SEC, TimeUnit.SECONDS)) { + throw new RuntimeException("Failed to shutdown scheduled tasks"); + } + } catch (Exception e) { + throw new RuntimeException("While stopping executor", e); + } + } + + + protected void shutdown() { + cancelPeriodicSend(); + stopExecutor(); + } } diff --git a/pubber/src/main/java/daq/pubber/PointsetManager.java b/pubber/src/main/java/daq/pubber/PointsetManager.java index 109ea726e..868c76b7f 100644 --- a/pubber/src/main/java/daq/pubber/PointsetManager.java +++ b/pubber/src/main/java/daq/pubber/PointsetManager.java @@ -54,6 +54,7 @@ public class PointsetManager extends ManagerBase { */ public PointsetManager(ManagerHost host, PubberOptions options) { super(host, options); + setExtraField(options.extraField); updateState(); } diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java index 84abda72b..e6773d74f 100644 --- a/pubber/src/main/java/daq/pubber/Pubber.java +++ b/pubber/src/main/java/daq/pubber/Pubber.java @@ -23,6 +23,7 @@ import static daq.pubber.MqttDevice.STATE_TOPIC; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; import static java.util.Optional.ofNullable; import static java.util.stream.Collectors.toMap; import static udmi.schema.BlobsetConfig.SystemBlobsets.IOT_ENDPOINT_CONFIG; @@ -59,11 +60,10 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -109,7 +109,7 @@ /** * IoT Core UDMI Device Emulator. */ -public class Pubber implements ManagerHost { +public class Pubber extends ManagerBase implements ManagerHost { public static final int SCAN_DURATION_SEC = 10; public static final String PUBBER_OUT = "pubber/out"; @@ -122,7 +122,6 @@ public class Pubber implements ManagerHost { static final int MESSAGE_REPORT_INTERVAL = 10; private static final String BROKEN_VERSION = "1.4."; private static final String HOSTNAME = System.getenv("HOSTNAME"); - private static final int WAIT_TIME_SEC = 10; private static final int STATE_THROTTLE_MS = 2000; private static final String PUBSUB_SITE = "PubSub"; private static final int DEFAULT_REPORT_SEC = 10; @@ -152,20 +151,17 @@ public class Pubber implements ManagerHost { private static final int FORCED_STATE_TIME_MS = 10000; private static final Duration CLOCK_SKEW = Duration.ofMinutes(30); private static final Duration SMOKE_CHECK_TIME = Duration.ofMinutes(5); - private static PubberOptions pubberOptions; - protected final PubberConfiguration configuration; + static PubberConfiguration configuration; final State deviceState = new State(); final Config deviceConfig = new Config(); private final File outDir; private final ScheduledExecutorService executor = new CatchingScheduledThreadPoolExecutor(1); - private final AtomicInteger messageDelaySec = new AtomicInteger(DEFAULT_REPORT_SEC); private final CountDownLatch configLatch = new CountDownLatch(1); private final AtomicBoolean stateDirty = new AtomicBoolean(); - private final Semaphore stateLock = new Semaphore(1); + private final ReentrantLock stateLock = new ReentrantLock(); private final String deviceId; protected DevicePersistent persistentData; private MqttDevice deviceTarget; - private ScheduledFuture periodicSender; private long lastStateTimeMs; private PubSubClient pubSubClient; private Function connectionDone; @@ -176,9 +172,8 @@ public class Pubber implements ManagerHost { private MqttDevice gatewayTarget; private LocalnetManager localnetManager; private SchemaVersion targetSchema; - private PointsetManager pointsetManager; private int deviceUpdateCount = -1; - private SystemManager systemManager; + private DeviceManager deviceManager; /** * Start an instance from a configuration file. @@ -186,19 +181,22 @@ public class Pubber implements ManagerHost { * @param configPath Path to configuration file. */ public Pubber(String configPath) { + super(null, loadConfigurationReturnOptions(configPath)); + setClockSkew(isTrue(options.skewClock) ? CLOCK_SKEW : Duration.ZERO); + Protocol protocol = requireNonNullElse( + ifNotNullGet(configuration.endpoint, endpoint -> endpoint.protocol), MQTT); + checkArgument(MQTT.equals(protocol), "protocol mismatch"); + deviceId = requireNonNull(configuration.deviceId, "device id not defined"); + outDir = new File(PUBBER_OUT); + } + + private static PubberOptions loadConfigurationReturnOptions(String configPath) { File configFile = new File(configPath); try { configuration = sanitizeConfiguration(fromJsonFile(configFile, PubberConfiguration.class)); - pubberOptions = configuration.options; - setClockSkew(isTrue(pubberOptions.skewClock) ? CLOCK_SKEW : Duration.ZERO); - Protocol protocol = ofNullable( - ifNotNullGet(configuration.endpoint, endpoint -> endpoint.protocol)).orElse(MQTT); - checkArgument(MQTT.equals(protocol), "protocol mismatch"); - deviceId = requireNonNull(configuration.deviceId, "device id not defined"); - outDir = new File(PUBBER_OUT); + return configuration.options; } catch (Exception e) { - throw new RuntimeException("While configuring instance from " + configFile.getAbsolutePath(), - e); + throw new RuntimeException("While configuring from " + configFile.getAbsolutePath(), e); } } @@ -211,20 +209,25 @@ public Pubber(String configPath) { * @param serialNo Serial number of the device */ public Pubber(String iotProject, String sitePath, String deviceId, String serialNo) { + super(null, getExplicitOptions(iotProject, sitePath, deviceId, serialNo)); this.deviceId = deviceId; outDir = new File(PUBBER_OUT + "/" + serialNo); - configuration = sanitizeConfiguration(new PubberConfiguration()); - pubberOptions = configuration.options; - configuration.deviceId = deviceId; - configuration.iotProject = iotProject; - configuration.serialNo = serialNo; if (PUBSUB_SITE.equals(sitePath)) { pubSubClient = new PubSubClient(iotProject, deviceId); - } else { - configuration.sitePath = sitePath; } } + private static PubberOptions getExplicitOptions(String iotProject, String sitePath, + String deviceId, String serialNo) { + configuration = new PubberConfiguration(); + configuration.iotProject = iotProject; + configuration.sitePath = sitePath; + configuration.deviceId = deviceId; + configuration.serialNo = serialNo; + configuration.options = new PubberOptions(); + return configuration.options; + } + private static String getEventsSuffix(String suffixSuffix) { return MqttDevice.EVENTS_TOPIC + "/" + suffixSuffix; } @@ -275,7 +278,7 @@ static Pubber singularPubber(String[] args) { }); } catch (Exception e) { if (pubber != null) { - pubber.terminate(); + pubber.shutdown(); } throw new RuntimeException("While starting singular pubber", e); } @@ -301,17 +304,19 @@ private static void swarmPubber(String[] args) throws InterruptedException { private static void startFeedListener(String projectId, String siteName, String feedName, String serialNo) { + Pubber pubber = new Pubber(projectId, siteName, feedName, serialNo); try { LOG.info("Starting feed listener " + serialNo); - Pubber pubber = new Pubber(projectId, siteName, feedName, serialNo); pubber.initialize(); pubber.startConnection(deviceId -> { LOG.error("Connection terminated, restarting listener"); startFeedListener(projectId, siteName, feedName, serialNo); return false; }); + pubber.shutdown(); } catch (Exception e) { LOG.error("Exception starting instance " + serialNo, e); + pubber.shutdown(); startFeedListener(projectId, siteName, feedName, serialNo); } } @@ -335,10 +340,10 @@ static String acquireBlobData(String url, String sha256) { return new String(dataBytes); } - static void augmentDeviceMessage(Object message, Date now) { + static void augmentDeviceMessage(Object message, Date now, boolean useBadVersion) { try { Field version = message.getClass().getField("version"); - version.set(message, isTrue(pubberOptions.badVersion) ? BROKEN_VERSION : UDMI_VERSION); + version.set(message, useBadVersion ? BROKEN_VERSION : UDMI_VERSION); Field timestamp = message.getClass().getField("timestamp"); timestamp.set(message, now); } catch (Throwable e) { @@ -346,13 +351,11 @@ static void augmentDeviceMessage(Object message, Date now) { } } - private void initializeDevice() { ifNotNullThen(configuration.sitePath, SupportedFeatures::writeFeatureFile); SupportedFeatures.setFeatureSwap(configuration.options.featureEnableSwap); - systemManager = new SystemManager(this, configuration.options, configuration.serialNo); - pointsetManager = new PointsetManager(this, configuration.options); + deviceManager = new DeviceManager(this, configuration.options, configuration.serialNo); if (configuration.sitePath != null) { siteModel = new SiteModel(configuration.sitePath); @@ -375,8 +378,6 @@ private void initializeDevice() { configuration.deviceId, configuration.serialNo, configuration.macAddr, configuration.gatewayId, optionsString(configuration.options))); - pointsetManager.setExtraField(configuration.options.extraField); - localnetManager = new LocalnetManager(this); markStateDirty(); } @@ -399,8 +400,7 @@ protected void initializePersistentStore() { : newDevicePersistent(); } - persistentData.restart_count = Objects.requireNonNullElse(persistentData.restart_count, 0) + 1; - systemManager.setPersistentData(persistentData); + persistentData.restart_count = requireNonNullElse(persistentData.restart_count, 0) + 1; // If the persistentData contains endpoint configuration, prioritize using that. // Otherwise, use the endpoint configuration that came from the Pubber config file on start. @@ -421,6 +421,7 @@ protected void initializePersistentStore() { private void writePersistentStore() { checkState(persistentData != null, "persistent data not defined"); toJsonFile(getPersistentStore(), persistentData); + deviceManager.setPersistentData(persistentData); } private File getPersistentStore() { @@ -541,41 +542,11 @@ private void processDeviceMetadata(Metadata metadata) { info("Configured with auth_type " + configuration.algorithm); - pointsetManager.setPointsetModel(metadata.pointset); - systemManager.setSystemMetadata(metadata); - } - - private synchronized void maybeRestartExecutor(int intervalSec) { - if (periodicSender == null || intervalSec != messageDelaySec.get()) { - cancelPeriodicSend(); - messageDelaySec.set(intervalSec); - startPeriodicSend(); - } - } - - private synchronized void startPeriodicSend() { - checkState(periodicSender == null); - int delay = messageDelaySec.get(); - info(format("Starting executor with send message delay %ds", delay)); - periodicSender = executor.scheduleAtFixedRate(this::periodicUpdate, delay, delay, - TimeUnit.SECONDS); - } - - private synchronized void cancelPeriodicSend() { - systemManager.cancelPeriodicSend(); - pointsetManager.cancelPeriodicSend(); - if (periodicSender != null) { - try { - periodicSender.cancel(false); - } catch (Exception e) { - throw new RuntimeException("While cancelling executor", e); - } finally { - periodicSender = null; - } - } + deviceManager.setMetadata(metadata); } - private void periodicUpdate() { + @Override + public void periodicUpdate() { try { deviceUpdateCount++; checkSmokyFailure(); @@ -592,7 +563,7 @@ private void checkSmokyFailure() { && Instant.now().minus(SMOKE_CHECK_TIME).isAfter(deviceStartTime.toInstant())) { error(format("Smoke check failed after %sm, terminating run.", SMOKE_CHECK_TIME.getSeconds() / 60)); - systemManager.systemLifecycle(SystemMode.TERMINATE); + deviceManager.systemLifecycle(SystemMode.TERMINATE); } } @@ -600,7 +571,7 @@ private void checkSmokyFailure() { * For testing, if configured, send a slate of bad messages for testing by the message handling * infrastructure. Uses the sekrit REPLACE_MESSAGE_WITH field to sneak bad output into the pipe. * E.g., Will send a message with "{ INVALID JSON!" as a message payload. Inserts a delay before - * each message sent to stabelize the output order for testing purposes. + * each message sent to stabilize the output order for testing purposes. */ private void sendEmptyMissingBadEvents() { int phase = deviceUpdateCount % MESSAGE_REPORT_INTERVAL; @@ -634,7 +605,7 @@ private void sendEmptyMissingBadEvents() { } private void deferredConfigActions() { - systemManager.maybeRestartSystem(); + deviceManager.maybeRestartSystem(); // Do redirect after restart system check, since this might take a long time. maybeRedirectEndpoint(); @@ -654,28 +625,6 @@ private void captureExceptions(String action, Runnable runnable) { } } - void terminate() { - warn("Terminating"); - if (deviceState.system != null && deviceState.system.operation != null) { - deviceState.system.operation.mode = SystemMode.SHUTDOWN; - } - captureExceptions("publishing shutdown state", this::publishSynchronousState); - stop(); - captureExceptions("executor flush", this::stopExecutor); - } - - private void stopExecutor() { - try { - cancelPeriodicSend(); - executor.shutdown(); - if (!executor.awaitTermination(WAIT_TIME_SEC, TimeUnit.SECONDS)) { - throw new RuntimeException("Failed to shutdown scheduled tasks"); - } - } catch (Exception e) { - throw new RuntimeException("While stopping executor", e); - } - } - protected void startConnection(Function connectionDone) { try { this.connectionDone = connectionDone; @@ -686,7 +635,6 @@ protected void startConnection(Function connectionDone) { } throw new RuntimeException("Failed connection attempt after retries"); } catch (Exception e) { - stop(); throw new RuntimeException("While attempting to start connection", e); } } @@ -712,15 +660,24 @@ protected void initialize() { initializeDevice(); initializeMqtt(); } catch (Exception e) { - terminate(); + shutdown(); throw new RuntimeException("While initializing main pubber class", e); } } - private void stop() { + @Override + public void shutdown() { + warn("Initiating shutdown"); + new RuntimeException("Hello").printStackTrace(); + + if (deviceState.system != null && deviceState.system.operation != null) { + deviceState.system.operation.mode = SystemMode.SHUTDOWN; + } + + super.shutdown(); + captureExceptions("device manager shutdown", deviceManager::shutdown); + captureExceptions("publishing shutdown state", this::publishSynchronousState); captureExceptions("disconnecting mqtt", this::disconnectMqtt); - captureExceptions("closing log", systemManager::closeLogWriter); - captureExceptions("stopping periodic send", this::cancelPeriodicSend); } private void disconnectMqtt() { @@ -777,14 +734,14 @@ private void publisherException(Exception toReport) { toReport.getCause()); } else if (toReport instanceof ConnectionClosedException) { error("Connection closed, attempting reconnect..."); - stop(); while (retriesRemaining.getAndDecrement() > 0) { + error("TAP2"); if (attemptConnection()) { return; } } - terminate(); - systemManager.systemLifecycle(SystemMode.TERMINATE); + error("Connection retry failed, giving up."); + deviceManager.systemLifecycle(SystemMode.TERMINATE); } else { error("Unknown exception type " + toReport.getClass(), toReport); } @@ -795,13 +752,13 @@ private void publisherHandler(String type, String phase, Throwable cause) { error("Error receiving message " + type, cause); if (isTrue(configuration.options.barfConfig)) { error("Restarting system because of restart-on-error configuration setting"); - systemManager.systemLifecycle(SystemMode.RESTART); + deviceManager.systemLifecycle(SystemMode.RESTART); } } - String usePhase = isTrue(pubberOptions.badCategory) ? "apply" : phase; + String usePhase = isTrue(options.badCategory) ? "apply" : phase; String category = format(SYSTEM_CATEGORY_FORMAT, type, usePhase); Entry report = entryFromException(category, cause); - systemManager.localLog(report); + deviceManager.localLog(report); publishLogMessage(report); registerSystemStatus(report); } @@ -861,7 +818,7 @@ private void configHandler(Config config) { info("Config handler"); File configOut = new File(outDir, traceTimestamp("config") + ".json"); toJsonFile(configOut, config); - debug(format("Config update%s", systemManager.getTestingTag()), toJsonString(config)); + debug(format("Config update%s", deviceManager.getTestingTag()), toJsonString(config)); processConfigUpdate(config); configLatch.countDown(); publisherConfigLog("apply", null); @@ -874,7 +831,7 @@ private void configHandler(Config config) { private void processConfigUpdate(Config config) { try { // Grab this to make state-after-config updates monolithic. - stateLock.acquire(); + stateLock.lock(); } catch (Exception e) { throw new RuntimeException("While acquiting state lock", e); } @@ -883,20 +840,19 @@ private void processConfigUpdate(Config config) { if (config != null) { if (config.system == null && isTrue(configuration.options.barfConfig)) { error("Empty config system block and configured to restart on bad config!"); - systemManager.systemLifecycle(SystemMode.RESTART); + deviceManager.systemLifecycle(SystemMode.RESTART); } GeneralUtils.copyFields(config, deviceConfig, true); info(format("%s received config %s", getTimestamp(), isoConvert(config.timestamp))); - pointsetManager.updateConfig(config.pointset); - systemManager.updateConfig(config.system, config.timestamp); + deviceManager.updateConfig(config); updateDiscoveryConfig(config.discovery); extractEndpointBlobConfig(); } else { info(getTimestamp() + " defaulting empty config"); } - maybeRestartExecutor(DEFAULT_REPORT_SEC); + updateInterval(DEFAULT_REPORT_SEC); } finally { - stateLock.release(); + stateLock.unlock(); } } @@ -1019,7 +975,6 @@ private void resetConnection(String targetEndpoint) { retriesRemaining.set(CONNECT_RETRIES); startConnection(connectionDone); } catch (Exception e) { - stop(); throw new RuntimeException("While resetting connection", e); } } @@ -1178,16 +1133,6 @@ private FamilyDiscoveryState getFamilyDiscoveryState(String family) { family, key -> new FamilyDiscoveryState()); } - private long scheduleFuture(Date futureTime, Runnable futureTask) { - if (executor.isShutdown() || executor.isTerminated()) { - throw new RuntimeException("Executor shutdown/terminated, not scheduling"); - } - long delay = futureTime.getTime() - getNow().getTime(); - debug(format("Scheduling future in %dms", delay)); - executor.schedule(futureTask, delay, TimeUnit.MILLISECONDS); - return delay; - } - private void checkDiscoveryScan(String family, Date scanGeneration) { try { FamilyDiscoveryState familyDiscoveryState = getFamilyDiscoveryState(family); @@ -1329,11 +1274,11 @@ private byte[] getFileBytes(String dataFile) { } private void publishLogMessage(Entry logEntry) { - systemManager.publishLogMessage(logEntry); + deviceManager.publishLogMessage(logEntry); } private void publishAsynchronousState() { - if (stateLock.tryAcquire()) { + if (stateLock.tryLock()) { try { long soonestAllowedStateUpdate = lastStateTimeMs + STATE_THROTTLE_MS; long delay = soonestAllowedStateUpdate - System.currentTimeMillis(); @@ -1344,7 +1289,7 @@ private void publishAsynchronousState() { publishStateMessage(); } } finally { - stateLock.release(); + stateLock.unlock(); } } else { markStateDirty(-1); @@ -1353,12 +1298,12 @@ private void publishAsynchronousState() { void publishSynchronousState() { try { - stateLock.acquire(); + stateLock.lock(); publishStateMessage(); } catch (Exception e) { throw new RuntimeException("While sending synchronous state", e); } finally { - stateLock.release(); + stateLock.unlock(); } } @@ -1394,7 +1339,7 @@ private void publishStateMessage(Object stateToSend) { CountDownLatch latch = new CountDownLatch(1); try { - debug(format("State update%s", systemManager.getTestingTag()), + debug(format("State update%s", deviceManager.getTestingTag()), toJsonString(stateToSend)); } catch (Exception e) { throw new RuntimeException("While converting new device state", e); @@ -1438,7 +1383,7 @@ private void publishDeviceMessage(Object message, Runnable callback) { return; } - augmentDeviceMessage(message, getNow()); + augmentDeviceMessage(message, getNow(), isTrue(options.badVersion)); Object downgraded = downgradeMessage(message); deviceTarget.publish(topicSuffix, downgraded, callback); String messageBase = topicSuffix.replace("/", "_"); @@ -1464,11 +1409,11 @@ private String traceTimestamp(String messageBase) { } private void cloudLog(String message, Level level) { - systemManager.cloudLog(message, level, null); + deviceManager.cloudLog(message, level, null); } private void cloudLog(String message, Level level, String detail) { - systemManager.cloudLog(message, level, detail); + deviceManager.cloudLog(message, level, detail); } private void trace(String message) { @@ -1493,11 +1438,13 @@ private void notice(String message) { cloudLog(message, Level.NOTICE); } + @Override public void warn(String message) { cloudLog(message, Level.WARNING); } - private void error(String message) { + @Override + public void error(String message) { cloudLog(message, Level.ERROR); } @@ -1509,7 +1456,7 @@ public void error(String message, Throwable e) { } String longMessage = message + ": " + e.getMessage(); cloudLog(longMessage, Level.ERROR); - systemManager.localLog(message, Level.TRACE, getTimestamp(), stackTraceString(e)); + deviceManager.localLog(message, Level.TRACE, getTimestamp(), stackTraceString(e)); } static class ExtraPointsetEvent extends PointsetEvent { diff --git a/pubber/src/main/java/daq/pubber/SystemManager.java b/pubber/src/main/java/daq/pubber/SystemManager.java index 5c61f3440..f15e62b36 100644 --- a/pubber/src/main/java/daq/pubber/SystemManager.java +++ b/pubber/src/main/java/daq/pubber/SystemManager.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Consumer; import org.slf4j.Logger; import udmi.schema.DevicePersistent; @@ -105,7 +106,9 @@ public SystemManager(ManagerHost host, PubberOptions options, String serialNo) { updateState(); } - void closeLogWriter() { + @Override + public void shutdown() { + super.shutdown(); if (logPrintWriter != null) { logPrintWriter.close(); logPrintWriter = null; @@ -147,6 +150,7 @@ void maybeRestartSystem() { if (SystemMode.ACTIVE.equals(stateMode) && SystemMode.RESTART.equals(configMode)) { + error("System mode requesting device restart"); systemLifecycle(SystemMode.RESTART); } @@ -158,12 +162,12 @@ void maybeRestartSystem() { Date configLastStart = operation.last_start; if (configLastStart != null) { if (DEVICE_START_TIME.before(configLastStart)) { - warn(format("Device start time %s before last config start %s, terminating.", + error(format("Device start time %s before last config start %s, terminating.", getTimestamp(DEVICE_START_TIME), getTimestamp(configLastStart))); systemLifecycle(SystemMode.TERMINATE); } else if (isTrue(options.smokeCheck) && CleanDateFormat.dateEquals(DEVICE_START_TIME, configLastStart)) { - warn(format("Device start time %s matches, smoke check indicating success!", + error(format("Device start time %s matches, smoke check indicating success!", getTimestamp(configLastStart))); systemLifecycle(SystemMode.SHUTDOWN); } @@ -204,7 +208,7 @@ void systemLifecycle(SystemMode mode) { System.exit(exitCode); } - public void setSystemMetadata(Metadata metadata) { + public void setMetadata(Metadata metadata) { setHardwareSoftware(metadata); } @@ -217,6 +221,7 @@ void updateConfig(SystemConfig system, Date timestamp) { systemState.last_config = timestamp; updateInterval(ifNotNullGet(system, config -> config.metrics_rate_sec)); updateState(); + maybeRestartSystem(); } void publishLogMessage(Entry report) { @@ -231,7 +236,7 @@ private boolean shouldLogLevel(int level) { } Integer minLoglevel = ifNotNullGet(systemConfig, config -> systemConfig.min_loglevel); - return level >= ofNullable(minLoglevel).orElse(Level.INFO.value()); + return level >= Objects.requireNonNullElse(minLoglevel, Level.INFO.value()); } void cloudLog(String message, Level level, String detail) { diff --git a/pubber/src/test/java/daq/pubber/PubberTest.java b/pubber/src/test/java/daq/pubber/PubberTest.java index 07cd7b293..ee7bd49e5 100644 --- a/pubber/src/test/java/daq/pubber/PubberTest.java +++ b/pubber/src/test/java/daq/pubber/PubberTest.java @@ -143,7 +143,7 @@ private EndpointConfiguration configurePubberRedirect() { @After public void terminatePubber() { if (pubber != null) { - pubber.terminate(); + pubber.shutdown(); pubber = null; } } @@ -211,12 +211,12 @@ public void augmentDeviceMessageTest() { State testMessage = new State(); assertNull(testMessage.timestamp); - Pubber.augmentDeviceMessage(testMessage, new Date()); + Pubber.augmentDeviceMessage(testMessage, new Date(), false); assertEquals(testMessage.version, Pubber.UDMI_VERSION); assertNotEquals(testMessage.timestamp, null); testMessage.timestamp = new Date(1241); - Pubber.augmentDeviceMessage(testMessage, new Date()); + Pubber.augmentDeviceMessage(testMessage, new Date(), false); assertEquals(testMessage.version, Pubber.UDMI_VERSION); assertNotEquals(testMessage.timestamp, new Date(1241)); } @@ -235,7 +235,7 @@ public void initializePersistentStoreNullTest() { // Prepare test. testPersistentData.endpoint = null; - pubber.configuration.endpoint = null; + Pubber.configuration.endpoint = null; // Now test. testFeatures.put(PubberUnderTestFeatures.noInitializePersistentStore, false); @@ -256,7 +256,7 @@ public void initializePersistentStoreFromConfigTest() { // Prepare test. testPersistentData.endpoint = null; - pubber.configuration.endpoint = getEndpointConfiguration("from_config"); + Pubber.configuration.endpoint = getEndpointConfiguration("from_config"); // Now test. testFeatures.put(PubberUnderTestFeatures.noInitializePersistentStore, false); @@ -276,12 +276,12 @@ public void initializePersistentStoreFromPersistentDataTest() { // Prepare test. testPersistentData.endpoint = getEndpointConfiguration("persistent"); - pubber.configuration.endpoint = null; + Pubber.configuration.endpoint = null; // Now test. testFeatures.put(PubberUnderTestFeatures.noInitializePersistentStore, false); pubber.initializePersistentStore(); assertEquals(pubber.persistentData.endpoint.hostname, "persistent"); - assertEquals(pubber.configuration.endpoint.hostname, "persistent"); + assertEquals(Pubber.configuration.endpoint.hostname, "persistent"); } } diff --git a/validator/src/main/java/com/google/daq/mqtt/validator/Reflector.java b/validator/src/main/java/com/google/daq/mqtt/validator/Reflector.java index ec3249075..96202bc1a 100644 --- a/validator/src/main/java/com/google/daq/mqtt/validator/Reflector.java +++ b/validator/src/main/java/com/google/daq/mqtt/validator/Reflector.java @@ -50,8 +50,11 @@ public Reflector(List argsList) { public static void main(String[] args) { Reflector reflector = new Reflector(Arrays.asList(args)); reflector.initialize(); - reflector.reflect(); - reflector.shutdown(); + try { + reflector.reflect(); + } finally { + reflector.shutdown(); + } } private void shutdown() {