Skip to content

Commit

Permalink
Basic Pubber Redirect (#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Jun 27, 2022
1 parent b553a3d commit 645b3fb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
1 change: 1 addition & 0 deletions pubber/src/main/java/daq/pubber/ConfigurationOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class ConfigurationOptions {
public Boolean noHardware;
public String extraPoint;
public String extraField;
public String redirectRegistry;

/**
* Returns a string of enabled options and values.
Expand Down
28 changes: 28 additions & 0 deletions pubber/src/main/java/daq/pubber/Pubber.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ private void sendMessages() {
updatePoints();
sendDeviceMessage();
flushDirtyState();
// Some things can't be done from a on-message callback, so do them here instead.
maybeRedirectEndpoint();
} catch (Exception e) {
error("Fatal error during execution", e);
terminate();
Expand Down Expand Up @@ -547,6 +549,16 @@ private void initialize() {
throw new RuntimeException("While creating out dir " + outDir.getPath(), e);
}

initializeMqtt();
}

private void disconnectMqtt() {
Preconditions.checkState(mqttPublisher != null, "mqttPublisher not defined");
mqttPublisher.close();
mqttPublisher = null;
}

private void initializeMqtt() {
Preconditions.checkNotNull(configuration.deviceId, "configuration deviceId not defined");
if (configuration.sitePath != null && configuration.keyFile != null) {
String keyDevice =
Expand Down Expand Up @@ -692,6 +704,22 @@ private void processConfigUpdate(Config config) {
maybeRestartExecutor(actualInterval);
}

private void maybeRedirectEndpoint() {
String redirectRegistry = configuration.options.redirectRegistry;
if (redirectRegistry == null || redirectRegistry.equals(configuration.registryId)
|| configLatch.getCount() > 0) {
return;
}
try {
disconnectMqtt();
configuration.registryId = redirectRegistry;
initializeMqtt();
startConnection(onDone);
} catch (Exception e) {
throw new RuntimeException("While redirecting connection endpoint", e);
}
}

private void updateDiscoveryConfig(DiscoveryConfig discovery) {
DiscoveryConfig discoveryConfig = discovery == null ? new DiscoveryConfig() : discovery;
if (deviceState.discovery == null) {
Expand Down

0 comments on commit 645b3fb

Please sign in to comment.