From 2e38cf9d1b5d53adaabb4d3cd42f5f03fef58ff2 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 7 Mar 2016 17:21:11 -0500 Subject: [PATCH] NIFI-1599 Changing DatagramChannelDispatcher, socket handlers, and RELP handler to use offer() when queueing instead of put(), and log an error if the offer failed --- .../dispatcher/DatagramChannelDispatcher.java | 8 +-- .../util/listen/event/EventQueue.java | 67 +++++++++++++++++++ .../util/listen/handler/ChannelHandler.java | 5 +- .../socket/SSLSocketChannelHandler.java | 4 +- .../socket/StandardSocketChannelHandler.java | 4 +- .../relp/handler/RELPFrameHandler.java | 14 ++-- .../handler/RELPSSLSocketChannelHandler.java | 2 +- .../handler/RELPSocketChannelHandler.java | 2 +- .../relp/handler/TestRELPFrameHandler.java | 5 +- 9 files changed, 91 insertions(+), 20 deletions(-) create mode 100644 nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java index e362e1592352..e72f5f8d19e8 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java @@ -21,6 +21,7 @@ import org.apache.nifi.processor.util.listen.event.Event; import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.event.EventQueue; import java.io.IOException; import java.net.InetSocketAddress; @@ -42,7 +43,7 @@ public class DatagramChannelDispatcher> impleme private final EventFactory eventFactory; private final BlockingQueue bufferPool; - private final BlockingQueue events; + private final EventQueue events; private final ProcessorLog logger; private Selector selector; @@ -55,8 +56,8 @@ public DatagramChannelDispatcher(final EventFactory eventFactory, final ProcessorLog logger) { this.eventFactory = eventFactory; this.bufferPool = bufferPool; - this.events = events; this.logger = logger; + this.events = new EventQueue<>(events, logger); if (bufferPool == null || bufferPool.size() == 0) { throw new IllegalArgumentException("A pool of available ByteBuffers is required"); @@ -115,9 +116,8 @@ public void run() { final Map metadata = EventFactoryUtil.createMapWithSender(sender); final E event = eventFactory.create(bytes, metadata, null); + events.offer(event); - // queue the raw message with the sender, block until space is available - events.put(event); buffer.clear(); } } diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java new file mode 100644 index 000000000000..4afaf40e8a45 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.logging.ProcessorLog; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Wraps a BlockingQueue to centralize logic for offering events across UDP, TCP, and SSL. + * + * @param the type of event + */ +public class EventQueue { + + /** + * The default number of milliseconds to wait when offering new events to the queue. + */ + public static final long DEFAULT_OFFER_WAIT_MS = 100; + + private final long offerWaitMs; + private final BlockingQueue events; + private final ProcessorLog logger; + + public EventQueue(final BlockingQueue events, final ProcessorLog logger) { + this(events, DEFAULT_OFFER_WAIT_MS, logger); + } + + public EventQueue(final BlockingQueue events, final long offerWaitMs, final ProcessorLog logger) { + this.events = events; + this.offerWaitMs = offerWaitMs; + this.logger = logger; + Validate.notNull(this.events); + Validate.notNull(this.logger); + } + + /** + * Offers the given event to the events queue with a wait time, if the offer fails the event + * is dropped an error is logged. + * + * @param event the event to offer + * @throws InterruptedException if interrupted while waiting to offer + */ + public void offer(final E event) throws InterruptedException { + boolean queued = events.offer(event, offerWaitMs, TimeUnit.MILLISECONDS); + if (!queued) { + logger.error("Internal queue at maximum capacity, could not queue event"); + } + } + +} diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java index ab4346f9835a..6307c7635aa7 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java @@ -20,6 +20,7 @@ import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventQueue; import java.nio.channels.SelectionKey; import java.nio.charset.Charset; @@ -34,7 +35,7 @@ public abstract class ChannelHandler eventFactory; - protected final BlockingQueue events; + protected final EventQueue events; protected final ProcessorLog logger; @@ -48,8 +49,8 @@ public ChannelHandler(final SelectionKey key, this.dispatcher = dispatcher; this.charset = charset; this.eventFactory = eventFactory; - this.events = events; this.logger = logger; + this.events = new EventQueue(events, logger); } } diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java index 460ef08b2f54..d11cb02a90de 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java @@ -135,10 +135,8 @@ protected void processBuffer(final SSLSocketChannel sslSocketChannel, final Sock if (currBytes.size() > 0) { final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); final Map metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - - // queue the raw event blocking until space is available, reset the temporary buffer final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.put(event); + events.offer(event); currBytes.reset(); } } else { diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java index e2fd3a84de30..94e41eba052a 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java @@ -137,10 +137,8 @@ protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer if (currBytes.size() > 0) { final SocketChannelResponder response = new SocketChannelResponder(socketChannel); final Map metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - - // queue the raw event blocking until space is available, reset the buffer final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.put(event); + events.offer(event); currBytes.reset(); // Mark this as the start of the next message diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java index 5af316e55162..7b879f3afa1a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java @@ -16,10 +16,12 @@ */ package org.apache.nifi.processors.standard.relp.handler; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; import org.apache.nifi.processor.util.listen.event.Event; import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.event.EventQueue; import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.processor.util.listen.response.ChannelResponse; import org.apache.nifi.processors.standard.relp.event.RELPMetadata; @@ -45,21 +47,24 @@ public class RELPFrameHandler> { private final Charset charset; private final EventFactory eventFactory; - private final BlockingQueue events; + private final EventQueue events; private final SelectionKey key; private final AsyncChannelDispatcher dispatcher; + private final ProcessorLog logger; private final RELPEncoder encoder; public RELPFrameHandler(final SelectionKey selectionKey, final Charset charset, final EventFactory eventFactory, final BlockingQueue events, - final AsyncChannelDispatcher dispatcher) { + final AsyncChannelDispatcher dispatcher, + final ProcessorLog logger) { this.key = selectionKey; this.charset = charset; this.eventFactory = eventFactory; - this.events = events; this.dispatcher = dispatcher; + this.logger = logger; + this.events = new EventQueue<>(events, logger); this.encoder = new RELPEncoder(charset); } @@ -82,9 +87,8 @@ public void handle(final RELPFrame frame, final ChannelResponder metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(frame.getTxnr())); metadata.put(RELPMetadata.COMMAND_KEY, frame.getCommand()); - // queue the raw event blocking until space is available, reset the buffer final E event = eventFactory.create(frame.getData(), metadata, responder); - events.put(event); + events.offer(event); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java index 8bbd11638fc8..30697ed8d654 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java @@ -50,7 +50,7 @@ public RELPSSLSocketChannelHandler(final SelectionKey key, final ProcessorLog logger) { super(key, dispatcher, charset, eventFactory, events, logger); this.decoder = new RELPDecoder(charset); - this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java index 0bb81850fc56..e3e84cfd19f7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java @@ -50,7 +50,7 @@ public RELPSocketChannelHandler(final SelectionKey key, final ProcessorLog logger) { super(key, dispatcher, charset, eventFactory, events, logger); this.decoder = new RELPDecoder(charset); - this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java index 38b0572ef492..892fb8e1f663 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard.relp.handler; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.response.ChannelResponder; @@ -45,6 +46,7 @@ public class TestRELPFrameHandler { private BlockingQueue events; private SelectionKey key; private AsyncChannelDispatcher dispatcher; + private ProcessorLog logger; private RELPFrameHandler frameHandler; @@ -55,8 +57,9 @@ public void setup() { this.events = new LinkedBlockingQueue<>(); this.key = Mockito.mock(SelectionKey.class); this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class); + this.logger = Mockito.mock(ProcessorLog.class); - this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); } @Test