Skip to content

Commit

Permalink
Add config data processing mode support
Browse files Browse the repository at this point in the history
Added support for configuring the data processing mode
specified to Cumulocity for data payloads (standard or
aggregated JSON). This required the differentiation
between types of payloads being sent, thus the reason
for the previously added data processing mode enum-like
class.

Because the sendMessageWithChildDeviceRouting method
was being adapted to require a message type input, the
JSON-specific method was also combined to reduce the
amount of unnecessary/duplicate code present.
  • Loading branch information
alexjhawk committed Jan 12, 2024
1 parent c1aa17d commit 8421ab5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,8 @@ public void runOnMqttLoop(int currentMqttStatus) {
retryPayload.incrementRetryCount();
String childDevice = (String) retryPayload.getChildDevice();
String payloadString = (String) retryPayload.getMessagePayload();
if (retryPayload.getMessageType() == CConnectorMessageType.JSON_DATA) {
sendJsonMeasurementMessageWithChildDeviceRouting(payloadString, childDevice);
} else {
sendMessageWithChildDeviceRouting(payloadString, childDevice);
}
sendMessageWithChildDeviceRouting(
payloadString, childDevice, retryPayload.getMessageType());
pendingRetryMessages.pop();
Logger.LOG_DEBUG(
"Successfully sent payload to Cumulocity after "
Expand Down Expand Up @@ -428,32 +425,6 @@ public void addMessageToRetryPending(
pendingRetryMessages.push(cConnectorRetryMessage);
}

/**
* Sends the specified JSON measurement message to Cumulocity with the proper topic for routing to
* a child device, if not null. If the child device has not been registered using the {@link
* CConnectorApiMessageBuilder#childDeviceCreation_101(String, String)} static template, it will
* be registered. Registered child devices are added to a list to ensure that they are not
* registered more than once per session.
*
* @param jsonPayload the JSON message payload to send
* @param childDevice the child device to route the message to (if not null)
* @throws EWException if an Ewon exception occurs, check the Ewon event log for more details
* @throws UnsupportedEncodingException if the character encoding is not supported
*/
public void sendJsonMeasurementMessageWithChildDeviceRouting(
String jsonPayload, String childDevice) throws EWException, UnsupportedEncodingException {
// Register child device if not already registered
verifyChildDeviceRegistration(childDevice);

// Send message to Cumulocity
mqttPublish(CUMULOCITY_MQTT_TOPIC_MEASUREMENT_JSON, jsonPayload, MQTT_QOS_LEVEL, MQTT_RETAIN);
Logger.LOG_DEBUG(
"Sent message to Cumulocity on topic ["
+ CUMULOCITY_MQTT_TOPIC_MEASUREMENT_JSON
+ "]: "
+ jsonPayload);
}

/**
* Verifies that the specified child device has been registered with Cumulocity. If the child
* device has not been registered, it will be registered using the {@link
Expand Down Expand Up @@ -489,19 +460,40 @@ public void verifyChildDeviceRegistration(String childDevice)
*
* @param messagePayload the message payload to send
* @param childDevice the child device to route the message to (if not null)
* @param messageType the value indicating the type of the message
* @throws EWException if an Ewon exception occurs, check the Ewon event log for more details
* @throws UnsupportedEncodingException if the character encoding is not supported
*/
public void sendMessageWithChildDeviceRouting(String messagePayload, String childDevice)
public void sendMessageWithChildDeviceRouting(
String messagePayload, String childDevice, CConnectorMessageType messageType)
throws EWException, UnsupportedEncodingException {
// Register child device if not already registered
verifyChildDeviceRegistration(childDevice);

// Build topic name to publish method (append child device, if not null)
// Get child device Cumulocity ID
String childDeviceCumulocityId = getMqttId() + "_" + childDevice;
String messageTopic = CUMULOCITY_MQTT_TOPIC_SUS;
if (childDevice != null) {
messageTopic += "/" + childDeviceCumulocityId;

// Get data processing mode
CConnectorDataProcessingMode dataProcessingMode =
CConnectorMain.getConnectorConfig().getCumulocityDataProcessingMode();

// Determine message type and build topic name
String messageTopic;
if (messageType == CConnectorMessageType.DATA) {
final String messageTopicBase =
dataProcessingMode.getValue() + "/" + CUMULOCITY_MQTT_UPSTREAM;
messageTopic =
childDevice == null ? messageTopicBase : messageTopicBase + "/" + childDeviceCumulocityId;
} else if (messageType == CConnectorMessageType.JSON_DATA) {
messageTopic =
dataProcessingMode == CConnectorDataProcessingMode.PERSISTENT
? CUMULOCITY_MQTT_TOPIC_MEASUREMENT_JSON
: dataProcessingMode.getValue() + "/" + CUMULOCITY_MQTT_TOPIC_MEASUREMENT_JSON;
} else {
messageTopic =
childDevice == null
? CUMULOCITY_MQTT_TOPIC_SUS
: CUMULOCITY_MQTT_TOPIC_SUS + "/" + childDeviceCumulocityId;
}

// Send message to Cumulocity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public void onTagAlarm(
CConnectorMqttMgr mqttMgr = CConnectorMain.getMqttMgr();
if (mqttMgr != null) {
try {
mqttMgr.sendMessageWithChildDeviceRouting(alarmMessage, childDevice);
mqttMgr.sendMessageWithChildDeviceRouting(
alarmMessage, childDevice, CConnectorMessageType.OTHER);
} catch (Exception e) {
Logger.LOG_SERIOUS(
"An alarm was unable to be sent to Cumulocity because of an exception: ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ public static void processDataPointsAndSend(
(CConnectorJsonDataPayload) payloadListIterator.next();
String payloadString = payload.getJsonString();
try {
mqttMgr.sendJsonMeasurementMessageWithChildDeviceRouting(payloadString, childDevice);
mqttMgr.sendMessageWithChildDeviceRouting(
payloadString, childDevice, CConnectorMessageType.JSON_DATA);
} catch (Exception e) {
Logger.LOG_CRITICAL("Unable to send payload to MQTT broker.");
Logger.LOG_EXCEPTION(e);
Expand Down Expand Up @@ -399,7 +400,8 @@ public static void processDataPointsAndSend(
String childDevice = (String) childDeviceMessageMapKeysArray[x];
String payloadString = (String) childDeviceMessageMap.get(childDevice);
try {
mqttMgr.sendMessageWithChildDeviceRouting(payloadString, childDevice);
mqttMgr.sendMessageWithChildDeviceRouting(
payloadString, childDevice, CConnectorMessageType.DATA);
} catch (Exception e) {
Logger.LOG_CRITICAL("Unable to send data point to MQTT broker.");
Logger.LOG_EXCEPTION(e);
Expand Down

0 comments on commit 8421ab5

Please sign in to comment.