Skip to content

Commit

Permalink
Replaced reactive MQTT client with async v2, handled ripples (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Nov 8, 2023
1 parent fa67007 commit bfdc029
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import net.sf.dz3r.device.mqtt.MqttAdapter;
import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v2.AbstractMqttCqrsSwitch;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import net.sf.dz3r.instrumentation.Marker;
import net.sf.dz3r.runtime.config.ConfigurationContext;
import net.sf.dz3r.runtime.config.ConfigurationContextAware;
Expand Down Expand Up @@ -103,6 +103,8 @@ List<Map.Entry<String, AbstractMqttCqrsSwitch>>>> parse(

// Step 3: for all the brokers, collect all their devices

// VT: FIXME: use zip() here

mqttConfigs
.doOnNext(MqttDeviceResolver::getSensorConfigs)
.blockLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import net.sf.dz3r.device.esphome.v2.ESPHomeCqrsSwitch;
import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import net.sf.dz3r.model.HvacMode;
import net.sf.dz3r.signal.Signal;
import net.sf.dz3r.signal.hvac.HvacCommand;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import net.sf.dz3r.device.mqtt.v1.AbstractMqttSwitch;
import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v1.MqttMessageAddress;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import net.sf.dz3r.signal.Signal;
import reactor.core.scheduler.Scheduler;

import java.io.IOException;
import java.time.Instant;

import static net.sf.dz3r.device.mqtt.v2.AbstractMqttListener.DEFAULT_CACHE_AGE;

/**
* Implementation to control <a href="https://esphome.io/components/switch/">ESPHome Switch Component</a> via
* <a href="https://esphome.io/components/mqtt">ESPHome MQTT Client Component</a>.
Expand Down Expand Up @@ -50,7 +52,7 @@ public ESPHomeSwitch(String host, String deviceRootTopic) {
*
* @param deviceRootTopic Switch root topic. See the doc link at the top for the configuration reference.
*
* @deprecated Use {@link ESPHomeSwitch#ESPHomeSwitch(MqttAdapterImpl, String, boolean, Scheduler)} instead.
* @deprecated Use {@link ESPHomeSwitch#ESPHomeSwitch(MqttAdapter, String, boolean, Scheduler)} instead.
*/
@Deprecated(forRemoval = false)
public ESPHomeSwitch(String host, int port,
Expand All @@ -68,7 +70,7 @@ public ESPHomeSwitch(String host, int port,
*
* @param deviceRootTopic Switch root topic. See the doc link at the top for the configuration reference.
*
* @deprecated Use {@link ESPHomeSwitch#ESPHomeSwitch(MqttAdapterImpl, String, boolean, Scheduler)} instead.
* @deprecated Use {@link ESPHomeSwitch#ESPHomeSwitch(MqttAdapter, String, boolean, Scheduler)} instead.
*/
@Deprecated(forRemoval = false)
public ESPHomeSwitch(String host, int port,
Expand All @@ -80,7 +82,7 @@ public ESPHomeSwitch(String host, int port,

// VT: NOTE: ESPHome appears to not suffer from buffer overruns like Zigbee and Z-Wave do,
// so not providing the delay
this(new MqttAdapterImpl(new MqttEndpoint(host, port), username, password, reconnect),
this(new MqttAdapterImpl(new MqttEndpoint(host, port), username, password, reconnect, DEFAULT_CACHE_AGE),
deviceRootTopic,
optimistic,
scheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public MqttAdapterImpl(MqttEndpoint address) {
super(address);
}

public MqttAdapterImpl(MqttEndpoint address, String username, String password, boolean autoReconnect) {
super(address, username, password, autoReconnect, DEFAULT_CACHE_AGE);
}

public MqttAdapterImpl(MqttEndpoint address, String username, String password, boolean autoReconnect, Duration cacheFor) {
super(address, username, password, autoReconnect, cacheFor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import net.sf.dz3r.device.mqtt.MqttListener;
import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v1.MqttSignal;
import net.sf.dz3r.device.mqtt.v2rx.MqttListenerImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttListenerImpl;
import net.sf.dz3r.signal.Signal;
import net.sf.dz3r.signal.SignalSource;
import org.apache.logging.log4j.LogManager;
Expand All @@ -13,6 +13,8 @@

import java.time.Instant;

import static net.sf.dz3r.device.mqtt.v2.AbstractMqttListener.DEFAULT_CACHE_AGE;

/**
* <a href="https://zigbee2mqtt.io">Zigbee2MQTT</a> sensor stream cold publisher.
*
Expand All @@ -34,7 +36,7 @@ public Z2MListener(String host, int port,
boolean reconnect,
String mqttRootTopicSub) {

mqttListener = new MqttListenerImpl(new MqttEndpoint(host, port), username, password, reconnect);
mqttListener = new MqttListenerImpl(new MqttEndpoint(host, port), username, password, reconnect, DEFAULT_CACHE_AGE);
this.mqttRootTopicSub = mqttRootTopicSub;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import net.sf.dz3r.device.mqtt.v1.AbstractMqttSwitch;
import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v1.MqttMessageAddress;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import net.sf.dz3r.signal.Signal;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.scheduler.Scheduler;
Expand All @@ -17,6 +17,8 @@
import java.time.Instant;
import java.util.Map;

import static net.sf.dz3r.device.mqtt.v2.AbstractMqttListener.DEFAULT_CACHE_AGE;

/**
* Implementation for a Zigbee switch over <a href="https://zigbee2mqtt.io">Zigbee2MQTT</a>.
*
Expand Down Expand Up @@ -80,7 +82,7 @@ public Z2MSwitch(String host, int port,
Scheduler scheduler) {

this(
new MqttAdapterImpl(new MqttEndpoint(host, port), username, password, reconnect),
new MqttAdapterImpl(new MqttEndpoint(host, port), username, password, reconnect, DEFAULT_CACHE_AGE),
deviceRootTopic,
optimistic,
scheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import net.sf.dz3r.device.mqtt.v1.AbstractMqttSwitch;
import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v1.MqttMessageAddress;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import net.sf.dz3r.signal.Signal;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.scheduler.Scheduler;
Expand All @@ -17,6 +17,8 @@
import java.time.Instant;
import java.util.Map;

import static net.sf.dz3r.device.mqtt.v2.AbstractMqttListener.DEFAULT_CACHE_AGE;

/**
* Implementation for Z-Wave Binary Switch Generic Device Class over MQTT.
*
Expand Down Expand Up @@ -76,7 +78,7 @@ public ZWaveBinarySwitch(String host, int port,
Scheduler scheduler) {

this(
new MqttAdapterImpl(new MqttEndpoint(host, port), username, password, reconnect),
new MqttAdapterImpl(new MqttEndpoint(host, port), username, password, reconnect, DEFAULT_CACHE_AGE),
deviceRootTopic,
false,
scheduler);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package net.sf.dz3r.device.esphome.v1;

import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v2rx.MqttListenerImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttListenerImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;

import static net.sf.dz3r.device.mqtt.v2.AbstractMqttListener.DEFAULT_CACHE_AGE;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

Expand All @@ -22,7 +23,7 @@ class ESPHomeListenerTest {
@Test
void constructor() {

var mqttListener = new MqttListenerImpl(new MqttEndpoint("localhost"), null, null, false);
var mqttListener = new MqttListenerImpl(new MqttEndpoint("localhost"), null, null, false, DEFAULT_CACHE_AGE);

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> {
Expand All @@ -34,7 +35,7 @@ void constructor() {
void sensorFlux() throws InterruptedException {

assertThatCode(() -> {
var mqttListener = new MqttListenerImpl(new MqttEndpoint("localhost"), null, null, false);
var mqttListener = new MqttListenerImpl(new MqttEndpoint("localhost"), null, null, false, DEFAULT_CACHE_AGE);
var esphomeListener = new ESPHomeListener(mqttListener, "/esphome");

var subscription = esphomeListener.getFlux("dining-room")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.sf.dz3r.device.esphome.v2;

import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.BeforeAll;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.sf.dz3r.device.esphome.v2;

import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.sf.dz3r.device.z2m.v2;

import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.BeforeAll;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.sf.dz3r.device.zwave.v2;

import net.sf.dz3r.device.mqtt.v1.MqttEndpoint;
import net.sf.dz3r.device.mqtt.v2rx.MqttAdapterImpl;
import net.sf.dz3r.device.mqtt.v2async.MqttAdapterImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.BeforeAll;
Expand Down

0 comments on commit bfdc029

Please sign in to comment.