Skip to content

Commit

Permalink
Added new implementation of a MQTT publisher actor.
Browse files Browse the repository at this point in the history
The new actor operates on a generic MQTT Publish message and thus is suitable for MQTT version 3 and 5.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Apr 14, 2022
1 parent c08484d commit acdbc67
Show file tree
Hide file tree
Showing 16 changed files with 2,583 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttHeader;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publish.GenericMqttPublish;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publish.MqttPublishTransformationException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publish.TransformationFailure;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publish.TransformationResult;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publish.TransformationSuccess;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publish.UserProperty;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.datatypes.MqttTopic;

/**
* Transforms an {@link org.eclipse.ditto.connectivity.api.ExternalMessage} to a {@link GenericMqttPublish}.
*/
@Immutable
final class ExternalMessageToMqttPublishTransformer {

/*
* Actually it would be correct to also include MqttHeader.MQTT_TOPIC,
* MqttHeader.MQTT_QOS and MqttHeader.MQTT_RETAIN in the set of the known
* MQTT header names because they are already dedicated properties in the
* MQTT Publish.
* However, as the named headers were included in user properties
* up to the present, there might be users who rely on their presence.
* Excluding the headers from user properties would then break
* functionality.
*/
private static final Set<String> KNOWN_MQTT_HEADER_NAMES = Set.of(DittoHeaderDefinition.CORRELATION_ID.getKey(),
ExternalMessage.REPLY_TO_HEADER,
ExternalMessage.CONTENT_TYPE_HEADER);

private ExternalMessageToMqttPublishTransformer() {
throw new AssertionError();
}

/**
* Transforms the specified {@code ExternalMessage} argument into a {@code GenericMqttPublish}.
*
* @param externalMessage the ExternalMessage to be transformed.
* @param mqttPublishTarget provides fall-back values for MQTT topic and MQTT QoS.
* @return the result of the transformation.
* @throws NullPointerException if any argument is {@code null}.
*/
public static TransformationResult<ExternalMessage, GenericMqttPublish> transform(
final ExternalMessage externalMessage,
final MqttPublishTarget mqttPublishTarget
) {
ConditionChecker.checkNotNull(externalMessage, "externalMessage");
ConditionChecker.checkNotNull(mqttPublishTarget, "mqttPublishTarget");

try {
return TransformationSuccess.of(externalMessage, getGenericMqttPublish(externalMessage, mqttPublishTarget));
} catch (final Exception e) {
return TransformationFailure.of(
externalMessage,
new MqttPublishTransformationException(
MessageFormat.format("Failed to transform {0} to {1}: {2}",
ExternalMessage.class.getSimpleName(),
GenericMqttPublish.class.getSimpleName(),
e.getMessage()),
e,
externalMessage.getInternalHeaders()
)
);
}
}

private static GenericMqttPublish getGenericMqttPublish(final ExternalMessage externalMessage,
final MqttPublishTarget mqttPublishTarget) {

final var externalMessageHeaders = externalMessage.getHeaders();
final var charset = getCharsetOrUtf8(externalMessage);
final var mqttTopic = getMqttTopic(externalMessageHeaders, mqttPublishTarget);
final var mqttQos = getMqttQosOrThrow(externalMessageHeaders, mqttPublishTarget);
final var retain = isRetainOrThrow(externalMessageHeaders);

return GenericMqttPublish.builder(mqttTopic, mqttQos)
.retain(retain)
.payload(getPayloadOrNull(externalMessage, charset))
.correlationData(getCorrelationDataOrNull(externalMessageHeaders, charset))
.responseTopic(getResponseTopicOrNull(externalMessageHeaders))
.contentType(getContentTypeOrNull(externalMessageHeaders))
.userProperties(getUserPropertiesOrEmptySet(externalMessageHeaders.entrySet(),
mqttTopic,
mqttQos,
retain))
.build();
}

private static Charset getCharsetOrUtf8(final ExternalMessage externalMessage) {
return externalMessage.findContentType().map(CharsetDeterminer.getInstance()).orElse(StandardCharsets.UTF_8);
}

private static MqttTopic getMqttTopic(final Map<String, String> externalMessageHeaders,
final MqttPublishTarget mqttPublishTarget) {

final MqttTopic result;
@Nullable final var mqttTopicHeaderValue = externalMessageHeaders.get(MqttHeader.MQTT_TOPIC.getName());
if (null == mqttTopicHeaderValue) {
result = mqttPublishTarget.getTopic();
} else {
result = MqttTopic.of(mqttTopicHeaderValue);
}
return result;
}

private static MqttQos getMqttQosOrThrow(final Map<String, String> externalMessageHeaders,
final MqttPublishTarget mqttPublishTarget) {

final MqttQos result;
@Nullable final var mqttQosHeaderValue = externalMessageHeaders.get(MqttHeader.MQTT_QOS.getName());
if (null == mqttQosHeaderValue) {
result = mqttPublishTarget.getQos();
} else {
result = getMqttQosFromCodeOrThrow(parseMqttQosCodeFromHeaderValueOrThrow(mqttQosHeaderValue));
}
return result;
}

private static int parseMqttQosCodeFromHeaderValueOrThrow(final String mqttQosHeaderValue) {
try {
return Integer.parseInt(mqttQosHeaderValue);
} catch (final NumberFormatException e) {
throw new InvalidHeaderValueException(MqttHeader.MQTT_QOS.getName(),
new InvalidMqttQosCodeException(mqttQosHeaderValue, e));
}
}

private static MqttQos getMqttQosFromCodeOrThrow(final int mqttQosCode) {
@Nullable final var mqttQosFromCode = MqttQos.fromCode(mqttQosCode);
if (null != mqttQosFromCode) {
return mqttQosFromCode;
} else {
throw new InvalidHeaderValueException(MqttHeader.MQTT_QOS.getName(),
new InvalidMqttQosCodeException(mqttQosCode));
}
}

private static boolean isRetainOrThrow(final Map<String, String> externalMessageHeaders) {
final boolean result;
final var retainHeaderName = MqttHeader.MQTT_RETAIN.getName();
@Nullable final var retainHeaderValue = externalMessageHeaders.get(retainHeaderName);
if (null == retainHeaderValue) {
result = false;
} else {
final var trimmedRetainHeaderValue = retainHeaderValue.trim();
if (trimmedRetainHeaderValue.equalsIgnoreCase("true")) {
result = true;
} else if (trimmedRetainHeaderValue.equalsIgnoreCase("false")) {
result = false;
} else {
throw new InvalidHeaderValueException(retainHeaderName,
MessageFormat.format("<{0}> is not a boolean.", retainHeaderValue));
}
}
return result;
}

@Nullable
private static ByteBuffer getPayloadOrNull(final ExternalMessage externalMessage, final Charset charset) {
return externalMessage.getTextPayload()
.map(s -> s.getBytes(charset))
.map(ByteBuffer::wrap)
.or(externalMessage::getBytePayload)
.orElse(null);
}

@Nullable
private static ByteBuffer getCorrelationDataOrNull(final Map<String, String> externalMessageHeaders,
final Charset charset) {

final ByteBuffer result;
@Nullable final var correlationId = externalMessageHeaders.get(DittoHeaderDefinition.CORRELATION_ID.getKey());
if (null != correlationId) {
result = ByteBuffer.wrap(correlationId.getBytes(charset));
} else {
result = null;
}
return result;
}

@Nullable
private static MqttTopic getResponseTopicOrNull(final Map<String, String> externalMessageHeaders) {
final MqttTopic result;
@Nullable final var replyToValue = externalMessageHeaders.get(ExternalMessage.REPLY_TO_HEADER);
if (null != replyToValue) {
result = MqttTopic.of(replyToValue);
} else {
result = null;
}
return result;
}

@Nullable
private static String getContentTypeOrNull(final Map<String, String> externalMessageHeaders) {
return externalMessageHeaders.get(ExternalMessage.CONTENT_TYPE_HEADER);
}

private static Set<UserProperty> getUserPropertiesOrEmptySet(
final Collection<Map.Entry<String, String>> externalMessageHeaders,
final MqttTopic actualMqttTopic,
final MqttQos actualMqttQos,
final boolean actualRetain
) {
return externalMessageHeaders.stream()
.filter(header -> !KNOWN_MQTT_HEADER_NAMES.contains(header.getKey()))
.map(header -> {
final var headerKey = header.getKey();
final String headerValue;
if (headerKey.equals(MqttHeader.MQTT_TOPIC.getName())) {
headerValue = actualMqttTopic.toString();
} else if (headerKey.equals(MqttHeader.MQTT_QOS.getName())) {
headerValue = String.valueOf(actualMqttQos.getCode());
} else if (headerKey.equals(MqttHeader.MQTT_RETAIN.getName())) {
headerValue = String.valueOf(actualRetain);
} else {
headerValue = header.getValue();
}
return new UserProperty(headerKey, headerValue);
})
.collect(Collectors.toCollection(LinkedHashSet::new));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing;

import java.util.Objects;
import java.util.Optional;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publish.GenericMqttPublish;

/**
* Result for a {@code GenericMqttPublish} sent by the client.
*/
final class GenericMqttPublishResult {

private final GenericMqttPublish genericMqttPublish;
@Nullable private final Throwable error;

private GenericMqttPublishResult(final GenericMqttPublish genericMqttPublish, @Nullable final Throwable error) {
this.genericMqttPublish = ConditionChecker.checkNotNull(genericMqttPublish, "genericMqttPublish");
this.error = error;
}

/**
* Returns an instance of {@code GenericMqttPublishResult} that represents the successful delivery of a Publish
* message.
*
* @param genericMqttPublish the successfully delivered Publish message.
* @return the instance of the result.
* @throws NullPointerException if {@code genericMqttPublish} is {@code null}.
*/
static GenericMqttPublishResult success(final GenericMqttPublish genericMqttPublish) {
return new GenericMqttPublishResult(genericMqttPublish, null);
}

/**
* Returns an instance of {@code GenericMqttPublishResult} that represents the failed delivery of a Publish
* message.
*
* @param genericMqttPublish the Publish message that could not be delivered.
* @param error the error that caused the delivery to fail.
* @return the instance of the result.
* @throws NullPointerException if any argument is {@code null}.
*/
static GenericMqttPublishResult failure(final GenericMqttPublish genericMqttPublish, final Throwable error) {
return new GenericMqttPublishResult(genericMqttPublish, ConditionChecker.checkNotNull(error, "error"));
}

/**
* Indicates whether this result represents the successful delivery of a Publish message.
*
* @return {@code true} if this result represents the successful delivery of a Publish message, {@code false} if
* delivery failed.
* @see #isFailure()
*/
boolean isSuccess() {
return null == error;
}

/**
* Indicates whether this result represents the failed delivery of a Publish message.
*
* @return {@code true} if this result represents the failed delivery of a Publish message, {@code false} if
* delivery succeeded.
* @see #isSuccess()
*/
boolean isFailure() {
return !isSuccess();
}

/**
* Returns the {@code GenericMqttPublish} this result is for.
*
* @return the Publish message this result is for.
*/
public GenericMqttPublish getGenericMqttPublish() {return genericMqttPublish;}

/**
* Returns the optional error that is present if the Publish message was not successfully delivered.
*
* @return the optional error.
*/
public Optional<Throwable> getError() {
return Optional.ofNullable(error);
}

/**
* Returns the error if this is result is a failure.
* Throws an {@code IllegalStateException} else.
*
* @return the error.
* @throws IllegalStateException if this result is a success.
*/
public Throwable getErrorOrThrow() {
if (isFailure()) {
return error;
} else {
throw new IllegalStateException("Success cannot provide an error.");
}
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final var that = (GenericMqttPublishResult) o;
return Objects.equals(genericMqttPublish, that.genericMqttPublish) &&
Objects.equals(error, that.error);
}

@Override
public int hashCode() {
return Objects.hash(genericMqttPublish, error);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"genericMqttPublish=" + genericMqttPublish +
", error=" + error +
"]";
}

}
Loading

0 comments on commit acdbc67

Please sign in to comment.