From 65346dec74e0667bd3c56e0fa4e9e06aea041279 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Unai=20Ler=C3=ADa=20Fortea?= <67756626+QuanticPony@users.noreply.github.com> Date: Thu, 23 Feb 2023 10:36:09 +0100 Subject: [PATCH] Feature/nifi integration sink record processor and minor fixes (#809) * Add Plc4xSinkRecordProcessor * Add unit test * Add response code check for Plc4xSinkProcessor * Change input requirement to required * Remove boolean conversion to writting values * Add timeout in writing request * Update README * Added Common properties entry to readme * Added Plc4xSinkRecordProcessor entry to readme * Fix merge issues with new cache connection * Changetimeout validators to positive integers * Update README * Added comment to commented lines in common tests * Make sink processors work with schema cache * Improve sink tests * Add unit test for address and property address access strategies * Fix unit tests. Address text was blank * Small fixes * Add a check if debug logger is enabled * Update tags of all processors * Check if record sink response is not null * Simplify record reader creation * Update README * Cache schema when writing success * Added exists chech to flowfile attribute in Plc4xSinkProcessor * Fix build --- plc4j/integrations/apache-nifi/README.md | 99 ++++---- .../apache/plc4x/nifi/BasePlc4xProcessor.java | 8 +- .../apache/plc4x/nifi/Plc4xSinkProcessor.java | 36 ++- .../plc4x/nifi/Plc4xSinkRecordProcessor.java | 228 ++++++++++++++++++ .../plc4x/nifi/Plc4xSourceProcessor.java | 12 +- .../nifi/Plc4xSourceRecordProcessor.java | 36 +-- .../record/Plc4xReadResponseRecordSet.java | 14 +- .../org.apache.nifi.processor.Processor | 1 + .../plc4x/nifi/Plc4xSinkProcessorTest.java | 40 ++- .../nifi/Plc4xSinkRecordProcessorTest.java | 94 ++++++++ .../plc4x/nifi/Plc4xSourceProcessorTest.java | 7 +- .../nifi/Plc4xSourceRecordProcessorTest.java | 7 +- .../plc4x/nifi/util/Plc4xCommonTest.java | 87 ++++++- 13 files changed, 581 insertions(+), 88 deletions(-) create mode 100644 plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java create mode 100644 plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java diff --git a/plc4j/integrations/apache-nifi/README.md b/plc4j/integrations/apache-nifi/README.md index 4be71723df0..4aaaf362003 100644 --- a/plc4j/integrations/apache-nifi/README.md +++ b/plc4j/integrations/apache-nifi/README.md @@ -1,4 +1,4 @@ -!-- + + --> # PLC4X Apache NiFi Integration -# Common properties -This applies to all Plc4x processors: - +## Common properties +The following properties applies to all Plc4x Processors: +* Connection String: A constant connection string such as `s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200`. +* Read/Write timeout (miliseconds): Specifies the time in milliseconds for the connection to return a timeout. * Address Access Strategy: defines how the processor obtains the PLC addresses. It can take 2 values: * **Properties as Addreses:** For each variable, add a new property to the processor where the property name matches the variable name, and the variable value corresponds to the address tag. @@ -48,19 +49,66 @@ This applies to all Plc4x processors: ``` If this JSON is in an attribute `plc4x.addresses` it can be accessed with *Address Text*=`${plc4x.addresses}`. + +When reading from a PLC the response is used to create a mapping between Plc types into Avro. The mapping is done as follows: + +Table of data mapping between plc data and Avro types (as specified in [Avro specification](https://avro.apache.org/docs/1.11.1/specification/#primitive-types)). + + +| PLC type | Avro Type | +|----------:|-----------| +| PlcBOOL | boolean | +| PlcBYTE | bytes | +| PlcSINT | int | +| PlcINT | int | +| PlcLINT | long | +| PlcREAL | float | +| PlcLREAL | double | +| PlcCHAR | string | +| PlcDATE_AND_TIME | string | +| PlcDATE | string | +| PlcDINT | string | +| PlcDWORD | string | +| PlcLTIME | string | +| PlcLWORD | string | +| PlcNull | string | +| PlcSTRING | string | +| PlcTIME_OF_DAY | string | +| PlcTIME | string | +| PlcUDINT | string | +| PlcUINT | string | +| PlcULINT | string | +| PlcUSINT | string | +| PlcWCHAR | string | +| PlcWORD | string | +| ELSE | string | + + +Also, it is important to keep in mind the Processor Scheduling Configuration. Using the parameter **Run Schedule** (for example to *1 sec*), the reading frequency can be set. Note that by default, this value is defined to 0 sec (as fast as possible). + + ## Plc4xSinkProcessor ## Plc4xSourceProcessor +## Plc4xSinkRecordProcessor + +This processor is record oriented, formatting output flowfile content using a Record Writer (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)). + +The Plc4xSinkRecord Processor can be configured using the common properties defined above and the following property: +- *Record Writer:* Specifies the Controller Service to use for writing results to a flowfile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types. + + +For the **Record Writer** property, any writer included in NiFi could be used, such as JSON, CSV, etc (also custom writers can be created). + + ## Plc4xSourceRecordProcessor This processor is record oriented, formatting output flowfile content using a Record Writer (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)). -The Plc4xSourceRecord Processor can be configured using the following **properties**: +The Plc4xSourceRecord Processor can be configured using the common properties defined above and the following **properties**: -- *PLC connection String:* PLC4X connection string used to connect to a given PLC device. - *Record Writer:* Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types. -- *Read timeout (miliseconds):* Specifies the time in milliseconds for the connection to return a timeout Then, the PLC variables to be accessed are specificied using Nifi processor **Dynamic Properties**. For each variable, add a new property to the processor where the property name matches the variable name, and the variable value corresponds to the address tag. @@ -109,37 +157,4 @@ The output flowfile will contain the PLC read values. This information is includ "var4" : "4", "ts" : 1628783058433 } ] -``` - -Also, it is important to keep in mind the Processor Scheduling Configuration. Using the parameter **Run Schedule** (for example to *1 sec*), the reading frequency can be set. Note that by default, this value is defined to 0 sec (as fast as possible). - -Table of data mapping between plc data and Avro types (as specified in [Avro specification](https://avro.apache.org/docs/1.11.1/specification/#primitive-types)). - - -| PLC type | Avro Type | -|----------:|-----------| -| PlcBOOL | boolean | -| PlcBYTE | bytes | -| PlcSINT | int | -| PlcINT | int | -| PlcLINT | long | -| PlcREAL | float | -| PlcLREAL | double | -| PlcCHAR | string | -| PlcDATE_AND_TIME | string | -| PlcDATE | string | -| PlcDINT | string | -| PlcDWORD | string | -| PlcLTIME | string | -| PlcLWORD | string | -| PlcNull | string | -| PlcSTRING | string | -| PlcTIME_OF_DAY | string | -| PlcTIME | string | -| PlcUDINT | string | -| PlcUINT | string | -| PlcULINT | string | -| PlcUSINT | string | -| PlcWCHAR | string | -| PlcWORD | string | -| ELSE | string | +``` \ No newline at end of file diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java index 158d08a0a5c..039d9917c67 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.plc4x.nifi; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -53,13 +54,17 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor { protected List properties; protected Set relationships; + protected volatile boolean debugEnabled; protected String connectionString; protected Map addressMap; protected final SchemaCache schemaCache = new SchemaCache(0); - private final PlcConnectionManager connectionManager = CachedPlcConnectionManager.getBuilder().build(); + private final PlcConnectionManager connectionManager = CachedPlcConnectionManager.getBuilder() + .withMaxLeaseTime(Duration.ofSeconds(1L)) + .withMaxWaitTime(Duration.ofSeconds(1L)) + .build(); protected static final List addressAccessStrategy = Collections.unmodifiableList(Arrays.asList( AddressesAccessUtils.ADDRESS_PROPERTY, @@ -151,6 +156,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String public void onScheduled(final ProcessContext context) { connectionString = context.getProperty(PLC_CONNECTION_STRING.getName()).getValue(); schemaCache.restartCache(context.getProperty(PLC_SCHEMA_CACHE_SIZE).asInteger()); + debugEnabled = getLogger().isDebugEnabled(); } @Override diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java index a13e6c3dff8..11bfa74e894 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java @@ -27,16 +27,18 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.plc4x.java.api.PlcConnection; import org.apache.plc4x.java.api.messages.PlcWriteRequest; import org.apache.plc4x.java.api.messages.PlcWriteResponse; +import org.apache.plc4x.java.api.types.PlcResponseCode; import org.apache.plc4x.java.api.model.PlcTag; @TriggerSerially -@Tags({"plc4x-sink"}) +@Tags({"plc4x", "put", "sink"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @CapabilityDescription("Processor able to write data to industrial PLCs using Apache PLC4X") @ReadsAttributes({@ReadsAttribute(attribute="value", description="some value")}) @@ -45,6 +47,7 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); + final ComponentLog logger = getLogger(); // Abort if there's nothing to do. if (flowFile == null) { @@ -64,13 +67,21 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (tags != null){ for (Map.Entry tag : tags.entrySet()){ - builder.addTag(tag.getKey(), tag.getValue()); + if (flowFile.getAttributes().containsKey(tag.getKey())) { + builder.addTag(tag.getKey(), tag.getValue(), flowFile.getAttribute(tag.getKey())); + } else { + if (debugEnabled) + logger.debug("PlcTag " + tag + " is declared as address but was not found on input record."); + } } } else { - getLogger().debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString()); for (Map.Entry entry: addressMap.entrySet()){ - builder.addTagAddress(entry.getKey(), entry.getValue()); + if (flowFile.getAttributes().containsKey(entry.getKey())) { + builder.addTagAddress(entry.getKey(), entry.getValue(), flowFile.getAttribute(entry.getKey())); + } } + if (debugEnabled) + logger.debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString()); } PlcWriteRequest writeRequest = builder.build(); @@ -78,11 +89,22 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // Send the request to the PLC. try { final PlcWriteResponse plcWriteResponse = writeRequest.execute().get(); - // TODO: Evaluate the response and create flow files for successful and unsuccessful updates + PlcResponseCode code = null; + + for (String tag : plcWriteResponse.getTagNames()) { + code = plcWriteResponse.getResponseCode(tag); + if (!code.equals(PlcResponseCode.OK)) { + logger.error("Not OK code when writing the data to PLC for tag " + tag + + " with value " + flowFile.getAttribute(tag) + + " in addresss " + plcWriteResponse.getTag(tag).getAddressString()); + throw new Exception(code.toString()); + } + } session.transfer(flowFile, REL_SUCCESS); if (tags == null){ - getLogger().debug("Adding PlcTypes resolution into cache with key: " + addressMap.toString()); + if (debugEnabled) + logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap.toString()); getSchemaCache().addSchema( addressMap, writeRequest.getTagNames(), @@ -90,10 +112,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session null ); } + } catch (Exception e) { flowFile = session.putAttribute(flowFile, "exception", e.getLocalizedMessage()); session.transfer(flowFile, REL_FAILURE); } + } catch (ProcessException e) { throw e; } catch (Exception e) { diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java new file mode 100644 index 00000000000..0ea2b2c44d4 --- /dev/null +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java @@ -0,0 +1,228 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + */ +package org.apache.plc4x.nifi; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; +import org.apache.plc4x.java.api.PlcConnection; +import org.apache.plc4x.java.api.messages.PlcWriteRequest; +import org.apache.plc4x.java.api.messages.PlcWriteResponse; +import org.apache.plc4x.java.api.model.PlcTag; +import org.apache.plc4x.java.api.types.PlcResponseCode; + +@TriggerSerially +@Tags({"plc4x", "put", "sink", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Processor able to write data to industrial PLCs using Apache PLC4X") +@WritesAttributes({ @WritesAttribute(attribute = "value", description = "some value") }) +public class Plc4xSinkRecordProcessor extends BasePlc4xProcessor { + + public static final String RESULT_ROW_COUNT = "plc4x.write.row.count"; + public static final String RESULT_QUERY_DURATION = "plc4x.write.query.duration"; + public static final String RESULT_QUERY_EXECUTION_TIME = "plc4x.write.query.executiontime"; + public static final String RESULT_QUERY_FETCH_TIME = "plc4x.write.query.fetchtime"; + public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid"; + public static final String RESULT_ERROR_MESSAGE = "plc4x.write.error.message"; + + public static final PropertyDescriptor PLC_RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader").displayName("Record Reader") + .description( + "Specifies the Controller Service to use for reading record from a FlowFile. The Record Reader may use Inherit Schema to emulate the inferred schema behavior, i.e. " + + "an explicit schema need not be defined in the reader, and will be supplied by the same logic used to infer the schema from the column types.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor PLC_WRITE_FUTURE_TIMEOUT_MILISECONDS = new PropertyDescriptor.Builder().name("plc4x-record-write-timeout").displayName("Write timeout (miliseconds)") + .description("Write timeout in miliseconds") + .defaultValue("10000") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + + public Plc4xSinkRecordProcessor() { + } + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + final Set r = new HashSet<>(); + r.addAll(super.getRelationships()); + this.relationships = Collections.unmodifiableSet(r); + + final List pds = new ArrayList<>(); + pds.addAll(super.getSupportedPropertyDescriptors()); + pds.add(PLC_RECORD_READER_FACTORY); + pds.add(PLC_WRITE_FUTURE_TIMEOUT_MILISECONDS); + this.properties = Collections.unmodifiableList(pds); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile fileToProcess = session.get(); + + // Abort if there's nothing to do. + if (fileToProcess == null) { + return; + } + + final ComponentLog logger = getLogger(); + // Get an instance of a component able to read from a PLC. + final AtomicLong nrOfRows = new AtomicLong(0L); + final StopWatch executeTime = new StopWatch(true); + + final FlowFile originalFlowFile = fileToProcess; + + InputStream in = session.read(originalFlowFile); + + Record record = null; + + try (RecordReader recordReader = context.getProperty(PLC_RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class) + .createRecordReader(originalFlowFile, in, logger)){ + + while ((record = recordReader.nextRecord()) != null) { + long nrOfRowsHere = 0L; + PlcWriteResponse plcWriteResponse; + PlcWriteRequest writeRequest; + + Map addressMap = getPlcAddressMap(context, fileToProcess); + final Map tags = getSchemaCache().retrieveTags(addressMap); + + try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString())) { + PlcWriteRequest.Builder builder = connection.writeRequestBuilder(); + + + if (tags != null){ + for (Map.Entry tag : tags.entrySet()){ + if (record.toMap().containsKey(tag.getKey())) { + builder.addTag(tag.getKey(), tag.getValue(), record.getValue(tag.getKey())); + nrOfRowsHere++; + } else { + if (debugEnabled) + logger.debug("PlcTag " + tag + " is declared as address but was not found on input record."); + } + } + } else { + if (debugEnabled) + logger.debug("Plc-Avro schema and PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString()); + for (Map.Entry entry: addressMap.entrySet()){ + if (record.toMap().containsKey(entry.getKey())) { + builder.addTagAddress(entry.getKey(), entry.getValue(), record.getValue(entry.getKey())); + nrOfRowsHere++; + } else { + if (debugEnabled) + logger.debug("PlcTag " + entry.getKey() + " with address " + entry.getValue() + " was not found on input record."); + } + } + } + writeRequest = builder.build(); + + plcWriteResponse = writeRequest.execute().get( + context.getProperty(PLC_WRITE_FUTURE_TIMEOUT_MILISECONDS.getName()).asInteger(), TimeUnit.MILLISECONDS + ); + } catch (Exception e) { + System.out.println(e.getMessage()); + in.close(); + logger.error("Exception writing the data to PLC", e); + session.transfer(originalFlowFile, REL_FAILURE); + session.commitAsync(); + throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e); + } + + // Response check if values were written + if (plcWriteResponse != null){ + PlcResponseCode code = null; + + for (String tag : plcWriteResponse.getTagNames()) { + code = plcWriteResponse.getResponseCode(tag); + if (!code.equals(PlcResponseCode.OK)) { + logger.error("Not OK code when writing the data to PLC for tag " + tag + + " with value " + record.getValue(tag).toString() + + " in addresss " + plcWriteResponse.getTag(tag).getAddressString()); + throw new ProcessException("Writing response code for " + plcWriteResponse.getTag(tag).getAddressString() + "was " + code.name() + ", expected OK"); + } + } + if (tags == null && writeRequest != null){ + if (debugEnabled) + logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap.toString()); + getSchemaCache().addSchema( + addressMap, + writeRequest.getTagNames(), + writeRequest.getTags(), + null + ); + } + nrOfRows.getAndAdd(nrOfRowsHere); + } + } + in.close(); + } catch (Exception e) { + throw new ProcessException(e); + } + + long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS); + final Map attributesToAdd = new HashMap<>(); + attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); + + FlowFile resultSetFF = session.putAllAttributes(originalFlowFile, attributesToAdd); + + logger.info("Writing {} fields from {} records; transferring to 'success'", new Object[] { nrOfRows.get(), resultSetFF }); + // Report a FETCH event if there was an incoming flow file, or a RECEIVE event + // otherwise + if (context.hasIncomingConnection()) { + session.getProvenanceReporter().fetch(resultSetFF, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed); + } else { + session.getProvenanceReporter().receive(resultSetFF, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed); + } + + session.transfer(resultSetFF, BasePlc4xProcessor.REL_SUCCESS); + session.commitAsync(); + } +} diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java index 1a88aa47217..026291246a9 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java @@ -28,16 +28,16 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.plc4x.java.api.PlcConnection; -import org.apache.plc4x.java.api.PlcDriver; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcReadResponse; import org.apache.plc4x.java.api.model.PlcTag; -@Tags({"plc4x-source"}) +@Tags({"plc4x", "get", "input", "source", "attributes"}) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Processor able to read data from industrial PLCs using Apache PLC4X") @WritesAttributes({@WritesAttribute(attribute="value", description="some value")}) @@ -45,6 +45,8 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + final ComponentLog logger = getLogger(); // Get an instance of a component able to read from a PLC. try(PlcConnection connection = getConnectionManager().getConnection(getConnectionString())) { @@ -64,7 +66,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session builder.addTag(tag.getKey(), tag.getValue()); } } else { - getLogger().debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString()); + if (debugEnabled) + logger.debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString()); for (Map.Entry entry: addressMap.entrySet()){ builder.addTagAddress(entry.getKey(), entry.getValue()); } @@ -82,7 +85,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session flowFile = session.putAllAttributes(flowFile, attributes); if (tags == null){ - getLogger().debug("Adding PlcTypes resolution into cache with key: " + addressMap.toString()); + if (debugEnabled) + logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap.toString()); getSchemaCache().addSchema( addressMap, readRequest.getTagNames(), diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java index 1af45e6bae7..3a1eabcf452 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java @@ -56,7 +56,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.plc4x.nifi.record.Plc4xWriter; import org.apache.plc4x.nifi.record.RecordPlc4xWriter; -@Tags({ "plc4x-source" }) +@Tags({"plc4x", "get", "input", "source", "record"}) @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) @CapabilityDescription("Processor able to read data from industrial PLCs using Apache PLC4X") @WritesAttributes({ @WritesAttribute(attribute = "value", description = "some value") }) @@ -80,7 +80,7 @@ public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor { .description("Read timeout in miliseconds") .defaultValue("10000") .required(true) - .addValidator(StandardValidators.INTEGER_VALIDATOR) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); Integer readTimeout; @@ -131,20 +131,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final AtomicLong nrOfRows = new AtomicLong(0L); final StopWatch executeTime = new StopWatch(true); - try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString())) { - - String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key()); - Map inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes(); - FlowFile resultSetFF; - if (fileToProcess == null) { - resultSetFF = session.create(); - } else { - resultSetFF = session.create(fileToProcess); - } - if (inputFileAttrMap != null) { - resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap); - } + String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key()); + Map inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes(); + FlowFile resultSetFF; + if (fileToProcess == null) { + resultSetFF = session.create(); + } else { + resultSetFF = session.create(fileToProcess); + } + if (inputFileAttrMap != null) { + resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap); + } + try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString())) { PlcReadRequest.Builder builder = connection.readRequestBuilder(); Map addressMap = getPlcAddressMap(context, fileToProcess); final RecordSchema recordSchema = getSchemaCache().retrieveSchema(addressMap); @@ -155,7 +154,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session builder.addTag(tag.getKey(), tag.getValue()); } } else { - logger.debug("Plc-Avro schema and PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString()); + if (debugEnabled) + logger.debug("Plc-Avro schema and PlcTypes resolution not found in cache and will be added with key: " + addressMap.toString()); for (Map.Entry entry: addressMap.entrySet()){ builder.addTagAddress(entry.getKey(), entry.getValue()); } @@ -185,7 +185,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session }); if (recordSchema == null){ - logger.debug("Adding Plc-Avro schema and PlcTypes resolution into cache with key: " + addressMap.toString()); + if (debugEnabled) + logger.debug("Adding Plc-Avro schema and PlcTypes resolution into cache with key: " + addressMap.toString()); getSchemaCache().addSchema( addressMap, readRequest.getTagNames(), @@ -228,5 +229,4 @@ public void onTrigger(final ProcessContext context, final ProcessSession session throw new ProcessException("Got an error while trying to get a connection", e); } } - } diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java index 3e038c0c50a..c1c24440e5e 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -44,6 +43,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, Closeable { private final PlcReadResponse readResponse; private final Set rsColumnNames; private boolean moreRows; + private boolean debugEnabled = logger.isDebugEnabled(); private AtomicReference recordSchema = new AtomicReference(null); @@ -51,7 +51,8 @@ public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, RecordSche this.readResponse = readResponse; moreRows = true; - logger.debug("Creating record schema from PlcReadResponse"); + if (debugEnabled) + logger.debug("Creating record schema from PlcReadResponse"); Map responseDataStructure = readResponse.getAsPlcValue().getStruct(); rsColumnNames = responseDataStructure.keySet(); @@ -61,7 +62,8 @@ public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, RecordSche } else { this.recordSchema.set(recordSchema); } - logger.debug("Record schema from PlcReadResponse successfuly created."); + if (debugEnabled) + logger.debug("Record schema from PlcReadResponse successfuly created."); } @@ -103,7 +105,8 @@ public void close() { protected Record createRecord(final PlcReadResponse readResponse) throws IOException{ final Map values = new HashMap<>(getSchema().getFieldCount()); - logger.debug("creating record."); + if (debugEnabled) + logger.debug("creating record."); for (final RecordField tag : getSchema().getFields()) { final String tagName = tag.getFieldName(); @@ -122,7 +125,8 @@ protected Record createRecord(final PlcReadResponse readResponse) throws IOExcep //add timestamp tag to schema values.put(Plc4xCommon.PLC4X_RECORD_TIMESTAMP_FIELD_NAME, System.currentTimeMillis()); - logger.debug("added timestamp tag to record."); + if (debugEnabled) + logger.debug("added timestamp tag to record."); return new MapRecord(getSchema(), values); diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 8f55535aee5..868e1281125 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,5 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.plc4x.nifi.Plc4xSinkProcessor +org.apache.plc4x.nifi.Plc4xSinkRecordProcessor org.apache.plc4x.nifi.Plc4xSourceProcessor org.apache.plc4x.nifi.Plc4xSourceRecordProcessor \ No newline at end of file diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java index e9ce18eb2fc..6e16763505d 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java @@ -16,24 +16,60 @@ */ package org.apache.plc4x.nifi; +import java.util.Map; +import java.util.stream.Collectors; + import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.apache.plc4x.nifi.Plc4xSinkProcessor; +import org.apache.plc4x.nifi.address.AddressesAccessUtils; +import org.apache.plc4x.nifi.util.Plc4xCommonTest; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + public class Plc4xSinkProcessorTest { private TestRunner testRunner; + private static int NUMBER_OF_CALLS = 5; @BeforeEach public void init() { testRunner = TestRunners.newTestRunner(Plc4xSinkProcessor.class); + testRunner.setIncomingConnection(false); + testRunner.setValidateExpressionUsage(false); + + testRunner.setProperty(Plc4xSinkProcessor.PLC_CONNECTION_STRING, "simulated://127.0.0.1"); + + testRunner.addConnection(Plc4xSinkProcessor.REL_SUCCESS); + testRunner.addConnection(Plc4xSinkProcessor.REL_FAILURE); + + for (int i = 0; i String.valueOf(e.getValue())))); } - @Test public void testProcessor() { + testRunner.run(NUMBER_OF_CALLS); + testRunner.assertTransferCount(Plc4xSinkProcessor.REL_FAILURE, 0); + testRunner.assertTransferCount(Plc4xSinkProcessor.REL_SUCCESS, NUMBER_OF_CALLS); + } + + @Test + public void testWithAddressProperties() { + testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, AddressesAccessUtils.ADDRESS_PROPERTY); + Plc4xCommonTest.getAddressMap().forEach((k,v) -> testRunner.setProperty(k, v)); + testProcessor(); + } + + // Test addressess text property access strategy + @Test + public void testWithAddressText() throws JsonProcessingException { + testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, AddressesAccessUtils.ADDRESS_TEXT); + testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap())); + testProcessor(); } } diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java new file mode 100644 index 00000000000..673dd5bba9e --- /dev/null +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java @@ -0,0 +1,94 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + */ +package org.apache.plc4x.nifi; + +import java.util.Map; + +import org.apache.nifi.avro.AvroReader; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.plc4x.nifi.address.AddressesAccessUtils; +import org.apache.plc4x.nifi.util.Plc4xCommonTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class Plc4xSinkRecordProcessorTest { + + private TestRunner testRunner; + private static int NUMBER_OF_CALLS = 5; + + private final AvroReader readerService = new AvroReader(); + + @BeforeEach + public void init() throws InitializationException { + testRunner = TestRunners.newTestRunner(Plc4xSinkRecordProcessor.class); + testRunner.setIncomingConnection(false); + testRunner.setValidateExpressionUsage(false); + + testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_CONNECTION_STRING, "simulated://127.0.0.1"); + testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_WRITE_FUTURE_TIMEOUT_MILISECONDS, "1000"); + + testRunner.addConnection(Plc4xSinkRecordProcessor.REL_SUCCESS); + testRunner.addConnection(Plc4xSinkRecordProcessor.REL_FAILURE); + + testRunner.addControllerService("reader", readerService); + testRunner.enableControllerService(readerService); + testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_RECORD_READER_FACTORY.getName(), "reader"); + + for (Map.Entry address :Plc4xCommonTest.addressMap.entrySet()) { + // TODO: Random generation not working with this types + if (address.getValue().startsWith("RANDOM/")) { + if (address.getValue().endsWith("WORD")) + continue; + } + testRunner.setProperty(address.getKey(), address.getValue()); + } + + for (int i = 0; i testRunner.setProperty(k, v)); + testAvroRecordReaderProcessor(); + } + + // Test addressess text property access strategy + @Test + public void testWithAddressText() throws InitializationException, JsonProcessingException { + testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, AddressesAccessUtils.ADDRESS_TEXT); + testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString()); + testAvroRecordReaderProcessor(); + } +} diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java index bf742a15cf4..030d431f1b1 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java @@ -25,6 +25,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + public class Plc4xSourceProcessorTest { private TestRunner testRunner; @@ -59,9 +62,9 @@ public void testWithAddressProperties() { // Test addressess text property access strategy @Test - public void testWithAddressText() { + public void testWithAddressText() throws JsonProcessingException { testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, AddressesAccessUtils.ADDRESS_TEXT); - testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, Plc4xCommonTest.getAddressMap().toString()); + testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString()); testProcessor(); } diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java index 434d22a4729..19953649f95 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java @@ -27,6 +27,9 @@ Licensed to the Apache Software Foundation (ASF) under one import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + public class Plc4xSourceRecordProcessorTest { private TestRunner testRunner; @@ -71,9 +74,9 @@ public void testWithAddressProperties() throws InitializationException { // Test addressess text property access strategy @Test - public void testWithAddressText() throws InitializationException { + public void testWithAddressText() throws InitializationException, JsonProcessingException { testRunner.setProperty(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, AddressesAccessUtils.ADDRESS_TEXT); - testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, Plc4xCommonTest.getAddressMap().toString()); + testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString()); testAvroRecordWriterProcessor(); } } diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java index eb532ea2687..bff0c2dda3f 100644 --- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java +++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/util/Plc4xCommonTest.java @@ -17,6 +17,7 @@ package org.apache.plc4x.nifi.util; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; @@ -24,34 +25,65 @@ import java.util.Map; import java.util.function.Consumer; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; import org.apache.avro.util.Utf8; import org.apache.nifi.util.MockFlowFile; public class Plc4xCommonTest { public static Map originalMap = new HashMap<>(); public static Map addressMap = new HashMap<>(); - public static Map typeMap = new HashMap<>(); + public static Map> typeMap = new HashMap<>(); + + + // TODO: BOOL, WORD; DWORD and LWORD are commented because random generation is not working with this types + // or a because a reverse type mapping between avro and PlcTypes is not implemented + public static Schema schema = SchemaBuilder.builder() + .record("tests").fields() + .nullableBoolean("BOOL", true) + // .nullableBytes("BYTE", new byte[] {1,2}) + // .nullableString("WORD", "4") + .nullableInt("SINT", -5) + .nullableString("USINT", "6") + .nullableInt("INT", 2000) + .nullableString("UINT", "3000") + .nullableString("DINT", "4000") + .nullableString("UDINT", "5000") + // .nullableString("DWORD", "0") + .nullableLong("LINT", 6000L) + .nullableString("ULINT", "7000") + // .nullableString("LWORD", "0") + .nullableFloat("REAL", 1.23456F) + .nullableDouble("LREAL", 2.34567) + .nullableString("CHAR", "c") + .nullableString("WCHAR", "d") + .nullableString("STRING", "this is a string") + .endRecord(); static { // originalMap values are in the type needed to check type mapping between PlcType and Avro originalMap.put("BOOL", true); originalMap.put("BYTE", "\u0001"); originalMap.put("WORD", "4"); - originalMap.put("SINT", Short.valueOf((short)-5)); + originalMap.put("SINT", -5); originalMap.put("USINT", "6"); originalMap.put("INT", 2000); originalMap.put("UINT", "3000"); originalMap.put("DINT", "4000"); originalMap.put("UDINT", "5000"); - originalMap.put("DWORD", "0"); + originalMap.put("DWORD", Long.valueOf("0")); originalMap.put("LINT", 6000L); originalMap.put("ULINT", "7000"); - originalMap.put("LWORD", "ab"); + originalMap.put("LWORD", Long.valueOf("0")); originalMap.put("REAL", 1.23456F); originalMap.put("LREAL", 2.34567); originalMap.put("CHAR", "c"); @@ -101,12 +133,16 @@ public static Map getAddressMap(){ Map result = new HashMap<>(); addressMap.forEach((k,v) -> { - if (!v.startsWith("RANDOM/")) { + if (v.startsWith("RANDOM/")) { if (!v.endsWith("BYTE") && !v.endsWith("CHAR") && + !v.endsWith("WORD") && !v.endsWith("STRING")) result.put(k, v); - } + } else { + result.put(k, v); + } + }); return result; } @@ -141,4 +177,43 @@ public void accept(MockFlowFile t) { } }); } + + public static GenericRecord getTestRecord() { + GenericRecord record = new GenericData.Record(schema); + record.put("BOOL", true); + // record.put("BYTE", "\u0001"); + // record.put("WORD", "4"); + record.put("SINT", -5); + record.put("USINT", "6"); + record.put("INT", 2000); + record.put("UINT", "3000"); + record.put("DINT", "4000"); + record.put("UDINT", "5000"); + // record.put("DWORD", "0"); + record.put("LINT", 6000L); + record.put("ULINT", "7000"); + // record.put("LWORD", "0"); + record.put("REAL", 1.23456F); + record.put("LREAL", 2.34567); + record.put("CHAR", "c"); + record.put("WCHAR", "d"); + record.put("STRING", "this is a string"); + return record; + } + + public static byte[] encodeRecord(GenericRecord record){ + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DatumWriter writer = new GenericDatumWriter(schema); + DataFileWriter fileWriter = new DataFileWriter(writer); + + try { + fileWriter.create(schema, out); + fileWriter.append(record); + fileWriter.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + return out.toByteArray(); + } }