Skip to content

Commit

Permalink
Cleanup shutdown/startup process (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed May 27, 2022
1 parent 7a9dbcf commit 68b3278
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pubber/.idea/runConfigurations/Pubber.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 22 additions & 21 deletions pubber/src/main/java/daq/pubber/MqttPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public class MqttPublisher {
void publish(String deviceId, String topic, Object data, Runnable callback) {
Preconditions.checkNotNull(deviceId, "publish deviceId");
if (publisherExecutor.isShutdown()) {
warn("Publisher is shutdown, dropping message");
return;
throw new RuntimeException("Publisher shutdown.");
}
debug("Publishing in background " + topic);
publisherExecutor.submit(() -> publishCore(deviceId, topic, data, callback));
Expand Down Expand Up @@ -150,12 +149,9 @@ void close() {
try {
warn("Closing publisher connection");
publisherExecutor.shutdown();
if (!publisherExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Could not terminate executor");
}
mqttClients.keySet().forEach(this::closeMqttClient);
} catch (Exception e) {
throw new RuntimeException("While closing publisher", e);
error("While closing publisher", null, "close", e);
}
}

Expand Down Expand Up @@ -381,27 +377,32 @@ private void checkAuthentication(String deviceId) {
}
warn("Authentication retry time reached for " + authId);
reauthTimes.remove(authId);
MqttClient client = mqttClients.remove(authId);
if (client == null) {
return;
}
Set<String> removeSet = mqttClients.entrySet().stream()
.filter(entry -> entry.getValue() == client).map(Entry::getKey).collect(Collectors.toSet());
removeSet.forEach(mqttClients::remove);
try {
client.disconnect();
client.close();
} catch (Exception e) {
throw new RuntimeException("While trying to reconnect mqtt client", e);
synchronized (mqttClients) {
MqttClient client = mqttClients.remove(authId);
if (client == null) {
return;
}
Set<String> removeSet = mqttClients.entrySet().stream()
.filter(entry -> entry.getValue() == client).map(Entry::getKey)
.collect(Collectors.toSet());
removeSet.forEach(mqttClients::remove);
try {
client.disconnect();
client.close();
} catch (Exception e) {
throw new RuntimeException("While trying to reconnect mqtt client", e);
}
}
}

private MqttClient getConnectedClient(String deviceId) {
try {
if (isProxyDevice(deviceId)) {
return mqttClients.computeIfAbsent(deviceId, this::newBoundClient);
synchronized (mqttClients) {
if (isProxyDevice(deviceId)) {
return mqttClients.computeIfAbsent(deviceId, this::newBoundClient);
}
return mqttClients.computeIfAbsent(deviceId, this::connectMqttClient);
}
return mqttClients.computeIfAbsent(deviceId, this::connectMqttClient);
} catch (Exception e) {
throw new RuntimeException("While getting mqtt client " + deviceId + ": " + e, e);
}
Expand Down
22 changes: 14 additions & 8 deletions pubber/src/main/java/daq/pubber/Pubber.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class Pubber {
ExtraPointsetEvent.class, "events/pointset",
DiscoveryEvent.class, "events/discovery"
);
private static final int MESSAGE_REPORT_INTERVAL = 100;
private static final int MESSAGE_REPORT_INTERVAL = 10;
private static final Map<Level, Consumer<String>> LOG_MAP = ImmutableMap.of(
Level.TRACE, LOG::info, // TODO: Make debug/trace programmatically visible.
Level.DEBUG, LOG::info,
Expand Down Expand Up @@ -436,21 +436,21 @@ private CloudIotConfig makeCloudIotConfig(Attributes attributes) {

private synchronized void maybeRestartExecutor(int intervalMs) {
if (scheduledFuture == null || intervalMs != messageDelayMs.get()) {
cancelExecutor();
cancelPeriodicSend();
messageDelayMs.set(intervalMs);
startExecutor();
startPeriodicSend();
}
}

private synchronized void startExecutor() {
private synchronized void startPeriodicSend() {
Preconditions.checkState(scheduledFuture == null);
int delay = messageDelayMs.get();
info("Starting executor with send message delay " + delay);
scheduledFuture = executor
.scheduleAtFixedRate(this::sendMessages, delay, delay, TimeUnit.MILLISECONDS);
}

private synchronized void cancelExecutor() {
private synchronized void cancelPeriodicSend() {
if (scheduledFuture != null) {
try {
scheduledFuture.cancel(false);
Expand Down Expand Up @@ -496,8 +496,11 @@ private void updateState(AbstractPoint point) {
private void terminate() {
try {
info("Terminating");
mqttPublisher.close();
cancelExecutor();
if (mqttPublisher != null) {
mqttPublisher.close();
mqttPublisher = null;
}
cancelPeriodicSend();
executor.shutdown();
} catch (Exception e) {
info("Error terminating: " + e.getMessage());
Expand All @@ -509,7 +512,7 @@ private void startConnection(Consumer<String> onDone) throws InterruptedExceptio
connect();
boolean result = configLatch.await(CONFIG_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
info("synchronized start config result " + result);
if (!result) {
if (!result && mqttPublisher != null) {
mqttPublisher.close();
}
}
Expand Down Expand Up @@ -1050,6 +1053,9 @@ private void cloudLog(String message, Level level) {
try {
publishingLog = true;
pubberLogMessage(message, level, timestamp);
} catch (Exception e) {
mqttPublisher = null;
localLog("Error publishing log message: " + e, Level.ERROR, timestamp);
} finally {
publishingLog = false;
}
Expand Down

0 comments on commit 68b3278

Please sign in to comment.