Skip to content

Commit

Permalink
Feature/nifi integration sink record processor and minor fixes (#809)
Browse files Browse the repository at this point in the history
* Add Plc4xSinkRecordProcessor

* Add unit test
* Add response code check for Plc4xSinkProcessor

* Change input requirement to required

* Remove boolean conversion to writting values

* Add timeout in writing request

* Update README

* Added Common properties entry to readme
* Added Plc4xSinkRecordProcessor entry to readme

* Fix merge issues with new cache connection

* Changetimeout validators to positive integers

* Update README

* Added comment to commented lines in common tests

* Make sink processors work with schema cache

* Improve sink tests

* Add unit test for address and property address access strategies

* Fix unit tests. Address text was blank

* Small fixes

* Add a check if debug logger is enabled
* Update tags of all processors
* Check if record sink response is not null
* Simplify record reader creation

* Update README

* Cache schema when writing success

* Added exists chech to flowfile attribute in Plc4xSinkProcessor

* Fix build
  • Loading branch information
QuanticPony committed Feb 23, 2023
1 parent 795454c commit 65346de
Show file tree
Hide file tree
Showing 13 changed files with 581 additions and 88 deletions.
99 changes: 57 additions & 42 deletions plc4j/integrations/apache-nifi/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
!--
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
Expand All @@ -15,12 +15,13 @@ software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
-->
# PLC4X Apache NiFi Integration

# Common properties
This applies to all Plc4x processors:

## Common properties
The following properties applies to all Plc4x Processors:
* Connection String: A constant connection string such as `s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200`.
* Read/Write timeout (miliseconds): Specifies the time in milliseconds for the connection to return a timeout.
* Address Access Strategy: defines how the processor obtains the PLC addresses. It can take 2 values:
* **Properties as Addreses:**
For each variable, add a new property to the processor where the property name matches the variable name, and the variable value corresponds to the address tag.
Expand Down Expand Up @@ -48,19 +49,66 @@ This applies to all Plc4x processors:
```
If this JSON is in an attribute `plc4x.addresses` it can be accessed with *Address Text*=`${plc4x.addresses}`.


When reading from a PLC the response is used to create a mapping between Plc types into Avro. The mapping is done as follows:

Table of data mapping between plc data and Avro types (as specified in [Avro specification](https://avro.apache.org/docs/1.11.1/specification/#primitive-types)).


| PLC type | Avro Type |
|----------:|-----------|
| PlcBOOL | boolean |
| PlcBYTE | bytes |
| PlcSINT | int |
| PlcINT | int |
| PlcLINT | long |
| PlcREAL | float |
| PlcLREAL | double |
| PlcCHAR | string |
| PlcDATE_AND_TIME | string |
| PlcDATE | string |
| PlcDINT | string |
| PlcDWORD | string |
| PlcLTIME | string |
| PlcLWORD | string |
| PlcNull | string |
| PlcSTRING | string |
| PlcTIME_OF_DAY | string |
| PlcTIME | string |
| PlcUDINT | string |
| PlcUINT | string |
| PlcULINT | string |
| PlcUSINT | string |
| PlcWCHAR | string |
| PlcWORD | string |
| ELSE | string |


Also, it is important to keep in mind the Processor Scheduling Configuration. Using the parameter **Run Schedule** (for example to *1 sec*), the reading frequency can be set. Note that by default, this value is defined to 0 sec (as fast as possible).


## Plc4xSinkProcessor

## Plc4xSourceProcessor

## Plc4xSinkRecordProcessor

This processor is <ins>record oriented</ins>, formatting output flowfile content using a Record Writer (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)).

The Plc4xSinkRecord Processor can be configured using the common properties defined above and the following property:
- *Record Writer:* Specifies the Controller Service to use for writing results to a flowfile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.


For the **Record Writer** property, any writer included in NiFi could be used, such as JSON, CSV, etc (also custom writers can be created).


## Plc4xSourceRecordProcessor

This processor is <ins>record oriented</ins>, formatting output flowfile content using a Record Writer (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)).

The Plc4xSourceRecord Processor can be configured using the following **properties**:
The Plc4xSourceRecord Processor can be configured using the common properties defined above and the following **properties**:

- *PLC connection String:* PLC4X connection string used to connect to a given PLC device.
- *Record Writer:* Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.
- *Read timeout (miliseconds):* Specifies the time in milliseconds for the connection to return a timeout

Then, the PLC variables to be accessed are specificied using Nifi processor **Dynamic Properties**. For each variable, add a new property to the processor where the property name matches the variable name, and the variable value corresponds to the address tag.

Expand Down Expand Up @@ -109,37 +157,4 @@ The output flowfile will contain the PLC read values. This information is includ
"var4" : "4",
"ts" : 1628783058433
} ]
```

Also, it is important to keep in mind the Processor Scheduling Configuration. Using the parameter **Run Schedule** (for example to *1 sec*), the reading frequency can be set. Note that by default, this value is defined to 0 sec (as fast as possible).

Table of data mapping between plc data and Avro types (as specified in [Avro specification](https://avro.apache.org/docs/1.11.1/specification/#primitive-types)).


| PLC type | Avro Type |
|----------:|-----------|
| PlcBOOL | boolean |
| PlcBYTE | bytes |
| PlcSINT | int |
| PlcINT | int |
| PlcLINT | long |
| PlcREAL | float |
| PlcLREAL | double |
| PlcCHAR | string |
| PlcDATE_AND_TIME | string |
| PlcDATE | string |
| PlcDINT | string |
| PlcDWORD | string |
| PlcLTIME | string |
| PlcLWORD | string |
| PlcNull | string |
| PlcSTRING | string |
| PlcTIME_OF_DAY | string |
| PlcTIME | string |
| PlcUDINT | string |
| PlcUINT | string |
| PlcULINT | string |
| PlcUSINT | string |
| PlcWCHAR | string |
| PlcWORD | string |
| ELSE | string |
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.plc4x.nifi;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -53,13 +54,17 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {

protected List<PropertyDescriptor> properties;
protected Set<Relationship> relationships;
protected volatile boolean debugEnabled;

protected String connectionString;
protected Map<String, String> addressMap;

protected final SchemaCache schemaCache = new SchemaCache(0);

private final PlcConnectionManager connectionManager = CachedPlcConnectionManager.getBuilder().build();
private final PlcConnectionManager connectionManager = CachedPlcConnectionManager.getBuilder()
.withMaxLeaseTime(Duration.ofSeconds(1L))
.withMaxWaitTime(Duration.ofSeconds(1L))
.build();

protected static final List<AllowableValue> addressAccessStrategy = Collections.unmodifiableList(Arrays.asList(
AddressesAccessUtils.ADDRESS_PROPERTY,
Expand Down Expand Up @@ -151,6 +156,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
public void onScheduled(final ProcessContext context) {
connectionString = context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
schemaCache.restartCache(context.getProperty(PLC_SCHEMA_CACHE_SIZE).asInteger());
debugEnabled = getLogger().isDebugEnabled();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.model.PlcTag;

@TriggerSerially
@Tags({"plc4x-sink"})
@Tags({"plc4x", "put", "sink"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Processor able to write data to industrial PLCs using Apache PLC4X")
@ReadsAttributes({@ReadsAttribute(attribute="value", description="some value")})
Expand All @@ -45,6 +47,7 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final ComponentLog logger = getLogger();

// Abort if there's nothing to do.
if (flowFile == null) {
Expand All @@ -64,36 +67,57 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

if (tags != null){
for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
builder.addTag(tag.getKey(), tag.getValue());
if (flowFile.getAttributes().containsKey(tag.getKey())) {
builder.addTag(tag.getKey(), tag.getValue(), flowFile.getAttribute(tag.getKey()));
} else {
if (debugEnabled)
logger.debug("PlcTag " + tag + " is declared as address but was not found on input record.");
}
}
} else {
getLogger().debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString());
for (Map.Entry<String,String> entry: addressMap.entrySet()){
builder.addTagAddress(entry.getKey(), entry.getValue());
if (flowFile.getAttributes().containsKey(entry.getKey())) {
builder.addTagAddress(entry.getKey(), entry.getValue(), flowFile.getAttribute(entry.getKey()));
}
}
if (debugEnabled)
logger.debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString());
}

PlcWriteRequest writeRequest = builder.build();

// Send the request to the PLC.
try {
final PlcWriteResponse plcWriteResponse = writeRequest.execute().get();
// TODO: Evaluate the response and create flow files for successful and unsuccessful updates
PlcResponseCode code = null;

for (String tag : plcWriteResponse.getTagNames()) {
code = plcWriteResponse.getResponseCode(tag);
if (!code.equals(PlcResponseCode.OK)) {
logger.error("Not OK code when writing the data to PLC for tag " + tag
+ " with value " + flowFile.getAttribute(tag)
+ " in addresss " + plcWriteResponse.getTag(tag).getAddressString());
throw new Exception(code.toString());
}
}
session.transfer(flowFile, REL_SUCCESS);

if (tags == null){
getLogger().debug("Adding PlcTypes resolution into cache with key: " + addressMap.toString());
if (debugEnabled)
logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap.toString());
getSchemaCache().addSchema(
addressMap,
writeRequest.getTagNames(),
writeRequest.getTags(),
null
);
}

} catch (Exception e) {
flowFile = session.putAttribute(flowFile, "exception", e.getLocalizedMessage());
session.transfer(flowFile, REL_FAILURE);
}

} catch (ProcessException e) {
throw e;
} catch (Exception e) {
Expand Down

0 comments on commit 65346de

Please sign in to comment.