Skip to content

Commit

Permalink
fix(#1956): Check Pulsar URL in consul migration runner
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Sep 22, 2023
1 parent 747e656 commit b3751df
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
Expand Up @@ -19,10 +19,13 @@
package org.apache.streampipes.service.core.migrations.v093;

import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.config.backend.BackendConfigKeys;
import org.apache.streampipes.model.configuration.DefaultMessagingSettings;
import org.apache.streampipes.model.configuration.SpCoreConfiguration;
import org.apache.streampipes.service.core.migrations.Migration;
import org.apache.streampipes.storage.api.ISpCoreConfigurationStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;

public class ConsulConfigMigration implements Migration {

Expand All @@ -34,14 +37,20 @@ public ConsulConfigMigration() {

@Override
public boolean shouldExecute() {
return storage.getAll().size() == 0;
return storage.getAll().isEmpty();
}

@Override
public void executeMigration() {
var currConf = BackendConfig.INSTANCE;
var newConf = new SpCoreConfiguration();

var pulsarUrlExists = SpServiceDiscovery
.getSpConfig("backend")
.exists(BackendConfigKeys.PULSAR_URL);

var pulsarUrl = pulsarUrlExists ? currConf.getPulsarUrl() : new DefaultMessagingSettings().make().getPulsarUrl();

var messagingSettings = currConf.getMessagingSettings();
newConf.setLocalAuthConfig(currConf.getLocalAuthConfig());
newConf.setEmailConfig(currConf.getEmailConfig());
Expand All @@ -55,7 +64,7 @@ public void executeMigration() {
messagingSettings.setNatsPort(currConf.getNatsPort());
messagingSettings.setKafkaHost(currConf.getKafkaHost());
messagingSettings.setKafkaPort(currConf.getKafkaPort());
messagingSettings.setPulsarUrl(currConf.getPulsarUrl());
messagingSettings.setPulsarUrl(pulsarUrl);
messagingSettings.setZookeeperHost(currConf.getZookeeperHost());
messagingSettings.setZookeeperPort(currConf.getZookeeperPort());
newConf.setAssetDir(currConf.getAssetDir());
Expand Down
Expand Up @@ -47,6 +47,8 @@ public interface SpConfig {

String getString(String key);

boolean exists(String key);

<T> T getObject(String key, Class<T> clazz, T defaultValue);

ConfigItem getConfigItem(String key);
Expand Down
Expand Up @@ -135,6 +135,12 @@ public String getString(String key) {
return getConfigItem(key).getValue();
}

@Override
public boolean exists(String key) {
var os = kvClient.getKVValue(addSn(key)).getValue();
return os != null;
}

@Override
public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
var os = kvClient.getKVValue(addSn(key));
Expand All @@ -153,7 +159,6 @@ public <T> T getObject(String key, Class<T> clazz, T defaultValue) {
@Override
public ConfigItem getConfigItem(String key) {
var os = kvClient.getKVValue(addSn(key)).getValue();

return fromJson(os.getDecodedValue());
}

Expand Down

0 comments on commit b3751df

Please sign in to comment.