diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/ConsulConfigMigration.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/ConsulConfigMigration.java index 899c9a5caf..5da0d0bef7 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/ConsulConfigMigration.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/ConsulConfigMigration.java @@ -19,10 +19,13 @@ package org.apache.streampipes.service.core.migrations.v093; import org.apache.streampipes.config.backend.BackendConfig; +import org.apache.streampipes.config.backend.BackendConfigKeys; +import org.apache.streampipes.model.configuration.DefaultMessagingSettings; import org.apache.streampipes.model.configuration.SpCoreConfiguration; import org.apache.streampipes.service.core.migrations.Migration; import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage; import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; public class ConsulConfigMigration implements Migration { @@ -34,7 +37,7 @@ public ConsulConfigMigration() { @Override public boolean shouldExecute() { - return storage.getAll().size() == 0; + return storage.getAll().isEmpty(); } @Override @@ -42,6 +45,12 @@ public void executeMigration() { var currConf = BackendConfig.INSTANCE; var newConf = new SpCoreConfiguration(); + var pulsarUrlExists = SpServiceDiscovery + .getSpConfig("backend") + .exists(BackendConfigKeys.PULSAR_URL); + + var pulsarUrl = pulsarUrlExists ? currConf.getPulsarUrl() : new DefaultMessagingSettings().make().getPulsarUrl(); + var messagingSettings = currConf.getMessagingSettings(); newConf.setLocalAuthConfig(currConf.getLocalAuthConfig()); newConf.setEmailConfig(currConf.getEmailConfig()); @@ -55,7 +64,7 @@ public void executeMigration() { messagingSettings.setNatsPort(currConf.getNatsPort()); messagingSettings.setKafkaHost(currConf.getKafkaHost()); messagingSettings.setKafkaPort(currConf.getKafkaPort()); - messagingSettings.setPulsarUrl(currConf.getPulsarUrl()); + messagingSettings.setPulsarUrl(pulsarUrl); messagingSettings.setZookeeperHost(currConf.getZookeeperHost()); messagingSettings.setZookeeperPort(currConf.getZookeeperPort()); newConf.setAssetDir(currConf.getAssetDir()); diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/SpConfig.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/SpConfig.java index e6b0f8fcf4..6ba4a2acd5 100644 --- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/SpConfig.java +++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/SpConfig.java @@ -47,6 +47,8 @@ public interface SpConfig { String getString(String key); + boolean exists(String key); + T getObject(String key, Class clazz, T defaultValue); ConfigItem getConfigItem(String key); diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java index ad67689c8f..48b3a93ed9 100644 --- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java +++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulSpConfig.java @@ -135,6 +135,12 @@ public String getString(String key) { return getConfigItem(key).getValue(); } + @Override + public boolean exists(String key) { + var os = kvClient.getKVValue(addSn(key)).getValue(); + return os != null; + } + @Override public T getObject(String key, Class clazz, T defaultValue) { var os = kvClient.getKVValue(addSn(key)); @@ -153,7 +159,6 @@ public T getObject(String key, Class clazz, T defaultValue) { @Override public ConfigItem getConfigItem(String key) { var os = kvClient.getKVValue(addSn(key)).getValue(); - return fromJson(os.getDecodedValue()); }