Skip to content

Commit

Permalink
Minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dimabarbul committed Nov 17, 2023
1 parent ccee5d1 commit 75b60b2
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 7 deletions.
Expand Up @@ -59,7 +59,10 @@
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.NoMqttConnectionException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.MqttConsumerActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.ReconnectConsumerClient;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.AcknowledgementUnsupportedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.GenericMqttPublish;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.ManualAcknowledgementDisabledException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.MessageAlreadyAcknowledgedException;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing.MqttPublisherActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.MqttSubscriber;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.SubscribeResult;
Expand Down Expand Up @@ -486,7 +489,7 @@ private void subscribeToAcknowledgeUnsolicitedPublishes() {

unsolicitedPublishesAutoAckSubscription = genericMqttClient.consumeSubscribedPublishesWithManualAcknowledgement()
.filter(p -> messageHasNoMatchingSubscription(p, subscribedTopics))
.subscribe(GenericMqttPublish::acknowledge,
.subscribe(this::tryToAcknowledgePublish,
p -> logger.info("Failed to read unsolicited publish: <{}>", p));
}

Expand All @@ -495,6 +498,27 @@ private boolean messageHasNoMatchingSubscription(final GenericMqttPublish generi
return topicFilters.stream().noneMatch(filter -> filter.matches(genericMqttPublish.getTopic()));
}

private void tryToAcknowledgePublish(final GenericMqttPublish mqttPublish) {
try {
mqttPublish.acknowledge();
} catch (final ManualAcknowledgementDisabledException e) {
logger.warning("""
Manual acknowledgement of unsolicited incoming message at topic <{0}> failed because manual acknowledgement \
is disabled.\
""", mqttPublish.getTopic());
} catch (final MessageAlreadyAcknowledgedException e) {
logger.warning("""
Acknowledgement of unsolicited incoming message at topic <{0}> failed because it was acknowledged already by \
another source.\
""", mqttPublish.getTopic());
} catch (final AcknowledgementUnsupportedException e) {
logger.warning(
"Manual acknowledgement of unsolicited incoming message at topic <{0}> failed: {1}",
mqttPublish.getTopic(),
e.getMessage());
}
}

private CompletionStage<List<ActorRef>> handleSourceSubscribeResults(
final Source<SubscribeResult, NotUsed> sourceSubscribeResults
) {
Expand Down
Expand Up @@ -53,7 +53,7 @@ static BaseGenericMqttConsumingClient<Mqtt5RxClient> ofMqtt5RxClient(
public String toString() {
final var mqttClientConfig = mqttClient.getConfig();
final var clientIdentifier = mqttClientConfig.getClientIdentifier();
return clientIdentifier.toString();
return clientIdentifier.map(String::valueOf).orElse("");
}

private static final class Mqtt3ConsumingClient extends BaseGenericMqttConsumingClient<Mqtt3RxClient> {
Expand Down
Expand Up @@ -15,7 +15,7 @@
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;

/**
* Wrapper around flowable that buffers the items until it is told to stop.
Expand All @@ -26,17 +26,17 @@
*/
public class BufferingFlowableWrapper<T> implements Disposable {
private final Flowable<T> originalFlowable;
private final BehaviorSubject<T> buffered;
private final BehaviorSubject<T> unbuffered;
private final PublishSubject<T> buffered;
private final PublishSubject<T> unbuffered;
private final Disposable originalSubscription;
private final Flowable<T> flowable;
private final Disposable subscription;
private boolean isBuffering = true;

private BufferingFlowableWrapper(final Flowable<T> flowable) {
this.originalFlowable = flowable;
this.buffered = BehaviorSubject.<T>create();
this.unbuffered = BehaviorSubject.<T>create();
this.buffered = PublishSubject.<T>create();
this.unbuffered = PublishSubject.<T>create();

this.flowable = buffered
.replay()
Expand Down
@@ -0,0 +1,251 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;

import java.util.ArrayList;
import java.util.Optional;

import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.GenericMqttPublish;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientConfig;
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig;
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.subjects.PublishSubject;

/**
* Unit test for {@link BaseGenericMqttConsumingClient}.
*/
@RunWith(Enclosed.class)
public final class BaseGenericMqttConsumingClientTest {

@RunWith(MockitoJUnitRunner.class)
public static final class Mqtt3RxClientTest {

private static final Mqtt3Publish MQTT_3_PUBLISH = Mockito.mock(Mqtt3Publish.class);
private static final GenericMqttPublish GENERIC_MQTT_PUBLISH = GenericMqttPublish.ofMqtt3Publish(MQTT_3_PUBLISH);

@Mock
private Mqtt3RxClient mqtt3RxClient;

@Before
public void before() {
Mockito.when(mqtt3RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(Flowable.never());
}

@Test
public void ofMqtt3RxClientWithNullMqtt3RxClientThrowsException() {
assertThatNullPointerException()
.isThrownBy(() -> BaseGenericMqttConsumingClient.ofMqtt3RxClient(null))
.withMessage("The mqtt3RxClient must not be null!")
.withNoCause();
}

@Test
public void consumePublishesIsDelegatedToMqtt3RxClient() {
Mockito.when(mqtt3RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(Flowable.just(MQTT_3_PUBLISH));
final var underTest = BaseGenericMqttConsumingClient.ofMqtt3RxClient(mqtt3RxClient);

final var publishes = new ArrayList<GenericMqttPublish>();
underTest.consumePublishes().blockingSubscribe(publishes::add);

assertThat(publishes).containsExactly(GENERIC_MQTT_PUBLISH);
}

@Test
public void consumePublishesEmitsError() {
final var error = new Exception();
Mockito.when(mqtt3RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(Flowable.error(error));
final var underTest = BaseGenericMqttConsumingClient.ofMqtt3RxClient(mqtt3RxClient);

final var errors = new ArrayList<Throwable>();
underTest.consumePublishes().blockingSubscribe(
ignored -> {},
errors::add);

assertThat(errors).containsExactly(error);
}

@Test
public void consumePublishesEmitsCompletion() {
Mockito.when(mqtt3RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(Flowable.empty());
final var underTest = BaseGenericMqttConsumingClient.ofMqtt3RxClient(mqtt3RxClient);

final var completed = new ArrayList<Boolean>();
underTest.consumePublishes().blockingSubscribe(
ignored -> {},
ignored -> {},
() -> completed.add(true));

assertThat(completed).containsExactly(true);
}

@Test
public void consumePublishesEmitsOnlyNewItemsWhenBufferingIsStopped() {
final var subject = PublishSubject.<Mqtt3Publish>create();
Mockito.when(mqtt3RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(subject.toFlowable(BackpressureStrategy.DROP));
final var underTest = BaseGenericMqttConsumingClient.ofMqtt3RxClient(mqtt3RxClient);

final var newMqtt3Publish = Mockito.mock(Mqtt3Publish.class);
final var newGenericMqttPublish = GenericMqttPublish.ofMqtt3Publish(newMqtt3Publish);

final var publishes = new ArrayList<GenericMqttPublish>();
subject.onNext(MQTT_3_PUBLISH);
underTest.stopBufferingPublishes();
final var subscription = underTest.consumePublishes().subscribe(publishes::add);
subject.onNext(newMqtt3Publish);
subject.onComplete();
subscription.dispose();

assertThat(publishes).containsExactly(newGenericMqttPublish);
}

@Test
public void toStringReturnsExpected() {
final var mqttClientId = "Anneliese";
final var mqtt3ClientConfig = Mockito.mock(Mqtt3ClientConfig.class);
Mockito.when(mqtt3ClientConfig.getClientIdentifier())
.thenReturn(Optional.of(MqttClientIdentifier.of(mqttClientId)));
Mockito.when(mqtt3RxClient.getConfig()).thenReturn(mqtt3ClientConfig);
final var underTest =
BaseGenericMqttConsumingClient.ofMqtt3RxClient(mqtt3RxClient);

assertThat(underTest).hasToString(mqttClientId);
}

}

@RunWith(MockitoJUnitRunner.class)
public static final class Mqtt5RxClientTest {

private static final Mqtt5Publish MQTT_5_PUBLISH = Mockito.mock(Mqtt5Publish.class);
private static final GenericMqttPublish GENERIC_MQTT_PUBLISH = GenericMqttPublish.ofMqtt5Publish(MQTT_5_PUBLISH);

@Mock
private Mqtt5RxClient mqtt5RxClient;

@Before
public void before() {
Mockito.when(mqtt5RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(Flowable.never());
}

@Test
public void ofMqtt5RxClientWithNullMqtt5RxClientThrowsException() {
assertThatNullPointerException()
.isThrownBy(() -> BaseGenericMqttConsumingClient.ofMqtt5RxClient(null))
.withMessage("The mqtt5RxClient must not be null!")
.withNoCause();
}

@Test
public void consumePublishesIsDelegatedToMqtt5RxClient() {
Mockito.when(mqtt5RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(Flowable.just(MQTT_5_PUBLISH));
final var underTest = BaseGenericMqttConsumingClient.ofMqtt5RxClient(mqtt5RxClient);

final var publishes = new ArrayList<GenericMqttPublish>();
underTest.consumePublishes().blockingSubscribe(publishes::add);

assertThat(publishes).containsExactly(GENERIC_MQTT_PUBLISH);
}

@Test
public void consumePublishesEmitsError() {
final var error = new Exception();
Mockito.when(mqtt5RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(Flowable.error(error));
final var underTest = BaseGenericMqttConsumingClient.ofMqtt5RxClient(mqtt5RxClient);

final var errors = new ArrayList<Throwable>();
underTest.consumePublishes().blockingSubscribe(
ignored -> {},
errors::add);

assertThat(errors).containsExactly(error);
}

@Test
public void consumePublishesEmitsCompletion() {
Mockito.when(mqtt5RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(Flowable.empty());
final var underTest = BaseGenericMqttConsumingClient.ofMqtt5RxClient(mqtt5RxClient);

final var completed = new ArrayList<Boolean>();
underTest.consumePublishes().blockingSubscribe(
ignored -> {},
ignored -> {},
() -> completed.add(true));

assertThat(completed).containsExactly(true);
}

@Test
public void consumePublishesEmitsOnlyNewItemsWhenBufferingIsStopped() {
final var subject = PublishSubject.<Mqtt5Publish>create();
Mockito.when(mqtt5RxClient.publishes(Mockito.eq(MqttGlobalPublishFilter.ALL), Mockito.eq(true)))
.thenReturn(subject.toFlowable(BackpressureStrategy.DROP));
final var underTest = BaseGenericMqttConsumingClient.ofMqtt5RxClient(mqtt5RxClient);

final var newMqtt5Publish = Mockito.mock(Mqtt5Publish.class);
final var newGenericMqttPublish = GenericMqttPublish.ofMqtt5Publish(newMqtt5Publish);

final var publishes = new ArrayList<GenericMqttPublish>();
subject.onNext(MQTT_5_PUBLISH);
underTest.stopBufferingPublishes();
final var subscription = underTest.consumePublishes().subscribe(publishes::add);
subject.onNext(newMqtt5Publish);
subject.onComplete();
subscription.dispose();

assertThat(publishes).containsExactly(newGenericMqttPublish);
}

@Test
public void toStringReturnsExpected() {
final var mqttClientId = "Anneliese";
final var mqtt5ClientConfig = Mockito.mock(Mqtt5ClientConfig.class);
Mockito.when(mqtt5ClientConfig.getClientIdentifier())
.thenReturn(Optional.of(MqttClientIdentifier.of(mqttClientId)));
Mockito.when(mqtt5RxClient.getConfig()).thenReturn(mqtt5ClientConfig);
final var underTest =
BaseGenericMqttConsumingClient.ofMqtt5RxClient(mqtt5RxClient);

assertThat(underTest).hasToString(mqttClientId);
}

}

}

0 comments on commit 75b60b2

Please sign in to comment.