Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
*/
public interface ProvenanceEventRecord {

String REMOTE_INPUT_PORT_TYPE = "Remote Input Port";
String REMOTE_OUTPUT_PORT_TYPE = "Remote Output Port";

/**
* @return a unique ID for this Provenance Event. Depending on the
* implementation, the Event ID may be set to -1 until the event has been
Expand Down Expand Up @@ -100,6 +103,14 @@ public interface ProvenanceEventRecord {
*/
String getComponentType();

/**
* @return whether this event originated from a remote group port
*/
default boolean isRemotePortType() {
final String componentType = getComponentType();
return REMOTE_INPUT_PORT_TYPE.equals(componentType) || REMOTE_OUTPUT_PORT_TYPE.equals(componentType);
}

/**
* @return a URI that provides information about the System and Protocol
* information over which the transfer occurred. The intent of this field is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ public interface ProvenanceAuthorizableFactory {
* @return the Authorizable that can be use to authorize access to provenance events
* @throws ResourceNotFoundException if no component can be found with the given ID
*/
Authorizable createDataAuthorizable(String componentId);
Authorizable createLocalDataAuthorizable(String componentId);

/**
* Generates an Authorizable object for the Data of the remote group port with the given ID.
*
* @param remoteGroupPortId the ID of the remote group port to which the data belongs
* @return the Authorizable that can be used to authorize access to provenance events
* @throws ResourceNotFoundException if no component can be found with the given ID
*/
Authorizable createRemoteDataAuthorizable(String remoteGroupPortId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
*/
package org.apache.nifi.connectable;

import java.util.Collection;
import java.util.List;
import java.util.Set;

import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;

import java.util.Collection;
import java.util.List;
import java.util.Set;

public interface Connection extends Authorizable {

void enqueue(FlowFileRecord flowFile);
Expand All @@ -35,6 +35,8 @@ public interface Connection extends Authorizable {

Connectable getDestination();

Authorizable getDestinationAuthorizable();

Collection<Relationship> getRelationships();

FlowFileQueue getFlowFileQueue();
Expand All @@ -59,6 +61,8 @@ public interface Connection extends Authorizable {

Connectable getSource();

Authorizable getSourceAuthorizable();

void setRelationships(Collection<Relationship> newRelationships);

void setDestination(final Connectable newDestination);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.remote.RemoteGroupPort;

import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -731,9 +732,16 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
* @param identifier of connectable
* @return the Connectable with the given ID, if it exists; otherwise
* returns null. This performs a recursive search of all ProcessGroups'
* input ports, output ports, funnels, processors, and remote process groups
* input ports, output ports, funnels, processors
*/
Connectable findConnectable(String identifier);
Connectable findLocalConnectable(String identifier);

/**
* @param identifier of remote group port
* @return the RemoteGroupPort with the given ID, if it exists; otherwise
* returns null.
*/
RemoteGroupPort findRemoteGroupPort(String identifier);

/**
* @return a Set of all {@link org.apache.nifi.connectable.Positionable}s contained within this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@
*/
package org.apache.nifi.connectable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
Expand All @@ -51,6 +40,19 @@
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.remote.RemoteGroupPort;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
* Models a connection between connectable components. A connection may contain
Expand Down Expand Up @@ -137,20 +139,50 @@ public String getName() {
};
}

@Override
public Authorizable getSourceAuthorizable() {
final Connectable sourceConnectable = getSource();
final Authorizable sourceAuthorizable;

// if the source is a remote group port, authorize according to the RPG
if (sourceConnectable instanceof RemoteGroupPort) {
sourceAuthorizable = ((RemoteGroupPort) sourceConnectable).getRemoteProcessGroup();
} else {
sourceAuthorizable = sourceConnectable;
}

return sourceAuthorizable;
}

@Override
public Authorizable getDestinationAuthorizable() {
final Connectable destinationConnectable = getDestination();
final Authorizable destinationAuthorizable;

// if the destination is a remote group port, authorize according to the RPG
if (destinationConnectable instanceof RemoteGroupPort) {
destinationAuthorizable = ((RemoteGroupPort) destinationConnectable).getRemoteProcessGroup();
} else {
destinationAuthorizable = destinationConnectable;
}

return destinationAuthorizable;
}

@Override
public AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) {
if (user == null) {
return AuthorizationResult.denied("Unknown user");
}

// check the source
final AuthorizationResult sourceResult = getSource().checkAuthorization(authorizer, action, user, resourceContext);
final AuthorizationResult sourceResult = getSourceAuthorizable().checkAuthorization(authorizer, action, user, resourceContext);
if (Result.Denied.equals(sourceResult.getResult())) {
return sourceResult;
}

// check the destination
return getDestination().checkAuthorization(authorizer, action, user, resourceContext);
return getDestinationAuthorizable().checkAuthorization(authorizer, action, user, resourceContext);
}

@Override
Expand All @@ -159,8 +191,8 @@ public void authorize(Authorizer authorizer, RequestAction action, NiFiUser user
throw new AccessDeniedException("Unknown user");
}

getSource().authorize(authorizer, action, user, resourceContext);
getDestination().authorize(authorizer, action, user, resourceContext);
getSourceAuthorizable().authorize(authorizer, action, user, resourceContext);
getDestinationAuthorizable().authorize(authorizer, action, user, resourceContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4010,7 +4010,7 @@ public List<ProvenanceEventRecord> getProvenanceEvents(final long firstEventId,
}

@Override
public Authorizable createDataAuthorizable(final String componentId) {
public Authorizable createLocalDataAuthorizable(final String componentId) {
final String rootGroupId = getRootGroupId();

// Provenance Events are generated only by connectable components, with the exception of DOWNLOAD events,
Expand All @@ -4022,7 +4022,7 @@ public Authorizable createDataAuthorizable(final String componentId) {
authorizable = new DataAuthorizable(getRootGroup());
} else {
// check if the component is a connectable, this should be the case most often
final Connectable connectable = getRootGroup().findConnectable(componentId);
final Connectable connectable = getRootGroup().findLocalConnectable(componentId);
if (connectable == null) {
// if the component id is not a connectable then consider a connection
final Connection connection = getRootGroup().findConnection(componentId);
Expand All @@ -4041,6 +4041,21 @@ public Authorizable createDataAuthorizable(final String componentId) {
return authorizable;
}

@Override
public Authorizable createRemoteDataAuthorizable(String remoteGroupPortId) {
final DataAuthorizable authorizable;

final RemoteGroupPort remoteGroupPort = getRootGroup().findRemoteGroupPort(remoteGroupPortId);
if (remoteGroupPort == null) {
throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bit of copy paste error with this Exception message. My understanding is that this method is not limited to an "event".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the refactoring it is just events. However, there is still one spot that is unused that can now be removed. I'll update.

} else {
// authorizable for remote group ports should be the remote process group
authorizable = new DataAuthorizable(remoteGroupPort.getRemoteProcessGroup());
}

return authorizable;
}

@Override
public List<Action> getFlowChanges(final int firstActionId, final int maxActions) {
final History history = auditService.getActions(firstActionId, maxActions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package org.apache.nifi.controller.reporting;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
Expand All @@ -43,6 +38,11 @@
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class StandardReportingContext implements ReportingContext, ControllerServiceLookup {

private final FlowController flowController;
Expand Down Expand Up @@ -95,7 +95,7 @@ public Bulletin createBulletin(final String category, final Severity severity, f
@Override
public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
final Connectable connectable = rootGroup.findConnectable(componentId);
final Connectable connectable = rootGroup.findLocalConnectable(componentId);
if (connectable == null) {
throw new IllegalStateException("Cannot create Component-Level Bulletin because no component can be found with ID " + componentId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,6 @@
*/
package org.apache.nifi.controller.repository;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.io.IOUtils;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
Expand Down Expand Up @@ -79,6 +54,31 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* <p>
* Provides a ProcessSession that ensures all accesses, changes and transfers
Expand Down Expand Up @@ -164,10 +164,10 @@ public StandardProcessSession(final ProcessContext context) {
componentType = "Output Port";
break;
case REMOTE_INPUT_PORT:
componentType = "Remote Input Port";
componentType = ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE;
break;
case REMOTE_OUTPUT_PORT:
componentType = "Remote Output Port";
componentType = ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE;
break;
case FUNNEL:
componentType = "Funnel";
Expand Down
Loading