Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MockProcessSession implements ProcessSession {
private final MockFlowFileQueue processorQueue;
private final Set<Long> beingProcessed = new HashSet<>();
private final List<MockFlowFile> penalized = new ArrayList<>();
private final Processor processor;

private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
Expand All @@ -77,6 +78,7 @@ public class MockProcessSession implements ProcessSession {
private int removedCount = 0;

public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
this.processor = processor;
this.sharedState = sharedState;
this.processorQueue = sharedState.getFlowFileQueue();
provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
Expand Down Expand Up @@ -650,6 +652,9 @@ public void transfer(final FlowFile flowFile, final Relationship relationship) {
transfer(flowFile);
return;
}
if(!processor.getRelationships().contains(relationship)){
throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
}

validateState(flowFile);
List<MockFlowFile> list = transferMap.get(relationship);
Expand All @@ -668,6 +673,9 @@ public void transfer(final Collection<FlowFile> flowFiles, final Relationship re
transfer(flowFiles);
return;
}
if(!processor.getRelationships().contains(relationship)){
throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
}

for (final FlowFile flowFile : flowFiles) {
validateState(flowFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,27 @@ public void testReadWithoutCloseThrowsExceptionOnCommit() throws IOException {
}
}

@Test
public void testTransferUnknownRelationship() {
final Processor processor = new PoorlyBehavedProcessor();
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor);
FlowFile ff1 = session.createFlowFile("hello, world".getBytes());
final Relationship fakeRel = new Relationship.Builder().name("FAKE").build();
try {
session.transfer(ff1, fakeRel);
Assert.fail("Should have thrown IllegalArgumentException");
} catch (final IllegalArgumentException ie) {

}
try {
session.transfer(Collections.singleton(ff1), fakeRel);
Assert.fail("Should have thrown IllegalArgumentException");
} catch (final IllegalArgumentException ie) {

}

}

protected static class PoorlyBehavedProcessor extends AbstractProcessor {

private static final Relationship REL_FAILURE = new Relationship.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1562,9 +1562,6 @@ private void updateLastQueuedDate(final StandardRepositoryRecord record) {
@Override
public void transfer(final FlowFile flowFile, final Relationship relationship) {
validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
updateLastQueuedDate(record);
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);

Expand All @@ -1575,7 +1572,13 @@ public void transfer(final FlowFile flowFile, final Relationship relationship) {
autoTerminated = true;
} else if (numDestinations == 0 && relationship == Relationship.SELF) {
selfRelationship = true;
} else if (numDestinations == 0) {
// the relationship specified is not known in this session/context
throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
}
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
updateLastQueuedDate(record);

if (autoTerminated) {
removedCount += multiplier;
Expand Down Expand Up @@ -1616,6 +1619,9 @@ public void transfer(final Collection<FlowFile> flowFiles, final Relationship re
autoTerminated = true;
} else if (numDestinations == 0 && relationship == Relationship.SELF) {
selfRelationship = true;
} else if (numDestinations == 0) {
// the relationship specified is not known in this session/context
throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
}

final int multiplier = Math.max(1, numDestinations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class TestStandardProcessSession {

private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();

@After
public void cleanup() {
Expand Down Expand Up @@ -187,11 +188,14 @@ public Set<Connection> answer(final InvocationOnMock invocation) throws Throwabl
final Relationship relationship = (Relationship) arguments[0];
if (relationship == Relationship.SELF) {
return Collections.emptySet();
} else {
} else if (relationship == FAKE_RELATIONSHIP || relationship.equals(FAKE_RELATIONSHIP) ){
return null;
}else {
return new HashSet<>(connList);
}
}
}).when(connectable).getConnections(Mockito.any(Relationship.class));

when(connectable.getConnections()).thenReturn(new HashSet<>(connList));

contentRepo = new MockContentRepository();
Expand Down Expand Up @@ -1250,6 +1254,34 @@ public void testOpenMultipleInputStreamsToFlowFile() throws IOException {
session.commit();
}

@Test
public void testTransferUnknownRelationship() {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
.entryDate(System.currentTimeMillis())
.build();

flowFileQueue.put(flowFileRecord1);

FlowFile ff1 = session.get();
ff1 = session.putAttribute(ff1, "index", "1");

try {
session.transfer(ff1, FAKE_RELATIONSHIP);
Assert.fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException iae) {
}

try {
final Collection<FlowFile> collection = new HashSet<>();
collection.add(ff1);
session.transfer(collection, FAKE_RELATIONSHIP);
Assert.fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException iae) {
}
}

private static class MockFlowFileRepository implements FlowFileRepository {
private boolean failOnUpdate = false;
private final AtomicLong idGenerator = new AtomicLong(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,34 @@
public class InvokeScriptedProcessor extends AbstractScriptProcessor {

private final AtomicReference<Processor> processor = new AtomicReference<>();
private final AtomicReference<Collection<ValidationResult>> validationResults =
new AtomicReference<>((Collection<ValidationResult>) new ArrayList<ValidationResult>());
private final AtomicReference<Collection<ValidationResult>> validationResults
= new AtomicReference<>((Collection<ValidationResult>) new ArrayList<ValidationResult>());

private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);

private ScriptEngine scriptEngine = null;

/**
* Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script
* processor has defined additional relationships, those will be added as well.
* Returns the valid relationships for this processor. The supported
* relationships are first determined by checking what the script supports
* and if it does not return any relationships then a default of
* SUCCESS and FAILURE is provided.
*
* @return a Set of Relationships supported by this processor
*/
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
final Processor instance = processor.get();
boolean useDefaultRelationships = false;
if (instance != null) {
try {
relationships.addAll(instance.getRelationships());
final Set<Relationship> rels = instance.getRelationships();
if (rels == null || rels.isEmpty()) {
useDefaultRelationships = true;
} else {
relationships.addAll(rels);
}
} catch (final Throwable t) {
final ComponentLog logger = getLogger();
final String message = "Unable to get relationships from scripted Processor: " + t;
Expand All @@ -93,17 +101,23 @@ public Set<Relationship> getRelationships() {
}
}
} else {
useDefaultRelationships = true;
}
if (useDefaultRelationships) {
// Return defaults for now
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);

}
return Collections.unmodifiableSet(relationships);
}

/**
* Returns a list of property descriptors supported by this processor. The list always includes properties such as
* script engine name, script file name, script body name, script arguments, and an external module path. If the
* scripted processor also defines supported properties, those are added to the list as well.
* Returns a list of property descriptors supported by this processor. The
* list always includes properties such as script engine name, script file
* name, script body name, script arguments, and an external module path. If
* the scripted processor also defines supported properties, those are added
* to the list as well.
*
* @return a List of PropertyDescriptor objects supported by this processor
*/
Expand Down Expand Up @@ -140,11 +154,14 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
}

/**
* Returns a PropertyDescriptor for the given name. This is for the user to be able to define their own properties
* which will be available as variables in the script
* Returns a PropertyDescriptor for the given name. This is for the user to
* be able to define their own properties which will be available as
* variables in the script
*
* @param propertyDescriptorName used to lookup if any property descriptors exist for that name
* @return a PropertyDescriptor object corresponding to the specified dynamic property name
* @param propertyDescriptorName used to lookup if any property descriptors
* exist for that name
* @return a PropertyDescriptor object corresponding to the specified
* dynamic property name
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
Expand All @@ -158,8 +175,9 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
}

/**
* Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's
* properties, as well as reloading the script (from file or the "Script Body" property)
* Performs setup operations when the processor is scheduled to run. This
* includes evaluating the processor's properties, as well as reloading the
* script (from file or the "Script Body" property)
*
* @param context the context in which to perform the setup operations
*/
Expand Down Expand Up @@ -195,14 +213,13 @@ public void setup() {
}
}


/**
* Handles changes to this processor's properties. If changes are made to script- or engine-related properties,
* the script will be reloaded.
* Handles changes to this processor's properties. If changes are made to
* script- or engine-related properties, the script will be reloaded.
*
* @param descriptor of the modified property
* @param oldValue non-null property value (previous)
* @param newValue the new property value or if null indicates the property
* @param oldValue non-null property value (previous)
* @param newValue the new property value or if null indicates the property
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
Expand All @@ -214,15 +231,13 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
|| MODULES.equals(descriptor)
|| SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
} else {
if (instance != null) {
// If the script provides a Processor, call its onPropertyModified() method
try {
instance.onPropertyModified(descriptor, oldValue, newValue);
} catch (final Exception e) {
final String message = "Unable to invoke onPropertyModified from script Processor: " + e;
logger.error(message, e);
}
} else if (instance != null) {
// If the script provides a Processor, call its onPropertyModified() method
try {
instance.onPropertyModified(descriptor, oldValue, newValue);
} catch (final Exception e) {
final String message = "Unable to invoke onPropertyModified from script Processor: " + e;
logger.error(message, e);
}
}
}
Expand Down Expand Up @@ -383,12 +398,15 @@ public ControllerServiceLookup getControllerServiceLookup() {
}

/**
* Invokes the validate() routine provided by the script, allowing for custom validation code.
* This method assumes there is a valid Processor defined in the script and it has been loaded
* by the InvokeScriptedProcessor processor
* Invokes the validate() routine provided by the script, allowing for
* custom validation code. This method assumes there is a valid Processor
* defined in the script and it has been loaded by the
* InvokeScriptedProcessor processor
*
* @param context The validation context to be passed into the custom validate method
* @return A collection of ValidationResults returned by the custom validate method
* @param context The validation context to be passed into the custom
* validate method
* @return A collection of ValidationResults returned by the custom validate
* method
*/
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
Expand Down Expand Up @@ -443,17 +461,19 @@ protected Collection<ValidationResult> customValidate(final ValidationContext co
}

/**
* Invokes the onTrigger() method of the scripted processor. If the script failed to reload, the processor yields
* until the script can be reloaded successfully. If the scripted processor's onTrigger() method throws an
* exception, a ProcessException will be thrown. If no processor is defined by the script, an error is logged
* with the system.
* Invokes the onTrigger() method of the scripted processor. If the script
* failed to reload, the processor yields until the script can be reloaded
* successfully. If the scripted processor's onTrigger() method throws an
* exception, a ProcessException will be thrown. If no processor is defined
* by the script, an error is logged with the system.
*
* @param context provides access to convenience methods for obtaining
* property values, delaying the scheduling of the processor, provides
* access to Controller Services, etc.
* @param sessionFactory provides access to a {@link ProcessSessionFactory}, which
* can be used for accessing FlowFiles, etc.
* @throws ProcessException if the scripted processor's onTrigger() method throws an exception
* @param context provides access to convenience methods for obtaining
* property values, delaying the scheduling of the processor, provides
* access to Controller Services, etc.
* @param sessionFactory provides access to a {@link ProcessSessionFactory},
* which can be used for accessing FlowFiles, etc.
* @throws ProcessException if the scripted processor's onTrigger() method
* throws an exception
*/
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exceptio
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
runner.run();

runner.assertAllFlowFilesTransferred("success", 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship("success");
runner.assertAllFlowFilesTransferred("test", 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship("test");
result.get(0).assertAttributeEquals("from-content", "test content");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class GroovyProcessor implements Processor {
}
flowFile = session.putAttribute(flowFile, "from-content", "test content")
// transfer
session.transfer(flowFile, InvokeScriptedProcessor.REL_SUCCESS)
session.transfer(flowFile, REL_TEST)
session.commit()
}

Expand Down