Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for Pubber startup errors #772

Merged
merged 17 commits into from
Nov 21, 2023
2 changes: 2 additions & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ jobs:
java-version: '17'
- name: base setup
run: bin/run_tests install_dependencies
- name: stagger startup
run: sleep $(($MATRIX_SHARD_INDEX * 20 + 20))
- name: registrar clean
run: bin/test_regclean $TARGET_PROJECT
- name: sequence tests clean
Expand Down
4 changes: 4 additions & 0 deletions bin/test_validator
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ fi
project_id=$1
shift

echo "export TARGET_PROJECT=$project_id"
echo "export UDMI_REGISTRY_SUFFIX=$UDMI_REGISTRY_SUFFIX"
echo "export UDMI_ALT_REGISTRY=$UDMI_ALT_REGISTRY"

[[ -n $GITHUB_RUN_NUMBER ]] && echo "Workflow run number $GITHUB_RUN_NUMBER" || true
echo 'Using target project:' $project_id

Expand Down
52 changes: 26 additions & 26 deletions etc/validator.out
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ sites/udmi_site_model/out/devices/AHU-22/event_pointset.json
sites/udmi_site_model/out/devices/AHU-22/event_system.json
sites/udmi_site_model/out/devices/AHU-22/persistent_data.json
sites/udmi_site_model/out/devices/AHU-22/state.json
sites/udmi_site_model/out/devices/AHU-22/state_error.json
sites/udmi_site_model/out/devices/AHU-22/state_localnet.json
sites/udmi_site_model/out/devices/AHU-22/state_pointset.json
sites/udmi_site_model/out/devices/AHU-22/state_system.json
Expand Down Expand Up @@ -325,9 +324,9 @@ sites/udmi_site_model/out/devices/AHU-22/state.out
"sub_folder" : "update",
"sub_type" : "state",
"status" : {
"message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold",
"detail" : "state_update: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR",
"category" : "validation.device.schema",
"message" : "Multiple validation errors",
"detail" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold; Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor; Device has extra points: globulating_globar",
"category" : "validation.device.multiple",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
},
Expand All @@ -337,26 +336,15 @@ sites/udmi_site_model/out/devices/AHU-22/state.out
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
} ]
}
::::::::::::::
sites/udmi_site_model/out/devices/AHU-22/state_error.out
::::::::::::::
{
"timestamp" : "REDACTED_TIMESTAMP",
"version" : "1.4.2",
"sub_folder" : "error",
"sub_type" : "state",
"status" : {
"message" : "Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*@(!*&@!\"; line: 1, column: 2]",
"detail" : "state_error: Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*REDACTED_ERROR",
}, {
"message" : "Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor",
"detail" : "state_update: Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
},
"errors" : [ {
"message" : "Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*@(!*&@!\"; line: 1, column: 2]",
"detail" : "state_error: Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*REDACTED_ERROR",
}, {
"message" : "Device has extra points: globulating_globar",
"detail" : "state_update: Device has extra points: globulating_globar",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
Expand Down Expand Up @@ -394,22 +382,34 @@ sites/udmi_site_model/out/devices/AHU-22/state_pointset.out
"sub_folder" : "pointset",
"sub_type" : "state",
"status" : {
"message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold",
"detail" : "state_pointset: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR",
"category" : "validation.device.schema",
"message" : "Multiple validation errors",
"detail" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold; Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor; Device has extra points: globulating_globar",
"category" : "validation.device.multiple",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
},
"pointset" : {
"missing" : [ ],
"extra" : [ ]
"missing" : [ "filter_alarm_pressure_status", "filter_differential_pressure", "filter_differential_pressure_sensor" ],
"extra" : [ "globulating_globar" ]
},
"errors" : [ {
"message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold",
"detail" : "state_pointset: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
}, {
"message" : "Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor",
"detail" : "state_pointset: Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
}, {
"message" : "Device has extra points: globulating_globar",
"detail" : "state_pointset: Device has extra points: globulating_globar",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
} ]
}
::::::::::::::
Expand Down
28 changes: 17 additions & 11 deletions pubber/src/main/java/daq/pubber/ListPublisher.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package daq.pubber;

import static com.google.udmi.util.GeneralUtils.ifNotNullThen;

import com.google.api.client.util.ArrayMap;
import com.google.udmi.util.JsonUtil;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import udmi.schema.Config;
import udmi.schema.PubberConfiguration;
import udmi.schema.SystemConfig;

/**
* Publishes message to an in-memory list.
Expand All @@ -19,6 +26,7 @@ public class ListPublisher implements Publisher {
private final PubberConfiguration configuration;
private List<String> messages = new ArrayList<>();
private String usePrefix;
private final Map<String, Entry<Consumer<Object>, Class<Object>>> handlers = new HashMap<>();

public ListPublisher(PubberConfiguration configuration, Consumer<Exception> onError) {
this.configuration = configuration;
Expand Down Expand Up @@ -48,31 +56,29 @@ public void setDeviceTopicPrefix(String deviceId, String topicPrefix) {
@Override
public <T> void registerHandler(String deviceId, String topicSuffix,
Consumer<T> handler, Class<T> messageType) {

Consumer<Object> foo = (Consumer<Object>) handler;
Class<Object> clazz = (Class<Object>) messageType;
handlers.put(topicSuffix, new SimpleEntry<>(foo, clazz));
}

@Override
public void connect(String deviceId) {

public void connect(String deviceId, boolean clean) {
Consumer<Object> handler = handlers.get("config").getKey();
handler.accept(new Config());
}

@Override
public void publish(String deviceId, String topicSuffix, Object message, Runnable callback) {
String useTopic = usePrefix + "/" + topicSuffix;
messages.add(getMessageString(deviceId, useTopic, message));
publisherExecutor.submit(callback);
ifNotNullThen(callback, () -> publisherExecutor.submit(callback));
}

@Override
public boolean isActive() {
return false;
}

@Override
public void startupLatchWait(CountDownLatch configLatch, String message) {

}

@Override
public void close() {

Expand Down
9 changes: 2 additions & 7 deletions pubber/src/main/java/daq/pubber/MqttDevice.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package daq.pubber;

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import udmi.schema.PubberConfiguration;

Expand Down Expand Up @@ -43,11 +42,11 @@ public <T> void registerHandler(String topicSuffix, Consumer<T> handler, Class<T
}

public void connect() {
publisher.connect(deviceId);
publisher.connect(deviceId, true);
}

public void connect(String targetId) {
publisher.connect(targetId);
publisher.connect(targetId, false);
}

public void publish(String deviceId, String topicSuffix, Object message, Runnable callback) {
Expand All @@ -58,10 +57,6 @@ public boolean isActive() {
return publisher.isActive();
}

public void startupLatchWait(CountDownLatch configLatch, String message) {
publisher.startupLatchWait(configLatch, message);
}

public void close() {
publisher.close();
}
Expand Down
45 changes: 19 additions & 26 deletions pubber/src/main/java/daq/pubber/MqttPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.udmi.util.GeneralUtils.ifTrueGet;
import static com.google.udmi.util.GeneralUtils.ifTrueThen;
import static com.google.udmi.util.GeneralUtils.isTrue;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;
Expand Down Expand Up @@ -38,7 +39,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.http.ConnectionClosedException;
Expand Down Expand Up @@ -81,10 +81,11 @@ public class MqttPublisher implements Publisher {
private static final int TOKEN_EXPIRY_MINUTES = 60;
private static final int QOS_AT_MOST_ONCE = 0;
private static final int QOS_AT_LEAST_ONCE = 1;
private static final int DEFAULT_CONFIG_WAIT_SEC = 10;
static final int DEFAULT_CONFIG_WAIT_SEC = 10;
private static final String EVENT_MARK_PREFIX = "events/";
private static final Map<String, AtomicInteger> EVENT_SERIAL = new HashMap<>();
private static final String GCP_CLIENT_PREFIX = "projects/";
public static final String EMPTY_STRING = "";

private final Semaphore connectionLock = new Semaphore(1);

Expand Down Expand Up @@ -235,15 +236,17 @@ private String getSendTopic(String deviceId, String topicSuffix) {
}

private void closeMqttClient(String deviceId) {
MqttClient removed = cleanClients(deviceId);
if (removed != null) {
try {
if (removed.isConnected()) {
removed.disconnect();
synchronized (mqttClients) {
MqttClient removed = cleanClients(deviceId);
if (removed != null) {
try {
if (removed.isConnected()) {
removed.disconnect();
}
removed.close();
} catch (Exception e) {
error("Error closing MQTT client: " + e, null, "stop", e);
}
removed.close();
} catch (Exception e) {
error("Error closing MQTT client: " + e, null, "stop", e);
}
}
}
Expand Down Expand Up @@ -282,12 +285,11 @@ private MqttClient newBoundClient(String deviceId) {
try {
String gatewayId = getGatewayId(deviceId);
debug(format("Connecting device %s through gateway %s", deviceId, gatewayId));
MqttClient mqttClient = getConnectedClient(gatewayId);
final MqttClient mqttClient = getConnectedClient(gatewayId);
startupLatchWait(connectionLatch, "gateway startup exchange");
String topic = getMessageTopic(deviceId, MqttDevice.ATTACH_TOPIC);
String payload = "";
info("Publishing attach message " + topic);
mqttClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), QOS_AT_LEAST_ONCE,
mqttClient.publish(topic, EMPTY_STRING.getBytes(StandardCharsets.UTF_8), QOS_AT_LEAST_ONCE,
SHOULD_RETAIN);
subscribeToUpdates(mqttClient, deviceId);
return mqttClient;
Expand All @@ -296,8 +298,7 @@ private MqttClient newBoundClient(String deviceId) {
}
}

@Override
public void startupLatchWait(CountDownLatch gatewayLatch, String designator) {
private void startupLatchWait(CountDownLatch gatewayLatch, String designator) {
try {
int waitTimeSec = ofNullable(configuration.endpoint.config_sync_sec)
.orElse(DEFAULT_CONFIG_WAIT_SEC);
Expand Down Expand Up @@ -480,7 +481,8 @@ private String getDeviceId(String topic) {
return topic.split("/")[2];
}

public void connect(String targetId) {
public void connect(String targetId, boolean clean) {
ifTrueThen(clean, () -> closeMqttClient(targetId));
getConnectedClient(targetId);
}

Expand Down Expand Up @@ -542,13 +544,6 @@ private void checkAuthentication(String targetId) {
reauthTimes.remove(authId);
synchronized (mqttClients) {
MqttClient client = cleanClients(authId);
if (client == null) {
return;
}
Set<String> removeSet = mqttClients.entrySet().stream()
.filter(entry -> entry.getValue() == client).map(Entry::getKey)
.collect(Collectors.toSet());
removeSet.forEach(mqttClients::remove);
try {
client.disconnect();
client.close();
Expand Down Expand Up @@ -659,9 +654,7 @@ public void messageArrived(String topic, MqttMessage message) {
String messageType = getMessageType(topic);
String handlerKey = getHandlerKey(topic);
String deviceId = getDeviceId(topic);
if (getGatewayId(deviceId) == null) {
connectionLatch.countDown();
}
connectionLatch.countDown();
Consumer<Object> handler = handlers.get(handlerKey);
Class<Object> type = handlersType.get(handlerKey);
if (handler == null) {
Expand Down
Loading
Loading