Skip to content

Commit

Permalink
Merge pull request #2 from hivemq/Refinement
Browse files Browse the repository at this point in the history
added separate logging for json output
  • Loading branch information
Anja Helmbrecht-Schaar committed Jul 25, 2022
2 parents bf7ee27 + 4f66565 commit 807b86d
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 115 deletions.
44 changes: 31 additions & 13 deletions README.adoc
Expand Up @@ -9,43 +9,61 @@
= HiveMQ Sparkplug Aware Extension

image:https://img.shields.io/badge/Extension_Type-Public-orange?style=for-the-badge[Extension Type]
image:https://img.shields.io/github/v/release/hivemq/hivemq-sparkplug-aware-extension?style=for-the-badge[GitHub release (latest by date),link=https://github.com/hivemq/hivemq-sparkplug-aware-extension/releases/latest]
image:https://img.shields.io/github/license/hivemq/hivemq-sparkplug-aware-extension?style=for-the-badge&color=brightgreen[GitHub,link=LICENSE]
image:https://img.shields.io/github/workflow/status/hivemq/hivemq-sparkplug-aware-extension/CI%20Check/master?style=for-the-badge[GitHub Workflow Status (branch),link=https://github.com/hivemq/hivemq-sparkplug-aware-extension/actions/workflows/check.yml?query=branch%3Amaster]
image:https://img.shields.io/github/v/release/hivemq/hivemq-sparkplug-compatible-extension?style=for-the-badge[GitHub release (latest by date),link=https://github.com/hivemq/hivemq-sparkplug-compatible-extension/releases/latest]
image:https://img.shields.io/github/license/hivemq/hivemq-sparkplug-compatible-extension?style=for-the-badge&color=brightgreen[GitHub,link=LICENSE]
image:https://img.shields.io/github/workflow/status/hivemq/hivemq-sparkplug-compatible-extension/CI%20Check/master?style=for-the-badge[GitHub Workflow Status (branch),link=https://github.com/hivemq/hivemq-sparkplug-compatible-extension/actions/workflows/check.yml?query=branch%3Amaster]

== Purpose

This HiveMQ Sparkplug aware extension has the purpose to fulfil the new link:https://github.com/eclipse/sparkplug[Sparkplug Release] of the Sparkplug specification extend HiveMQ with the functionality of a Sparkplug Aware Broker.
This HiveMQ Sparkplug aware extension has the purpose to fulfil the new link:https://github.com/eclipse/sparkplug[Sparkplug Release] of the Sparkplug specification extend HiveMQ with the functionality of a Sparkplug Aware Broker wich is an optional Requirement.

This includes on top of providing the native HiveMQ MQTT features like Retained Message handling and all QoS levels the following:
This includes on top of providing the necessary native HiveMQ MQTT features like Retained Message handling and all QoS levels the following

_REQUIREMENTS:_

* A Sparkplug Aware MQTT Server MUST store NBIRTH and DBIRTH messages as they pass through the MQTT Server
* A Sparkplug Aware MQTT Server MUST make NBIRTH messages available on a topic of the form: `$sparkplug/certificates/namespace/group_id/NBIRTH/edge_node_id`
* A Sparkplug Aware MQTT Server MUST make NBIRTH messages available on the topic: $sparkplug/certificates/namespace/group_id/NBIRTH/edge_node_id with the MQTT retain flag set to true
* A Sparkplug Aware MQTT Server MUST make DBIRTH messages available on a topic of the form: `$sparkplug/certificates/namespace/group_id/DBIRTH/edge_node_id/device_id`
* A Sparkplug Aware MQTT Server MUST make DBIRTH messages available on this topic with the MQTT retain flag set to true
* A Sparkplug Aware MQTT Server MAY replace the timestamp of NDEATH messages. If it does, it MUST set the timestamp to the UTC time at which it attempts to deliver the NDEATH to subscribed client.
* A Sparkplug Aware MQTT Server MAY replace the timestamp of NDEATH messages.
If it does, it MUST set the timestamp to the UTC time at which it attempts to deliver the NDEATH to subscribed client.

== Configuration

The topic structure, and the sparkplug version can be configured.
The Extension expects the configuration file `sparkplug.properties` in the extension home folder.
This configuration file contains a set of properties to align the behavior of the extension.

The topic structure root for the systopic, and the sparkplug version can be configured.
Compression is switched of by default.
An optional log output of the payload in formatted JSON can be configured.
The JSON is available from a separate logger.
The Message Expiry of the retained messages send to the systopic can be configured to a smaller lifetime.
If not set, the standard expiration of 4.294.967.296 seconds for a message is used.

*Default properties are:*

```
sparkplug.version:spBv1.0
sparkplug.systopic:$sparkplug/certificates/
sparkplug.compression:false
sparkplug.json.log:false
sparkplug.version=spBv1.0
sparkplug.systopic=$sparkplug/certificates/
sparkplug.compression=false
sparkplug.json.log=false
sparkplug.systopic.msgExpiry=4294967296
```

== JSON Logging configuration

When using logger sparkplug.json.log, add the configuration settings `appender` and `logger` from the `src/hivemq-extension/logback-example.xml` file to your `hivemq/conf/logback.xml` configuration.
With this, the sparkplug payload will be written as json into the separate file `hivemq/log/sparkplug-json-payload.log`.

== Installation

. Clone this repository into a Java 11 Gradle project.
. Execute the Gradle task `hivemqExtensionZip` to build the extension.
. Move the file: `build/hivemq-extension/hivemq-sparkplug-aware-extension-4.X.X.zip` to the directory: `HIVEMQ_HOME/extensions`
. Move the file: `build/hivemq-extension/hivemq-sparkplug-compatible-extension-4.X.X.zip` to the directory: `HIVEMQ_HOME/extensions`
. Unzip the file.
. Start HiveMQ.


== Contributing

If you want to contribute to HiveMQ Sparkplug Aware Extension, see the link:CONTRIBUTING.md[contribution guidelines].
Expand Down
21 changes: 21 additions & 0 deletions src/hivemq-extension/logback-example.xml
@@ -0,0 +1,21 @@
<configuration>
<appender name="SPARKPLUG-JSON-LOG-FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${hivemq.log.folder}/sparkplug-json-payload.log</file>
<append>true</append>
<encoder>
<pattern>%-24(%d)- %msg%n%ex</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${hivemq.log.folder}/sparkplug-json-payload-%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
</appender>

<logger name="com.hivemq.extensions.sparkplug.jsonLog" additivity="false">
<appender-ref ref="SPARKPLUG-JSON-LOG-FILE"/>
</logger>
</configuration>
4 changes: 2 additions & 2 deletions src/hivemq-extension/sparkplug.properties
Expand Up @@ -7,5 +7,5 @@
sparkplug.version=spBv1.0
sparkplug.systopic=$sparkplug/certificates/
sparkplug.compression=false
sparkplug.json.log=false
sparkplug.systopic.msgExpiry=600
sparkplug.json.log=true
sparkplug.systopic.msgExpiry=4294967296

This file was deleted.

Expand Up @@ -121,6 +121,7 @@ public void onInboundPublish(
log.warn("No payload present in the sparkplug message");
}
}

if (jsonLogEnabled) {
logFormattedPayload(clientId, origin, publishPacket, topicStructure);
}
Expand Down
Expand Up @@ -47,7 +47,7 @@ public class SparkplugConfiguration extends PropertiesReader {


private static final @NotNull String SPARKPLUG_SYSTOPIC_MSGEXPIRY = "sparkplug.systopic.msgExpiry";
private static final @NotNull Long SPARKPLUG_SYSTOPIC_MSGEXPIRY_DEFAULT = Long.MAX_VALUE;
private static final @NotNull Long SPARKPLUG_SYSTOPIC_MSGEXPIRY_DEFAULT = 4294967296L;

public SparkplugConfiguration(@NotNull final File configFilePath) {
super(configFilePath);
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.tahu.message.SparkplugBPayloadEncoder;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.util.CompressionAlgorithm;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,11 +35,42 @@

public class PayloadUtil {
private static final @NotNull Logger log = LoggerFactory.getLogger(PayloadUtil.class);
private static final @NotNull Logger jsonLog = LoggerFactory.getLogger("com.hivemq.extensions.sparkplug.jsonLog");
private static final CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.GZIP;

public static ByteBuffer modifySparkplugTimestamp(Boolean useCompression, ByteBuffer byteBuffer) throws Exception {
SparkplugBPayload inboundPayload = getSparkplugBPayload(byteBuffer);
//create the same payload with a new timestamp.
SparkplugBPayload payload =
new SparkplugBPayload(new Date(),
inboundPayload.getMetrics(),
inboundPayload.getSeq(),
inboundPayload.getUuid(), inboundPayload.getBody());

SparkplugBPayloadEncoder encoder = new SparkplugBPayloadEncoder();
byte[] bytes = null;
// Compress payload (optional)
if (useCompression) {
bytes = encoder.getBytes(org.eclipse.tahu.util.PayloadUtil.compress(payload, compressionAlgorithm));
} else {
bytes = encoder.getBytes(payload);
}
return ByteBuffer.wrap(bytes);
}

public static void logFormattedPayload(String clientId, String origin, PublishPacket publishPacket, TopicStructure topicStructure) {
if (publishPacket.getPayload().isPresent() && topicStructure.getMessageType() != MessageType.STATE) {
jsonLog.info("JSON Sparkplug MSG: clientId={}, topic={} payload={}",
clientId,
origin,
asJSONFormatted(getPayloadAsJSON(publishPacket.getPayload().get())));
}
}

@VisibleForTesting
public static String asJSONFormatted(String jsonObject) {
ObjectMapper mapper = new ObjectMapper();
String result;
final @NotNull ObjectMapper mapper = new ObjectMapper();
@NotNull String result;
try {
Object json = mapper.readValue(jsonObject, Object.class);
result = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
Expand All @@ -48,24 +80,24 @@ public static String asJSONFormatted(String jsonObject) {
return result;
}

@VisibleForTesting
public static String getPayloadAsJSON(@NotNull ByteBuffer payload) {
try {
byte[] bytes = getBytesFromBuffer(payload);
PayloadDecoder<SparkplugBPayload> decoder = new SparkplugBPayloadDecoder();
SparkplugBPayload sparkplugPayload = decoder.buildFromByteArray(bytes);
return org.eclipse.tahu.util.PayloadUtil.toJsonString(sparkplugPayload);
} catch (Exception e) {
log.error("Failed to parse the sparkplug payload - reason:", e);
jsonLog.error("Failed to parse the sparkplug payload - reason:", e);
}
return "";
}

public static SparkplugBPayload getSparkplugBPayload(@NotNull ByteBuffer payload) {
private static SparkplugBPayload getSparkplugBPayload(@NotNull ByteBuffer payload) {
try {
byte[] bytes = getBytesFromBuffer(payload);
PayloadDecoder<SparkplugBPayload> decoder = new SparkplugBPayloadDecoder();
SparkplugBPayload sparkplugPayload = decoder.buildFromByteArray(bytes);
return sparkplugPayload;
return decoder.buildFromByteArray(bytes);
} catch (Exception e) {
log.error("Failed to parse the sparkplug payload - reason:", e);
}
Expand All @@ -78,34 +110,4 @@ private static byte[] getBytesFromBuffer(ByteBuffer byteBuffer) {
return bytes;
}

public static void logFormattedPayload(String clientId, String origin, PublishPacket publishPacket, TopicStructure topicStructure) {
if (publishPacket.getPayload().isPresent()
&& topicStructure.getMessageType() != MessageType.STATE) {

log.info("JSON Sparkplug MSG: clientId={}, topic={} payload={}",
clientId,
origin,
asJSONFormatted(getPayloadAsJSON(publishPacket.getPayload().get())));
}
}

public static ByteBuffer modifySparkplugTimestamp(Boolean useCompression, ByteBuffer byteBuffer) throws Exception {
SparkplugBPayload inboundPayload = getSparkplugBPayload(byteBuffer);
//create the same payload with a new timestamp.
SparkplugBPayload payload =
new SparkplugBPayload(new Date(),
inboundPayload.getMetrics(),
inboundPayload.getSeq(),
inboundPayload.getUuid(), inboundPayload.getBody());

SparkplugBPayloadEncoder encoder = new SparkplugBPayloadEncoder();
byte[] bytes = null;
// Compress payload (optional)
if (useCompression) {
bytes = encoder.getBytes(org.eclipse.tahu.util.PayloadUtil.compress(payload, compressionAlgorithm));
} else {
bytes = encoder.getBytes(payload);
}
return ByteBuffer.wrap(bytes);
}
}

0 comments on commit 807b86d

Please sign in to comment.