Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase test stability and cleanup some logging/debug #478

Merged
merged 15 commits into from
Oct 11, 2022
30 changes: 20 additions & 10 deletions dashboard/functions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ function process_state_update(attributes, msgObject) {

const commandFolder = `devices/${deviceId}/${STATE_TYPE}/${UPDATE_FOLDER}`;
promises.push(sendCommand(REFLECT_REGISTRY, registryId, commandFolder, msgObject));

attributes.subFolder = UPDATE_FOLDER;
attributes.subType = STATE_TYPE;
promises.push(publishPubsubMessage('udmi_target', attributes, msgObject));
Expand Down Expand Up @@ -330,18 +330,28 @@ exports.udmi_config = functions.pubsub.topic('udmi_config').onPublish((event) =>
return Promise.all(promises);
});

function parse_old_config(oldConfig, resetConfig) {
if (!oldConfig || resetConfig) {
console.warn('Resetting config bock, explicit=' + resetConfig);
return {};
}

function parse_old_config(configStr, resetConfig) {
let config = {};
try {
return JSON.parse(oldConfig);
config = JSON.parse(configStr || "{}");
} catch(e) {
console.warn('Previous config parse error, ignoring update');
return null;
if (!resetConfig) {
console.warn('Previous config parse error without reset, ignoring update');
return null;
}
config = {};
}

if (resetConfig) {
const configLastStart = config.system && config.system.last_start;
console.warn('Resetting config bock', configLastStart);
config = {
system: {
last_start: configLastStart
}
}
}
return config;
}

function update_last_start(config, stateStart) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.google.daq.mqtt.registrar.Registrar.GENERATED_CONFIG_JSON;
import static com.google.daq.mqtt.registrar.Registrar.METADATA_JSON;
import static com.google.daq.mqtt.registrar.Registrar.NORMALIZED_JSON;
import static com.google.daq.mqtt.util.Common.VERSION_PROPERTY_KEY;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonGenerator;
Expand Down Expand Up @@ -309,7 +310,7 @@ private Metadata readMetadataWithValidation(boolean validate) {
final JsonNode instance;
try (InputStream targetStream = new FileInputStream(metadataFile)) {
instance = OBJECT_MAPPER.readTree(targetStream);
baseVersion = instance.get("version");
baseVersion = instance.get(VERSION_PROPERTY_KEY);
new MessageUpgrader("metadata", instance).upgrade();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be MessageUpgrader.METADATA_SCHEMA

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} catch (IOException ioException) {
exceptionMap.put(EXCEPTION_LOADING, ioException);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.google.daq.mqtt.registrar;

import static com.google.daq.mqtt.util.Common.VERSION_PROPERTY_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

Expand Down Expand Up @@ -49,7 +51,7 @@ public void downgrade(JsonNode versionNode) {

downgradeLocalnet();

message.set("version", versionNode);
message.set(VERSION_PROPERTY_KEY, versionNode);
}

private String convertVersion(JsonNode versionNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.daq.mqtt.sequencer.semantic.SemanticValue.actualize;
import static com.google.daq.mqtt.util.JsonUtil.getTimestamp;
import static com.google.daq.mqtt.util.JsonUtil.safeSleep;
import static com.google.daq.mqtt.util.JsonUtil.stringify;
import static java.util.Optional.ofNullable;

Expand Down Expand Up @@ -38,7 +39,6 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -81,7 +81,6 @@ public abstract class SequenceBase {
public static final String SYSTEM_EVENT_MESSAGE_BASE = "event_system";
public static final int CONFIG_UPDATE_DELAY_MS = 2000;
public static final int NORM_TIMEOUT_MS = 120 * 1000;
public static final String LOCAL_CONFIG = "local_config";
private static final String EMPTY_MESSAGE = "{}";
private static final String CLOUD_IOT_CONFIG_FILE = "cloud_iot_config.json";
private static final String RESULT_LOG_FILE = "RESULT.log";
Expand All @@ -103,10 +102,12 @@ public abstract class SequenceBase {
private static final Map<String, AtomicInteger> UPDATE_COUNTS = new HashMap<>();
private static final long LOG_CLEAR_TIME_MS = 1000;
private static final String LOCAL_PREFIX = "local_";
private static final String LOCAL_CONFIG_UPDATE = LOCAL_PREFIX + "update";
private static final String SEQUENCER_LOG = "sequencer.log";
private static final String SYSTEM_LOG = "system.log";
private static final String SEQUENCE_MD = "sequence.md";
private static final String CONFIG_NONCE_KEY = "debug_config_nonce";
private static final long CLEAN_START_DELAY_MS = 20 * 1000;
protected static Metadata deviceMetadata;
protected static String projectId;
protected static String deviceId;
Expand Down Expand Up @@ -392,6 +393,9 @@ private Config readGeneratedConfig() {
*/
@Before
public void setUp() {
// Old messages can sometimes take a while to clear out, so need some delay for stability.
safeSleep(CLEAN_START_DELAY_MS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a TODO to minimize this time or change the strategy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


deviceState = new State();
configAcked = false;
receivedState.clear();
Expand Down Expand Up @@ -502,7 +506,7 @@ private void recordRawMessage(Map<String, Object> message, String messageBase) {
try {
JsonUtil.OBJECT_MAPPER.writeValue(messageFile, message);
boolean traceMessage =
traceLogLevel() || (debugLogLevel() && messageBase.equals(LOCAL_CONFIG));
traceLogLevel() || (debugLogLevel() && messageBase.equals(LOCAL_CONFIG_UPDATE));
String postfix =
traceMessage ? (message == null ? ": (null)" : ":\n" + stringify(message)) : "";
if (messageBase.equals(SYSTEM_EVENT_MESSAGE_BASE)) {
Expand Down Expand Up @@ -629,17 +633,12 @@ private void updateConfig(SubFolder subBlock, Object data) {
@SuppressWarnings("unchecked")
private Object augmentConfigTrace(Object data) {
try {
if (data == null) {
return null;
}
if (traceLogLevel()) {
String messageData = stringify(data);
Map<String, Long> map = JsonUtil.OBJECT_MAPPER.readValue(messageData, Map.class);
map.put(CONFIG_NONCE_KEY, System.currentTimeMillis());
return map;
} else {
if (data == null || !traceLogLevel()) {
return data;
}
Map<String, Object> map = JsonUtil.convertTo(Map.class, data);
map.put(CONFIG_NONCE_KEY, System.currentTimeMillis());
return map;
} catch (Exception e) {
throw new RuntimeException("While augmenting data message", e);
}
Expand All @@ -650,7 +649,7 @@ private void localConfigChange(String reason) {
String suffix = reason == null ? "" : (" " + reason);
String header = String.format("Update config%s:", suffix);
debug(header + " " + getTimestamp(deviceConfig.timestamp));
recordRawMessage(deviceConfig, LOCAL_PREFIX + "config");
recordRawMessage(deviceConfig, LOCAL_CONFIG_UPDATE);
List<String> configUpdates = configDiffEngine.computeChanges(deviceConfig);
if (configUpdates.isEmpty()) {
return;
Expand All @@ -676,27 +675,6 @@ private AugmentedSystemConfig augmentConfig(SystemConfig system) {
}
}

private <T> boolean updateState(SubFolder subFolder, SubFolder expected, Class<T> target,
Map<String, Object> message, Consumer<T> handler) {
try {
if (!expected.equals(subFolder)) {
return false;
}
message.remove("timestamp");
message.remove("version");
String messageString = stringify(message);
boolean updated = !messageString.equals(receivedState.get(subFolder));
if (updated) {
debug(String.format("updating %s state", subFolder));
T state = JsonUtil.OBJECT_MAPPER.readValue(messageString, target);
handler.accept(state);
}
return updated;
} catch (Exception e) {
throw new RuntimeException("While converting state type " + subFolder, e);
}
}

protected boolean validSerialNo() {
String deviceSerial = deviceState == null ? null
: deviceState.system == null ? null : deviceState.system.serial_no;
Expand Down Expand Up @@ -745,7 +723,7 @@ protected void checkThat(String description, Supplier<Boolean> condition) {

protected List<Map<String, Object>> clearLogs() {
info("clearing system logs...");
JsonUtil.safeSleep(LOG_CLEAR_TIME_MS);
safeSleep(LOG_CLEAR_TIME_MS);
lastLog = null;
return receivedEvents.remove(SubFolder.SYSTEM);
}
Expand Down Expand Up @@ -905,7 +883,7 @@ private synchronized void handleReflectorMessage(String subFolderRaw,
}
Config config = (Config) converted;
updateDeviceConfig(config);
info("Updated config with timestamp " + JsonUtil.getTimestamp(config.timestamp));
debug("Updated config with timestamp " + JsonUtil.getTimestamp(config.timestamp));
debug(String.format("Updated config #%03d:\n%s", updateCount,
stringify(converted)));
} else if (converted instanceof AugmentedState) {
Expand All @@ -914,7 +892,7 @@ private synchronized void handleReflectorMessage(String subFolderRaw,
deviceState = (State) converted;
updateConfigAcked((AugmentedState) converted);
validSerialNo();
info("Updated state has last_config " + JsonUtil.getTimestamp(
debug("Updated state has last_config " + JsonUtil.getTimestamp(
deviceState.system.last_config));
} else {
error("Unknown update type " + converted.getClass().getSimpleName());
Expand Down Expand Up @@ -970,7 +948,7 @@ protected boolean configUpdateComplete() {
List<String> differences = configDiffEngine.diff(deviceConfig,
sanitizeConfig((Config) receivedConfig));
if (traceLogLevel() && !differences.isEmpty()) {
trace("+- " + Joiner.on("\n+- ").join(differences));
trace("\n+- " + Joiner.on("\n+- ").join(differences));
}
return differences.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ public void self_enumeration() {
() -> deviceState.discovery.enumeration.generation.equals(startTime)
);
untilUntrue("enumeration still not active", () -> deviceState.discovery.enumeration.active);
List<DiscoveryEvent> events = getReceivedEvents(DiscoveryEvent.class);
assertTrue("a few events received", events.size() >= 1 && events.size() <= 2);
DiscoveryEvent discoveryEvent = events.get(0);
info("Received discovery generation " + JsonUtil.getTimestamp(discoveryEvent.generation));
assertEquals("matching event generation", startTime, discoveryEvent.generation);
int discoveredPoints = discoveryEvent.uniqs == null ? 0 : discoveryEvent.uniqs.size();
List<DiscoveryEvent> allEvents = getReceivedEvents(DiscoveryEvent.class);
// Filter for enumeration events, since there will sometimes be lingering scan events.
List<DiscoveryEvent> enumEvents = allEvents.stream().filter(event -> event.scan_id == null)
.collect(Collectors.toList());
assertEquals("a single discovery event received", enumEvents.size(), 1);
DiscoveryEvent event = enumEvents.get(0);
info("Received discovery generation " + JsonUtil.getTimestamp(event.generation));
assertEquals("matching event generation", startTime, event.generation);
int discoveredPoints = event.uniqs == null ? 0 : event.uniqs.size();
assertEquals("discovered points count", deviceMetadata.pointset.points.size(),
discoveredPoints);
}
Expand Down
3 changes: 2 additions & 1 deletion validator/src/main/java/com/google/daq/mqtt/util/Common.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
public abstract class Common {

public static final String STATE_QUERY_TOPIC = "query/state";
public static final String TIMESTAMP_ATTRIBUTE = "timestamp";
public static final String TIMESTAMP_PROPERTY_KEY = "timestamp";
public static final String VERSION_PROPERTY_KEY = "version";
public static final String NO_SITE = "--";
public static final String GCP_REFLECT_KEY_PKCS8 = "validator/rsa_private.pkcs8";
private static final String UDMI_VERSION_KEY = "UDMI_VERSION";
Expand Down
14 changes: 13 additions & 1 deletion validator/src/main/java/com/google/daq/mqtt/util/JsonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.io.File;
import java.time.Instant;
import java.util.Date;
import java.util.Map;
import java.util.TreeMap;

/**
* Collection of utilities for working with json things.
Expand Down Expand Up @@ -87,6 +89,17 @@ public static <T> T convertTo(Class<T> targetClass, Object message) {
return message == null ? null : convertTo(targetClass, stringify(message));
}

/**
* Convert the pojo to a mapped representaiton.
*
* @param message input object to convert
* @return object-as-map
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> toMap(Object message) {
return convertTo(TreeMap.class, message);
}

/**
* Convert an object to a json string.
*
Expand All @@ -107,7 +120,6 @@ public static String stringify(Object target) {
* @param clazz class of result
* @param file file to load
* @param <T> type of result
*
* @return loaded object
*/
public static <T> T loadFile(Class<T> clazz, File file) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.google.daq.mqtt.util;

import static com.google.daq.mqtt.util.Common.VERSION_PROPERTY_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -10,9 +12,9 @@
public class MessageUpgrader {

public static final JsonNodeFactory NODE_FACTORY = JsonNodeFactory.instance;
private static final String TARGET_FORMAT = "%d.%d.%d";
public static final String STATE_SCHEMA = "state";
public static final String METADATA_SCHEMA = "metadata";
private static final String TARGET_FORMAT = "%d.%d.%d";
private final JsonNode message;
private final String schemaName;
private final int major;
Expand All @@ -29,7 +31,7 @@ public MessageUpgrader(String schemaName, JsonNode message) {
this.message = message;
this.schemaName = schemaName;

JsonNode version = message.get("version");
JsonNode version = message.get(VERSION_PROPERTY_KEY);
String verStr =
version != null ? version.isNumber() ? Integer.toString(version.asInt()) : version.asText()
: "1";
Expand Down Expand Up @@ -58,8 +60,9 @@ public void upgrade() {
if (patch < 14) {
upgrade_1_3_14();
}
if (message.has("version")) {
((ObjectNode) message).put("version", String.format(TARGET_FORMAT, major, minor, patch));
if (message.has(VERSION_PROPERTY_KEY)) {
((ObjectNode) message).put(VERSION_PROPERTY_KEY,
String.format(TARGET_FORMAT, major, minor, patch));
}
}

Expand Down Expand Up @@ -120,7 +123,7 @@ private void upgradeFirmware(ObjectNode system) {
if (system.has("software")) {
throw new IllegalStateException("Node already has software field");
}
JsonNode version = ((ObjectNode) firmware).remove("version");
JsonNode version = ((ObjectNode) firmware).remove(VERSION_PROPERTY_KEY);
if (version != null) {
ObjectNode softwareNode = new ObjectNode(NODE_FACTORY);
softwareNode.put("firmware", version.asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static com.google.daq.mqtt.util.Common.GCP_REFLECT_KEY_PKCS8;
import static com.google.daq.mqtt.util.Common.NO_SITE;
import static com.google.daq.mqtt.util.Common.STATE_QUERY_TOPIC;
import static com.google.daq.mqtt.util.Common.TIMESTAMP_ATTRIBUTE;
import static com.google.daq.mqtt.util.Common.TIMESTAMP_PROPERTY_KEY;
import static com.google.daq.mqtt.util.Common.removeNextArg;
import static com.google.daq.mqtt.util.ConfigUtil.UDMI_VERSION;
import static com.google.daq.mqtt.util.JsonUtil.JSON_SUFFIX;
Expand Down Expand Up @@ -420,7 +420,7 @@ private void validateMessage(JsonSchema schema, Object message) {

private void sanitizeMessage(String schemaName, Map<String, Object> message) {
if (schemaName.startsWith(CONFIG_PREFIX) || schemaName.startsWith(STATE_PREFIX)) {
message.remove(TIMESTAMP_ATTRIBUTE);
message.remove(TIMESTAMP_PROPERTY_KEY);
}
}

Expand Down