Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Last seen timestamp validator fix #524

Merged
merged 5 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added add
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.daq.mqtt.sequencer.semantic.SemanticValue.actualize;
import static com.google.daq.mqtt.validator.Validator.EXCEPTION_KEY;
import static com.google.daq.mqtt.util.Common.EXCEPTION_KEY;
import static com.google.udmi.util.JsonUtil.getTimestamp;
import static com.google.udmi.util.JsonUtil.safeSleep;
import static com.google.udmi.util.JsonUtil.stringify;
Expand Down
3 changes: 3 additions & 0 deletions validator/src/main/java/com/google/daq/mqtt/util/Common.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ public abstract class Common {
public static final String STATE_QUERY_TOPIC = "query/state";
public static final String TIMESTAMP_PROPERTY_KEY = "timestamp";
public static final String VERSION_PROPERTY_KEY = "version";
public static final String SUBTYPE_PROPERTY_KEY = "subType";
public static final String SUBFOLDER_PROPERTY_KEY = "subFolder";
public static final String NO_SITE = "--";
public static final String GCP_REFLECT_KEY_PKCS8 = "validator/rsa_private.pkcs8";
public static final String EXCEPTION_KEY = "exception";
private static final String UDMI_VERSION_KEY = "UDMI_VERSION";

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.google.daq.mqtt.validator;

import static com.google.daq.mqtt.util.Common.SUBFOLDER_PROPERTY_KEY;
import static com.google.daq.mqtt.util.Common.SUBTYPE_PROPERTY_KEY;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -35,6 +38,7 @@ public class MessageReadingClient implements MessagePublisher {
.setSerializationInclusion(Include.NON_NULL);
private static final Pattern filenamePattern = Pattern.compile("[0-9]+_([a-z]+)_([a-z]+)\\.json");
private static final String TRACE_FILE_SUFFIX = ".json";
public static final String MSG_SOURCE = "msgSource";
private final File messageDir;
private final String registryId;
private final Map<String, List<String>> deviceMessageLists = new HashMap<>();
Expand Down Expand Up @@ -107,15 +111,16 @@ private Map<String, Object> getMessageObject(String deviceId, String msgName) {
private Map<String, String> makeAttributes(String deviceId, String msgName) {
try {
Map<String, String> attributes = new HashMap<>();
attributes.put(MSG_SOURCE, msgName);
attributes.put("deviceId", deviceId);
attributes.put("deviceNumId", getNumId(deviceId));
attributes.put("projectId", PLAYBACK_PROJECT_ID);
attributes.put("deviceRegistryId", registryId);

Matcher matcher = filenamePattern.matcher(msgName);
if (matcher.matches()) {
attributes.put("subType", matcher.group(1));
attributes.put("subFolder", matcher.group(2));
attributes.put(SUBTYPE_PROPERTY_KEY, matcher.group(1));
attributes.put(SUBFOLDER_PROPERTY_KEY, matcher.group(2));
} else {
throw new RuntimeException("Malformed filename " + msgName);
}
Expand Down Expand Up @@ -170,7 +175,8 @@ public Validator.MessageBundle takeNextMessage() {
lastValidTimestamp = deviceNextTimestamp.remove(deviceId);
deviceLastTimestamp.put(deviceId, lastValidTimestamp);
prepNextMessage(deviceId);
System.out.printf("Replay %s for %s%n", lastValidTimestamp, deviceId);
String messageName = attributes.get(MSG_SOURCE);
System.out.printf("Replay %s %s for %s%n", messageName, lastValidTimestamp, deviceId);
messageCount++;
MessageBundle bundle = new MessageBundle();
bundle.message = message;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.google.daq.mqtt.validator;

import static com.google.daq.mqtt.util.Common.SUBFOLDER_PROPERTY_KEY;
import static com.google.daq.mqtt.util.Common.SUBTYPE_PROPERTY_KEY;
import static org.junit.Assert.assertTrue;

import com.google.common.base.Joiner;
Expand Down Expand Up @@ -181,7 +183,6 @@ public boolean hasErrors() {
* @param attributes message attributes
*/
public void validateMessageType(Object message, Date timestamp, Map<String, String> attributes) {
lastSeen = (timestamp != null && timestamp.after(lastSeen)) ? timestamp : lastSeen;
if (reportingPointset == null) {
return;
}
Expand All @@ -207,6 +208,15 @@ public void validateMessageType(Object message, Date timestamp, Map<String, Stri
}
}

/**
* Update the last seen timestamp for this device.
*
* @param timestamp timestamp for last seen update
*/
public void updateLastSeen(Date timestamp) {
lastSeen = (timestamp != null && timestamp.after(lastSeen)) ? timestamp : lastSeen;
}

private Exception pointValidationError(String description, Set<String> points) {
return new ValidationException(
String.format("Device has %s: %s", description, Joiner.on(", ").join(points)));
Expand All @@ -228,8 +238,8 @@ private void addEntry(Entry entry) {
* @param category error category
*/
void addError(Exception error, Map<String, String> attributes, String category) {
String subFolder = attributes.get("subFolder");
String subType = attributes.get("subType");
String subFolder = attributes.get(SUBFOLDER_PROPERTY_KEY);
String subType = attributes.get(SUBTYPE_PROPERTY_KEY);
addError(error, category,
String.format("%s_%s: %s", subType, subFolder, getExceptionDetail(error)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static com.google.daq.mqtt.util.Common.GCP_REFLECT_KEY_PKCS8;
import static com.google.daq.mqtt.util.Common.NO_SITE;
import static com.google.daq.mqtt.util.Common.STATE_QUERY_TOPIC;
import static com.google.daq.mqtt.util.Common.SUBFOLDER_PROPERTY_KEY;
import static com.google.daq.mqtt.util.Common.SUBTYPE_PROPERTY_KEY;
import static com.google.daq.mqtt.util.Common.TIMESTAMP_PROPERTY_KEY;
import static com.google.daq.mqtt.util.Common.VERSION_PROPERTY_KEY;
import static com.google.daq.mqtt.util.Common.removeNextArg;
Expand All @@ -28,6 +30,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.daq.mqtt.util.CloudIotManager;
import com.google.daq.mqtt.util.Common;
import com.google.daq.mqtt.util.ConfigUtil;
import com.google.daq.mqtt.util.ExceptionMap;
import com.google.daq.mqtt.util.ExceptionMap.ErrorTree;
Expand Down Expand Up @@ -57,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.MissingFormatArgumentException;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
Expand Down Expand Up @@ -110,13 +114,14 @@ public class Validator {
EVENT_POINTSET, PointsetEvent.class,
STATE_POINTSET, PointsetState.class
);
private static final Set<SubType> LAST_SEEN_SUBTYPES = ImmutableSet.of(SubType.EVENT,
SubType.STATE);
private static final long REPORT_INTERVAL_SEC = 15;
private static final String EXCLUDE_DEVICE_PREFIX = "_";
private static final String VALIDATION_REPORT_DEVICE = "_validator";
private static final String VALIDATION_EVENT_TOPIC = "validation/event";
private static final String VALIDATION_STATE_TOPIC = "validation/state";
private static final String POINTSET_SUBFOLDER = "pointset";
public static final String EXCEPTION_KEY = "exception";
private final Map<String, ReportingDevice> expectedDevices = new TreeMap<>();
private final Set<String> extraDevices = new TreeSet<>();
private final Set<String> processedDevices = new TreeSet<>();
Expand Down Expand Up @@ -449,7 +454,6 @@ private ReportingDevice validateUpdate(
Map<String, Object> message,
Map<String, String> attributes) {


String deviceId = attributes.get("deviceId");
ReportingDevice device = expectedDevices.computeIfAbsent(deviceId, ReportingDevice::new);
device.clearMessageEntries();
Expand All @@ -468,8 +472,8 @@ private ReportingDevice validateUpdate(
base64Devices.add(deviceId);
}

if (message.containsKey(EXCEPTION_KEY)) {
device.addError((Exception) message.get(EXCEPTION_KEY), attributes,
if (message.containsKey(Common.EXCEPTION_KEY)) {
device.addError((Exception) message.get(Common.EXCEPTION_KEY), attributes,
Category.VALIDATION_DEVICE_RECEIVE);
return device;
}
Expand All @@ -478,6 +482,13 @@ private ReportingDevice validateUpdate(
upgradeMessage(schemaName, message);
prepareDeviceOutDir(message, attributes, deviceId, schemaName);

String timeString = (String) message.get(TIMESTAMP_PROPERTY_KEY);
String subTypeRaw = Optional.ofNullable(attributes.get(SUBTYPE_PROPERTY_KEY))
.orElse(UNKNOWN_TYPE_DEFAULT);
if (timeString != null && LAST_SEEN_SUBTYPES.contains(SubType.fromValue(subTypeRaw))) {
device.updateLastSeen(Date.from(Instant.parse(timeString)));
}

try {
if (!schemaMap.containsKey(schemaName)) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -511,8 +522,7 @@ private ReportingDevice validateUpdate(
if (CONTENT_VALIDATORS.containsKey(schemaName)) {
Class<?> targetClass = CONTENT_VALIDATORS.get(schemaName);
Object messageObject = OBJECT_MAPPER.convertValue(message, targetClass);
Date timestamp = JsonUtil.getDate((String) message.get("timestamp"));
device.validateMessageType(messageObject, timestamp, attributes);
device.validateMessageType(messageObject, JsonUtil.getDate(timeString), attributes);
}
} catch (Exception e) {
System.err.println("Error validating contents: " + e.getMessage());
Expand All @@ -538,11 +548,10 @@ private void sendValidationResult(Map<String, String> origAttributes,
ValidationEvent event = new ValidationEvent();
event.version = UDMI_VERSION;
event.timestamp = new Date();
String subFolder = origAttributes.get("subFolder");
String subFolder = origAttributes.get(SUBFOLDER_PROPERTY_KEY);
event.sub_folder = subFolder;
event.sub_type = origAttributes.getOrDefault("subType", UNKNOWN_TYPE_DEFAULT);
event.sub_type = origAttributes.getOrDefault(SUBTYPE_PROPERTY_KEY, UNKNOWN_TYPE_DEFAULT);
event.status = ReportingDevice.getSummaryEntry(reportingDevice.getMessageEntries());

event.errors = reportingDevice.getErrors(now);
if (POINTSET_SUBFOLDER.equals(subFolder)) {
PointsetSummary pointsSummary = new PointsetSummary();
Expand Down Expand Up @@ -575,8 +584,8 @@ private void sendValidationMessage(String deviceId, Object message, String topic

private void writeMessageCapture(Map<String, Object> message, Map<String, String> attributes) {
String deviceId = attributes.get("deviceId");
String type = attributes.getOrDefault("subType", UNKNOWN_TYPE_DEFAULT);
String folder = attributes.get("subFolder");
String type = attributes.getOrDefault(SUBTYPE_PROPERTY_KEY, UNKNOWN_TYPE_DEFAULT);
String folder = attributes.get(SUBFOLDER_PROPERTY_KEY);
AtomicInteger messageIndex = deviceMessageIndex.computeIfAbsent(deviceId,
key -> new AtomicInteger());
int index = messageIndex.incrementAndGet();
Expand Down Expand Up @@ -612,8 +621,8 @@ private boolean shouldConsiderMessage(Map<String, String> attributes) {
return false;
}

String subType = attributes.get("subType");
String subFolder = attributes.get("subFolder");
String subType = attributes.get(SUBTYPE_PROPERTY_KEY);
String subFolder = attributes.get(SUBFOLDER_PROPERTY_KEY);
String category = attributes.get("category");
boolean isInteresting = subType == null
|| INTERESTING_TYPES.contains(subType)
Expand Down Expand Up @@ -644,8 +653,8 @@ private File makeDeviceDir(String deviceId) {
}

private String messageSchema(Map<String, String> attributes) {
String subFolder = attributes.get("subFolder");
String subType = attributes.get("subType");
String subFolder = attributes.get(SUBFOLDER_PROPERTY_KEY);
String subType = attributes.get(SUBTYPE_PROPERTY_KEY);

if (SubFolder.UPDATE.value().equals(subFolder)) {
return subType;
Expand Down Expand Up @@ -892,7 +901,7 @@ public static class ErrorContainer extends TreeMap<String, Object> {
* @param timestamp timestamp of generating message
*/
public ErrorContainer(Exception exception, String message, String timestamp) {
put(EXCEPTION_KEY, exception);
put(Common.EXCEPTION_KEY, exception);
put("message", message);
put("timestamp", timestamp);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package com.google.daq.mqtt.validator;

import static com.google.daq.mqtt.util.Common.TIMESTAMP_PROPERTY_KEY;
import static com.google.udmi.util.JsonUtil.getTimestamp;
import static com.google.udmi.util.JsonUtil.safeSleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.api.client.json.Json;
import com.google.common.collect.ImmutableList;
import com.google.daq.mqtt.TestCommon;
import com.google.daq.mqtt.validator.Validator.MessageBundle;
import com.google.udmi.util.GeneralUtils;
import com.google.udmi.util.JsonUtil;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import org.junit.Test;
import udmi.schema.Config;
import udmi.schema.DeviceValidationEvent;
import udmi.schema.DiscoveryEvent;
import udmi.schema.DiscoveryState;
import udmi.schema.PointPointsetEvent;
import udmi.schema.PointsetEvent;
import udmi.schema.PointsetState;
import udmi.schema.PointsetSummary;
import udmi.schema.State;
import udmi.schema.ValidationEvent;
import udmi.schema.ValidationState;

Expand All @@ -26,14 +32,18 @@
public class BasicTest extends TestBase {

private static final String EVENT_SUBTYPE = "event";
private static final String CONFIG_SUBTYPE = "config";
private static final String STATE_SUBTYPE = "state";
private static final String POINTSET_SUBFOLDER = "pointset";
private static final String DISCOVERY_SUBFOLDER = "discovery";
private static final String UPDATE_SUBFOLDER = "update";
private static final List<String> TEST_ARGS = ImmutableList.of(
"-n",
"-p", TestCommon.PROJECT_ID,
"-a", TestCommon.SCHEMA_SPEC,
"-s", TestCommon.SITE_DIR);
private static final String FLUX_READING = "FLUX_READING";
private static final long TWO_SECONDS_MS = 1000 * 2;
private final Validator validator = new Validator(TEST_ARGS).prepForMock();

@Test
Expand Down Expand Up @@ -65,8 +75,8 @@ public void validPointsetEvent() {
assertEquals("No error devices", 1, report.devices.size());
DeviceValidationEvent deviceValidationEvent = report.devices.get(TestCommon.DEVICE_ID);
assertEquals("no report status", null, deviceValidationEvent.status);
String expected = JsonUtil.getTimestamp(messageObject.timestamp);
String lastSeen = JsonUtil.getTimestamp(deviceValidationEvent.last_seen);
String expected = getTimestamp(messageObject.timestamp);
String lastSeen = getTimestamp(deviceValidationEvent.last_seen);
assertEquals("status last_seen", expected, lastSeen);
}

Expand Down Expand Up @@ -117,5 +127,24 @@ public void missingPointsetState() {
assertTrue("Missing correct point", missingPoints.contains(FILTER_ALARM_PRESSURE_STATUS));
}

@Test
public void lastSeenUpdate() {
Validator.MessageBundle eventBundle = getMessageBundle(EVENT_SUBTYPE, DISCOVERY_SUBFOLDER,
new DiscoveryEvent());
validator.validateMessage(eventBundle);

// Add enough of a delay to ensure that a seconds-based timestamp is different.
safeSleep(TWO_SECONDS_MS);
Validator.MessageBundle configBundle = getMessageBundle(CONFIG_SUBTYPE, UPDATE_SUBFOLDER,
new Config());
validator.validateMessage(configBundle);

// Only the event should update the last seen, since config is not from the device.
ValidationState report = getValidationReport();
DeviceValidationEvent deviceValidationEvent = report.devices.get(TestCommon.DEVICE_ID);
Date lastSeen = deviceValidationEvent.last_seen;
Instant parse = Instant.parse((String) eventBundle.message.get(TIMESTAMP_PROPERTY_KEY));
assertEquals("device last seen", getTimestamp(Date.from(parse)), getTimestamp(lastSeen));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public void simpleTraceReport() {
OutputBundle lastBundle = outputMessages.get(outputMessages.size() - 1);
ValidationState finalReport = asValidationState(lastBundle.message);
try {
assertEquals("correct devices", 1, finalReport.summary.correct_devices.size());
assertEquals("correct devices", 2, finalReport.summary.correct_devices.size());
assertEquals("extra devices", 0, finalReport.summary.extra_devices.size());
assertEquals("missing devices", 1, finalReport.summary.missing_devices.size());
assertEquals("missing devices", 0, finalReport.summary.missing_devices.size());
assertEquals("error devices", 2, finalReport.summary.error_devices.size());
assertEquals("device summaries", 3, finalReport.devices.size());
assertEquals("device summaries", 4, finalReport.devices.size());
assertNull("no AHU-1 status", finalReport.devices.get("AHU-1").status);
assertEquals("AHU-22 status", Category.VALIDATION_DEVICE_SCHEMA,
finalReport.devices.get("AHU-22").status.category);
Expand Down
1 change: 1 addition & 0 deletions validator/traces/simple.in/SNS-4/001_event_pointset.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"present_value" : "yes"
}
},
"monkey": true,
"timestamp" : "2022-07-19T05:04:26Z",
"version" : "1.3.14"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"subFolder" : "pointset",
"deviceRegistryId" : "ZZ-TRI-FECTA",
"subType" : "event",
"msgSource" : "003_event_pointset.json",
"deviceId" : "AHU-1",
"projectId" : "playback-project"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"subFolder" : "system",
"deviceRegistryId" : "ZZ-TRI-FECTA",
"subType" : "event",
"msgSource" : "002_event_system.json",
"deviceId" : "AHU-1",
"projectId" : "playback-project"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"subFolder" : "pointset",
"deviceRegistryId" : "ZZ-TRI-FECTA",
"subType" : "event",
"msgSource" : "001_event_pointset.json",
"deviceId" : "AHU-22",
"projectId" : "playback-project"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"subFolder" : "system",
"deviceRegistryId" : "ZZ-TRI-FECTA",
"subType" : "event",
"msgSource" : "003_event_system.json",
"deviceId" : "AHU-22",
"projectId" : "playback-project"
}
Loading