From 2e33e6e456839011ee6e0ef056be2c46465fd3cb Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Thu, 7 Jan 2021 10:15:44 -0500 Subject: [PATCH 1/2] NIFI-8120 Added RuntimeException handling on HttpContextMap.complete() --- .../standard/HandleHttpResponse.java | 10 +- .../standard/TestHandleHttpResponse.java | 171 ++++++++++-------- 2 files changed, 106 insertions(+), 75 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java index 41c6ecdec521..7f34ffd6bc8f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java @@ -65,8 +65,6 @@ @SeeAlso(value = {HandleHttpRequest.class}, classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"}) public class HandleHttpResponse extends AbstractProcessor { - public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+"); - public static final PropertyDescriptor STATUS_CODE = new PropertyDescriptor.Builder() .name("HTTP Status Code") .description("The HTTP Status Code to use when responding to the HTTP Request. See Section 10 of RFC 2616 for more information.") @@ -194,7 +192,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } catch (final ProcessException e) { session.transfer(flowFile, REL_FAILURE); getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e}); - contextMap.complete(contextIdentifier); + try { + contextMap.complete(contextIdentifier); + } catch (final RuntimeException ce) { + getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce}); + } return; } catch (final Exception e) { session.transfer(flowFile, REL_FAILURE); @@ -204,7 +206,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try { contextMap.complete(contextIdentifier); - } catch (final IllegalStateException ise) { + } catch (final RuntimeException ise) { getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ise}); session.transfer(flowFile, REL_FAILURE); return; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java index 02b1d1c8dff6..a2d6b2816aae 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java @@ -49,46 +49,52 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; public class TestHandleHttpResponse { + private static final String CONTEXT_MAP_ID = MockHttpContextMap.class.getSimpleName(); + + private static final String HTTP_REQUEST_ID = "HTTP-Request-Identifier"; + + private static final int HTTP_STATUS_CREATED = HttpServletResponse.SC_CREATED; + + private static final String FLOW_FILE_CONTENT = "TESTING"; + @Test public void testEnsureCompleted() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", ""); - runner.addControllerService("http-context-map", contextMap); + final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null); + runner.addControllerService(CONTEXT_MAP_ID, contextMap); runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); + runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID); runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); runner.setProperty("my-attr", "${my-attr}"); runner.setProperty("no-valid-attr", "${no-valid-attr}"); final Map attributes = new HashMap<>(); - attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id"); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID); attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test"); attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server"); attributes.put(HTTPUtils.HTTP_PORT, "8443"); attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client"); attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN"); attributes.put("my-attr", "hello"); - attributes.put("status.code", "201"); + attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED)); - runner.enqueue("hello".getBytes(), attributes); + runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes); runner.run(); runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1); - assertTrue(runner.getProvenanceEvents().size() == 1); + assertEquals(1, runner.getProvenanceEvents().size()); assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType()); assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri()); - assertEquals("hello", contextMap.baos.toString()); + assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString()); assertEquals("hello", contextMap.headersSent.get("my-attr")); assertNull(contextMap.headersSent.get("no-valid-attr")); - assertEquals(201, contextMap.statusCode); + assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode); assertEquals(1, contextMap.getCompletionCount()); assertTrue(contextMap.headersWithNoValue.isEmpty()); } @@ -97,15 +103,15 @@ public void testEnsureCompleted() throws InitializationException { public void testRegexHeaders() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", ""); - runner.addControllerService("http-context-map", contextMap); + final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null); + runner.addControllerService(CONTEXT_MAP_ID, contextMap); runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); + runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID); runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); runner.setProperty(HandleHttpResponse.ATTRIBUTES_AS_HEADERS_REGEX, "^(my.*)$"); final Map attributes = new HashMap<>(); - attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id"); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID); attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test"); attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server"); attributes.put(HTTPUtils.HTTP_PORT, "8443"); @@ -113,43 +119,43 @@ public void testRegexHeaders() throws InitializationException { attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN"); attributes.put("my-attr", "hello"); attributes.put("my-blank-attr", ""); - attributes.put("status.code", "201"); + attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED)); - runner.enqueue("hello".getBytes(), attributes); + runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes); runner.run(); runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1); - assertTrue(runner.getProvenanceEvents().size() == 1); + assertEquals(1, runner.getProvenanceEvents().size()); assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType()); assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri()); - assertEquals("hello", contextMap.baos.toString()); + assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString()); assertEquals("hello", contextMap.headersSent.get("my-attr")); assertNull(contextMap.headersSent.get("my-blank-attr")); - assertEquals(201, contextMap.statusCode); + assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode); assertEquals(1, contextMap.getCompletionCount()); assertTrue(contextMap.headersWithNoValue.isEmpty()); } @Test - public void testWithExceptionThrown() throws InitializationException { + public void testResponseFlowFileAccessException() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "FlowFileAccessException"); - runner.addControllerService("http-context-map", contextMap); + final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new FlowFileAccessException("Access Problem"), null); + runner.addControllerService(CONTEXT_MAP_ID, contextMap); runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); + runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID); runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); runner.setProperty("my-attr", "${my-attr}"); runner.setProperty("no-valid-attr", "${no-valid-attr}"); final Map attributes = new HashMap<>(); - attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id"); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID); attributes.put("my-attr", "hello"); - attributes.put("status.code", "201"); + attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED)); - runner.enqueue("hello".getBytes(), attributes); + runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes); runner.run(); @@ -158,23 +164,23 @@ public void testWithExceptionThrown() throws InitializationException { } @Test - public void testCannotWriteResponse() throws InitializationException { + public void testResponseProcessException() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "ProcessException"); - runner.addControllerService("http-context-map", contextMap); + final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), null); + runner.addControllerService(CONTEXT_MAP_ID, contextMap); runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); + runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID); runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); runner.setProperty("my-attr", "${my-attr}"); runner.setProperty("no-valid-attr", "${no-valid-attr}"); final Map attributes = new HashMap<>(); - attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id"); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID); attributes.put("my-attr", "hello"); - attributes.put("status.code", "201"); + attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED)); - runner.enqueue("hello".getBytes(), attributes); + runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes); runner.run(); @@ -182,21 +188,46 @@ public void testCannotWriteResponse() throws InitializationException { assertEquals(1, contextMap.getCompletionCount()); } + @Test + public void testResponseProcessExceptionThenIllegalStateException() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); + + final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), new IllegalStateException()); + runner.addControllerService(CONTEXT_MAP_ID, contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID); + runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); + runner.setProperty("my-attr", "${my-attr}"); + runner.setProperty("no-valid-attr", "${no-valid-attr}"); + + final Map attributes = new HashMap<>(); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID); + attributes.put("my-attr", "hello"); + attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED)); + + runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes); + + runner.run(); + + runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1); + assertEquals(0, contextMap.getCompletionCount()); + } + @Test public void testStatusCodeEmpty() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class); - final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", ""); - runner.addControllerService("http-context-map", contextMap); + final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null); + runner.addControllerService(CONTEXT_MAP_ID, contextMap); runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map"); + runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID); runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}"); final Map attributes = new HashMap<>(); - attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id"); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID); attributes.put("my-attr", "hello"); - runner.enqueue("hello".getBytes(), attributes); + runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes); runner.run(); @@ -208,16 +239,18 @@ private static class MockHttpContextMap extends AbstractControllerService implem private final String id; private final AtomicInteger completedCount = new AtomicInteger(0); - private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); private final ConcurrentMap headersSent = new ConcurrentHashMap<>(); - private final String shouldThrowExceptionClass; + private final Exception responseException; + private final RuntimeException completeException; private volatile int statusCode = -1; private final List headersWithNoValue = new CopyOnWriteArrayList<>(); - public MockHttpContextMap(final String expectedIdentifier, final String shouldThrowExceptionClass) { + public MockHttpContextMap(final String expectedIdentifier, final Exception responseException, final RuntimeException completeException) { this.id = expectedIdentifier; - this.shouldThrowExceptionClass = shouldThrowExceptionClass; + this.responseException = responseException; + this.completeException = completeException; } @Override @@ -233,11 +266,7 @@ public HttpServletResponse getResponse(final String identifier) { try { final HttpServletResponse response = Mockito.mock(HttpServletResponse.class); - if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("FlowFileAccessException")) { - Mockito.when(response.getOutputStream()).thenThrow(new FlowFileAccessException("exception")); - } else if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("ProcessException")) { - Mockito.when(response.getOutputStream()).thenThrow(new ProcessException("exception")); - } else { + if (responseException == null) { Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() { @Override public boolean isReady() { @@ -249,43 +278,39 @@ public void setWriteListener(WriteListener writeListener) { } @Override - public void write(int b) throws IOException { - baos.write(b); + public void write(int b) { + outputStream.write(b); } @Override public void write(byte[] b) throws IOException { - baos.write(b); + outputStream.write(b); } @Override - public void write(byte[] b, int off, int len) throws IOException { - baos.write(b, off, len); + public void write(byte[] b, int off, int len) { + outputStream.write(b, off, len); } }); + } else { + Mockito.when(response.getOutputStream()).thenThrow(responseException); } - Mockito.doAnswer(new Answer() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - final String key = invocation.getArgument(0); - final String value = invocation.getArgument(1); - if (value == null) { - headersWithNoValue.add(key); - } else { - headersSent.put(key, value); - } - - return null; + Mockito.doAnswer(invocation -> { + final String key = invocation.getArgument(0); + final String value = invocation.getArgument(1); + if (value == null) { + headersWithNoValue.add(key); + } else { + headersSent.put(key, value); } + + return null; }).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class)); - Mockito.doAnswer(new Answer() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - statusCode = invocation.getArgument(0); - return null; - } + Mockito.doAnswer(invocation -> { + statusCode = invocation.getArgument(0); + return null; }).when(response).setStatus(Mockito.anyInt()); return response; @@ -302,6 +327,10 @@ public void complete(final String identifier) { Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier); } + if (completeException != null) { + throw completeException; + } + completedCount.incrementAndGet(); } From 9af2719f70c78ce1ff66cf3b8bd92dea0f54f483 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Fri, 8 Jan 2021 10:35:05 -0500 Subject: [PATCH 2/2] NIFI-8120 Renamed exception variable and reordered log statements --- .../processors/standard/HandleHttpResponse.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java index 7f34ffd6bc8f..a0d3f4f6b19b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java @@ -134,25 +134,25 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID); if (contextIdentifier == null) { - session.transfer(flowFile, REL_FAILURE); getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an '" + HTTPUtils.HTTP_CONTEXT_ID + "' attribute", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); return; } final String statusCodeValue = context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue(); if (!isNumber(statusCodeValue)) { - session.transfer(flowFile, REL_FAILURE); getLogger().error("Failed to respond to HTTP request for {} because status code was '{}', which is not a valid number", new Object[]{flowFile, statusCodeValue}); + session.transfer(flowFile, REL_FAILURE); return; } final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class); final HttpServletResponse response = contextMap.getResponse(contextIdentifier); if (response == null) { - session.transfer(flowFile, REL_FAILURE); getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' attribute of {} but could not find an HTTP Response Object for this identifier", new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier}); + session.transfer(flowFile, REL_FAILURE); return; } @@ -190,31 +190,31 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.exportTo(flowFile, response.getOutputStream()); response.flushBuffer(); } catch (final ProcessException e) { - session.transfer(flowFile, REL_FAILURE); getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e}); try { contextMap.complete(contextIdentifier); } catch (final RuntimeException ce) { getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce}); } + session.transfer(flowFile, REL_FAILURE); return; } catch (final Exception e) { - session.transfer(flowFile, REL_FAILURE); getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); return; } try { contextMap.complete(contextIdentifier); - } catch (final RuntimeException ise) { - getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ise}); + } catch (final RuntimeException ce) { + getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce}); session.transfer(flowFile, REL_FAILURE); return; } session.getProvenanceReporter().send(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[]{flowFile, statusCode}); + session.transfer(flowFile, REL_SUCCESS); } private static boolean isNumber(final String value) {