Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup shutdown/startup process #342

Merged
merged 1 commit into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubber/.idea/runConfigurations/Pubber.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 22 additions & 21 deletions pubber/src/main/java/daq/pubber/MqttPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public class MqttPublisher {
void publish(String deviceId, String topic, Object data, Runnable callback) {
Preconditions.checkNotNull(deviceId, "publish deviceId");
if (publisherExecutor.isShutdown()) {
warn("Publisher is shutdown, dropping message");
return;
throw new RuntimeException("Publisher shutdown.");
}
debug("Publishing in background " + topic);
publisherExecutor.submit(() -> publishCore(deviceId, topic, data, callback));
Expand Down Expand Up @@ -150,12 +149,9 @@ void close() {
try {
warn("Closing publisher connection");
publisherExecutor.shutdown();
if (!publisherExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Could not terminate executor");
}
mqttClients.keySet().forEach(this::closeMqttClient);
} catch (Exception e) {
throw new RuntimeException("While closing publisher", e);
error("While closing publisher", null, "close", e);
}
}

Expand Down Expand Up @@ -381,27 +377,32 @@ private void checkAuthentication(String deviceId) {
}
warn("Authentication retry time reached for " + authId);
reauthTimes.remove(authId);
MqttClient client = mqttClients.remove(authId);
if (client == null) {
return;
}
Set<String> removeSet = mqttClients.entrySet().stream()
.filter(entry -> entry.getValue() == client).map(Entry::getKey).collect(Collectors.toSet());
removeSet.forEach(mqttClients::remove);
try {
client.disconnect();
client.close();
} catch (Exception e) {
throw new RuntimeException("While trying to reconnect mqtt client", e);
synchronized (mqttClients) {
MqttClient client = mqttClients.remove(authId);
if (client == null) {
return;
}
Set<String> removeSet = mqttClients.entrySet().stream()
.filter(entry -> entry.getValue() == client).map(Entry::getKey)
.collect(Collectors.toSet());
removeSet.forEach(mqttClients::remove);
try {
client.disconnect();
client.close();
} catch (Exception e) {
throw new RuntimeException("While trying to reconnect mqtt client", e);
}
}
}

private MqttClient getConnectedClient(String deviceId) {
try {
if (isProxyDevice(deviceId)) {
return mqttClients.computeIfAbsent(deviceId, this::newBoundClient);
synchronized (mqttClients) {
if (isProxyDevice(deviceId)) {
return mqttClients.computeIfAbsent(deviceId, this::newBoundClient);
}
return mqttClients.computeIfAbsent(deviceId, this::connectMqttClient);
}
return mqttClients.computeIfAbsent(deviceId, this::connectMqttClient);
} catch (Exception e) {
throw new RuntimeException("While getting mqtt client " + deviceId + ": " + e, e);
}
Expand Down
22 changes: 14 additions & 8 deletions pubber/src/main/java/daq/pubber/Pubber.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class Pubber {
ExtraPointsetEvent.class, "events/pointset",
DiscoveryEvent.class, "events/discovery"
);
private static final int MESSAGE_REPORT_INTERVAL = 100;
private static final int MESSAGE_REPORT_INTERVAL = 10;
private static final Map<Level, Consumer<String>> LOG_MAP = ImmutableMap.of(
Level.TRACE, LOG::info, // TODO: Make debug/trace programmatically visible.
Level.DEBUG, LOG::info,
Expand Down Expand Up @@ -436,21 +436,21 @@ private CloudIotConfig makeCloudIotConfig(Attributes attributes) {

private synchronized void maybeRestartExecutor(int intervalMs) {
if (scheduledFuture == null || intervalMs != messageDelayMs.get()) {
cancelExecutor();
cancelPeriodicSend();
messageDelayMs.set(intervalMs);
startExecutor();
startPeriodicSend();
}
}

private synchronized void startExecutor() {
private synchronized void startPeriodicSend() {
Preconditions.checkState(scheduledFuture == null);
int delay = messageDelayMs.get();
info("Starting executor with send message delay " + delay);
scheduledFuture = executor
.scheduleAtFixedRate(this::sendMessages, delay, delay, TimeUnit.MILLISECONDS);
}

private synchronized void cancelExecutor() {
private synchronized void cancelPeriodicSend() {
if (scheduledFuture != null) {
try {
scheduledFuture.cancel(false);
Expand Down Expand Up @@ -496,8 +496,11 @@ private void updateState(AbstractPoint point) {
private void terminate() {
try {
info("Terminating");
mqttPublisher.close();
cancelExecutor();
if (mqttPublisher != null) {
mqttPublisher.close();
mqttPublisher = null;
}
cancelPeriodicSend();
executor.shutdown();
} catch (Exception e) {
info("Error terminating: " + e.getMessage());
Expand All @@ -509,7 +512,7 @@ private void startConnection(Consumer<String> onDone) throws InterruptedExceptio
connect();
boolean result = configLatch.await(CONFIG_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
info("synchronized start config result " + result);
if (!result) {
if (!result && mqttPublisher != null) {
mqttPublisher.close();
}
}
Expand Down Expand Up @@ -1050,6 +1053,9 @@ private void cloudLog(String message, Level level) {
try {
publishingLog = true;
pubberLogMessage(message, level, timestamp);
} catch (Exception e) {
mqttPublisher = null;
localLog("Error publishing log message: " + e, Level.ERROR, timestamp);
} finally {
publishingLog = false;
}
Expand Down