Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse DataTransferAuthorizable in DataTransferResource #971

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AccessPolicy;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizationRequest;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.AuthorizeAccess;
Expand All @@ -34,8 +35,8 @@
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.User;
import org.apache.nifi.authorization.UserContextKeys;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.DataTransferAuthorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
Expand Down Expand Up @@ -2449,7 +2450,10 @@ public List<ResourceDTO> getResources() {
}

/**
* Ensures the specified user has permission to access the specified port.
* Ensures the specified user has permission to access the specified port. This method does
* not utilize the DataTransferAuthorizable as that will enforce the entire chain is
* authorized for the transfer. This method is only invoked when obtaining the site to site
* details so the entire chain isn't necessary.
*/
private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) {
final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure());
Expand All @@ -2459,9 +2463,24 @@ private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port)
return true;
}

// authorize this port for data transfer
final Authorizable dataTransferAuthorizable = new DataTransferAuthorizable(port);
final AuthorizationResult result = dataTransferAuthorizable.checkAuthorization(authorizer, RequestAction.WRITE, user);
final Map<String, String> userContext;
if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
userContext = new HashMap<>();
userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
} else {
userContext = null;
}

final AuthorizationRequest request = new AuthorizationRequest.Builder()
.resource(ResourceFactory.getDataTransferResource(port.getResource()))
.identity(user.getIdentity())
.anonymous(user.isAnonymous())
.accessAttempt(false)
.action(RequestAction.WRITE)
.userContext(userContext)
.build();

final AuthorizationResult result = authorizer.authorize(request);
return Result.Approved.equals(result.getResult());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,17 @@
*/
package org.apache.nifi.web.api;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_DURATION;
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_SIZE;
import static org.apache.nifi.remote.protocol.HandshakeProperty.REQUEST_EXPIRATION_MILLIS;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;

import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationRequest;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.UserContextKeys;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.DataTransferAuthorizable;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
Expand All @@ -89,10 +49,45 @@
import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_DURATION;
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_SIZE;
import static org.apache.nifi.remote.protocol.HandshakeProperty.REQUEST_EXPIRATION_MILLIS;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;

/**
* RESTful endpoint for managing a SiteToSite connection.
*/
Expand All @@ -113,6 +108,7 @@ public class DataTransferResource extends ApplicationResource {
private static final String PORT_TYPE_OUTPUT = "output-ports";

private Authorizer authorizer;
private NiFiServiceFacade serviceFacade;
private final ResponseCreator responseCreator = new ResponseCreator();
private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
private final HttpRemoteSiteListener transactionManager;
Expand All @@ -128,36 +124,24 @@ public DataTransferResource(final NiFiProperties nifiProperties){
* <p>
* Note: Protected for testing purposes
*/
protected void authorizeDataTransfer(final ResourceType resourceType, final String identifier) {
protected void authorizeDataTransfer(final AuthorizableLookup lookup, final ResourceType resourceType, final String identifier) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();

// ensure the resource type is correct
if (!ResourceType.InputPort.equals(resourceType) && !ResourceType.OutputPort.equals(resourceType)) {
throw new IllegalArgumentException("The resource must be an Input or Output Port.");
}

final Map<String, String> userContext;
if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
userContext = new HashMap<>();
userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
// get the authorizable
final DataTransferAuthorizable authorizable;
if (ResourceType.InputPort.equals(resourceType)) {
authorizable = new DataTransferAuthorizable(lookup.getInputPort(identifier));
} else {
userContext = null;
authorizable = new DataTransferAuthorizable(lookup.getOutputPort(identifier));
}

final Resource resource = ResourceFactory.getComponentResource(resourceType, identifier, identifier);
final AuthorizationRequest request = new AuthorizationRequest.Builder()
.resource(ResourceFactory.getDataTransferResource(resource))
.identity(user.getIdentity())
.anonymous(user.isAnonymous())
.accessAttempt(true)
.action(RequestAction.WRITE)
.userContext(userContext)
.build();

final AuthorizationResult result = authorizer.authorize(request);
if (!Result.Approved.equals(result.getResult())) {
final String message = StringUtils.isNotBlank(result.getExplanation()) ? result.getExplanation() : "Access is denied";
throw new AccessDeniedException(message);
}
// perform the authorization
authorizable.authorize(authorizer, RequestAction.WRITE, user);
}

@POST
Expand Down Expand Up @@ -199,7 +183,9 @@ public Response createPortTransaction(
}

// authorize access
authorizeDataTransfer(PORT_TYPE_INPUT.equals(portType) ? ResourceType.InputPort : ResourceType.OutputPort, portId);
serviceFacade.authorizeAccess(lookup -> {
authorizeDataTransfer(lookup, PORT_TYPE_INPUT.equals(portType) ? ResourceType.InputPort : ResourceType.OutputPort, portId);
});

final ValidateRequestResult validationResult = validateResult(req, portId);
if (validationResult.errResponse != null) {
Expand Down Expand Up @@ -266,7 +252,9 @@ public Response receiveFlowFiles(
InputStream inputStream) {

// authorize access
authorizeDataTransfer(ResourceType.InputPort, portId);
serviceFacade.authorizeAccess(lookup -> {
authorizeDataTransfer(lookup, ResourceType.InputPort, portId);
});

final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
if (validationResult.errResponse != null) {
Expand Down Expand Up @@ -421,7 +409,9 @@ public Response commitOutputPortTransaction(
InputStream inputStream) {

// authorize access
authorizeDataTransfer(ResourceType.OutputPort, portId);
serviceFacade.authorizeAccess(lookup -> {
authorizeDataTransfer(lookup, ResourceType.OutputPort, portId);
});

final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
if (validationResult.errResponse != null) {
Expand Down Expand Up @@ -523,7 +513,9 @@ public Response commitInputPortTransaction(
InputStream inputStream) {

// authorize access
authorizeDataTransfer(ResourceType.InputPort, portId);
serviceFacade.authorizeAccess(lookup -> {
authorizeDataTransfer(lookup, ResourceType.InputPort, portId);
});

final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
if (validationResult.errResponse != null) {
Expand Down Expand Up @@ -634,7 +626,9 @@ public Response transferFlowFiles(
InputStream inputStream) {

// authorize access
authorizeDataTransfer(ResourceType.OutputPort, portId);
serviceFacade.authorizeAccess(lookup -> {
authorizeDataTransfer(lookup, ResourceType.OutputPort, portId);
});

final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
if (validationResult.errResponse != null) {
Expand Down Expand Up @@ -713,7 +707,9 @@ public Response extendInputPortTransactionTTL(
InputStream inputStream) {

// authorize access
authorizeDataTransfer(ResourceType.InputPort, portId);
serviceFacade.authorizeAccess(lookup -> {
authorizeDataTransfer(lookup, ResourceType.InputPort, portId);
});

return extendPortTransactionTTL(PORT_TYPE_INPUT, portId, transactionId, req, res, context, uriInfo, inputStream);
}
Expand Down Expand Up @@ -749,7 +745,9 @@ public Response extendOutputPortTransactionTTL(
InputStream inputStream) {

// authorize access
authorizeDataTransfer(ResourceType.OutputPort, portId);
serviceFacade.authorizeAccess(lookup -> {
authorizeDataTransfer(lookup, ResourceType.OutputPort, portId);
});

return extendPortTransactionTTL(PORT_TYPE_OUTPUT, portId, transactionId, req, res, context, uriInfo, inputStream);
}
Expand Down Expand Up @@ -837,4 +835,7 @@ public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}

public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
package org.apache.nifi.web.api;


import java.net.InetAddress;
import java.net.UnknownHostException;

import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiResponse;
Expand All @@ -32,6 +29,7 @@
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.UserContextKeys;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
Expand All @@ -43,6 +41,7 @@
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.remote.PeerDTO;
Expand All @@ -60,12 +59,15 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import org.apache.nifi.util.NiFiProperties;

/**
* RESTful endpoint for managing a SiteToSite connection.
Expand Down Expand Up @@ -99,12 +101,21 @@ public SiteToSiteResource(final NiFiProperties nifiProperties) {
protected void authorizeSiteToSite() {
final NiFiUser user = NiFiUserUtils.getNiFiUser();

final Map<String, String> userContext;
if (!StringUtils.isBlank(user.getClientAddress())) {
userContext = new HashMap<>();
userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
} else {
userContext = null;
}

final AuthorizationRequest request = new AuthorizationRequest.Builder()
.resource(ResourceFactory.getSiteToSiteResource())
.identity(user.getIdentity())
.anonymous(user.isAnonymous())
.accessAttempt(true)
.action(RequestAction.READ)
.userContext(userContext)
.build();

final AuthorizationResult result = authorizer.authorize(request);
Expand Down Expand Up @@ -155,7 +166,7 @@ public Response getSiteToSiteDetails(@Context HttpServletRequest req) {
if (isEmpty(req.getHeader(HttpHeaders.PROTOCOL_VERSION))) {
// This indicates the client uses older NiFi version,
// which strictly read JSON properties and fail with unknown properties.
// Convert result entity so that old version clients can understance.
// Convert result entity so that old version clients can understand.
logger.debug("Converting result to provide backward compatibility...");
controller.setRemoteSiteHttpListeningPort(null);
}
Expand All @@ -173,7 +184,6 @@ public Response getSiteToSiteDetails(@Context HttpServletRequest req) {
@Path("/peers")
@Consumes(MediaType.WILDCARD)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
// TODO: @PreAuthorize("hasRole('ROLE_NIFI')")
@ApiOperation(
value = "Returns the available Peers and its status of this NiFi",
response = PeersEntity.class,
Expand Down Expand Up @@ -251,6 +261,7 @@ public Response getPeers(@Context HttpServletRequest req) {
}

// setters

public void setServiceFacade(final NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
Expand Down
Loading