diff --git a/.gencode_hash.txt b/.gencode_hash.txt index 1e020d6a4..c778b0b58 100644 --- a/.gencode_hash.txt +++ b/.gencode_hash.txt @@ -2,9 +2,9 @@ b35646a2f3d7c30fd34e3ea6ab6a1070b39c469fd1de49cab1093ff32faaf06d gencode/docs/c e1f62882576d0f4332360f2aedb79b37a43973fb31d862c829af924f8f310ec1 gencode/docs/command_mapping.html 692ea694fbd8c88ebc1d7fed26363bd3e6b4e52061431a4d8c76bf0eddfaf2ff gencode/docs/config.html 30c7f39e50e290b0a36a3414cf1c90277ebd249f126f85f0a307bc7b74164e08 gencode/docs/config_mapping.html -51633513052c10d16fdc1bada9860c75fc0d1d5a9b8d6f9457e2845cfe4e750e gencode/docs/configuration_endpoint.html +e6b9cc7798e71237416b092801356f51c5178f3aac35811a8cee8a0045ad96d8 gencode/docs/configuration_endpoint.html b7ca70e3176f997f254c32bd8d53dff1462ac8a71c0e166aaa85193d67bff60b gencode/docs/configuration_execution.html -7b7d42d19e15aa64f5e57dd3c8867e72279365c7f39405c512aa572f7e97f527 gencode/docs/configuration_pubber.html +d7fa593d3ec205465b220b72a6358a870bd52d6e61bd1426c9e5fb54cbbbdd56 gencode/docs/configuration_pubber.html 0f0355e343cb8197dfbae1481c0eca3a0566d73181bc968e33fd469a1c94d1e6 gencode/docs/envelope.html f1fde870638541bea6de16acd64365c89a7d56ee3270d6fd1bcdbb5bdb141887 gencode/docs/event.html a8800ab8384de3b846af0d72cf310acf01644842b557e42eec20b69a0bfa1868 gencode/docs/event_discovery.html @@ -13,7 +13,7 @@ a8800ab8384de3b846af0d72cf310acf01644842b557e42eec20b69a0bfa1868 gencode/docs/e 35238ac2be3b2860bf74d291342897ef53a537a86aeb37d0eab55e3c8f8baecb gencode/docs/event_system.html 43025f61381de050eae9680c3193a1d878a397d0b5515dd1063a1a993b0c41ea gencode/docs/event_validation.html 9ae793a641f2e053c5af734bf813cd2637a675330acb8d9e903ef03b4286e007 gencode/docs/metadata.html -035be07b11b35920280a88c370ab34f600775216af10981aa212dfc3c9d3ddc5 gencode/docs/persistent_device.html +5856ee0b42512f6a1031f1799af2d9a436ed57a5d2f9265b95082b78cb1473fd gencode/docs/persistent_device.html 5d039d607af9ec75ee552dfe36b16c702687ea16f5663f41fc49b4533b86e00d gencode/docs/properties.html 392f272c1c8cd047eb6906f2f11ffacc7083c32f639da2437d6a9ca861062ecc gencode/docs/readme.md 690d56e96c4e180e49e0abcba358ab1842ea06dd3f693918852a9de370c994d5 gencode/docs/reflect_config.html @@ -45,7 +45,7 @@ d8a80ab3180d33bfa17564c969018e1d4350a47dbc70c4ae8a5abbfb25cfedc9 gencode/java/u 4793b2370b1f3b8fd89941586b779dffdde83d321a2621907f67d4d297d19b17 gencode/java/udmi/schema/DiscoveryEvent.java 04112dd47b0f761131c276c67d3cd8b789d25e6716b5732be9fef14fc6831f1d gencode/java/udmi/schema/DiscoveryModel.java 9962b0eb7d5adf52af6160e9f3131f8eeb52ae9e518954dbb6aead1bcad0245e gencode/java/udmi/schema/DiscoveryState.java -c5ab32c7412c0cb3a8e48b4adb0a924c238dea655fddd581979c553c32f908d8 gencode/java/udmi/schema/EndpointConfiguration.java +caa0d61cb96d33ce4436af152f7189181b549d237ead953a805fdb078e5776dc gencode/java/udmi/schema/EndpointConfiguration.java 6fea7710d4eb64bfc568820f7063a89ce7808853ef7a56763fa90f323e09362f gencode/java/udmi/schema/Entry.java 017f8e237efa959b81d72f3dd2e78b915856198ceef02c041c657b30df93b7c2 gencode/java/udmi/schema/Envelope.java e9f5c77be81486b6b8c6d88f70f2d50583d8c3fafa2ac09ead80f44b8d5e751e gencode/java/udmi/schema/Event.java @@ -122,7 +122,7 @@ ac3facbd96f7cb2f7e387e7497d6a36af379a2687329571f250c5670f9933244 gencode/python 33ce7ee99051d56dd3c4d69edf92583d0bb7fd751417948b71292ac2285d3c19 gencode/python/udmi/schema/config_pointset_point.py ba37d2d54df565aba42ef055a0ec961175d180c2e8092a914e62029bc5388857 gencode/python/udmi/schema/config_system.py 97c2e5fadc6da0d84660f3296de885ab59f4b04154179b6717f77ec366f1544b gencode/python/udmi/schema/config_system_testing.py -82297b35fe77ab67c3ef87baf23fdf934e06b566be9eb9737125cef4276be8bf gencode/python/udmi/schema/configuration_endpoint.py +a1f527558554393623eb7f14a0ff6b3ce67e23578e2e716c206c6aa9a29ccee3 gencode/python/udmi/schema/configuration_endpoint.py e8a1574074554b6144b178d2adedc76a1c7be5ae911b253deff4460d8d82c6ee gencode/python/udmi/schema/configuration_execution.py 7f4f8c1a6ead34bf72b9134b97a39001752eb0f9647fafaac82ced4bb2f46b88 gencode/python/udmi/schema/configuration_pubber.py 998ce105f88686f27b85f3630a396ed04b106f830c133a684ea5c505ca95b1c3 gencode/python/udmi/schema/envelope.py diff --git a/bin/test_mosquitto b/bin/test_mosquitto index 3c5db6790..a0f199d3f 100755 --- a/bin/test_mosquitto +++ b/bin/test_mosquitto @@ -34,7 +34,7 @@ systemctl status mosquitto killall mosquitto_sub || true echo Subscribing to mqtt/test in the background... -mosquitto_sub -t \# -u ${USERNAME} -P ${PASSWORD} | tee out/mosquitto.sub & +mosquitto_sub -F "%t %p" -t \# -u ${USERNAME} -P ${PASSWORD} | tee out/mosquitto.sub & sleep 1 @@ -50,6 +50,7 @@ cat < /tmp/pubber_config.json "transport": "tcp", "client_id": "$registry_id/$device_id", "hostname": "127.0.0.1", + "topic_prefix": "/$serial_no/$device_id", "config_sync_sec": -1, "port": 1883, "auth_provider": { @@ -71,7 +72,14 @@ $ROOT_DIR/pubber/bin/build timeout 10s $ROOT_DIR/pubber/bin/run /tmp/pubber_config.json || true -received_no=$(fgrep operational out/mosquitto.sub | jq -r .system.serial_no) +received_no=$(fgrep operational out/mosquitto.sub | sed -E 's/^[^{]+//' | jq -r .system.serial_no) +received_topic=$(fgrep operational out/mosquitto.sub | awk '{ print $1 }') + +expected_topic=/$serial_no/$device_id/state +if [[ $received_topic != $expected_topic ]]; then + echo Unexpected received topic $received_topic != $expected_topic + false +fi if [[ $received_no != $serial_no ]]; then echo Mismatched/missing serial no: $received_no != $serial_no diff --git a/bin/test_redirect b/bin/test_redirect index 61fa2e348..af4beb28f 100755 --- a/bin/test_redirect +++ b/bin/test_redirect @@ -54,6 +54,7 @@ cat < out/endpoint.json { "protocol": "mqtt", "client_id": "projects/$project_id/locations/$cloud_region/registries/reconfigure/devices/$device_id", + "topic_prefix": "/devices/$device_id", "hostname": "mqtt.googleapis.com", "auth_provider": { "jwt": { diff --git a/gencode/docs/configuration_endpoint.html b/gencode/docs/configuration_endpoint.html index 50bb1cefb..97d1d0ca9 100644 --- a/gencode/docs/configuration_endpoint.html +++ b/gencode/docs/configuration_endpoint.html @@ -230,6 +230,40 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Prefix for message topics

+
Must match regular expression: ^[-_/a-zA-Z0-9]+$ + + + + + +
diff --git a/gencode/docs/configuration_pubber.html b/gencode/docs/configuration_pubber.html index 5aef4e5a0..b7ebb8bb3 100644 --- a/gencode/docs/configuration_pubber.html +++ b/gencode/docs/configuration_pubber.html @@ -304,6 +304,47 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Prefix for message topics

+
Must match regular expression: ^[-_/a-zA-Z0-9]+$ + + + + + +
diff --git a/gencode/docs/persistent_device.html b/gencode/docs/persistent_device.html index ae4d575b5..f114c9748 100644 --- a/gencode/docs/persistent_device.html +++ b/gencode/docs/persistent_device.html @@ -304,6 +304,47 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Prefix for message topics

+
Must match regular expression: ^[-_/a-zA-Z0-9]+$ + + + + + +
diff --git a/gencode/java/udmi/schema/EndpointConfiguration.java b/gencode/java/udmi/schema/EndpointConfiguration.java index 0ef8437db..39e3b0848 100644 --- a/gencode/java/udmi/schema/EndpointConfiguration.java +++ b/gencode/java/udmi/schema/EndpointConfiguration.java @@ -26,6 +26,7 @@ "port", "config_sync_sec", "client_id", + "topic_prefix", "auth_provider", "nonce" }) @@ -64,6 +65,13 @@ public class EndpointConfiguration { */ @JsonProperty("client_id") public String client_id; + /** + * Prefix for message topics + * + */ + @JsonProperty("topic_prefix") + @JsonPropertyDescription("Prefix for message topics") + public String topic_prefix; @JsonProperty("auth_provider") public Auth_provider auth_provider; @JsonProperty("nonce") @@ -75,6 +83,7 @@ public int hashCode() { result = ((result* 31)+((this.protocol == null)? 0 :this.protocol.hashCode())); result = ((result* 31)+((this.hostname == null)? 0 :this.hostname.hashCode())); result = ((result* 31)+((this.port == null)? 0 :this.port.hashCode())); + result = ((result* 31)+((this.topic_prefix == null)? 0 :this.topic_prefix.hashCode())); result = ((result* 31)+((this.transport == null)? 0 :this.transport.hashCode())); result = ((result* 31)+((this.config_sync_sec == null)? 0 :this.config_sync_sec.hashCode())); result = ((result* 31)+((this.nonce == null)? 0 :this.nonce.hashCode())); @@ -92,7 +101,7 @@ public boolean equals(Object other) { return false; } EndpointConfiguration rhs = ((EndpointConfiguration) other); - return (((((((((this.protocol == rhs.protocol)||((this.protocol!= null)&&this.protocol.equals(rhs.protocol)))&&((this.hostname == rhs.hostname)||((this.hostname!= null)&&this.hostname.equals(rhs.hostname))))&&((this.port == rhs.port)||((this.port!= null)&&this.port.equals(rhs.port))))&&((this.transport == rhs.transport)||((this.transport!= null)&&this.transport.equals(rhs.transport))))&&((this.config_sync_sec == rhs.config_sync_sec)||((this.config_sync_sec!= null)&&this.config_sync_sec.equals(rhs.config_sync_sec))))&&((this.nonce == rhs.nonce)||((this.nonce!= null)&&this.nonce.equals(rhs.nonce))))&&((this.client_id == rhs.client_id)||((this.client_id!= null)&&this.client_id.equals(rhs.client_id))))&&((this.auth_provider == rhs.auth_provider)||((this.auth_provider!= null)&&this.auth_provider.equals(rhs.auth_provider)))); + return ((((((((((this.protocol == rhs.protocol)||((this.protocol!= null)&&this.protocol.equals(rhs.protocol)))&&((this.hostname == rhs.hostname)||((this.hostname!= null)&&this.hostname.equals(rhs.hostname))))&&((this.port == rhs.port)||((this.port!= null)&&this.port.equals(rhs.port))))&&((this.topic_prefix == rhs.topic_prefix)||((this.topic_prefix!= null)&&this.topic_prefix.equals(rhs.topic_prefix))))&&((this.transport == rhs.transport)||((this.transport!= null)&&this.transport.equals(rhs.transport))))&&((this.config_sync_sec == rhs.config_sync_sec)||((this.config_sync_sec!= null)&&this.config_sync_sec.equals(rhs.config_sync_sec))))&&((this.nonce == rhs.nonce)||((this.nonce!= null)&&this.nonce.equals(rhs.nonce))))&&((this.client_id == rhs.client_id)||((this.client_id!= null)&&this.client_id.equals(rhs.client_id))))&&((this.auth_provider == rhs.auth_provider)||((this.auth_provider!= null)&&this.auth_provider.equals(rhs.auth_provider)))); } @Generated("jsonschema2pojo") diff --git a/gencode/python/udmi/schema/configuration_endpoint.py b/gencode/python/udmi/schema/configuration_endpoint.py index 99d1668ea..b5f1e96f5 100644 --- a/gencode/python/udmi/schema/configuration_endpoint.py +++ b/gencode/python/udmi/schema/configuration_endpoint.py @@ -52,6 +52,7 @@ def __init__(self): self.port = None self.config_sync_sec = None self.client_id = None + self.topic_prefix = None self.auth_provider = None self.nonce = None @@ -66,6 +67,7 @@ def from_dict(source): result.port = source.get('port') result.config_sync_sec = source.get('config_sync_sec') result.client_id = source.get('client_id') + result.topic_prefix = source.get('topic_prefix') result.auth_provider = ObjectA90DCC28.from_dict(source.get('auth_provider')) result.nonce = source.get('nonce') return result @@ -100,6 +102,8 @@ def to_dict(self): result['config_sync_sec'] = self.config_sync_sec # 5 if self.client_id: result['client_id'] = self.client_id # 5 + if self.topic_prefix: + result['topic_prefix'] = self.topic_prefix # 5 if self.auth_provider: result['auth_provider'] = self.auth_provider.to_dict() # 4 if self.nonce: diff --git a/pubber/build.gradle b/pubber/build.gradle index e9f0a8b46..143464553 100644 --- a/pubber/build.gradle +++ b/pubber/build.gradle @@ -12,6 +12,7 @@ buildscript { plugins { id 'com.github.johnrengelman.shadow' version '7.1.2' + id 'com.adarshr.test-logger' version '3.2.0' id 'java' id 'checkstyle' } @@ -28,6 +29,11 @@ sourceSets { srcDirs '../common/src/main/java' } } + test { + java { + srcDirs '../common/src/test/java' + } + } } checkstyle { diff --git a/pubber/pubber.iml b/pubber/pubber.iml index af7508084..a5ac2ece3 100644 --- a/pubber/pubber.iml +++ b/pubber/pubber.iml @@ -4,15 +4,19 @@ - + + + + + @@ -53,6 +57,7 @@ + @@ -100,16 +105,15 @@ + - - - - - + + + diff --git a/pubber/src/main/java/daq/pubber/ListPublisher.java b/pubber/src/main/java/daq/pubber/ListPublisher.java new file mode 100644 index 000000000..f8bd1ee59 --- /dev/null +++ b/pubber/src/main/java/daq/pubber/ListPublisher.java @@ -0,0 +1,79 @@ +package daq.pubber; + +import com.google.udmi.util.JsonUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import udmi.schema.PubberConfiguration; + +/** + * Publishes message to an in-memory list. + */ +public class ListPublisher implements Publisher { + + private final ExecutorService publisherExecutor = Executors.newFixedThreadPool(1); + private final PubberConfiguration configuration; + private List messages = new ArrayList<>(); + private String usePrefix; + + public ListPublisher(PubberConfiguration configuration, Consumer onError) { + this.configuration = configuration; + usePrefix = configuration.endpoint.topic_prefix; + } + + /** + * Get messages that have been mocked-published. + * + * @return list of published messages + */ + public synchronized List getMessages() { + List returnMessages = messages; + messages = new ArrayList<>(); + return returnMessages; + } + + @Override + public void setDeviceTopicPrefix(String deviceId, String topicPrefix) { + usePrefix = topicPrefix + "/" + deviceId; + } + + @Override + public void registerHandler(String deviceId, String topicSuffix, Consumer handler, + Class messageType) { + + } + + @Override + public void connect(String deviceId) { + + } + + @Override + public void publish(String deviceId, String topicSuffix, Object message, Runnable callback) { + String useTopic = usePrefix + "/" + topicSuffix; + messages.add(getMessageString(deviceId, useTopic, message)); + publisherExecutor.submit(callback); + } + + static String getMessageString(String deviceId, String topic, Object message) { + return String.format("%s/%s/%s", deviceId, topic, JsonUtil.stringify(message)); + } + + @Override + public boolean isActive() { + return false; + } + + @Override + public void startupLatchWait(CountDownLatch configLatch, String message) { + + } + + @Override + public void close() { + + } +} diff --git a/pubber/src/main/java/daq/pubber/MqttDevice.java b/pubber/src/main/java/daq/pubber/MqttDevice.java new file mode 100644 index 000000000..a8b53b9ce --- /dev/null +++ b/pubber/src/main/java/daq/pubber/MqttDevice.java @@ -0,0 +1,68 @@ +package daq.pubber; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import udmi.schema.PubberConfiguration; + +/** + * Encapsulation of a device connection. + */ +public class MqttDevice { + + public static final String TEST_PROJECT = "test-project"; + static final String ATTACH_TOPIC = "attach"; + static final String CONFIG_TOPIC = "config"; + static final String ERRORS_TOPIC = "errors"; + static final String EVENTS_TOPIC = "events"; + static final String STATE_TOPIC = "state"; + + private final String deviceId; + private final Publisher publisher; + + MqttDevice(PubberConfiguration configuration, Consumer onError) { + deviceId = configuration.deviceId; + publisher = getPublisher(configuration, onError); + if (configuration.endpoint.topic_prefix != null) { + publisher.setDeviceTopicPrefix(deviceId, configuration.endpoint.topic_prefix); + } + } + + MqttDevice(String deviceId, MqttDevice target) { + this.deviceId = deviceId; + publisher = target.publisher; + } + + Publisher getPublisher(PubberConfiguration configuration, + Consumer onError) { + return TEST_PROJECT.equals(configuration.projectId) ? new ListPublisher(configuration, onError) + : new MqttPublisher(configuration, onError); + } + + public void registerHandler(String topicSuffix, Consumer handler, Class messageType) { + publisher.registerHandler(deviceId, topicSuffix, handler, messageType); + } + + public void connect() { + publisher.connect(deviceId); + } + + public void publish(String topicSuffix, Object message, Runnable callback) { + publisher.publish(deviceId, topicSuffix, message, callback); + } + + public boolean isActive() { + return publisher.isActive(); + } + + public void startupLatchWait(CountDownLatch configLatch, String message) { + publisher.startupLatchWait(configLatch, message); + } + + public void close() { + publisher.close(); + } + + public ListPublisher getMockPublisher() { + return (ListPublisher) publisher; + } +} diff --git a/pubber/src/main/java/daq/pubber/MqttPublisher.java b/pubber/src/main/java/daq/pubber/MqttPublisher.java index 5d07f56fc..22f05d5af 100644 --- a/pubber/src/main/java/daq/pubber/MqttPublisher.java +++ b/pubber/src/main/java/daq/pubber/MqttPublisher.java @@ -52,8 +52,9 @@ /** * Handle publishing sensor data to a Cloud IoT MQTT endpoint. */ -public class MqttPublisher { +public class MqttPublisher implements Publisher { + private static final String TOPIC_PREFIX_FMT = "/devices/%s"; private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) @@ -62,11 +63,8 @@ public class MqttPublisher { .setSerializationInclusion(JsonInclude.Include.NON_NULL); // Indicate if this message should be a MQTT 'retained' message. private static final boolean SHOULD_RETAIN = false; - private static final String CONFIG_UPDATE_TOPIC_FMT = "/devices/%s/config"; - private static final String ERRORS_TOPIC_FMT = "/devices/%s/errors"; private static final String UNUSED_ACCOUNT_NAME = "unused"; private static final int INITIALIZE_TIME_MS = 20000; - private static final String MESSAGE_TOPIC_FORMAT = "/devices/%s/%s"; private static final String BROKER_URL_FORMAT = "%s://%s:%s"; private static final int PUBLISH_THREAD_COUNT = 10; private static final String HANDLER_KEY_FORMAT = "%s/%s"; @@ -81,6 +79,7 @@ public class MqttPublisher { private final Map mqttClients = new ConcurrentHashMap<>(); private final Map reauthTimes = new ConcurrentHashMap<>(); + private final Map topicPrefixMap = new HashMap<>(); private final ExecutorService publisherExecutor = Executors.newFixedThreadPool(PUBLISH_THREAD_COUNT); @@ -131,18 +130,21 @@ private String getClientId(String deviceId) { return SiteModel.getClientId(projectId, cloudRegion, registryId, deviceId); } - boolean isActive() { + @Override + public boolean isActive() { return !publisherExecutor.isShutdown(); } - void publish(String deviceId, String topic, Object data, Runnable callback) { + @Override + public void publish(String deviceId, String topicSuffix, Object data, Runnable callback) { Preconditions.checkNotNull(deviceId, "publish deviceId"); - debug("Publishing in background " + topic); - Object marked = topic.startsWith(EVENT_MARK_PREFIX) ? decorateMessage(topic, data) : data; + debug("Publishing in background " + topicSuffix); + Object marked = + topicSuffix.startsWith(EVENT_MARK_PREFIX) ? decorateMessage(topicSuffix, data) : data; try { - publisherExecutor.submit(() -> publishCore(deviceId, topic, marked, callback)); + publisherExecutor.submit(() -> publishCore(deviceId, topicSuffix, marked, callback)); } catch (Exception e) { - throw new RuntimeException("While publishing to topic " + topic, e); + throw new RuntimeException("While publishing to topic suffix " + topicSuffix, e); } } @@ -160,12 +162,23 @@ private Object decorateMessage(String topic, Object data) { } } - private void publishCore(String deviceId, String topic, Object data, + @Override + public void setDeviceTopicPrefix(String deviceId, String topicPrefix) { + topicPrefixMap.put(deviceId, topicPrefix); + } + + private String getMessageTopic(String deviceId, String topic) { + return + topicPrefixMap.computeIfAbsent(deviceId, key -> String.format(TOPIC_PREFIX_FMT, deviceId)) + + "/" + topic; + } + + private void publishCore(String deviceId, String topicSuffix, Object data, Runnable callback) { try { String payload = OBJECT_MAPPER.writeValueAsString(data); - debug("Sending message to " + topic); - sendMessage(deviceId, getMessageTopic(deviceId, topic), payload.getBytes()); + debug("Sending message to " + topicSuffix); + sendMessage(deviceId, getMessageTopic(deviceId, topicSuffix), payload.getBytes()); if (callback != null) { callback.run(); } @@ -198,7 +211,8 @@ private void closeMqttClient(String deviceId) { } } - void close() { + @Override + public void close() { try { warn("Closing publisher connection"); publisherExecutor.shutdown(); @@ -227,7 +241,7 @@ private MqttClient newBoundClient(String deviceId) { gatewayLatch = new CountDownLatch(1); MqttClient mqttClient = getConnectedClient(gatewayId); startupLatchWait(gatewayLatch, "gateway startup exchange"); - String topic = String.format("/devices/%s/attach", deviceId); + String topic = getMessageTopic(deviceId, MqttDevice.ATTACH_TOPIC); String payload = ""; info("Publishing attach message " + topic); mqttClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8.name()), QOS_AT_LEAST_ONCE, @@ -239,7 +253,8 @@ private MqttClient newBoundClient(String deviceId) { } } - void startupLatchWait(CountDownLatch gatewayLatch, String designator) { + @Override + public void startupLatchWait(CountDownLatch gatewayLatch, String designator) { try { int waitTimeSec = Optional.ofNullable(configuration.endpoint.config_sync_sec) .orElse(DEFAULT_CONFIG_WAIT_SEC); @@ -257,7 +272,7 @@ private MqttClient newMqttClient(String deviceId) { Preconditions.checkNotNull(deviceId, "deviceId is null"); String clientId = getClientId(deviceId); String brokerUrl = getBrokerUrl(); - MqttClient mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); + MqttClient mqttClient = getMqttClient(clientId, brokerUrl); info("Creating new client to " + brokerUrl + " as " + clientId); return mqttClient; } catch (Exception e) { @@ -266,6 +281,10 @@ private MqttClient newMqttClient(String deviceId) { } } + MqttClient getMqttClient(String clientId, String brokerUrl) throws MqttException { + return new MqttClient(brokerUrl, clientId, new MemoryPersistence()); + } + private MqttClient connectMqttClient(String deviceId) { try { if (!connectionLock.tryAcquire(INITIALIZE_TIME_MS, TimeUnit.MILLISECONDS)) { @@ -359,16 +378,12 @@ private String getBrokerUrl() { configuration.endpoint.port); } - private String getMessageTopic(String deviceId, String topic) { - return String.format(MESSAGE_TOPIC_FORMAT, deviceId, topic); - } - private void subscribeToUpdates(MqttClient client, String deviceId) { boolean noConfigAck = (configuration.options.noConfigAck != null && configuration.options.noConfigAck); int configQos = noConfigAck ? QOS_AT_MOST_ONCE : QOS_AT_LEAST_ONCE; - subscribeTopic(client, String.format(CONFIG_UPDATE_TOPIC_FMT, deviceId), configQos); - subscribeTopic(client, String.format(ERRORS_TOPIC_FMT, deviceId), QOS_AT_MOST_ONCE); + subscribeTopic(client, getMessageTopic(deviceId, MqttDevice.CONFIG_TOPIC), configQos); + subscribeTopic(client, getMessageTopic(deviceId, MqttDevice.ERRORS_TOPIC), QOS_AT_MOST_ONCE); info("Updates subscribed"); } @@ -383,23 +398,24 @@ private void subscribeTopic(MqttClient client, String updateTopic, int mqttQos) /** * Register a message handler. * - * @param deviceId Device id to register with - * @param mqttTopic Mqtt topic + * @param Param of the message type + * @param deviceId Sending device id + * @param mqttSuffix Mqtt topic suffix * @param handler Message received handler * @param messageType Type of the message for this handler - * @param Param of the message type */ @SuppressWarnings("unchecked") - public void registerHandler(String deviceId, String mqttTopic, + public void registerHandler(String deviceId, String mqttSuffix, Consumer handler, Class messageType) { - String key = getHandlerKey(getMessageTopic(deviceId, mqttTopic)); + String mqttTopic = getMessageTopic(deviceId, mqttSuffix); + String handlerKey = getHandlerKey(mqttTopic); if (handler == null) { - handlers.remove(key); - handlersType.remove(key); - } else if (handlers.put(key, (Consumer) handler) == null) { - handlersType.put(key, (Class) messageType); + handlers.remove(handlerKey); + handlersType.remove(handlerKey); + } else if (handlers.put(handlerKey, (Consumer) handler) == null) { + handlersType.put(handlerKey, (Class) messageType); } else { - throw new IllegalStateException("Overwriting existing handler for " + key); + throw new IllegalStateException("Overwriting existing handler " + handlerKey); } } diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java index 04a2210f9..f1766ec03 100644 --- a/pubber/src/main/java/daq/pubber/Pubber.java +++ b/pubber/src/main/java/daq/pubber/Pubber.java @@ -8,6 +8,8 @@ import static com.google.udmi.util.GeneralUtils.optionsString; import static com.google.udmi.util.GeneralUtils.toJsonFile; import static com.google.udmi.util.GeneralUtils.toJsonString; +import static daq.pubber.MqttDevice.CONFIG_TOPIC; +import static daq.pubber.MqttDevice.ERRORS_TOPIC; import static java.lang.Boolean.TRUE; import static java.util.Optional.ofNullable; import static java.util.stream.Collectors.toMap; @@ -99,11 +101,10 @@ public class Pubber { public static final String PUBBER_OUT = "pubber/out"; public static final String PERSISTENT_STORE_FILE = "persistent_data.json"; public static final String PERSISTENT_TMP_FORMAT = "/tmp/pubber_%s_" + PERSISTENT_STORE_FILE; + public static final String PUBBER_LOG_CATEGORY = "device.log"; private static final String UDMI_VERSION = "1.4.0"; private static final Logger LOG = LoggerFactory.getLogger(Pubber.class); private static final String HOSTNAME = System.getenv("HOSTNAME"); - private static final String CONFIG_TOPIC = "config"; - private static final String ERROR_TOPIC = "errors"; private static final int MIN_REPORT_MS = 200; private static final int DEFAULT_REPORT_SEC = 10; private static final int WAIT_TIME_SEC = 10; @@ -112,13 +113,14 @@ public class Pubber { private static final Set BOOLEAN_UNITS = ImmutableSet.of("No-units"); private static final double DEFAULT_BASELINE_VALUE = 50; private static final String MESSAGE_CATEGORY_FORMAT = "system.%s.%s"; - private static final Map, String> MESSAGE_TOPIC_MAP = ImmutableMap.of( - State.class, "state", - SystemEvent.class, "events/system", - PointsetEvent.class, "events/pointset", - ExtraPointsetEvent.class, "events/pointset", - DiscoveryEvent.class, "events/discovery" + private static final Map, String> MESSAGE_TOPIC_SUFFIX_MAP = ImmutableMap.of( + State.class, MqttDevice.STATE_TOPIC, + SystemEvent.class, getEventsSuffix("system"), + PointsetEvent.class, getEventsSuffix("pointset"), + ExtraPointsetEvent.class, getEventsSuffix("pointset"), + DiscoveryEvent.class, getEventsSuffix("discovery") ); + private static final int MESSAGE_REPORT_INTERVAL = 10; private static final Map> LOG_MAP = ImmutableMap.>builder() @@ -142,7 +144,6 @@ public class Pubber { private static final AtomicInteger retriesRemaining = new AtomicInteger(CONNECT_RETRIES); private static final long RESTART_DELAY_MS = 1000; private static final long BYTES_PER_MEGABYTE = 1024 * 1024; - public static final String PUBBER_LOG_CATEGORY = "device.log"; private final File outDir; private final ScheduledExecutorService executor = new CatchingScheduledThreadPoolExecutor(1); private final PubberConfiguration configuration; @@ -156,7 +157,7 @@ public class Pubber { private final String deviceId; private Config deviceConfig = new Config(); private int deviceUpdateCount = -1; - private MqttPublisher mqttPublisher; + private MqttDevice deviceTarget; private ScheduledFuture periodicSender; private long lastStateTimeMs; private PubSubClient pubSubClient; @@ -168,6 +169,7 @@ public class Pubber { private SiteModel siteModel; private PrintStream logPrintWriter; private DevicePersistent persistentData; + private MqttDevice gatewayTarget; /** * Start an instance from a configuration file. @@ -209,6 +211,10 @@ public Pubber(String projectId, String sitePath, String deviceId, String serialN } } + private static String getEventsSuffix(String suffixSuffix) { + return MqttDevice.EVENTS_TOPIC + "/" + suffixSuffix; + } + private static Date getRoundedStartTime() { long timestamp = getCurrentTimestamp().getTime(); // Remove ms so that rounded conversions preserve equality. @@ -311,6 +317,10 @@ private static PubberConfiguration sanitizeConfiguration(PubberConfiguration con return configuration; } + private static Date getCurrentTimestamp() { + return new Date(); + } + private AbstractPoint makePoint(String name, PointPointsetModel point) { boolean writable = point.writable != null && point.writable; if (BOOLEAN_UNITS.contains(point.units)) { @@ -409,6 +419,13 @@ private File getPersistentStore() { new File(siteModel.getDeviceWorkingDir(deviceId), PERSISTENT_STORE_FILE); } + private void publishDirtyState() { + if (stateDirty.get()) { + System.err.println("Publishing dirty state block"); + markStateDirty(0); + } + } + private void markStateDirty(long delayMs) { stateDirty.set(true); if (delayMs >= 0) { @@ -654,11 +671,11 @@ private void startConnection(Function connectionDone) { private boolean attemptConnection() { try { - if (mqttPublisher == null) { + if (deviceTarget == null) { throw new RuntimeException("Mqtt publisher not initialized"); } connect(); - mqttPublisher.startupLatchWait(configLatch, "initial config sync"); + deviceTarget.startupLatchWait(configLatch, "initial config sync"); return true; } catch (Exception e) { error("While waiting for connection start", e); @@ -711,9 +728,9 @@ private void closeLogWriter() { } private void disconnectMqtt() { - if (mqttPublisher != null) { - captureExceptions("closing mqtt publisher", mqttPublisher::close); - mqttPublisher = null; + if (deviceTarget != null) { + captureExceptions("closing mqtt publisher", deviceTarget::close); + deviceTarget = null; } } @@ -722,17 +739,16 @@ private void initializeMqtt() { if (siteModel != null && configuration.keyFile != null) { configuration.keyFile = siteModel.getDeviceKeyFile(configuration.deviceId); } - Preconditions.checkState(mqttPublisher == null, "mqttPublisher already defined"); + Preconditions.checkState(deviceTarget == null, "mqttPublisher already defined"); ensureKeyBytes(); - mqttPublisher = new MqttPublisher(configuration, this::publisherException); + deviceTarget = new MqttDevice(configuration, this::publisherException); if (configuration.gatewayId != null) { - mqttPublisher.registerHandler(configuration.gatewayId, CONFIG_TOPIC, - this::gatewayHandler, Config.class); - mqttPublisher.registerHandler(configuration.gatewayId, ERROR_TOPIC, - this::errorHandler, GatewayError.class); + gatewayTarget = new MqttDevice(configuration.gatewayId, deviceTarget); + gatewayTarget.registerHandler(CONFIG_TOPIC, this::gatewayHandler, Config.class); + gatewayTarget.registerHandler(ERRORS_TOPIC, this::errorHandler, GatewayError.class); } - mqttPublisher.registerHandler(configuration.deviceId, CONFIG_TOPIC, - this::configHandler, Config.class); + deviceTarget.registerHandler(CONFIG_TOPIC, this::configHandler, Config.class); + publishDirtyState(); } private void ensureKeyBytes() { @@ -747,7 +763,7 @@ private void ensureKeyBytes() { private void connect() { try { - mqttPublisher.connect(configuration.deviceId); + deviceTarget.connect(); info("Connection complete."); workingEndpoint = toJsonString(configuration.endpoint); } catch (Exception e) { @@ -812,10 +828,6 @@ private Entry entryFromException(String category, Throwable e) { return entry; } - private static Date getCurrentTimestamp() { - return new Date(); - } - private String exceptionDetail(Throwable e) { StringBuilder buffer = new StringBuilder(); while (e != null) { @@ -1363,6 +1375,11 @@ private void publishStateMessage() { } catch (Exception e) { throw new RuntimeException("While converting new device state", e); } + + if (deviceTarget == null) { + markStateDirty(-1); + return; + } stateDirty.set(false); lastStateTimeMs = System.currentTimeMillis(); CountDownLatch latch = new CountDownLatch(1); @@ -1384,15 +1401,20 @@ private void publishDeviceMessage(Object message) { } private void publishDeviceMessage(Object message, Runnable callback) { - String topic = MESSAGE_TOPIC_MAP.get(message.getClass()); - if (topic == null) { + String topicSuffix = MESSAGE_TOPIC_SUFFIX_MAP.get(message.getClass()); + if (topicSuffix == null) { error("Unknown message class " + message.getClass()); return; } + if (deviceTarget == null) { + error("publisher not active"); + return; + } + augmentDeviceMessage(message); - mqttPublisher.publish(configuration.deviceId, topic, message, callback); - String messageBase = topic.replace("/", "_"); + deviceTarget.publish(topicSuffix, message, callback); + String messageBase = topicSuffix.replace("/", "_"); String fileName = traceTimestamp(messageBase) + ".json"; File messageOut = new File(outDir, fileName); try { @@ -1423,7 +1445,7 @@ private void augmentDeviceMessage(Object message) { } private boolean publisherActive() { - return mqttPublisher != null && mqttPublisher.isActive(); + return deviceTarget != null && deviceTarget.isActive(); } private void cloudLog(String message, Level level) { diff --git a/pubber/src/main/java/daq/pubber/Publisher.java b/pubber/src/main/java/daq/pubber/Publisher.java new file mode 100644 index 000000000..2289d11e2 --- /dev/null +++ b/pubber/src/main/java/daq/pubber/Publisher.java @@ -0,0 +1,42 @@ +package daq.pubber; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; + +interface Publisher { + + /** + * Sets the device and prefix for published messages. + * + * @param deviceId the device id to use + * @param topicPrefix the topic prefix to use + */ + void setDeviceTopicPrefix(String deviceId, String topicPrefix); + + /** + * Register a handler for receiving a message. + * + * @param deviceId device to handle + * @param topicSuffix topic to handle + * @param handler handler to consume messages + * @param messageType the type of messages + * @param type of message + */ + void registerHandler(String deviceId, String topicSuffix, Consumer handler, + Class messageType); + + /** + * Connect the given device id. + * + * @param deviceId device to connect + */ + void connect(String deviceId); + + void publish(String deviceId, String topicSuffix, Object message, Runnable callback); + + boolean isActive(); + + void startupLatchWait(CountDownLatch configLatch, String message); + + void close(); +} diff --git a/pubber/src/test/java/daq/pubber/ListPublisherTest.java b/pubber/src/test/java/daq/pubber/ListPublisherTest.java new file mode 100644 index 000000000..c398fd797 --- /dev/null +++ b/pubber/src/test/java/daq/pubber/ListPublisherTest.java @@ -0,0 +1,27 @@ +package daq.pubber; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.junit.Test; + +/** + * Tests for simple string list publisher. + */ +public class ListPublisherTest extends TestBase { + + @Test + public void testPublish() throws InterruptedException { + ListPublisher listPublisher = new ListPublisher(getTestConfiguration(), null); + listPublisher.setDeviceTopicPrefix(TEST_DEVICE, TEST_PREFIX); + final CountDownLatch sent = new CountDownLatch(1); + listPublisher.publish(TEST_DEVICE, TEST_TOPIC, TEST_MESSAGE, sent::countDown); + sent.await(); + List messages = listPublisher.getMessages(); + assertEquals("published message count", 1, messages.size()); + + String publishedMessage = messages.get(0); + assertEquals("published message", EXPECTED_MESSAGE_STRING, publishedMessage); + } +} \ No newline at end of file diff --git a/pubber/src/test/java/daq/pubber/MqttDeviceTest.java b/pubber/src/test/java/daq/pubber/MqttDeviceTest.java new file mode 100644 index 000000000..ce22f1636 --- /dev/null +++ b/pubber/src/test/java/daq/pubber/MqttDeviceTest.java @@ -0,0 +1,30 @@ +package daq.pubber; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.junit.Test; + +/** + * Unit tests for a mqtt device abstraction. + */ +public class MqttDeviceTest extends TestBase { + + @Test + public void publishTopicPrefix() throws InterruptedException { + final CountDownLatch sent = new CountDownLatch(1); + MqttDevice mqttDevice = new MqttDevice(getTestConfiguration(), exception -> sent.countDown()); + + mqttDevice.publish(TEST_TOPIC, TEST_MESSAGE, sent::countDown); + sent.await(); + + ListPublisher mockPublisher = mqttDevice.getMockPublisher(); + List messages = mockPublisher.getMessages(); + assertEquals("published message count", 1, messages.size()); + + String publishedMessage = messages.get(0); + assertEquals("published message", EXPECTED_MESSAGE_STRING, publishedMessage); + } + +} \ No newline at end of file diff --git a/pubber/src/test/java/daq/pubber/MqttPublisherTest.java b/pubber/src/test/java/daq/pubber/MqttPublisherTest.java new file mode 100644 index 000000000..47558f490 --- /dev/null +++ b/pubber/src/test/java/daq/pubber/MqttPublisherTest.java @@ -0,0 +1,70 @@ +package daq.pubber; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.junit.Test; +import org.mockito.Mockito; +import udmi.schema.Auth_provider; +import udmi.schema.Basic; +import udmi.schema.PubberConfiguration; + +/** + * Test cases for MqttPublisher. + */ +public class MqttPublisherTest extends TestBase { + + private static final String EXPECTED_TOPIC = TEST_PREFIX + "/" + TEST_TOPIC; + private static final String EXPECTED_MESSAGE = "{}"; + private String publishedTopic; + private String publishedData; + + @Test + public void testPublish() throws InterruptedException { + PubberConfiguration configuration = getEndpointConfiguration(); + MqttPublisher mqttPublisher = new MockPublisher(configuration, null); + mqttPublisher.setDeviceTopicPrefix(TEST_DEVICE, TEST_PREFIX); + final CountDownLatch sent = new CountDownLatch(1); + mqttPublisher.publish(TEST_DEVICE, TEST_TOPIC, TEST_MESSAGE, sent::countDown); + sent.await(); + assertEquals("published topic", EXPECTED_TOPIC, publishedTopic); + assertEquals("published message", EXPECTED_MESSAGE, publishedData); + } + + private PubberConfiguration getEndpointConfiguration() { + PubberConfiguration configuration = getTestConfiguration(); + configuration.endpoint.auth_provider = new Auth_provider(); + configuration.endpoint.auth_provider.basic = new Basic(); + configuration.endpoint.auth_provider.basic.username = "username"; + configuration.endpoint.auth_provider.basic.password = "username"; + configuration.endpoint.hostname = "endpoint hostname"; + configuration.endpoint.port = 9217312; + configuration.endpoint.client_id = "endpoint client_id"; + configuration.keyBytes = new byte[10]; + configuration.algorithm = "algorithm"; + return configuration; + } + + class MockPublisher extends MqttPublisher { + + public MockPublisher(PubberConfiguration configuration, + Consumer onError) { + super(configuration, onError); + } + + MqttClient getMqttClient(String clientId, String brokerUrl) throws MqttException { + MqttClient mocked = Mockito.mock(MqttClient.class); + Mockito.when(mocked.isConnected()).thenReturn(true); + Mockito.doAnswer(invocation -> { + publishedTopic = invocation.getArgument(0, String.class); + publishedData = new String((byte[]) invocation.getArgument(1, Object.class)); + return null; + }).when(mocked) + .publish(Mockito.anyString(), Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean()); + return mocked; + } + } +} \ No newline at end of file diff --git a/pubber/src/test/java/daq/pubber/TestBase.java b/pubber/src/test/java/daq/pubber/TestBase.java new file mode 100644 index 000000000..51263ad0b --- /dev/null +++ b/pubber/src/test/java/daq/pubber/TestBase.java @@ -0,0 +1,36 @@ +package daq.pubber; + +import static daq.pubber.ListPublisher.getMessageString; + +import org.checkerframework.checker.units.qual.A; +import udmi.schema.Auth_provider; +import udmi.schema.DiscoveryCommand; +import udmi.schema.EndpointConfiguration; +import udmi.schema.PubberConfiguration; +import udmi.schema.PubberOptions; + +/** + * Test base for common constants, methods, etc... + */ +public class TestBase { + + protected static final String TEST_TOPIC = "test_topic"; + protected static final Object TEST_MESSAGE = new DiscoveryCommand(); + protected static final String TEST_PREFIX = "test_prefix"; + protected static final String TEST_DEVICE = "test_device"; + protected static final String EXPECTED_TOPIC = String.format( + "%s/%s/%s", TEST_PREFIX, TEST_DEVICE, TEST_TOPIC); + + protected static String EXPECTED_MESSAGE_STRING = getMessageString( + TEST_DEVICE, EXPECTED_TOPIC, TEST_MESSAGE); + + protected PubberConfiguration getTestConfiguration() { + PubberConfiguration configuration = new PubberConfiguration(); + configuration.projectId = MqttDevice.TEST_PROJECT; + configuration.deviceId = TEST_DEVICE; + configuration.endpoint = new EndpointConfiguration(); + configuration.endpoint.topic_prefix = TEST_PREFIX; + configuration.options = new PubberOptions(); + return configuration; + } +} diff --git a/schema/configuration_endpoint.json b/schema/configuration_endpoint.json index e3cc7bc56..e30a6cc98 100644 --- a/schema/configuration_endpoint.json +++ b/schema/configuration_endpoint.json @@ -32,6 +32,11 @@ "client_id": { "type": "string" }, + "topic_prefix": { + "description": "Prefix for message topics", + "type": "string", + "pattern": "^[-_/a-zA-Z0-9]+$" + }, "auth_provider": { "type": "object", "additionalProperties": false, diff --git a/tests/configuration_endpoint.tests/mosquitto.json b/tests/configuration_endpoint.tests/mosquitto.json index 2cbdc81d6..429b92838 100644 --- a/tests/configuration_endpoint.tests/mosquitto.json +++ b/tests/configuration_endpoint.tests/mosquitto.json @@ -5,6 +5,7 @@ "port": 8883, "config_sync_sec": -1, "client_id": "any_client_id", + "topic_prefix": "/devices/AHU-2", "auth_provider": { "basic": { "username": "username", diff --git a/tests/configuration_endpoint.tests/multiple.json b/tests/configuration_endpoint.tests/multiple.json index 101b23437..e4ac3d21b 100644 --- a/tests/configuration_endpoint.tests/multiple.json +++ b/tests/configuration_endpoint.tests/multiple.json @@ -2,6 +2,7 @@ "protocol": "mqtt", "hostname": "mqtt.googleapis.com", "port": 8883, + "topic_prefix": "/devices/#", "client_id": "prasjdhsadas", "auth_provider": { "jwt": { diff --git a/tests/configuration_endpoint.tests/multiple.out b/tests/configuration_endpoint.tests/multiple.out index e69de29bb..efdc9e303 100644 --- a/tests/configuration_endpoint.tests/multiple.out +++ b/tests/configuration_endpoint.tests/multiple.out @@ -0,0 +1,2 @@ +1 schema violations found + ECMA 262 regex "^[-_/a-zA-Z0-9]+$" does not match input string "/devices/#" diff --git a/validator/sequences/broken_config/sequence.md b/validator/sequences/broken_config/sequence.md index baf705245..f4143f52d 100644 --- a/validator/sequences/broken_config/sequence.md +++ b/validator/sequences/broken_config/sequence.md @@ -14,8 +14,6 @@ Check that the device correctly handles a broken (non-json) config message. 1. Wait for log category `system.config.parse` level `ERROR` 1. Check has not logged category `system.config.apply` level `NOTICE` (**incomplete!**) 1. Force reset config -1. Update config before log category `system.config.receive` level `DEBUG`: - * Add `system.last_start` = _device reported_ 1. Wait for log category `system.config.receive` level `DEBUG` 1. Wait for no interesting status 1. Wait for last_config updated