From 13523bd08a07534d130987cbe5cd612a55be7c9a Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Wed, 6 Apr 2016 15:09:39 -0400 Subject: [PATCH] NIFI-1630 Making TestPutUDP select an open port for testing to avoid conflicts --- .../nifi/processors/standard/TestPutUDP.java | 54 ++++++++----------- .../processors/standard/UDPTestServer.java | 4 ++ 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java index d35b7ea1719e..52aebe6fa207 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java @@ -34,12 +34,6 @@ public class TestPutUDP { private final static String UDP_SERVER_ADDRESS = "127.0.0.1"; private final static String UNKNOWN_HOST = "fgdsfgsdffd"; private final static String INVALID_IP_ADDRESS = "300.300.300.300"; - private final static int UDP_SERVER_PORT = 54674; - private final static int UDP_SERVER_PORT_ALT = 54675; - private final static int MIN_INVALID_PORT = 0; - private final static int MIN_VALID_PORT = 1; - private final static int MAX_VALID_PORT = 65535; - private final static int MAX_INVALID_PORT = 65536; private final static int BUFFER_SIZE = 1024; private final static int VALID_LARGE_FILE_SIZE = 32768; private final static int VALID_SMALL_FILE_SIZE = 64; @@ -63,7 +57,7 @@ public class TestPutUDP { @Before public void setup() throws Exception { - createTestServer(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, BUFFER_SIZE); + createTestServer(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE); runner = TestRunners.newTestRunner(PutUDP.class); } @@ -98,7 +92,7 @@ private byte[] getPacketData(final DatagramPacket packet) { @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testValidFiles() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + configureProperties(UDP_SERVER_ADDRESS, true); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); checkInputQueueIsEmpty(); @@ -106,7 +100,7 @@ public void testValidFiles() throws Exception { @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testEmptyFile() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + configureProperties(UDP_SERVER_ADDRESS, true); sendTestData(EMPTY_FILE); checkRelationships(EMPTY_FILE.length, 0); checkNoDataReceived(); @@ -115,7 +109,7 @@ public void testEmptyFile() throws Exception { @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testlargeValidFile() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + configureProperties(UDP_SERVER_ADDRESS, true); final String[] testData = createContent(VALID_LARGE_FILE_SIZE); sendTestData(testData); checkReceivedAllData(testData); @@ -124,7 +118,7 @@ public void testlargeValidFile() throws Exception { @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) public void testlargeInvalidFile() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + configureProperties(UDP_SERVER_ADDRESS, true); String[] testData = createContent(INVALID_LARGE_FILE_SIZE); sendTestData(testData); checkRelationships(0, testData.length); @@ -140,7 +134,7 @@ public void testlargeInvalidFile() throws Exception { @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) public void testInvalidIPAddress() throws Exception { - configureProperties(INVALID_IP_ADDRESS, UDP_SERVER_PORT, true); + configureProperties(INVALID_IP_ADDRESS, true); sendTestData(VALID_FILES); checkNoDataReceived(); checkRelationships(0, VALID_FILES.length); @@ -149,32 +143,24 @@ public void testInvalidIPAddress() throws Exception { @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) public void testUnknownHostname() throws Exception { - configureProperties(UNKNOWN_HOST, UDP_SERVER_PORT, true); + configureProperties(UNKNOWN_HOST, true); sendTestData(VALID_FILES); checkNoDataReceived(); checkRelationships(0, VALID_FILES.length); checkInputQueueIsEmpty(); } - @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) - public void testInvalidPort() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, MIN_INVALID_PORT, false); - configureProperties(UDP_SERVER_ADDRESS, MIN_VALID_PORT, true); - configureProperties(UDP_SERVER_ADDRESS, MAX_VALID_PORT, true); - configureProperties(UDP_SERVER_ADDRESS, MAX_INVALID_PORT, false); - } - @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testReconfiguration() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + configureProperties(UDP_SERVER_ADDRESS, true); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); - reset(UDP_SERVER_ADDRESS, UDP_SERVER_PORT_ALT, BUFFER_SIZE); - configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT_ALT, true); + reset(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE); + configureProperties(UDP_SERVER_ADDRESS, true); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); - reset(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, BUFFER_SIZE); - configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + reset(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE); + configureProperties(UDP_SERVER_ADDRESS, true); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); checkInputQueueIsEmpty(); @@ -183,7 +169,7 @@ public void testReconfiguration() throws Exception { @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) public void testLoadTest() throws Exception { final String[] testData = createContent(VALID_SMALL_FILE_SIZE); - configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + configureProperties(UDP_SERVER_ADDRESS, true); sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT); checkReceivedAllData(testData, LOAD_TEST_ITERATIONS); checkInputQueueIsEmpty(); @@ -195,9 +181,9 @@ private void reset(final String address, final int port, final int recvQueueSize createTestServer(address, port, recvQueueSize); } - private void configureProperties(final String host, final int port, final boolean expectValid) { + private void configureProperties(final String host, final boolean expectValid) { runner.setProperty(PutUDP.HOSTNAME, host); - runner.setProperty(PutUDP.PORT, Integer.toString(port)); + runner.setProperty(PutUDP.PORT, Integer.toString(server.getLocalPort())); if (expectValid) { runner.assertValid(); } else { @@ -205,18 +191,22 @@ private void configureProperties(final String host, final int port, final boolea } } - private void sendTestData(final String[] testData) { + private void sendTestData(final String[] testData) throws InterruptedException { sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT); } - private void sendTestData(final String[] testData, final int iterations, final int threadCount) { + private void sendTestData(final String[] testData, final int iterations, final int threadCount) throws InterruptedException { for (String item : testData) { runner.setThreadCount(threadCount); for (int i = 0; i < iterations; i++) { runner.enqueue(item.getBytes()); + runner.run(1, false); + Thread.sleep(1); } - runner.run(iterations); } + + // ensure @OnStopped methods get called + runner.run(); } private void checkRelationships(final int successCount, final int failedCount) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/UDPTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/UDPTestServer.java index 283568856ec1..8caa02827e47 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/UDPTestServer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/UDPTestServer.java @@ -68,6 +68,10 @@ public DatagramPacket getReceivedPacket() { return recvQueue.poll(); } + public int getLocalPort() { + return serverSocket == null ? 0 : serverSocket.getLocalPort(); + } + @Override public void run() { try {