-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix pubber for proxy device through gateway, and also periodic discovery timing #341
Changes from 17 commits
377a045
caf4945
89c8a36
42614d8
8ce59d8
806790d
80897c2
90b6b8b
1749e29
00d3f23
a439d48
488dd05
1aa91b3
f27f17f
d462231
459c027
cd7400a
6deadf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
#!/bin/bash | ||
|
||
sudo apt-get install hxtools | ||
|
||
python3 -m venv venv | ||
|
||
venv/bin/pip3 install -r etc/requirements.txt |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,11 @@ fi | |
project_id=$1 | ||
shift | ||
|
||
echo 'Using target project (base64):' $(echo =$project_id | base64) | ||
if [[ -n `which rot13` ]]; then | ||
echo 'Using target project (rot13):' $(echo $project_id | rot13) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the goal here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added comment. Goal is to obfuscate so that it can be viewed in server-side logs. |
||
else | ||
echo 'Using target project:' $project_id | ||
fi | ||
|
||
site_path=sites/udmi_site_model | ||
device_id=AHU-1 | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,17 @@ | |
import java.security.spec.PKCS8EncodedKeySpec; | ||
import java.time.Instant; | ||
import java.util.Map; | ||
import java.util.Map.Entry; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
import org.apache.http.ConnectionClosedException; | ||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; | ||
import org.eclipse.paho.client.mqttv3.MqttCallback; | ||
|
@@ -53,7 +57,6 @@ public class MqttPublisher { | |
// Indicate if this message should be a MQTT 'retained' message. | ||
private static final boolean SHOULD_RETAIN = false; | ||
|
||
private static final int MQTT_QOS = 1; | ||
private static final String CONFIG_UPDATE_TOPIC_FMT = "/devices/%s/config"; | ||
private static final String ERRORS_TOPIC_FMT = "/devices/%s/errors"; | ||
private static final String UNUSED_ACCOUNT_NAME = "unused"; | ||
|
@@ -65,6 +68,9 @@ public class MqttPublisher { | |
private static final int PUBLISH_THREAD_COUNT = 10; | ||
private static final String HANDLER_KEY_FORMAT = "%s/%s"; | ||
private static final int TOKEN_EXPIRY_MINUTES = 60; | ||
private static final int QOS_AT_MOST_ONCE = 0; | ||
private static final int QOS_AT_LEAST_ONCE = 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (NOP) It's interesting the library doesn't make constants for this stuff. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know -- I had to actually go look at documentation somewhere to figure out what the values were. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (NOP) Yeah, me too, to confirm/understand your naming. 2X work! |
||
private static final long CONFIG_WAIT_TIME_MS = 10000; | ||
|
||
private final Semaphore connectionLock = new Semaphore(1); | ||
|
||
|
@@ -82,6 +88,7 @@ public class MqttPublisher { | |
private final Map<String, Consumer<Object>> handlers = new ConcurrentHashMap<>(); | ||
private final Map<String, Class<Object>> handlersType = new ConcurrentHashMap<>(); | ||
private final Consumer<Exception> onError; | ||
private CountDownLatch gatewayLatch; | ||
|
||
MqttPublisher(Configuration configuration, Consumer<Exception> onError) { | ||
this.configuration = configuration; | ||
|
@@ -100,7 +107,7 @@ void publish(String deviceId, String topic, Object data, Runnable callback) { | |
publisherExecutor.submit(() -> publishCore(deviceId, topic, data, callback)); | ||
} | ||
|
||
private synchronized void publishCore(String deviceId, String topic, Object data, | ||
private void publishCore(String deviceId, String topic, Object data, | ||
Runnable callback) { | ||
try { | ||
String payload = OBJECT_MAPPER.writeValueAsString(data); | ||
|
@@ -169,11 +176,15 @@ private MqttClient newBoundClient(String deviceId) { | |
try { | ||
String gatewayId = configuration.gatewayId; | ||
debug("Connecting through gateway " + gatewayId); | ||
gatewayLatch = new CountDownLatch(1); | ||
MqttClient mqttClient = getConnectedClient(gatewayId); | ||
if (!gatewayLatch.await(CONFIG_WAIT_TIME_MS, TimeUnit.MILLISECONDS)) { | ||
throw new RuntimeException("Timeout waiting for gateway startup exchange"); | ||
} | ||
String topic = String.format("/devices/%s/attach", deviceId); | ||
String payload = ""; | ||
info("Publishing attach message " + topic); | ||
mqttClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8.name()), MQTT_QOS, | ||
mqttClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8.name()), QOS_AT_LEAST_ONCE, | ||
SHOULD_RETAIN); | ||
subscribeToUpdates(mqttClient, deviceId); | ||
return mqttClient; | ||
|
@@ -279,14 +290,14 @@ private String getMessageTopic(String deviceId, String topic) { | |
} | ||
|
||
private void subscribeToUpdates(MqttClient client, String deviceId) { | ||
subscribeTopic(client, String.format(CONFIG_UPDATE_TOPIC_FMT, deviceId)); | ||
subscribeTopic(client, String.format(ERRORS_TOPIC_FMT, deviceId)); | ||
subscribeTopic(client, String.format(CONFIG_UPDATE_TOPIC_FMT, deviceId), QOS_AT_LEAST_ONCE); | ||
subscribeTopic(client, String.format(ERRORS_TOPIC_FMT, deviceId), QOS_AT_MOST_ONCE); | ||
info("Updates subscribed"); | ||
} | ||
|
||
private void subscribeTopic(MqttClient client, String updateTopic) { | ||
private void subscribeTopic(MqttClient client, String updateTopic, int mqttQos) { | ||
try { | ||
client.subscribe(updateTopic); | ||
client.subscribe(updateTopic, mqttQos); | ||
} catch (MqttException e) { | ||
throw new RuntimeException("While subscribing to MQTT topic " + updateTopic, e); | ||
} | ||
|
@@ -324,6 +335,11 @@ private String getMessageType(String topic) { | |
return topic.split("/")[3]; | ||
} | ||
|
||
private String getDeviceId(String topic) { | ||
// {site}/devices/{device}/{type} | ||
return topic.split("/")[2]; | ||
} | ||
|
||
public void connect(String deviceId) { | ||
getConnectedClient(deviceId); | ||
} | ||
|
@@ -349,22 +365,29 @@ private void debug(String message) { | |
LOG.debug(message); | ||
} | ||
|
||
private synchronized void sendMessage(String deviceId, String mqttTopic, | ||
private void sendMessage(String deviceId, String mqttTopic, | ||
byte[] mqttMessage) throws Exception { | ||
checkAuthentication(deviceId); | ||
MqttClient connectedClient = getConnectedClient(deviceId); | ||
connectedClient.publish(mqttTopic, mqttMessage, MQTT_QOS, SHOULD_RETAIN); | ||
connectedClient.publish(mqttTopic, mqttMessage, QOS_AT_LEAST_ONCE, SHOULD_RETAIN); | ||
publishCounter.incrementAndGet(); | ||
} | ||
|
||
private void checkAuthentication(String deviceId) { | ||
Instant reauthTime = reauthTimes.get(deviceId); | ||
String authId = isProxyDevice(deviceId) ? configuration.gatewayId : deviceId; | ||
Instant reauthTime = reauthTimes.get(authId); | ||
if (reauthTime != null && Instant.now().isBefore(reauthTime)) { | ||
return; | ||
} | ||
warn("Authentication retry time reached for " + deviceId); | ||
reauthTimes.remove(deviceId); | ||
MqttClient client = mqttClients.remove(deviceId); | ||
warn("Authentication retry time reached for " + authId); | ||
reauthTimes.remove(authId); | ||
MqttClient client = mqttClients.remove(authId); | ||
if (client == null) { | ||
return; | ||
} | ||
Set<String> removeSet = mqttClients.entrySet().stream() | ||
.filter(entry -> entry.getValue() == client).map(Entry::getKey).collect(Collectors.toSet()); | ||
removeSet.forEach(mqttClients::remove); | ||
try { | ||
client.disconnect(); | ||
client.close(); | ||
|
@@ -373,10 +396,9 @@ private void checkAuthentication(String deviceId) { | |
} | ||
} | ||
|
||
private synchronized MqttClient getConnectedClient(String deviceId) { | ||
private MqttClient getConnectedClient(String deviceId) { | ||
try { | ||
String gatewayId = configuration.gatewayId; | ||
if (gatewayId != null && !gatewayId.equals(deviceId)) { | ||
if (isProxyDevice(deviceId)) { | ||
return mqttClients.computeIfAbsent(deviceId, this::newBoundClient); | ||
} | ||
return mqttClients.computeIfAbsent(deviceId, this::connectMqttClient); | ||
|
@@ -385,6 +407,11 @@ private synchronized MqttClient getConnectedClient(String deviceId) { | |
} | ||
} | ||
|
||
private boolean isProxyDevice(String deviceId) { | ||
String gatewayId = configuration.gatewayId; | ||
return gatewayId != null && !gatewayId.equals(deviceId); | ||
} | ||
|
||
/** | ||
* Load a PKCS8 encoded keyfile from the given path. | ||
*/ | ||
|
@@ -412,7 +439,7 @@ public static class PublisherException extends RuntimeException { | |
* @param message Error message | ||
* @param type Type of message being published | ||
* @param phase Which phase of execution | ||
* @param cause Caouse of the exception | ||
* @param cause Cause of the exception | ||
*/ | ||
public PublisherException(String message, String type, String phase, Throwable cause) { | ||
super(message, cause); | ||
|
@@ -444,6 +471,10 @@ public void messageArrived(String topic, MqttMessage message) { | |
synchronized (MqttPublisher.this) { | ||
String messageType = getMessageType(topic); | ||
String handlerKey = getHandlerKey(topic); | ||
String deviceId = getDeviceId(topic); | ||
if (deviceId.equals(configuration.gatewayId)) { | ||
gatewayLatch.countDown(); | ||
} | ||
Consumer<Object> handler = handlers.get(handlerKey); | ||
Class<Object> type = handlersType.get(handlerKey); | ||
if (handler == null) { | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could generate a user prompt. If we going with rot13 how about:
sudo apt-get install -y hxtools
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done