Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,9 @@ To enable authentication via OpenId Connect the following properties must be con
|`nifi.security.user.oidc.read.timeout` | Read timeout when communicating with the OpenId Connect Provider.
|`nifi.security.user.oidc.client.id` | The client id for NiFi after registration with the OpenId Connect Provider.
|`nifi.security.user.oidc.client.secret` | The client secret for NiFi after registration with the OpenId Connect Provider.
|`nifi.security.user.oidc.preferred.jwsalgorithm` | The preferred algorithm for for validating identity tokens. If this value is blank, it will default to `RS256` which is required to be supported
|`nifi.security.user.oidc.preferred.jwsalgorithm` | The preferred algorithm for validating identity tokens. If this value is blank, it will default to `RS256` which is required to be supported
|`nifi.security.user.oidc.additional.scopes` | Comma separated scopes that are sent to OpenId Connect Provider in addition to `openid` and `email`.
|`nifi.security.user.oidc.claim.identifying.user` | Claim that identifies the user to be logged in; default is `email`. May need to be requested via the `nifi.security.user.oidc.additional.scopes` before usage.
by the OpenId Connect Provider according to the specification. If this value is `HS256`, `HS384`, or `HS512`, NiFi will attempt to validate HMAC protected tokens using the specified client secret.
|`nifi.security.user.oidc.claim.identifying.user` | Claim that identifies the user to be logged in; default is `email`. May need to be requested via the `nifi.security.user.oidc.additional.scopes` before usage by the OpenId Connect Provider according to the specification. If this value is `HS256`, `HS384`, or `HS512`, NiFi will attempt to validate HMAC protected tokens using the specified client secret.
If this value is `none`, NiFi will attempt to validate unsecured/plain tokens. Other values for this algorithm will attempt to parse as an RSA or EC algorithm to be used in conjunction with the
JSON Web Key (JWK) provided through the jwks_uri in the metadata found at the discovery URL.
|==================================================================================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public interface ClusterCoordinator {
* Removes the given disconnected node from the cluster
*
* @param nodeId the node to remove
* @param userDn the DN of the user requesting that the node be removed
* @param event the reason for the node to be removed
*/
void removeNode(NodeIdentifier nodeId, String userDn);
void removeNode(NodeIdentifier nodeId, String event);

/**
* Returns the current status of the node with the given identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -574,8 +575,8 @@ public void disconnectionRequestedByNode(final NodeIdentifier nodeId, final Disc
}

@Override
public void removeNode(final NodeIdentifier nodeId, final String userDn) {
reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster");
public void removeNode(final NodeIdentifier nodeId, final String event) {
reportEvent(nodeId, Severity.INFO, event);
notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED));
removeNode(nodeId);

Expand Down Expand Up @@ -1157,6 +1158,10 @@ private ConnectionResponseMessage handleConnectionRequest(final ConnectionReques
// Resolve Node identifier.
registerNodeId(nodeIdentifier);

// Remove obsolete identifier. In case the node's identifier has changed, after restart the obsolete version remains in the
// local storage in DISCONNECTED state. This prevents cluster members from properly syncing. The obsolete nodes must be removed.
removeObsoleteNodeVersion(nodeIdentifier);

if (isBlockedByFirewall(nodeIdentities)) {
// if the socket address is not listed in the firewall, then return a null response
logger.info("Firewall blocked connection request from node " + nodeIdentifier + " with Node Identities " + nodeIdentities);
Expand All @@ -1181,6 +1186,20 @@ private ConnectionResponseMessage handleConnectionRequest(final ConnectionReques
return createConnectionResponse(requestWithNodeIdentities, nodeIdentifier);
}

private void removeObsoleteNodeVersion(final NodeIdentifier resolvedNodeId) {
// The identification of the change is depending on the full description. Equation of the class id defined by using ID.
final Optional<NodeIdentifier> candidate = nodeStatuses.values()
.stream()
.map(s -> s.getNodeIdentifier())
.filter(i -> resolvedNodeId.getId().equals(i.getId()) && !resolvedNodeId.getFullDescription().equals(i.getFullDescription()))
.findFirst();

if (candidate.isPresent()) {
logger.warn("Node {} is removed due to it is an obsolete version of a joining node", candidate.get().getFullDescription());
removeNode(candidate.get());
}
}

private ConnectionResponseMessage createFlowElectionInProgressResponse() {
final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
final String statusDescription = flowElection.getStatusDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -111,6 +112,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {

private static final String EVENT_CATEGORY = "Controller";
private static final String CLUSTER_NODE_CONFIG = "Cluster Node Configuration";
private static final String MESSAGE_CLEANUP_NODE = "Cleaning up node was not part of the connection response and disconnected!";

// state keys
private static final String NODE_UUID = "Node UUID";
Expand Down Expand Up @@ -967,8 +969,22 @@ private void loadFromConnectionResponse(final ConnectionResponse response) throw
writeLock.lock();
try {
if (response.getNodeConnectionStatuses() != null) {
clusterCoordinator.resetNodeStatuses(response.getNodeConnectionStatuses().stream()
.collect(Collectors.toMap(NodeConnectionStatus::getNodeIdentifier, status -> status)));
final Set<NodeIdentifier> localNodes = clusterCoordinator.getNodeIdentifiers();
final Map<NodeIdentifier, NodeConnectionStatus> responseNodeStatuses = response
.getNodeConnectionStatuses()
.stream()
.collect(Collectors.toMap(NodeConnectionStatus::getNodeIdentifier, status -> status));

for (final NodeIdentifier localNode : localNodes) {
final Optional<NodeIdentifier> candidate = responseNodeStatuses.keySet().stream().filter(i -> i.getId().equals(localNode.getId())).findFirst();

if (candidate.isPresent() && !candidate.get().getFullDescription().equals(localNode.getFullDescription())) {
logger.warn(MESSAGE_CLEANUP_NODE + " " + localNode.getFullDescription());
responseNodeStatuses.put(localNode, new NodeConnectionStatus(localNode, NodeConnectionState.REMOVED));
}
}

clusterCoordinator.resetNodeStatuses(responseNodeStatuses);
}

// get the dataflow from the response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5485,7 +5485,7 @@ public void deleteNode(final String nodeId) {
" because it is not disconnected or offloaded, current state = " + nodeConnectionStatus.getState());
}

clusterCoordinator.removeNode(nodeIdentifier, userDn);
clusterCoordinator.removeNode(nodeIdentifier, "User " + userDn + " requested that node be removed from cluster");
heartbeatMonitor.removeHeartbeat(nodeIdentifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import com.nimbusds.oauth2.sdk.AuthorizationGrant;
import com.nimbusds.oauth2.sdk.Scope;
import com.nimbusds.oauth2.sdk.id.ClientID;

import java.io.IOException;
import java.net.URI;

public interface OidcIdentityProvider {

String OPEN_ID_CONNECT_SUPPORT_IS_NOT_CONFIGURED = "OpenId Connect support is not configured";

/**
* Initializes the provider.
*/
void initializeProvider();

/**
* Returns whether OIDC support is enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.nimbusds.oauth2.sdk.AuthorizationGrant;
import com.nimbusds.oauth2.sdk.Scope;
import com.nimbusds.oauth2.sdk.id.State;
import org.apache.nifi.web.security.util.CacheKey;

import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
Expand All @@ -31,6 +29,7 @@
import java.security.SecureRandom;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.web.security.util.CacheKey;

import static org.apache.nifi.web.security.oidc.StandardOidcIdentityProvider.OPEN_ID_CONNECT_SUPPORT_IS_NOT_CONFIGURED;

Expand Down Expand Up @@ -66,6 +65,7 @@ public OidcService(final OidcIdentityProvider identityProvider, final int durati
throw new RuntimeException("The OidcIdentityProvider must be specified.");
}

identityProvider.initializeProvider();
this.identityProvider = identityProvider;
this.stateLookupForPendingRequests = CacheBuilder.newBuilder().expireAfterWrite(duration, units).build();
this.jwtLookupForCompletedRequests = CacheBuilder.newBuilder().expireAfterWrite(duration, units).build();
Expand Down Expand Up @@ -198,7 +198,7 @@ public void exchangeAuthorizationCode(final String oidcRequestIdentifier, final
}

final CacheKey oidcRequestIdentifierKey = new CacheKey(oidcRequestIdentifier);
final String nifiJwt = identityProvider.exchangeAuthorizationCode(authorizationGrant);
final String nifiJwt = retrieveNifiJwt(authorizationGrant);

try {
// cache the jwt for later retrieval
Expand All @@ -213,6 +213,17 @@ public void exchangeAuthorizationCode(final String oidcRequestIdentifier, final
}
}

/**
* Exchange the authorization code to retrieve a NiFi JWT.
*
* @param authorizationGrant authorization grant
* @return NiFi JWT
* @throws IOException exceptional case for communication error with the OpenId Connect provider
*/
public String retrieveNifiJwt(final AuthorizationGrant authorizationGrant) throws IOException {
return identityProvider.exchangeAuthorizationCode(authorizationGrant);
}

/**
* Returns the resulting JWT for the given request identifier. Will return null if the request
* identifier is not associated with a JWT or if the login sequence was not completed before
Expand Down
Loading