diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
index 91a030f4e15a..1aeccb4b8131 100644
--- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
@@ -34,31 +34,18 @@
org.apache.nifinifi-event-listen1.19.0-SNAPSHOT
-
-
- org.apache.nifi
- nifi-security-socket-ssl
- 1.19.0-SNAPSHOT
-
-
- com.google.code.gson
- gson
-
-
- org.apache.nifi
- nifi-socket-utils
- 1.19.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-security-socket-ssl
+
+ org.apache.nifinifi-utils1.19.0-SNAPSHOT
-
- org.apache.nifi
- nifi-flowfile-packager
- 1.19.0-SNAPSHOT
- org.apache.nifinifi-ssl-context-service-api
@@ -71,25 +58,4 @@
test
-
-
-
-
- jigsaw
-
- (1.8,)
-
-
-
- jakarta.xml.bind
- jakarta.xml.bind-api
-
-
- org.glassfish.jaxb
- jaxb-runtime
-
-
-
-
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
index 86cc2df339ec..4248c31dcf5c 100644
--- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
@@ -16,20 +16,18 @@
*/
package org.apache.nifi.processors.beats;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -38,14 +36,13 @@
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
-import org.apache.nifi.processors.beats.netty.BeatsMessage;
-import org.apache.nifi.processors.beats.netty.BeatsMessageServerFactory;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.server.BeatsMessageServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
@@ -55,11 +52,9 @@
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,34 +62,30 @@
import java.util.concurrent.LinkedBlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"listen", "beats", "tcp", "logs"})
-@CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload " +
- "to the content of a FlowFile." +
- "This processor replaces the now deprecated/removed ListenLumberjack")
+@Tags({"beats", "logstash", "elasticsearch", "log"})
+@CapabilityDescription("Receive messages encoded using the Elasticsearch Beats protocol and write decoded JSON")
@WritesAttributes({
- @WritesAttribute(attribute = "beats.sender", description = "The sending host of the messages."),
- @WritesAttribute(attribute = "beats.port", description = "The sending port the messages were received over."),
- @WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message. Only included if is 1."),
+ @WritesAttribute(attribute = "beats.sender", description = "Internet Protocol address of the message sender"),
+ @WritesAttribute(attribute = "beats.port", description = "TCP port on which the Processor received messages"),
+ @WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message included for batches containing single messages"),
@WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json")
})
-@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
public class ListenBeats extends AbstractProcessor {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE")
.displayName("SSL Context Service")
- .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
- "messages will be received over a secure connection.")
- // Nearly all Lumberjack v1 implementations require TLS to work. v2 implementations (i.e. beats) have TLS as optional
+ .description("SSL Context Service is required to enable TLS for socket connections")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
- .displayName("Client Auth")
- .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
+ .displayName("Client Authentication")
+ .description("Client authentication policy when TLS is enabled")
.required(false)
+ .dependsOn(SSL_CONTEXT_SERVICE)
.allowableValues(ClientAuth.values())
.defaultValue(ClientAuth.REQUIRED.name())
.build();
@@ -104,73 +95,43 @@ public class ListenBeats extends AbstractProcessor {
.description("Messages received successfully will be sent out this relationship.")
.build();
- protected List descriptors;
- protected Set relationships;
+ private static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+ ListenerProperties.NETWORK_INTF_NAME,
+ ListenerProperties.PORT,
+ ListenerProperties.RECV_BUFFER_SIZE,
+ ListenerProperties.MAX_MESSAGE_QUEUE_SIZE,
+ ListenerProperties.MAX_SOCKET_BUFFER_SIZE,
+ ListenerProperties.CHARSET,
+ ListenerProperties.MAX_BATCH_SIZE,
+ ListenerProperties.MESSAGE_DELIMITER,
+ ListenerProperties.WORKER_THREADS,
+ SSL_CONTEXT_SERVICE,
+ CLIENT_AUTH
+ ));
+
+ private static final Set RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
+
protected volatile int port;
- protected volatile BlockingQueue events;
- protected volatile BlockingQueue errorEvents;
+ protected volatile BlockingQueue events;
+ protected volatile BlockingQueue errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
- protected volatile EventBatcher eventBatcher;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List descriptors = new ArrayList<>();
- descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
- descriptors.add(ListenerProperties.PORT);
- descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
- descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
- // Deprecated
- descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
- descriptors.add(ListenerProperties.CHARSET);
- descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
- descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
- descriptors.add(ListenerProperties.WORKER_THREADS);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(CLIENT_AUTH);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- protected Collection customValidate(final ValidationContext validationContext) {
- final List results = new ArrayList<>();
-
- final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
- if (sslContextService != null && !sslContextService.isTrustStoreConfigured()) {
- results.add(new ValidationResult.Builder()
- .explanation("SSL Context Service requires a truststore for the Beats forwarder client to work correctly")
- .valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
- }
-
- final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
- if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
- results.add(new ValidationResult.Builder()
- .explanation("Client Auth must be provided when using TLS/SSL")
- .valid(false).subject("Client Auth").build());
- }
-
- return results;
- }
+ protected volatile EventBatcher eventBatcher;
@Override
public final Set getRelationships() {
- return this.relationships;
+ return RELATIONSHIPS;
}
@Override
public List getSupportedPropertyDescriptors() {
- return descriptors;
+ return DESCRIPTORS;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
final int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
- final int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final InetAddress address = NetworkUtils.getInterfaceAddress(networkInterface);
final Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@@ -180,7 +141,7 @@ public void onScheduled(final ProcessContext context) throws IOException {
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
- final NettyEventServerFactory eventFactory = new BeatsMessageServerFactory(getLogger(), address, port, charset, events);
+ final NettyEventServerFactory eventFactory = new BeatsMessageServerFactory(getLogger(), address, port, events);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
@@ -191,80 +152,75 @@ public void onScheduled(final ProcessContext context) throws IOException {
eventFactory.setClientAuth(clientAuth);
}
- eventFactory.setSocketReceiveBuffer(bufferSize);
+ eventFactory.setSocketReceiveBuffer(socketBufferSize);
eventFactory.setWorkerThreads(workerThreads);
eventFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
+ eventFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+ eventFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
try {
eventServer = eventFactory.getEventServer();
- } catch (EventException e) {
+ } catch (final EventException e) {
getLogger().error("Failed to bind to [{}:{}]", address, port, e);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- EventBatcher eventBatcher = getEventBatcher();
+ EventBatcher eventBatcher = getEventBatcher();
final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
- Map> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+ Map> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
processEvents(session, batches);
}
@OnStopped
- public void stopped() {
- if (eventServer != null) {
+ public void shutdown() {
+ if (eventServer == null) {
+ getLogger().warn("Event Server not configured");
+ } else {
eventServer.shutdown();
}
eventBatcher = null;
}
- private void processEvents(final ProcessSession session, final Map> batches) {
- for (Map.Entry> entry : batches.entrySet()) {
+ private void processEvents(final ProcessSession session, final Map> batches) {
+ for (final Map.Entry> entry : batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
- final List events = entry.getValue().getEvents();
+ final List events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
- getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
continue;
}
final Map attributes = getAttributes(entry.getValue());
flowFile = session.putAllAttributes(flowFile, attributes);
- getLogger().debug("Transferring {} to success", flowFile);
session.transfer(flowFile, REL_SUCCESS);
- session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
- // the sender and command will be the same for all events based on the batch key
final String transitUri = getTransitUri(entry.getValue());
session.getProvenanceReporter().receive(flowFile, transitUri);
-
}
- session.commitAsync();
}
- protected String getTransitUri(FlowFileEventBatch batch) {
- final List events = batch.getEvents();
+ private String getTransitUri(final FlowFileEventBatch batch) {
+ final List events = batch.getEvents();
final String sender = events.get(0).getSender();
- final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
- return String.format("beats://%s:%d", senderHost, port);
+ return String.format("beats://%s:%d", sender, port);
}
- protected Map getAttributes(FlowFileEventBatch batch) {
- final List events = batch.getEvents();
- // the sender and command will be the same for all events based on the batch key
+ private Map getAttributes(final FlowFileEventBatch batch) {
+ final List events = batch.getEvents();
+
final String sender = events.get(0).getSender();
- final int numAttributes = events.size() == 1 ? 5 : 4;
- final Map attributes = new HashMap<>(numAttributes);
- attributes.put(beatsAttributes.SENDER.key(), sender);
- attributes.put(beatsAttributes.PORT.key(), String.valueOf(port));
+ final Map attributes = new LinkedHashMap<>();
+ attributes.put(BeatsAttributes.SENDER.key(), sender);
+ attributes.put(BeatsAttributes.PORT.key(), String.valueOf(port));
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
- // if there was only one event then we can pass on the transaction
- // NOTE: we could pass on all the transaction ids joined together
+
if (events.size() == 1) {
- attributes.put(beatsAttributes.SEQNUMBER.key(), String.valueOf(events.get(0).getSeqNumber()));
+ attributes.put(BeatsAttributes.SEQUENCE_NUMBER.key(), String.valueOf(events.get(0).getSequenceNumber()));
}
return attributes;
}
@@ -275,11 +231,11 @@ private String getMessageDemarcator(final ProcessContext context) {
.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
- private EventBatcher getEventBatcher() {
+ private EventBatcher getEventBatcher() {
if (eventBatcher == null) {
- eventBatcher = new EventBatcher(getLogger(), events, errorEvents) {
+ eventBatcher = new EventBatcher(getLogger(), events, errorEvents) {
@Override
- protected String getBatchKey(BeatsMessage event) {
+ protected String getBatchKey(final BatchMessage event) {
return event.getSender();
}
};
@@ -287,14 +243,14 @@ protected String getBatchKey(BeatsMessage event) {
return eventBatcher;
}
- public enum beatsAttributes implements FlowFileAttributeKey {
+ private enum BeatsAttributes implements FlowFileAttributeKey {
SENDER("beats.sender"),
PORT("beats.port"),
- SEQNUMBER("beats.sequencenumber");
+ SEQUENCE_NUMBER("beats.sequencenumber");
private final String key;
- beatsAttributes(String key) {
+ BeatsAttributes(String key) {
this.key = key;
}
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
deleted file mode 100644
index 2fa20ca856d7..000000000000
--- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.zip.InflaterInputStream;
-
-/**
- * Decodes a Beats frame by maintaining a state based on each byte that has been processed. This class
- * should not be shared by multiple threads.
- */
-public class BeatsDecoder {
-
-
- final ComponentLog logger;
-
- private BeatsFrame.Builder frameBuilder;
- private BeatsState currState = BeatsState.VERSION;
- private byte decodedFrameType;
-
- private byte[] unprocessedData;
-
- private final Charset charset;
- private final ByteArrayOutputStream currBytes;
-
- private long windowSize;
-
- static final int MIN_FRAME_HEADER_LENGTH = 2; // Version + Type
- static final int WINDOWSIZE_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32bit unsigned window size
- static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32 bit unsigned + payload
- static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit unsigned sequence number + 32 bit unsigned payload length
-
- /**
- * @param charset the charset to decode bytes from the frame
- */
- public BeatsDecoder(final Charset charset, final ComponentLog logger) {
- this(charset, new ByteArrayOutputStream(4096), logger);
- }
-
- /**
- * @param charset the charset to decode bytes from the frame
- * @param buffer a buffer to use while processing the bytes
- */
- public BeatsDecoder(final Charset charset, final ByteArrayOutputStream buffer, final ComponentLog logger) {
- this.logger = logger;
- this.charset = charset;
- this.currBytes = buffer;
- this.frameBuilder = new BeatsFrame.Builder();
- this.decodedFrameType = 0x00;
- }
-
- /**
- * Resets this decoder back to its initial state.
- */
- public void reset() {
- frameBuilder = new BeatsFrame.Builder();
- currState = BeatsState.VERSION;
- decodedFrameType = 0x00;
- currBytes.reset();
- }
-
- /**
- * Process the next byte from the channel, updating the builder and state accordingly.
- *
- * @param currByte the next byte to process
- * @preturn true if a frame is ready to be retrieved, false otherwise
- */
- public boolean process(final byte currByte) throws BeatsFrameException {
- try {
- switch (currState) {
- case VERSION: // Just enough data to process the version
- processVERSION(currByte);
- break;
- case FRAMETYPE: // Also able to process the frametype
- processFRAMETYPE(currByte);
- break;
- case PAYLOAD: // Initial bytes with version and Frame Type have already been received, start iteration over payload
- processPAYLOAD(currByte);
-
- // At one stage, the data sent to processPAYLOAD will be represente a complete frame, so we check before returning true
-
- if (frameBuilder.frameType == BeatsFrameType.WINDOWSIZE && currState == BeatsState.COMPLETE) {
- return true;
- } else if (frameBuilder.frameType == BeatsFrameType.COMPRESSED && currState == BeatsState.COMPLETE) {
- return true;
- } else if (frameBuilder.frameType == BeatsFrameType.JSON && currState == BeatsState.COMPLETE) {
- return true;
- } else {
- break;
- }
- case COMPLETE:
- return true;
- default:
- break;
- }
- return false;
- } catch (Exception e) {
- throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
- }
- }
-
-
- /**
- * Returns the decoded frame and resets the decoder for the next frame.
- * This method should be called after checking isComplete().
- *
- * @return the BeatsFrame that was decoded
- */
- public List getFrames() throws BeatsFrameException {
- List frames = new LinkedList<>();
-
- if (currState != BeatsState.COMPLETE) {
- throw new BeatsFrameException("Must be at the trailer of a frame");
- }
- try {
- // Once compressed frames are expanded, they must be devided into individual frames
- if (currState == BeatsState.COMPLETE && frameBuilder.frameType == BeatsFrameType.COMPRESSED) {
- logger.debug("Frame is compressed, will iterate to decode", new Object[]{});
-
- // Zero currBytes, currState and frameBuilder prior to iteration over
- // decompressed bytes
- currBytes.reset();
- frameBuilder.reset();
- currState = BeatsState.VERSION;
-
- // Run over decompressed data and split frames
- frames = splitCompressedFrames(unprocessedData);
-
- // In case of V or wired D and J frames we just ship them across the List
- } else {
- final BeatsFrame frame = frameBuilder.build();
- currBytes.reset();
- frameBuilder.reset();
- currState = BeatsState.VERSION;
- frames.add(frame);
- }
- return frames;
-
- } catch (Exception e) {
- throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
- }
- }
-
- private List splitCompressedFrames(byte[] decompressedData) {
- List frames = new LinkedList<>();
- BeatsFrame.Builder internalFrameBuilder = new BeatsFrame.Builder();
- ByteBuffer currentData = ByteBuffer.wrap(decompressedData);
-
- // Both Lumberjack v1 and Beats (LJ v2) has a weird approach to frames, where compressed frames embed D(ata) or J(SON) frames.
- // inside a compressed input.
- // Or as stated in the documentation:
- //
- // "As an example, you could have 3 data frames compressed into a single
- // 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
- //
- // Therefore, instead of calling process method again, just iterate over each of
- // the frames and split them so they can be processed
-
- while (currentData.hasRemaining()) {
-
- int payloadLength = 0;
-
- internalFrameBuilder.version = currentData.get();
- internalFrameBuilder.frameType = currentData.get();
- switch (internalFrameBuilder.frameType) {
- case BeatsFrameType.JSON:
-
- internalFrameBuilder.seqNumber = (int) (currentData.getInt() & 0x00000000ffffffffL);
- currentData.mark();
-
- internalFrameBuilder.dataSize = currentData.getInt() & 0x00000000ffffffffL;
- currentData.mark();
-
- // Define how much data to chomp
- payloadLength = Math.toIntExact(internalFrameBuilder.dataSize);
- byte[] jsonBytes = new byte[payloadLength];
-
- currentData.get(jsonBytes, 0, payloadLength);
- currentData.mark();
-
- // Add payload to frame
- internalFrameBuilder.payload(jsonBytes);
- break;
- }
-
- // data frame is created
- BeatsFrame frame = internalFrameBuilder.build();
- frames.add(frame);
- internalFrameBuilder.reset();
- }
-
- return frames;
- }
-
-
- private void processVERSION(final byte b) {
- byte version = b;
- frameBuilder.version(version);
- logger.debug("Version number is {}", new Object[]{version});
- currBytes.write(b);
- currState = BeatsState.FRAMETYPE;
- }
-
- private void processFRAMETYPE(final byte b) {
- decodedFrameType = b;
- frameBuilder.frameType(decodedFrameType);
- logger.debug("Frame type is {}", new Object[]{decodedFrameType});
- currBytes.write(b);
- currState = BeatsState.PAYLOAD;
- }
-
-
- /** Process the outer PAYLOAD byte by byte. Once data is read state is set to COMPLETE so that the data payload
- * can be processed fully using {@link #splitCompressedFrames(byte[])}
- * */
- private void processPAYLOAD(final byte b) {
- currBytes.write(b);
- switch (decodedFrameType) {
- case BeatsFrameType.WINDOWSIZE: //'W'
- if (currBytes.size() < WINDOWSIZE_LENGTH ) {
- logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
- break;
- } else if (currBytes.size() == WINDOWSIZE_LENGTH) {
- frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
- logger.debug("Data size is {}", new Object[]{frameBuilder.dataSize});
- // Sets payload to empty as frame contains no data
- frameBuilder.payload(new byte[]{});
- currBytes.reset();
- currState = BeatsState.COMPLETE;
- windowSize = frameBuilder.dataSize;
- break;
- } else { // Should never be here to be honest...
- logger.debug("Saw a packet I should not have seen. Packet contents were {}", new Object[] {currBytes.toString()});
- break;
- }
- case BeatsFrameType.COMPRESSED: //'C'
- if (currBytes.size() < COMPRESSED_MIN_LENGTH) {
- if (logger.isTraceEnabled()) {
- logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
- }
- break;
- } else if (currBytes.size() >= COMPRESSED_MIN_LENGTH) {
- // If data contains more thant the minimum data size
- frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL;
- if (currBytes.size() - 6 == frameBuilder.dataSize) {
- try {
- byte[] buf = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size());
- InputStream in = new InflaterInputStream(new ByteArrayInputStream(buf));
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
- int len;
- while ((len = in.read(buffer)) > 0) {
- out.write(buffer, 0, len);
- }
- in.close();
- out.close();
- unprocessedData = out.toByteArray();
- // buf is no longer needed
- buf = null;
- logger.debug("Finished decompressing data");
- // Decompression is complete, we should be able to proceed with resetting currBytes and curSrtate and iterating them
- // as type 'D' frames
- frameBuilder.dataSize(unprocessedData.length);
- currState = BeatsState.COMPLETE;
-
- } catch (IOException e) {
- throw new BeatsFrameException("Error decompressing frame: " + e.getMessage(), e);
- }
-
- }
- break;
- // If currentByte.size is not lower than six and also not equal or great than 6...
- } else { // Should never be here to be honest...
- if (logger.isDebugEnabled()) {
- logger.debug("Received a compressed frame with partial data or invalid content. The packet contents were {}", new Object[] {currBytes.toString()});
- }
- break;
- }
- case BeatsFrameType.JSON: // 'J́'
- // Because Beats can disable compression, sometimes, JSON data will be received outside a compressed
- // stream (i.e. 0x43). Instead of processing it here, we defer its processing to went getFrames is
- // called
- if (currBytes.size() < JSON_MIN_LENGTH) {
- if (logger.isTraceEnabled()) {
- logger.trace("Beats currBytes contents are {}", new Object[] {currBytes.toString()});
- }
- break;
- } else if (currBytes.size() == JSON_MIN_LENGTH) {
- // Read the sequence number from bytes
- frameBuilder.seqNumber = (int) (ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 6)).getInt() & 0x00000000ffffffffL);
- // Read the JSON payload length
- frameBuilder.dataSize = ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, 10)).getInt() & 0x00000000ffffffffL;
- } else if (currBytes.size() > JSON_MIN_LENGTH) {
- // Wait for payload to be fully read and then complete processing
- if (currBytes.size() - 10 == frameBuilder.dataSize) {
- // Transfer the current payload so it can be processed by {@link #splitCompressedFrames} method.
- frameBuilder.payload = java.util.Arrays.copyOfRange(currBytes.toByteArray(), 10, currBytes.size());
- currState = BeatsState.COMPLETE;
- }
- break;
- }
- }
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java
deleted file mode 100644
index 8463d48ed2b2..000000000000
--- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-/**
- * Encodes a BeatsFrame into raw bytes using the given charset.
- */
-public class BeatsEncoder {
-
-
- public byte[] encode(final BeatsFrame frame) {
- final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-
- // Writes the version
- buffer.write(frame.getVersion());
-
- // Writes the frameType
- buffer.write(frame.getFrameType());
-
- // Writes the sequence number
- try {
- buffer.write(frame.getPayload());
- } catch (IOException e) {
- throw new BeatsFrameException("Error decoding Beats frame: " + e.getMessage(), e);
- }
-
- return buffer.toByteArray();
- }
-
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java
deleted file mode 100644
index ccb3bba3660b..000000000000
--- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-
-/**
- * A frame received from a channel.
- */
-public class BeatsFrame {
-
- public static final byte DELIMITER = 10;
-
- private final byte version;
- private final byte frameType;
- private final byte[] payload;
- private final long dataSize;
- private final long seqNumber;
-
- private BeatsFrame(final Builder builder) {
- this.version = builder.version;
- this.frameType = builder.frameType;
- this.payload = builder.payload;
- this.dataSize = builder.dataSize;
- this.seqNumber = builder.seqNumber;
-
- if (version < 2 || payload.length < 0 ) {
- throw new BeatsFrameException("Invalid Frame");
- }
- }
-
- public long getSeqNumber() {
- return seqNumber;
- }
-
- public byte getVersion() {
- return version;
- }
-
- public byte getFrameType() {
- return frameType;
- }
-
- public byte [] getPayload() {
- return payload;
- }
-
- /**
- * Builder for a BeatsFrame.
- */
- public static class Builder {
-
- byte version;
- byte frameType;
- byte [] payload;
- long dataSize;
- int seqNumber;
-
- public Builder() {
- reset();
- }
-
- public void reset() {
- version = -1;
- seqNumber = -1;
- frameType = -1;
- payload = null;
- }
-
- public Builder version(final byte version) {
- this.version = version;
- return this;
- }
-
- public Builder seqNumber(final int seqNumber) {
- this.seqNumber = seqNumber;
- return this;
- }
-
- public Builder frameType(final byte frameType) {
- this.frameType = frameType;
- return this;
- }
-
- public Builder dataSize(final long dataSize) {
- this.dataSize = dataSize;
- return this;
- }
-
- public Builder payload(final byte [] payload) {
- this.payload = payload;
- return this;
- }
-
-
- public BeatsFrame build() {
- return new BeatsFrame(this);
- }
-
- }
-
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java
new file mode 100644
index 000000000000..c6204564316a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.protocol.Batch;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.protocol.MessageAck;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Batch Channel Inbound Handler processes a batch of messages and sends an acknowledgement for the last sequence number
+ */
+@ChannelHandler.Sharable
+public class BatchChannelInboundHandler extends SimpleChannelInboundHandler {
+ private final ComponentLog log;
+
+ private final BlockingQueue messages;
+
+ /**
+ * Batch Channel Inbound Handler with required arguments
+ *
+ * @param log Processor Log
+ * @param messages Queue of messages
+ */
+ public BatchChannelInboundHandler(final ComponentLog log, final BlockingQueue messages) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ this.messages = Objects.requireNonNull(messages, "Message Queue required");
+ }
+
+ /**
+ * Channel Read processes a batch of messages and sends an acknowledgement for the last sequence number
+ *
+ * @param context Channel Handler Context
+ * @param batch Batch of messages
+ */
+ @Override
+ protected void channelRead0(final ChannelHandlerContext context, final Batch batch) {
+ Integer lastSequenceNumber = null;
+
+ final Collection batchMessages = batch.getMessages();
+ int queued = 0;
+ for (final BatchMessage batchMessage : batchMessages) {
+ final int sequenceNumber = batchMessage.getSequenceNumber();
+ final String sender = batchMessage.getSender();
+ if (messages.offer(batchMessage)) {
+ log.debug("Message Sequence Number [{}] Sender [{}] queued", sequenceNumber, sender);
+ lastSequenceNumber = batchMessage.getSequenceNumber();
+ queued++;
+ } else {
+ log.warn("Message Sequence Number [{}] Sender [{}] queuing failed: Queued [{}] of [{}]", sequenceNumber, sender, queued, batchMessages.size());
+ break;
+ }
+ }
+
+ if (lastSequenceNumber == null) {
+ log.warn("Batch Messages [{}] queuing failed", batch.getMessages().size());
+ } else {
+ final MessageAck messageAck = new MessageAck(lastSequenceNumber);
+ context.writeAndFlush(messageAck);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java
new file mode 100644
index 000000000000..4aec512c2253
--- /dev/null
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.protocol.Batch;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.protocol.FrameType;
+import org.apache.nifi.processors.beats.protocol.FrameTypeDecoder;
+import org.apache.nifi.processors.beats.protocol.ProtocolCodeDecoder;
+import org.apache.nifi.processors.beats.protocol.ProtocolException;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersionDecoder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterOutputStream;
+
+/**
+ * Byte Buffer to Batch Decoder parses bytes to batches of Beats messages
+ */
+public class BatchDecoder extends ByteToMessageDecoder {
+ private static final int INITIAL_WINDOW_SIZE = 1;
+
+ private static final int INITIAL_QUEUE_SIZE = 1;
+
+ private static final int CODE_READABLE_BYTES = 1;
+
+ private static final int INT_READABLE_BYTES = 4;
+
+ private static final ProtocolCodeDecoder VERSION_DECODER = new ProtocolVersionDecoder();
+
+ private static final ProtocolCodeDecoder FRAME_TYPE_DECODER = new FrameTypeDecoder();
+
+ private final ComponentLog log;
+
+ private final AtomicReference versionRef = new AtomicReference<>();
+
+ private final AtomicReference frameTypeRef = new AtomicReference<>();
+
+ private final AtomicInteger windowSize = new AtomicInteger(INITIAL_WINDOW_SIZE);
+
+ private final AtomicReference sequenceNumberRef = new AtomicReference<>();
+
+ private final AtomicReference payloadSizeRef = new AtomicReference<>();
+
+ private final AtomicReference compressedSizeRef = new AtomicReference<>();
+
+ private Queue batchMessages = new ArrayBlockingQueue<>(INITIAL_QUEUE_SIZE);
+
+ /**
+ * Beats Batch Decoder with required arguments
+ *
+ * @param log Processor Log
+ */
+ public BatchDecoder(final ComponentLog log) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ }
+
+ /**
+ * Decode Batch of Beats Messages from Byte Buffer
+ *
+ * @param context Channel Handler Context
+ * @param buffer Byte Buffer
+ * @param objects List of Batch objects
+ */
+ @Override
+ protected void decode(final ChannelHandlerContext context, final ByteBuf buffer, final List