Skip to content

Commit

Permalink
Various fixes to pubber and sequencer (#519)
Browse files Browse the repository at this point in the history
This PR resolves a few things discovered while writing endpoint tests
  • Loading branch information
johnrandolph authored Dec 5, 2022
1 parent d6efa05 commit 1b5a350
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ jobs:
redirect:
name: Endpoint Redirection
runs-on: ubuntu-latest
needs: udmi # Access to UDMI-REFLECTOR is mutually exclusive
timeout-minutes: 10
steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -97,8 +98,7 @@ jobs:
udmi:
name: Integration Tests
runs-on: ubuntu-latest
needs: redirect # Access to UDMI-REFLECTOR is mutually exclusive
timeout-minutes: 30
timeout-minutes: 40
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
Expand Down
13 changes: 6 additions & 7 deletions docs/specs/sequences/generated.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ Check that the device correctly handles a broken (non-json) config message.
1. Update config before no interesting status:
* Set `system.min_loglevel` = `100`
1. Wait for no interesting status
1. Wait for clean config/state synced
1. Wait for state synchronized
1. Check that initial stable_config matches last_config
1. Wait for log category `system.config.receive` level `DEBUG`
Expand All @@ -75,15 +74,15 @@ Check that the device MQTT-acknowledges a sent config.
Push endpoint config message to device that results in a connection error.

1. Update config before blobset entry config status is error:
* Add `blobset` = { "blobs": { "_iot_endpoint_config": { "phase": `final`, "content_type": `application/json`, "base64": _endpoint_base64_payload_ } } }
* Add `blobset` = { "blobs": { "_iot_endpoint_config": { "phase": `final`, "content_type": `application/json`, "base64": `endpoint_base64_payload` } } }
1. Wait for blobset entry config status is error

## endpoint_config_connection_success_reconnect

Push endpoint config message to device that results in successful reconnect to the same endpoint.

1. Update config before blobset entry config status is success:
* Add `blobset` = { "blobs": { "_iot_endpoint_config": { "phase": `final`, "content_type": `application/json`, "base64": _endpoint_base64_payload_, "nonce": _endpoint_nonce_ } } }
* Add `blobset` = { "blobs": { "_iot_endpoint_config": { "phase": `final`, "content_type": `application/json`, "base64": `endpoint_base64_payload`, "nonce": `endpoint_nonce` } } }
1. Wait for blobset entry config status is success

## extra_config
Expand Down Expand Up @@ -114,14 +113,14 @@ Check that the device correctly handles an extra out-of-schema field
* Add `discovery` = { "families": { } }
1. Wait for all scans not active
1. Update config before scan iterations:
* Add `discovery.families.virtual` = { "generation": _family generation_, "scan_interval_sec": `10`, "enumerate": `true` }
* Add `discovery.families.virtual` = { "generation": `family generation`, "scan_interval_sec": `10`, "enumerate": `true` }
1. Wait for scan iterations

## self_enumeration

1. Wait for enumeration not active
1. Update config before enumeration generation:
* Add `discovery` = { "enumeration": { "generation": _generation start time_ } }
* Add `discovery` = { "enumeration": { "generation": `generation start time` } }
1. Wait for enumeration generation
1. Wait for enumeration still not active

Expand All @@ -131,7 +130,7 @@ Check that the device correctly handles an extra out-of-schema field
* Add `discovery` = { "families": { } }
1. Wait for all scans not active
1. Update config before scheduled scan start:
* Add `discovery.families.virtual` = { "generation": _family generation_, "enumerate": `true` }
* Add `discovery.families.virtual` = { "generation": `family generation`, "enumerate": `true` }
1. Wait for scheduled scan start
1. Wait for scan activation
1. Wait for scan completed
Expand All @@ -155,7 +154,7 @@ Check that the min log-level config is honored by the device.

## valid_serial_no

1. Check that received serial no matches
1. Wait for received serial no matches

## writeback_failure_state

Expand Down
23 changes: 15 additions & 8 deletions pubber/src/main/java/daq/pubber/Pubber.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ private void initializeDevice() {
deviceState.system.hardware = null;
}

markStateDirty(0);
markStateDirty();
}

private void initializePersistentStore() {
Expand All @@ -423,13 +423,10 @@ private File getPersistentStore() {
new File(siteModel.getDeviceWorkingDir(deviceId), PERSISTENT_STORE_FILE);
}

private void publishDirtyState() {
if (stateDirty.get()) {
System.err.println("Publishing dirty state block");
markStateDirty(0);
}
private void markStateDirty() {
markStateDirty(0);
}

private void markStateDirty(long delayMs) {
stateDirty.set(true);
if (delayMs >= 0) {
Expand All @@ -441,6 +438,13 @@ private void markStateDirty(long delayMs) {
}
}

private void publishDirtyState() {
if (stateDirty.get()) {
debug("Publishing dirty state block");
markStateDirty(0);
}
}

private void pullDeviceMessage() {
while (true) {
try {
Expand Down Expand Up @@ -579,12 +583,15 @@ private void maybeRestartSystem() {
if (systemConfig == null) {
return;
}
debug("maybeRestartSystem " + deviceState.system.mode + " " + systemConfig.mode
+ " " + persistentData.restart_count);
if (SystemMode.ACTIVE.equals(deviceState.system.mode)
&& SystemMode.RESTART.equals(systemConfig.mode)) {
restartSystem(true);
}
if (SystemMode.ACTIVE.equals(systemConfig.mode)) {
deviceState.system.mode = SystemMode.ACTIVE;
markStateDirty();
}
if (systemConfig.last_start != null && DEVICE_START_TIME.before(systemConfig.last_start)) {
System.err.printf("Device start time %s before last config start %s, terminating.",
Expand Down Expand Up @@ -906,7 +913,7 @@ private void removeBlobsetBlobState(SystemBlobsets blobId) {
if (deviceState.blobset.blobs.isEmpty()) {
deviceState.blobset = null;
}
markStateDirty(0);
markStateDirty();
}

private void maybeRedirectEndpoint() {
Expand Down
2 changes: 1 addition & 1 deletion validator/sequences/periodic_scan/sequence.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
* Add `discovery` = { "families": { } }
1. Wait for all scans not active
1. Update config before scan iterations:
* Add `discovery.families.virtual` = { "generation": _family generation_, "scan_interval_sec": `10`, "enumerate": `true` }
* Add `discovery.families.virtual` = { "generation": `family generation`, "scan_interval_sec": `10`, "enumerate": `true` }
1. Wait for scan iterations
2 changes: 1 addition & 1 deletion validator/sequences/self_enumeration/sequence.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@

1. Wait for enumeration not active
1. Update config before enumeration generation:
* Add `discovery` = { "enumeration": { "generation": _generation start time_ } }
* Add `discovery` = { "enumeration": { "generation": `generation start time` } }
1. Wait for enumeration generation
1. Wait for enumeration still not active
2 changes: 1 addition & 1 deletion validator/sequences/valid_serial_no/sequence.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

## valid_serial_no

1. Check that received serial no matches
1. Wait for received serial no matches
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public abstract class SequenceBase {
public static final String SEQUENCER_CATEGORY = "sequencer";
public static final String EVENT_PREFIX = "event_";
public static final String SYSTEM_EVENT_MESSAGE_BASE = "event_system";
public static final int CONFIG_UPDATE_DELAY_MS = 2000;
public static final int NORM_TIMEOUT_MS = 120 * 1000;
public static final int CONFIG_UPDATE_DELAY_MS = 8 * 1000;
public static final int NORM_TIMEOUT_MS = 180 * 1000;
private static final String EMPTY_MESSAGE = "{}";
private static final String CLOUD_IOT_CONFIG_FILE = "cloud_iot_config.json";
private static final String RESULT_LOG_FILE = "RESULT.log";
Expand Down Expand Up @@ -401,7 +401,7 @@ private Config readGeneratedConfig() {
public void setUp() {
// Old messages can sometimes take a while to clear out, so need some delay for stability.
// TODO: Minimize time, or better yet find deterministic way to flush messages.
safeSleep(CLEAN_START_DELAY_MS);
safeSleep(CONFIG_UPDATE_DELAY_MS);

deviceState = new State();
configAcked = false;
Expand All @@ -417,7 +417,7 @@ public void setUp() {
clearLogs();
queryState();

syncConfig();
updateConfig();

untilTrue("device state update", () -> deviceState != null);
recordSequence = true;
Expand All @@ -426,37 +426,32 @@ public void setUp() {
protected void resetConfig() {
recordSequence("Force reset config");
withRecordSequence(false, () -> {
recordSequence = false;
debug("Starting reset_config");
resetDeviceConfig(true);
extraField = "reset_config";
deviceConfig.system.testing.sequence_name = extraField;
waitForConfigSync();
updateConfig();
clearLogs();
extraField = null;
resetDeviceConfig();
waitForConfigSync();
updateConfig();
debug("Done with reset_config");
});
}

private void waitForConfigSync() {
untilTrue("device config sync", this::configUpdateComplete);
}

private Date syncConfig() {
updateConfig();
waitForConfigSync();
debug("config synced to " + JsonUtil.getTimestamp(deviceConfig.timestamp));
return CleanDateFormat.cleanDate(deviceConfig.timestamp);
// TODO: Cleanup delay, which is a workaround for cloud-based race-conditions.
info("waiting for config sync...");
safeSleep(CONFIG_UPDATE_DELAY_MS);
messageEvaluateLoop(this::configNotReady);
}

@Test
public void valid_serial_no() {
if (serialNo == null) {
throw new SkipTest("No test serial number provided");
}
checkThat("received serial no matches", () -> serialNo.equals(lastSerialNo));
untilTrue("received serial no matches", () -> serialNo.equals(lastSerialNo));
}

private void recordResult(String result, String methodName, String message) {
Expand Down Expand Up @@ -626,10 +621,12 @@ protected void updateConfig(String reason) {
updateConfig(SubFolder.LOCALNET, deviceConfig.localnet);
updateConfig(SubFolder.BLOBSET, deviceConfig.blobset);
updateConfig(SubFolder.DISCOVERY, deviceConfig.discovery);
localConfigChange(reason);
if (localConfigChange(reason)) {
waitForConfigSync();
}
}

private void updateConfig(SubFolder subBlock, Object data) {
private boolean updateConfig(SubFolder subBlock, Object data) {
try {
String messageData = stringify(data);
String sentBlockConfig = sentConfig.computeIfAbsent(subBlock, key -> "null");
Expand All @@ -642,9 +639,8 @@ private void updateConfig(SubFolder subBlock, Object data) {
debug(String.format("update %s_%s", "config", subBlock));
recordRawMessage(tracedObject, LOCAL_PREFIX + subBlock.value());
sentConfig.put(subBlock, messageData);
// Delay so the backend can process the update before others arrive.
Thread.sleep(CONFIG_UPDATE_DELAY_MS);
}
return updated;
} catch (Exception e) {
throw new RuntimeException("While updating config block " + subBlock, e);
}
Expand All @@ -664,19 +660,20 @@ private Object augmentConfigTrace(Object data) {
}
}

private void localConfigChange(String reason) {
private boolean localConfigChange(String reason) {
try {
String suffix = reason == null ? "" : (" " + reason);
String header = String.format("Update config%s:", suffix);
debug(header + " " + getTimestamp(deviceConfig.timestamp));
recordRawMessage(deviceConfig, LOCAL_CONFIG_UPDATE);
List<String> configUpdates = configDiffEngine.computeChanges(deviceConfig);
if (configUpdates.isEmpty()) {
return;
return false;
}
recordSequence(header);
configUpdates.forEach(this::recordBullet);
sequenceMd.flush();
return true;
} catch (Exception e) {
throw new RuntimeException("While recording device config", e);
}
Expand Down Expand Up @@ -785,6 +782,12 @@ private void untilLoop(Supplier<Boolean> evaluator, String description) {
info(String.format("start %s after %s", waitingCondition, timeSinceStart()));
updateConfig("before " + description);
recordSequence("Wait for " + description);
messageEvaluateLoop(evaluator);
info(String.format("finished %s after %s", waitingCondition, timeSinceStart()));
waitingCondition = "nothing";
}

private void messageEvaluateLoop(Supplier<Boolean> evaluator) {
while (evaluator.get()) {
processMessage();
}
Expand Down Expand Up @@ -959,17 +962,20 @@ private void handleEventMessage(SubFolder subFolder, Map<String, Object> message
}
}

protected boolean configUpdateComplete() {
private boolean configNotReady() {
Object receivedConfig = receivedUpdates.get("config");
if (!(receivedConfig instanceof Config)) {
return false;
trace("no valid received config");
return true;
}
List<String> differences = configDiffEngine.diff(deviceConfig,
sanitizeConfig((Config) receivedConfig));
if (traceLogLevel() && !differences.isEmpty()) {
List<String> differences = configDiffEngine.diff(
sanitizeConfig((Config) receivedConfig), deviceConfig);
boolean configNotReady = !differences.isEmpty();
trace("testing valid received config " + configNotReady);
if (traceLogLevel() && configNotReady) {
trace("\n+- " + Joiner.on("\n+- ").join(differences));
}
return differences.isEmpty();
return configNotReady;
}

protected void trace(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,28 @@ static String getDescription(Object target) {
*/
String getDescription();

/**
* Convert the potentially semantic object to just the value component.
*
* @param target input string to actualize
* @return actual string
*/
static Object getValue(Object target) {
return target instanceof String ? stringValue((String) target) : target;
}

/**
* Convert the potentially semantic string to just the value of the string.
*
* @param target target string to convert
* @return value part of the string
*/
static String stringValue(String target) {
return isSemanticString(target)
? target.substring(target.indexOf(AFTER_MARKER) + AFTER_MARKER.length())
: target;
}

/**
* Remove the description tag from any embedded strings, returning the actual string to be used
* (w/o semantic descriptions).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void device_config_acked() {
public void broken_config() {
deviceConfig.system.min_loglevel = Level.DEBUG.value();
untilFalse("no interesting status", this::hasInterestingStatus);
untilTrue("clean config/state synced", this::configUpdateComplete);
updateConfig();
Date stableConfig = deviceConfig.timestamp;
info("initial stable_config " + getTimestamp(stableConfig));
untilTrue("state synchronized", () -> dateEquals(stableConfig, deviceState.system.last_config));
Expand Down
Loading

0 comments on commit 1b5a350

Please sign in to comment.