Skip to content

Commit

Permalink
feat(integration/nifi): Various improvements for Nifi integration
Browse files Browse the repository at this point in the history
* Add tag validation

* Major changes

* Rewrite processor to make them more consistent
* Split processor in smaller funcions
* Add timestamp field name
* Allow expression language in connection string

* Update readme

* Add file address access strategy

* Set a better exception attribute name in case of failure

* Update readme

* Move duplicated code to base processor

* Add testing for tag validation on properties and address text

* Add testing for file address access validation

* Clean up

* Add file address strategy test for all processors
  • Loading branch information
QuanticPony committed Oct 2, 2023
1 parent 9369e7d commit 4156cc9
Show file tree
Hide file tree
Showing 24 changed files with 1,196 additions and 492 deletions.
27 changes: 20 additions & 7 deletions plc4j/integrations/apache-nifi/README.md
Expand Up @@ -20,8 +20,9 @@ under the License.

## Common properties
The following properties applies to all Plc4x Processors:
* Connection String: A constant connection string such as `s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200`.
* Read/Write/Subscribe timeout (miliseconds): Specifies the time in milliseconds for the connection to return a timeout. In case of subscription the timeout is used to renew connections.
* Connection String: A constant connection string such as `s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200` or a valid Expression Language ([Expression Language NiFi documentation](https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html)) such as `${plc4x.connection_string}`.
* Timeout (miliseconds): Specifies the time in milliseconds for the connection to return a timeout. Is used to renew connections. Can be set with Expression Language.
* Timestamp field name: It defines the name of the field that represents the time when the response from the Plc was received. It will be added to the attributes or to the record deppending on the processor used.
* Address Access Strategy: defines how the processor obtains the PLC addresses. It can take 2 values:
* **Properties as Addreses:**
For each variable, add a new property to the processor where the property name matches the variable name, and the variable value corresponds to the address tag.
Expand All @@ -48,6 +49,21 @@ The following properties applies to all Plc4x Processors:
}
```
If this JSON is in an attribute `plc4x.addresses` it can be accessed with *Address Text*=`${plc4x.addresses}`.

* **Address File:**
Property *Address File* must be supplied with a path to a file in JSON format that contains variable name and address tag. Expression Language is supported.

For example a file in:
- *Address File*:```/home/nifi/s7addresses.json```
With the following content
```json
{
"var1" : "%DB1:DBX0.0:BOOL",
"var2" : "%DB1:DBX0.1:BOOL"
}
```
If the file name is in an attribute `plc4x.addresses_file` it can be accessed with *Address File*=`${plc4x.addresses_file}`.



When reading from a PLC the response is used to create a mapping between Plc types into Avro. The mapping is done as follows:
Expand Down Expand Up @@ -87,10 +103,6 @@ Table of data mapping between plc data and Avro types (as specified in [Avro spe
Also, it is important to keep in mind the Processor Scheduling Configuration. Using the parameter **Run Schedule** (for example to *1 sec*), the reading frequency can be set. Note that by default, this value is defined to 0 sec (as fast as possible).


## Plc4xSinkProcessor

## Plc4xSourceProcessor

## Plc4xSinkRecordProcessor

This processor is <ins>record oriented</ins>, reads from a formated input flowfile content using a Record Reader (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)).
Expand Down Expand Up @@ -124,6 +136,7 @@ An *example* for reading values from a S7-1200:
- *PLC connection String:* *s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200*
- *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
- *Read timeout (miliseconds):* *10000*
- *Timestamp field name:* *timestamp*
- *var1:* *%DB1:DBX0.0:BOOL*
- *var2:* *%DB1:DBX0.1:BOOL*
- *var3:* *%DB1:DBB01:BYTE*
Expand Down Expand Up @@ -162,6 +175,6 @@ The output flowfile will contain the PLC read values. This information is includ
"var3" : "\u0005",
"var5" : 1992,
"var4" : "4",
"ts" : 1628783058433
"timestamp" : 1628783058433
} ]
```

Large diffs are not rendered by default.

Expand Up @@ -30,7 +30,6 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.nifi.annotation.behavior.InputRequirement;
Expand Down Expand Up @@ -85,6 +84,7 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
protected Plc4xListenerDispatcher dispatcher;
protected RecordSchema recordSchema;
protected Thread readerThread;
protected Map<String, String> addressMap;
final StopWatch executeTime = new StopWatch(false);

public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -112,21 +112,7 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
.dependsOn(PLC_SUBSCRIPTION_TYPE, Plc4xSubscriptionType.CYCLIC.name())
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.addValidator(new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if (context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).asLong() > Long.valueOf(input)) {
return new ValidationResult.Builder().valid(true).build();
} else {
return new ValidationResult.Builder()
.valid(false)
.input(input)
.subject(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL.getDisplayName())
.explanation(String.format("it must me smaller than the value of %s", PLC_FUTURE_TIMEOUT_MILISECONDS.getDisplayName()))
.build();
}
}
})
.addValidator(new CyclycPollingIntervalValidator())
.defaultValue("10000")
.build();

Expand All @@ -151,20 +137,19 @@ public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
subscriptionType = Plc4xSubscriptionType.valueOf(context.getProperty(PLC_SUBSCRIPTION_TYPE).getValue());
cyclingPollingInterval = context.getProperty(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL).asLong();
addressMap = getPlcAddressMap(context, null);

createDispatcher(events);
createDispatcher(context, events);
}

protected void createDispatcher(final BlockingQueue<PlcSubscriptionEvent> events) {
protected void createDispatcher(final ProcessContext context, final BlockingQueue<PlcSubscriptionEvent> events) {
if (readerThread != null) {
return;
}

// create the dispatcher and calls open() to start listening to the plc subscription
dispatcher = new Plc4xListenerDispatcher(timeout, subscriptionType, cyclingPollingInterval, getLogger(), events);
dispatcher = new Plc4xListenerDispatcher(getTimeout(context, null), subscriptionType, cyclingPollingInterval, getLogger(), events);
try {
dispatcher.open(getConnectionString(), addressMap);
addressMap = getPlcAddressMap(context, null);
dispatcher.open(getConnectionString(context, null), addressMap);
} catch (Exception e) {
if (debugEnabled) {
getLogger().debug("Error creating a the subscription event dispatcher");
Expand Down Expand Up @@ -198,7 +183,7 @@ public void closeDispatcher() throws ProcessException {
}
}

protected PlcSubscriptionEvent getMessage() {
protected PlcSubscriptionEvent getMessage(final ProcessContext context) {
if (readerThread != null && readerThread.isAlive()) {
return events.poll();

Expand All @@ -208,14 +193,14 @@ protected PlcSubscriptionEvent getMessage() {
getLogger().debug("Connection to Plc broke. Trying to restart connection");
}
closeDispatcher();
createDispatcher(events);
createDispatcher(context, events);
throw new ProcessException("Connection to Plc broke. Trying to restart connection");
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

DefaultPlcSubscriptionEvent event = (DefaultPlcSubscriptionEvent) getMessage();
DefaultPlcSubscriptionEvent event = (DefaultPlcSubscriptionEvent) getMessage(context);

if (event == null) {
return;
Expand All @@ -226,70 +211,25 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

final AtomicLong nrOfRows = new AtomicLong(0L);

FlowFile resultSetFF;
resultSetFF = session.create();
FlowFile resultSetFF = session.create();

Plc4xWriter plc4xWriter = new RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class), Collections.emptyMap());

try {
resultSetFF = session.write(resultSetFF, out -> {
session.write(resultSetFF, out -> {
try {
nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), null, recordSchema));
nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), null, recordSchema, getTimestampField(context)));
} catch (Exception e) {
getLogger().error("Exception reading the data from PLC", e);
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}

if (recordSchema == null){
if (debugEnabled)
getLogger().debug("Adding Plc-Avro schema and PlcTypes resolution into cache with key: " + addressMap.toString());

// Add schema to the cache
LinkedHashSet<String> addressNames = new LinkedHashSet<String>();
addressNames.addAll(event.getTagNames());

List<PlcTag> addressTags = addressNames.stream().map(
new Function<String,PlcTag>() {
@Override
public PlcTag apply(String addr) {
return new PlcTag() {
@Override
public String getAddressString() {
return addr;
}

@Override
public PlcValueType getPlcValueType() {
return event.getPlcValue(addr).getPlcValueType();
}
};
}
}).collect(Collectors.toList());

getSchemaCache().addSchema(
addressMap,
addressNames,
addressTags,
plc4xWriter.getRecordSchema()
);
recordSchema = getSchemaCache().retrieveSchema(addressMap);
addTagsToCache(event, plc4xWriter);
}
});
long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
executeTime.stop();

final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_LAST_EVENT, String.valueOf(executionTimeElapsed));

attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
plc4xWriter.updateCounters(session);
getLogger().info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());

session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows from subscription", executionTimeElapsed);
resultSetFF = completeResultFlowFile(session, nrOfRows, resultSetFF, plc4xWriter);
session.transfer(resultSetFF, REL_SUCCESS);
session.commitAsync();

executeTime.start();

Expand All @@ -298,4 +238,71 @@ public PlcValueType getPlcValueType() {
throw new ProcessException("Got an error while trying to get a subscription event", e);
}
}

private void addTagsToCache(DefaultPlcSubscriptionEvent event, Plc4xWriter plc4xWriter) {
if (debugEnabled)
getLogger().debug("Adding Plc-Avro schema and PlcTypes resolution into cache with key: " + addressMap.toString());

// Add schema to the cache
LinkedHashSet<String> addressNames = new LinkedHashSet<>();
addressNames.addAll(event.getTagNames());

List<PlcTag> addressTags = addressNames.stream().map(addr ->
new PlcTag() {
@Override
public String getAddressString() {
return addr;
}

@Override
public PlcValueType getPlcValueType() {
return event.getPlcValue(addr).getPlcValueType();
}
}
).collect(Collectors.toList());

getSchemaCache().addSchema(
addressMap,
addressNames,
addressTags,
plc4xWriter.getRecordSchema()
);
recordSchema = getSchemaCache().retrieveSchema(addressMap);
}

private FlowFile completeResultFlowFile(final ProcessSession session, final AtomicLong nrOfRows, FlowFile resultSetFF,
Plc4xWriter plc4xWriter) {

long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
executeTime.stop();

final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_LAST_EVENT, String.valueOf(executionTimeElapsed));

attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
plc4xWriter.updateCounters(session);
getLogger().info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());

session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows from subscription", executionTimeElapsed);
return resultSetFF;
}


protected static class CyclycPollingIntervalValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if (context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).asLong() > Long.valueOf(input)) {
return new ValidationResult.Builder().valid(true).build();
} else {
return new ValidationResult.Builder()
.valid(false)
.input(input)
.subject(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL.getDisplayName())
.explanation(String.format("it must me smaller than the value of %s", PLC_FUTURE_TIMEOUT_MILISECONDS.getDisplayName()))
.build();
}
}
}
}

0 comments on commit 4156cc9

Please sign in to comment.