From 6509d9b236e9cf756618f52d971e95e31b4dcc83 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 7 Feb 2017 14:23:57 +0100 Subject: [PATCH] NIFI-3447 - PutSplunk - force connection close --- .../handler/socket/SSLSocketChannelHandler.java | 2 +- .../handler/socket/StandardSocketChannelHandler.java | 2 +- .../nifi/processor/util/put/sender/ChannelSender.java | 11 +++++++++-- .../org/apache/nifi/processors/splunk/GetSplunk.java | 2 +- .../org/apache/nifi/processors/splunk/PutSplunk.java | 4 ++-- .../apache/nifi/processors/splunk/TestGetSplunk.java | 2 +- 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java index 951dc4b704a5..ef747e12bdfe 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java @@ -25,8 +25,8 @@ import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.stream.io.ByteArrayOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.SocketTimeoutException; diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java index 1aac860de7f1..250168c40d3c 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java @@ -24,8 +24,8 @@ import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; -import org.apache.nifi.stream.io.ByteArrayOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java index d2daeb941dec..278a9ab7e015 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java @@ -80,8 +80,15 @@ public void send(final String message, final Charset charset) throws IOException * @throws IOException if there was an error communicating over the channel */ public void send(final byte[] data) throws IOException { - write(data); - lastUsed = System.currentTimeMillis(); + try { + write(data); + lastUsed = System.currentTimeMillis(); + } catch (IOException e) { + // failed to send data over the channel, we close it to force + // the creation of a new one next time + close(); + throw e; + } } /** diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java index 4919e61a2c64..956a6538930f 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java @@ -49,8 +49,8 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.BufferedOutputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 56d3e2603743..57ea812aaef1 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -34,13 +34,13 @@ import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; import org.apache.nifi.processor.util.put.sender.ChannelSender; import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import javax.net.ssl.SSLContext; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java index 5ad78813f9be..b64ca6a672ff 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java @@ -23,7 +23,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; -import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -34,6 +33,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.Mockito; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.ParseException;