Skip to content

Commit

Permalink
NIFI-12590 Add options to add a prefix on Kubernetes resources for Cl…
Browse files Browse the repository at this point in the history
…uster Leader Election and State Management
  • Loading branch information
juldrixx committed Jan 16, 2024
1 parent 31d04c8 commit dd3f3f0
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ public class NiFiProperties extends ApplicationProperties {
public static final String PYTHON_CONTROLLER_DEBUGPY_HOST = "nifi.python.controller.debugpy.host";
public static final String PYTHON_CONTROLLER_DEBUGPY_LOGS_DIR = "nifi.python.controller.debugpy.logs.directory";

// kubernetes properties
public static final String CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX = "nifi.cluster.leader.election.kubernetes.lease.prefix";

public static final String DEFAULT_PYTHON_WORKING_DIRECTORY = "./work/python";

// automatic diagnostic defaults
Expand Down Expand Up @@ -398,6 +401,7 @@ public class NiFiProperties extends ApplicationProperties {
private static final String DEFAULT_SECURITY_USER_JWS_KEY_ROTATION_PERIOD = "PT1H";
public static final String DEFAULT_WEB_SHOULD_SEND_SERVER_VERSION = "true";
public static final int DEFAULT_LISTENER_BOOTSTRAP_PORT = 0;
public static final String DEFAULT_CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX = null;

// cluster common defaults
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
Expand Down
3 changes: 3 additions & 0 deletions nifi-docker/dockerhub/sh/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ prop_replace 'nifi.analytics.connection.model.implementation' "${NIFI_ANALYTIC
prop_replace 'nifi.analytics.connection.model.score.name' "${NIFI_ANALYTICS_MODEL_SCORE_NAME:-rSquared}"
prop_replace 'nifi.analytics.connection.model.score.threshold' "${NIFI_ANALYTICS_MODEL_SCORE_THRESHOLD:-.90}"

# Set kubernetes properties
prop_replace 'nifi.cluster.leader.election.kubernetes.lease.prefix' "${NIFI_CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX:-}"

# Add NAR provider properties
# nifi-registry NAR provider
if [ -n "${NIFI_NAR_LIBRARY_PROVIDER_NIFI_REGISTRY_URL}" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-leader-election-shared</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
import org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommandProvider;
import org.apache.nifi.kubernetes.leader.election.command.StandardLeaderElectionCommandProvider;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,10 +74,13 @@ public class KubernetesLeaderElectionManager extends TrackedLeaderElectionManage

private final LeaderElectionCommandProvider leaderElectionCommandProvider;

private final String roleIdPrefix;

/**
* Kubernetes Leader Election Manager default constructor
* Kubernetes Leader Election Manager constructor with NiFi Properties
*/
public KubernetesLeaderElectionManager() {
public KubernetesLeaderElectionManager(final NiFiProperties nifiProperties) {
this.roleIdPrefix = nifiProperties.getProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX, NiFiProperties.DEFAULT_CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX);
executorService = createExecutorService();
leaderElectionCommandProvider = createLeaderElectionCommandProvider();
}
Expand Down Expand Up @@ -285,7 +289,7 @@ private String getRoleId(final String roleName) {
if (roleId == null) {
throw new IllegalArgumentException(String.format("Role Name [%s] not supported", roleName));
}
return roleId;
return roleIdPrefix == null ? roleId : String.format("%s-%s", roleIdPrefix, roleId);
}

private static class ParticipantRegistration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.nifi.controller.leader.election.LeaderElectionRole;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommandProvider;
import org.apache.nifi.util.NiFiProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -28,6 +29,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
Expand All @@ -49,6 +51,8 @@ class KubernetesLeaderElectionManagerTest {

private static final String PARTICIPANT_ID = "Node-0";

private static final String PREFIX = "mynifi";

@Mock
LeaderElectionStateChangeListener changeListener;

Expand All @@ -64,11 +68,16 @@ class KubernetesLeaderElectionManagerTest {
ManagedLeaderElectionCommandProvider leaderElectionCommandProvider;

KubernetesLeaderElectionManager manager;
KubernetesLeaderElectionManager managerWithProperties;

@BeforeEach
void setManager() {
leaderElectionCommandProvider = new ManagedLeaderElectionCommandProvider();
manager = new MockKubernetesLeaderElectionManager();
manager = new MockKubernetesLeaderElectionManager(new NiFiProperties());

final Properties properties = new Properties();
properties.setProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX, PREFIX);
managerWithProperties = new MockKubernetesLeaderElectionManager(new NiFiProperties(properties));
}

@Test
Expand Down Expand Up @@ -184,6 +193,19 @@ void testUnregisterNotRegistered() {
assertFalse(unregisteredActiveParticipant);
}

@Test
void testRoleIdWithPrefix() {
managerWithProperties.start();

setSubmitStartLeading();

managerWithProperties.register(ROLE, changeListener, PARTICIPANT_ID);

captureRunCommand();

assertEquals(PREFIX + "-" + LEADER_ELECTION_ROLE.getRoleId(), leaderElectionCommandProvider.name);
}

private void setSubmitStartLeading() {
doReturn(future).when(executorService).submit(isA(Runnable.class));
leaderElectionCommandProvider.runStartLeading = true;
Expand Down Expand Up @@ -223,6 +245,10 @@ private void assertNotActiveParticipantNotLeader() {
}

private class MockKubernetesLeaderElectionManager extends KubernetesLeaderElectionManager {
public MockKubernetesLeaderElectionManager(NiFiProperties nifiProperties) {
super(nifiProperties);
}

@Override
protected ExecutorService createExecutorService() {
return executorService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@
<artifactId>kubernetes-server-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-expression-language</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.HttpURLConnection;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -40,26 +41,33 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;

/**
* State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints
*/
public class KubernetesConfigMapStateProvider extends AbstractConfigurableComponent implements StateProvider {
static final PropertyDescriptor CONFIG_MAP_NAME_PREFIX = new PropertyDescriptor.Builder()
.name("ConfigMap Name Prefix")
.description("Optional prefix that the Provider will prepend to Kubernetes ConfigMap names. The resulting ConfigMap name will contain nifi-component and the component identifier.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(false)
.build();

private static final int MAX_UPDATE_ATTEMPTS = 5;
private static final Scope[] SUPPORTED_SCOPES = { Scope.CLUSTER };

private static final Charset KEY_CHARACTER_SET = StandardCharsets.UTF_8;

private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";

private static final Pattern CONFIG_MAP_NAME_PATTERN = Pattern.compile("^nifi-component-(.+)$");
private static final String CONFIG_MAP_CORE_NAME = "nifi-component";

private static final int COMPONENT_ID_GROUP = 1;

Expand All @@ -70,6 +78,10 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon

private final AtomicBoolean enabled = new AtomicBoolean();

private String CONFIG_MAP_NAME_FORMAT;

private Pattern CONFIG_MAP_NAME_PATTERN;

private KubernetesClient kubernetesClient;

private String namespace;
Expand All @@ -78,6 +90,13 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon

private ComponentLog logger;

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONFIG_MAP_NAME_PREFIX);
return properties;
}

/**
* Get configured component identifier
*
Expand All @@ -99,6 +118,12 @@ public void initialize(final StateProviderInitializationContext context) {
this.logger = context.getLogger();
this.kubernetesClient = getKubernetesClient();
this.namespace = new ServiceAccountNamespaceProvider().getNamespace();

String configMapNamePrefix = context.getProperty(CONFIG_MAP_NAME_PREFIX).isSet() ? context.getProperty(CONFIG_MAP_NAME_PREFIX).getValue() : null;
CONFIG_MAP_NAME_FORMAT = configMapNamePrefix != null
? String.format("%s-%s-%%s", configMapNamePrefix, CONFIG_MAP_CORE_NAME) : String.format("%s-%%s", CONFIG_MAP_CORE_NAME);
CONFIG_MAP_NAME_PATTERN = Pattern.compile(configMapNamePrefix != null
? String.format("^%s-%s-(.+)$", configMapNamePrefix, CONFIG_MAP_CORE_NAME) : String.format("^%s-(.+)$", CONFIG_MAP_CORE_NAME));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parameter.ParameterLookup;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -67,6 +69,7 @@ class KubernetesConfigMapStateProviderTest {
private static final String STATE_PROPERTY_ENCODED = "c3RhcnRlZA";

private static final String STATE_VALUE = "now";
private static final String CONFIG_MAP_NAME_PREFIX_VALUE = "mynifi";

@Mock
StateProviderInitializationContext context;
Expand Down Expand Up @@ -293,9 +296,53 @@ void testReplaceConcurrentUpdate() throws IOException {
assertFalse(replaced2);
}

@Test
void testSetStateGetStateWithPrefix() throws IOException {
setContextWithProperties();
provider.initialize(context);

final Map<String, String> state = Collections.singletonMap(STATE_PROPERTY, STATE_VALUE);

provider.setState(state, COMPONENT_ID);

final StateMap stateMap = provider.getState(COMPONENT_ID);

assertNotNull(stateMap);
final Map<String, String> stateRetrieved = stateMap.toMap();
assertEquals(state, stateRetrieved);

assertConfigMapFound();
}

@Test
void testSetStateGetStoredComponentIdsWithPrefix() throws IOException {
setContextWithProperties();
provider.initialize(context);

final Collection<String> initialStoredComponentIds = provider.getStoredComponentIds();
assertTrue(initialStoredComponentIds.isEmpty());

final Map<String, String> state = Collections.singletonMap(STATE_PROPERTY, STATE_VALUE);
provider.setState(state, COMPONENT_ID);

final Collection<String> storedComponentIds = provider.getStoredComponentIds();
final Iterator<String> componentIds = storedComponentIds.iterator();

assertTrue(componentIds.hasNext());
assertEquals(COMPONENT_ID, componentIds.next());
}

private void setContext() {
when(context.getIdentifier()).thenReturn(IDENTIFIER);
when(context.getLogger()).thenReturn(logger);
when(context.getProperty(KubernetesConfigMapStateProvider.CONFIG_MAP_NAME_PREFIX))
.thenReturn(new StandardPropertyValue(null, null, ParameterLookup.EMPTY));
}

private void setContextWithProperties() {
setContext();
when(context.getProperty(KubernetesConfigMapStateProvider.CONFIG_MAP_NAME_PREFIX))
.thenReturn(new StandardPropertyValue(CONFIG_MAP_NAME_PREFIX_VALUE, null, ParameterLookup.EMPTY));
}

private void assertStateEquals(final Map<String, String> expected, final StateMap stateMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@
<nifi.python.max.processes>100</nifi.python.max.processes>
<nifi.python.max.processes.per.extension.type>10</nifi.python.max.processes.per.extension.type>
<nifi.python.logs.dir>./logs</nifi.python.logs.dir>

<!-- nifi.properties: kubernetes properties -->
<nifi.cluster.leader.election.kubernetes.lease.prefix />

<nifi.performance.tracking.percentage>0</nifi.performance.tracking.percentage>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model
nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name}
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}

# kubernetes #
nifi.cluster.leader.election.kubernetes.lease.prefix=${nifi.cluster.leader.election.kubernetes.lease.prefix}

# flow analysis properties
nifi.flow.analysis.background.task.schedule=${nifi.flow.analysis.background.task.schedule}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@
<property name="Access Control">Open</property>
</cluster-provider>

<!-- Kubernetes ConfigMap implementation of State Provider -->
<!--
Kubernetes ConfigMap implementation of State Provider. This Provider has the following optional properties:
ConfigMap Name Prefix - Optional prefix that the Provider will prepend to Kubernetes ConfigMap names. The resulting ConfigMap name will contain nifi-component and the component identifier.
-->
<cluster-provider>
<id>kubernetes-provider</id>
<class>org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider</class>
<property name="ConfigMap Name Prefix"></property>
</cluster-provider>

<!--
Expand Down

0 comments on commit dd3f3f0

Please sign in to comment.