From 3471cc4d021a173fc3a955e3cd0a2b4c409aff9b Mon Sep 17 00:00:00 2001 From: Y Wikander Date: Tue, 19 Sep 2017 03:06:38 +0900 Subject: [PATCH 1/3] NIFI-4169: websocket; PutWebSocket; enhance broadcast support --- .../processors/websocket/PutWebSocket.java | 251 ++++++++-- .../WebSocketProcessorAttributes.java | 1 + .../websocket/TestPutWebSocket.java | 472 ++++++++++++++++++ .../websocket/AbstractWebSocketService.java | 5 + .../websocket/WebSocketMessageRouter.java | 24 +- .../websocket/WebSocketMessageRouters.java | 5 + .../nifi/websocket/WebSocketService.java | 2 + 7 files changed, 701 insertions(+), 59 deletions(-) diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java index 0cb698dd88fa..140f022f57a1 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java @@ -24,6 +24,7 @@ import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_BROADCAST; import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME; import java.io.IOException; @@ -44,6 +45,8 @@ import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -66,14 +69,30 @@ @TriggerSerially @CapabilityDescription("Sends messages to a WebSocket remote endpoint" + " using a WebSocket session that is established by either ListenWebSocket or ConnectWebSocket.") +@ReadsAttributes({ + @ReadsAttribute(attribute = ATTR_WS_BROADCAST, description = "Broadcast related. Defined when the flowfile was forked due to a send failure to the WebSocket session id. " + + "Used to control success behavior; on success, this causes the flowfile to be dropped. Since this is a forked flowfile, the parent " + + "flowfile already transfered to success (because at least one client received the broadcast message). Transfering any forked flowfile to " + + "success would effectivly duplicate success for the same data."), +}) @WritesAttributes({ @WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket Controller Service id."), @WritesAttribute(attribute = ATTR_WS_SESSION_ID, description = "Established WebSocket session id."), @WritesAttribute(attribute = ATTR_WS_ENDPOINT_ID, description = "WebSocket endpoint id."), @WritesAttribute(attribute = ATTR_WS_MESSAGE_TYPE, description = "TEXT or BINARY."), - @WritesAttribute(attribute = ATTR_WS_LOCAL_ADDRESS, description = "WebSocket server address."), - @WritesAttribute(attribute = ATTR_WS_REMOTE_ADDRESS, description = "WebSocket client address."), + @WritesAttribute(attribute = ATTR_WS_LOCAL_ADDRESS, description = "WebSocket server address. If the message is sent to more than one client (e.g. broadcast) " + + "then it will be a comma separated list of addresses; where any address that could not be sent to on the first try will end with an '*'. " + + "This maintains an audit trail of addresses; because if the client's message is actually sent to the client later, that (forked) flowfile is dropped " + + "(it does not transfer to success). Only the parent flowfile transfers to success."), + @WritesAttribute(attribute = ATTR_WS_REMOTE_ADDRESS, description = "WebSocket client address. If the message is sent to more than one client (e.g. broadcast) " + + "then it will be a comma separated list of addresses; where any address that could not be sent to on the first try will end with an '*'. " + + "This maintains an audit trail of addresses; because if the client's message is actually sent to the client later, that (forked) flowfile is dropped " + + "(it does not transfer to success). Only the parent flowfile transfers to success."), @WritesAttribute(attribute = ATTR_WS_FAILURE_DETAIL, description = "Detail of the failure."), + @WritesAttribute(attribute = ATTR_WS_BROADCAST, description = "Broadcast related. Defined when the flowfile was forked due to a send failure to the WebSocket session id. " + + "Used to control success behavior; on success, this causes the flowfile to be dropped. Since this is a forked flowfile, the parent " + + "flowfile already transfered to success (because at least one client received the broadcast message). Transfering any forked flowfile to " + + "success would effectivly duplicate success for the same data."), }) @SystemResourceConsideration(resource = SystemResource.MEMORY) public class PutWebSocket extends AbstractProcessor { @@ -82,8 +101,17 @@ public class PutWebSocket extends AbstractProcessor { .name("websocket-session-id") .displayName("WebSocket Session Id") .description("A NiFi Expression to retrieve the session id. If not specified, a message will be " + - "sent to all connected WebSocket peers for the WebSocket controller service endpoint.") - .required(true) + "sent (e.g. broadcast) to all connected WebSocket clients for the WebSocket controller service id and endpoint. " + + "Under the condition where some clients can be sent to and others cannot, the flowfile will transfer to success " + + "and each failed client will get a forked copy of the flowfile with ${" + ATTR_WS_SESSION_ID + "} set (to the client's session id) and " + + "${" + ATTR_WS_BROADCAST + "} defined. Then it is routed to failure; such that if failure is routed back " + + "into this processor it can be retried (as a non-broadcast message). For this reason, using a value other than the default can " + + "interfere with forked message processing. To handle this potential issue, any message that does not have a session id but " + + "has ${" + ATTR_WS_BROADCAST + "} defined will be dropped (so that it is not broadcast to all clients). " + + "If this processor is being used in a broadcast capacity and flowfiles come from a processor that adds the ${" + ATTR_WS_SESSION_ID + "} " + + "attribute (ex. ListenWebSocket), the ${" + ATTR_WS_SESSION_ID + "} attribute needs to be removed from the flowfile before it gets to this " + + "processor. Another option is to get creative with Expression Language referencing ${" + ATTR_WS_SESSION_ID + "} and ${" + ATTR_WS_BROADCAST + "}.") + .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue("${" + ATTR_WS_SESSION_ID + "}") @@ -92,7 +120,15 @@ public class PutWebSocket extends AbstractProcessor { public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ID = new PropertyDescriptor.Builder() .name("websocket-controller-service-id") .displayName("WebSocket ControllerService Id") - .description("A NiFi Expression to retrieve the id of a WebSocket ControllerService.") + .description("A NiFi Expression to retrieve the id of a WebSocket ControllerService. " + + "Setting this value to the Id of a ConnectWebSocket processor allows this processor to send messages via the ConnectWebSocket's " + + "WebSocket controller service id and endpoint; as well as the session id of the client it is connected to (at the time the message " + + "is sent). Under this configuration, do not set this processor's WebSocket Session Id (this allows the session id to be obtained from " + + "the referenced ConnectWebSocket processor). Similarly, if the flowfile comes from a processor that writes this value to an attribute " + + "(ex. ListenWebSocket) it can be used; and, if this processor's WebSocket Session Id is not set then this processor will send the " + + "message to every client connected to it (at the time the message is sent). The creation of forked copies of the flowfile " + + "can only occur when more than one client is in the list of clients (to send to)." + + "See also WebSocket Endpoint Id.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @@ -102,7 +138,9 @@ public class PutWebSocket extends AbstractProcessor { public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ENDPOINT = new PropertyDescriptor.Builder() .name("websocket-endpoint-id") .displayName("WebSocket Endpoint Id") - .description("A NiFi Expression to retrieve the endpoint id of a WebSocket ControllerService.") + .description("A NiFi Expression to retrieve the endpoint id of a WebSocket ControllerService. " + + "Setting this value to a ConnectWebSocket processor's WebSocket Endpoint Id value allows this processor to send messages via the " + + "ConnectWebSocket's connection. See also WebSocket ControllerService Id.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @@ -127,6 +165,10 @@ public class PutWebSocket extends AbstractProcessor { .name("failure") .description("FlowFiles that failed to send to the destination are transferred to this relationship.") .build(); + public static final Relationship REL_SESSION_UNKNOWN = new Relationship.Builder() + .name("session unknown") + .description("FlowFiles that failed to send to the destination because the session id does not exist are transferred to this relationship.") + .build(); private static final List descriptors; private static final Set relationships; @@ -142,9 +184,21 @@ public class PutWebSocket extends AbstractProcessor { final Set innerRelationshipsSet = new HashSet<>(); innerRelationshipsSet.add(REL_SUCCESS); innerRelationshipsSet.add(REL_FAILURE); + innerRelationshipsSet.add(REL_SESSION_UNKNOWN); relationships = Collections.unmodifiableSet(innerRelationshipsSet); } + @SuppressWarnings("unused") + private class sessionIdInfo { + String sessionId; + long startSending; + long stopSending; + final AtomicReference transitUri = new AtomicReference<>(); + final Map attrs = new HashMap<>(); + boolean success = false; + Exception e = null; + } + @Override public Set getRelationships() { return relationships; @@ -162,7 +216,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession process return; } - final String sessionId = context.getProperty(PROP_WS_SESSION_ID) + final List sessionIdInfoList = new ArrayList(); + + final String sessionIdProperty = context.getProperty(PROP_WS_SESSION_ID) .evaluateAttributeExpressions(flowfile).getValue(); final String webSocketServiceId = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ID) .evaluateAttributeExpressions(flowfile).getValue(); @@ -171,8 +227,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession process final String messageTypeStr = context.getProperty(PROP_WS_MESSAGE_TYPE) .evaluateAttributeExpressions(flowfile).getValue(); final WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf(messageTypeStr); + final boolean flowfileIsBroadcast = (null != flowfile.getAttribute(ATTR_WS_BROADCAST)); - if (StringUtils.isEmpty(sessionId)) { + if (StringUtils.isEmpty(sessionIdProperty)) { getLogger().debug("Specific SessionID not specified. Message will be broadcast to all connected clients."); } @@ -196,52 +253,156 @@ public void onTrigger(final ProcessContext context, final ProcessSession process final byte[] messageContent = new byte[(int) flowfile.getSize()]; final long startSending = System.currentTimeMillis(); - final AtomicReference transitUri = new AtomicReference<>(); - final Map attrs = new HashMap<>(); - attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier()); - - if (!StringUtils.isEmpty(sessionId)) { - attrs.put(ATTR_WS_SESSION_ID, sessionId); - } - - attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint); - attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr); - processSession.read(flowfile, in -> { StreamUtils.fillBuffer(in, messageContent, true); }); - try { - - webSocketService.sendMessage(webSocketServiceEndpoint, sessionId, sender -> { - switch (messageType) { - case TEXT: - sender.sendString(new String(messageContent, CHARSET_NAME)); - break; - case BINARY: - sender.sendBinary(ByteBuffer.wrap(messageContent)); - break; - } + final boolean broadcastMessage = StringUtils.isEmpty(sessionIdProperty); - attrs.put(ATTR_WS_LOCAL_ADDRESS, sender.getLocalAddress().toString()); - attrs.put(ATTR_WS_REMOTE_ADDRESS, sender.getRemoteAddress().toString()); - transitUri.set(sender.getTransitUri()); - }); - - final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs); - final long transmissionMillis = System.currentTimeMillis() - startSending; - processSession.getProvenanceReporter().send(updatedFlowFile, transitUri.get(), transmissionMillis); - - processSession.transfer(updatedFlowFile, REL_SUCCESS); + if (broadcastMessage && flowfileIsBroadcast) { + // message is a broadcast (no websocket.session.id) *and* id'd as a FORKed message (websocket.broadcast) - which isn't good. + // Sending it on would cause a duplicate, so just drop it. + final FlowFile updatedFlowFile = processSession.penalize(processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, + "No session id; message was FORKed from a broadcast message")); + processSession.remove(updatedFlowFile); // terminate relationship + return; + } - } catch (WebSocketConfigurationException|IllegalStateException|IOException e) { + final List sessionIds = new ArrayList(); + final HashSet currentSessionIds = new HashSet(); + WebSocketConfigurationException currentSessionIdsEx = null; + try { + currentSessionIds.addAll(webSocketService.getSessionIds(webSocketServiceEndpoint)); + } catch (WebSocketConfigurationException e) { // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped. - // IllegalStateException: Session is already closed or not found. - // IOException: other IO error. - getLogger().error("Failed to send message via WebSocket due to " + e, e); - transferToFailure(processSession, flowfile, e.toString()); + currentSessionIdsEx = e; + } + + if (broadcastMessage) { + if (currentSessionIdsEx != null) { + getLogger().error("Failed to obtain list of SessionIds via WebSocket due to " + currentSessionIdsEx, currentSessionIdsEx); + transferToFailure(processSession, flowfile, currentSessionIdsEx.toString()); + return; + } + else if (currentSessionIds.isEmpty()) { + getLogger().error("Failed to obtain list of SessionIds via WebSocket due to no Sessions connected"); + transferToFailure(processSession, flowfile, "No connected WebSocket Sessions"); + return; + } + sessionIds.addAll(currentSessionIds); + } + else { + if (currentSessionIds.contains(sessionIdProperty)) { + sessionIds.add(sessionIdProperty); + } + else { + final FlowFile updatedFlowFile = processSession.penalize(processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, "No such session id")); + if( flowfileIsBroadcast ) { + // flowfile was created earlier by the below FORK; don't need it anymore. Sending it to SESSION UNKNOWN would cause a duplicate. + processSession.remove(updatedFlowFile); // terminate relationship + } + else { + processSession.transfer(updatedFlowFile, REL_SESSION_UNKNOWN); + } + return; + } } + final HashSet ws_local_addresses = new HashSet(); + final HashSet ws_remote_addresses = new HashSet(); + Integer sessionCount = 0, successCount = 0, failureCount = 0; + for (String sessionId : sessionIds) { + sessionCount ++; + boolean sendSuccess = false; + final sessionIdInfo sessionIdInfoEntry = new sessionIdInfo(); + sessionIdInfoEntry.sessionId = sessionId; + sessionIdInfoEntry.attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier()); + sessionIdInfoEntry.attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint); + sessionIdInfoEntry.attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr); + if (!broadcastMessage) { + sessionIdInfoEntry.attrs.put(ATTR_WS_SESSION_ID, sessionId); + } + sessionIdInfoEntry.startSending = System.currentTimeMillis(); + try { + webSocketService.sendMessage(webSocketServiceEndpoint, sessionId, sender -> { + switch (messageType) { + case TEXT: + sender.sendString(new String(messageContent, CHARSET_NAME)); + break; + case BINARY: + sender.sendBinary(ByteBuffer.wrap(messageContent)); + break; + } + sessionIdInfoEntry.attrs.put(ATTR_WS_LOCAL_ADDRESS, sender.getLocalAddress().toString()); + sessionIdInfoEntry.attrs.put(ATTR_WS_REMOTE_ADDRESS, sender.getRemoteAddress().toString()); + sessionIdInfoEntry.transitUri.set(sender.getTransitUri()); + }); + sendSuccess = true; + } catch (WebSocketConfigurationException|IllegalStateException|IOException e) { + // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped. + // IllegalStateException: Session is already closed or not found. + // IOException: other IO error. + sessionIdInfoEntry.e = e; + getLogger().error("Failed to send message (sessionId '" + sessionId + "') via WebSocket due to " + e, e); + } + if (sendSuccess) { + successCount ++; + sessionIdInfoEntry.success = true; + } + else { + failureCount ++; + } + String address = sessionIdInfoEntry.attrs.get(ATTR_WS_LOCAL_ADDRESS); + if (address != null) + ws_local_addresses.add(address + (sendSuccess ? "" : "*")); + address = sessionIdInfoEntry.attrs.get(ATTR_WS_REMOTE_ADDRESS); + if (address != null) + ws_remote_addresses.add(address + (sendSuccess ? "" : "*")); + + sessionIdInfoEntry.stopSending = System.currentTimeMillis(); + sessionIdInfoList.add(sessionIdInfoEntry); + } + final long transmissionMillis = System.currentTimeMillis() - startSending; + + if (sessionCount > 1 && failureCount > 0) { + // FORK every failed broadcast (sessionCount >1) message; adding the session id attribute to make it a non-broadcast message + for (sessionIdInfo sessionIdInfoEntry : sessionIdInfoList) { + if (sessionIdInfoEntry.success) + continue; + FlowFile forkFlowFile = processSession.create(flowfile); + forkFlowFile = processSession.putAllAttributes(forkFlowFile, sessionIdInfoEntry.attrs); + forkFlowFile = processSession.putAttribute(forkFlowFile, ATTR_WS_SESSION_ID, sessionIdInfoEntry.sessionId); + forkFlowFile = processSession.putAttribute(forkFlowFile, ATTR_WS_BROADCAST, ""); // so we know it was a FORK; used later on + transferToFailure(processSession, forkFlowFile, sessionIdInfoEntry.e.toString()); + } + } + // decide fate of the flowfile + final sessionIdInfo sessionIdInfoEntry = sessionIdInfoList.get(0); // use info from the first (and maybe only) entry + if (successCount > 0) { // SUCCESS sending to at least one destination + if (sessionIds.size() > 1) { // when applicable, retain list of local and remote address that were sent to successfully + String addresses; + if (!ws_local_addresses.isEmpty()) { + addresses = ws_local_addresses.toString(); + sessionIdInfoEntry.attrs.put(ATTR_WS_LOCAL_ADDRESS, addresses.substring(1, addresses.length()-1)); + } + if (!ws_remote_addresses.isEmpty()) { + addresses = ws_remote_addresses.toString(); + sessionIdInfoEntry.attrs.put(ATTR_WS_REMOTE_ADDRESS, addresses.substring(1, addresses.length()-1)); + } + } + final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, sessionIdInfoEntry.attrs); + processSession.getProvenanceReporter().send(updatedFlowFile, sessionIdInfoEntry.transitUri.get(), transmissionMillis); + if( flowfileIsBroadcast ) { + // flowfile was created earlier by the above FORK; don't need it anymore. Sending it to SUCCESS would cause a duplicate. + processSession.remove(updatedFlowFile); // terminate relationship + } + else { + processSession.transfer(updatedFlowFile, REL_SUCCESS); + } + } + else { // FAILURE sending to at least one destination + transferToFailure(processSession, flowfile, sessionIdInfoEntry.e.toString()); + } } private FlowFile transferToFailure(final ProcessSession processSession, FlowFile flowfile, final String value) { diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/WebSocketProcessorAttributes.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/WebSocketProcessorAttributes.java index 931ec2be5679..2d82cde3d94c 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/WebSocketProcessorAttributes.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/WebSocketProcessorAttributes.java @@ -26,6 +26,7 @@ public final class WebSocketProcessorAttributes{ public static final String ATTR_WS_MESSAGE_TYPE = "websocket.message.type"; public static final String ATTR_WS_LOCAL_ADDRESS = "websocket.local.address"; public static final String ATTR_WS_REMOTE_ADDRESS = "websocket.remote.address"; + public static final String ATTR_WS_BROADCAST = "websocket.broadcast"; private WebSocketProcessorAttributes() { } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java index 52f6f2a28b09..f76ea971d7a7 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java @@ -23,10 +23,14 @@ import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_BROADCAST; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; @@ -35,11 +39,13 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -76,10 +82,15 @@ public void testSessionIsNotSpecified() throws Exception { final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); final WebSocketService service = spy(WebSocketService.class); + final WebSocketSession webSocketSession = getWebSocketSession(); + final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add(webSocketSession.getSessionId()); when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); runner.addControllerService(serviceId, service); runner.enableControllerService(service); @@ -99,6 +110,9 @@ public void testSessionIsNotSpecified() throws Exception { //assertEquals(1, failedFlowFiles.size()); //No longer valid test after NIFI-3318 assertEquals(0, failedFlowFiles.size()); + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); @@ -135,6 +149,9 @@ public void testServiceIsNotFound() throws Exception { final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); @@ -171,6 +188,9 @@ public void testServiceIsNotWebSocketService() throws Exception { final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); @@ -186,7 +206,10 @@ public void testSendFailure() throws Exception { final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add(webSocketSession.getSessionId()); when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); doAnswer(invocation -> { final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); sendMessage.send(webSocketSession); @@ -213,6 +236,9 @@ public void testSendFailure() throws Exception { final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); @@ -228,7 +254,10 @@ public void testSuccess() throws Exception { final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add(webSocketSession.getSessionId()); when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); doAnswer(invocation -> { final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); sendMessage.send(webSocketSession); @@ -262,9 +291,452 @@ public void testSuccess() throws Exception { final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); assertEquals(0, failedFlowFiles.size()); + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(2, provenanceEvents.size()); } + @Test + public void testSessionUnknown() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + final WebSocketService service = spy(WebSocketService.class); + + final WebSocketSession webSocketSession = getWebSocketSession(); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + final String textMessageFromServer = "message from server."; + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(new HashSet()); + runner.addControllerService(serviceId, service); + + runner.enableControllerService(service); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_WS_CS_ID, serviceId); + attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); + runner.enqueue(textMessageFromServer, attributes); + + runner.run(); + + final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); + assertEquals(0, succeededFlowFiles.size()); + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(0, failedFlowFiles.size()); + + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(1, unknownFlowFiles.size()); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(0, provenanceEvents.size()); + } + + @Test + public void testBroadcast_FailureAll() throws Exception { + // test failed sending to all broadcast client + final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + final WebSocketService service = spy(WebSocketService.class); + + final WebSocketSession webSocketSession = getWebSocketSession(); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add("ws-session-id-1"); + sessionIds.add("ws-session-id-2"); + sessionIds.add("ws-session-id-3"); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + return null; + }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); + runner.addControllerService(serviceId, service); + + runner.enableControllerService(service); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_WS_CS_ID, serviceId); + attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + runner.enqueue(textMessageFromServer, attributes); + + runner.run(); + + final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); + assertEquals(0, succeededFlowFiles.size()); + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(4, failedFlowFiles.size()); + + // sequence the flowfiles not important + int broadcastCnt = 0, sessionIdCnt = 0; + for (int i = 0; i < failedFlowFiles.size(); i++ ) { + MockFlowFile failedFlowFile = failedFlowFiles.get(i); + assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); + if (failedFlowFile.getAttribute(ATTR_WS_SESSION_ID) == null || failedFlowFile.getAttribute(ATTR_WS_SESSION_ID).isEmpty()) { + sessionIdCnt ++; + assertNull(failedFlowFile.getAttribute(ATTR_WS_BROADCAST)); + } + else { + broadcastCnt ++; + assertNotNull(failedFlowFile.getAttribute(ATTR_WS_BROADCAST)); + } + } + assertEquals(1, sessionIdCnt); + assertEquals(3, broadcastCnt); + + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); // logging the FORK + assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.FORK); // verify EventType + } + + @Test + public void testBroadcast_SuccessAll() throws Exception { + // test success sending to all broadcast clients + final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + final WebSocketService service = spy(WebSocketService.class); + + final WebSocketSession webSocketSession = getWebSocketSession(); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add("ws-session-id-1"); + sessionIds.add("ws-session-id-2"); + sessionIds.add("ws-session-id-3"); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + return null; + }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + + runner.addControllerService(serviceId, service); + + runner.enableControllerService(service); + + runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_WS_CS_ID, serviceId); + attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + runner.enqueue(textMessageFromServer, attributes); + + runner.run(); + + final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); + assertEquals(1, succeededFlowFiles.size()); + assertNull(succeededFlowFiles.get(0).getAttribute(ATTR_WS_BROADCAST)); + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(0, failedFlowFiles.size()); + + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.SEND); // verify EventType + } + + @Test + public void testBroadcast_1Success_2Failure() throws Exception { + // test success and failure sending to all broadcast clients + final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + final WebSocketService service = spy(WebSocketService.class); + + final WebSocketSession webSocketSession = getWebSocketSession(); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add("ws-session-id-1"); + sessionIds.add("ws-session-id-2"); + sessionIds.add("ws-session-id-3"); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + return null; + }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + throw new IOException("Sending message failed."); + }).when(service).sendMessage(anyString(), eq("ws-session-id-2"), any(SendMessage.class)); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + throw new IOException("Sending message failed."); + }).when(service).sendMessage(anyString(), eq("ws-session-id-3"), any(SendMessage.class)); + + runner.addControllerService(serviceId, service); + + runner.enableControllerService(service); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_WS_CS_ID, serviceId); + attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + runner.enqueue(textMessageFromServer, attributes); + + runner.run(); + + final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); + assertEquals(1, succeededFlowFiles.size()); + assertNull(succeededFlowFiles.get(0).getAttribute(ATTR_WS_BROADCAST)); + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(2, failedFlowFiles.size()); + + // sequence the flowfiles not important + int broadcastCnt = 0, sessionIdCnt = 0; + for (int i = 0; i < failedFlowFiles.size(); i++ ) { + MockFlowFile failedFlowFile = failedFlowFiles.get(i); + assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); + if (failedFlowFile.getAttribute(ATTR_WS_SESSION_ID) == null || failedFlowFile.getAttribute(ATTR_WS_SESSION_ID).isEmpty()) { + sessionIdCnt ++; + assertNull(failedFlowFile.getAttribute(ATTR_WS_BROADCAST)); + } + else { + broadcastCnt ++; + assertNotNull(failedFlowFile.getAttribute(ATTR_WS_BROADCAST)); + } + } + assertEquals(0, sessionIdCnt); + assertEquals(2, broadcastCnt); + + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(2, provenanceEvents.size()); // logging SEND and FORK + int sendCnt = 0, forkCnt = 0; + for (ProvenanceEventRecord provEvent : provenanceEvents ) { + if (provEvent.getEventType() == ProvenanceEventType.SEND) + sendCnt ++; + else if (provEvent.getEventType() == ProvenanceEventType.FORK) + forkCnt ++; + } + assertEquals(1, sendCnt); // verify EventType SEND (due to success) + assertEquals(1, forkCnt); // verify EventType FORK (due to failure) + } + + @Test + public void testBroadcast_SessionIdSet_BroadcastFlagSet_Success() throws Exception { + // test failed FORKed flowfile rerouted back into processor; to status: Success + final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + final WebSocketService service = spy(WebSocketService.class); + + final WebSocketSession webSocketSession = getWebSocketSession(); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add(webSocketSession.getSessionId()); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + return null; + }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + runner.addControllerService(serviceId, service); + + runner.enableControllerService(service); + + runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_WS_CS_ID, serviceId); + attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + attributes.put(ATTR_WS_BROADCAST, ""); + runner.enqueue(textMessageFromServer, attributes); + + runner.run(); + + final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); + assertEquals(0, succeededFlowFiles.size()); // zero because it was DROPped (like a Terminated Relationship) + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(0, failedFlowFiles.size()); + + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.SEND); // verify EventType + } + + @Test + public void testBroadcast_SessionIdSet_BroadcastFlagSet_Failure() throws Exception { + // test failed FORKed flowfile rerouted back into processor; to status: Failure + final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + final WebSocketService service = spy(WebSocketService.class); + + final WebSocketSession webSocketSession = getWebSocketSession(); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add(webSocketSession.getSessionId()); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + return null; + }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); + runner.addControllerService(serviceId, service); + + runner.enableControllerService(service); + + runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_WS_CS_ID, serviceId); + attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + attributes.put(ATTR_WS_BROADCAST, ""); + runner.enqueue(textMessageFromServer, attributes); + + runner.run(); + + final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); + assertEquals(0, succeededFlowFiles.size()); + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(1, failedFlowFiles.size()); + + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(0, provenanceEvents.size()); + } + + @Test + public void testBroadcast_SessionIdSet_BroadcastFlagSet_SessionUnknown() throws Exception { + // test failed FORKed flowfile rerouted back into processor; to status: Session Unknown; which is DROPed + final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + final WebSocketService service = spy(WebSocketService.class); + + final WebSocketSession webSocketSession = getWebSocketSession(); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add("a-bogus-session-id"); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + return null; + }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); + runner.addControllerService(serviceId, service); + + runner.enableControllerService(service); + + runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_WS_CS_ID, serviceId); + attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + attributes.put(ATTR_WS_BROADCAST, ""); + runner.enqueue(textMessageFromServer, attributes); + + runner.run(); + + final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); + assertEquals(0, succeededFlowFiles.size()); + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(0, failedFlowFiles.size()); + + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); // zero because it was DROPped (like a Terminated Relationship) + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(0, provenanceEvents.size()); + } + + @Test + public void testBroadcast_SessionIdNotSet_BroadcastFlagSet() throws Exception { + // test of a theoretical bad processor config (session id being blank); catches a flowfile that looks like a + // broadcast, but is actually a FORKed flowfile (because websocket.broadcast attrib exists) -- where it is just DROPped + final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + final WebSocketService service = spy(WebSocketService.class); + + final WebSocketSession webSocketSession = getWebSocketSession(); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet(); + sessionIds.add(webSocketSession.getSessionId()); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + return null; + }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + runner.addControllerService(serviceId, service); + + runner.enableControllerService(service); + + runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_WS_CS_ID, serviceId); + attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + attributes.put(ATTR_WS_BROADCAST, ""); + runner.enqueue(textMessageFromServer, attributes); + + runner.run(); + + final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); + assertEquals(0, succeededFlowFiles.size()); + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(0, failedFlowFiles.size()); + + final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); + assertEquals(0, unknownFlowFiles.size()); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(0, provenanceEvents.size()); + } } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java index 36deb55c5b10..90586d0df81f 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java @@ -20,6 +20,7 @@ import org.apache.nifi.processor.Processor; import java.io.IOException; +import java.util.HashSet; public abstract class AbstractWebSocketService extends AbstractControllerService implements WebSocketService { @@ -45,4 +46,8 @@ public void sendMessage(final String endpointId, final String sessionId, final S routers.sendMessage(endpointId, sessionId, sendMessage); } + @Override + public HashSet getSessionIds(final String endpointId) throws WebSocketConfigurationException { + return routers.getSessionIds(endpointId); + } } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java index 0e8737a8f683..038a435e538c 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.HashSet; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.processor.Processor; @@ -94,6 +95,9 @@ public void onWebSocketBinary(final String sessionId, final byte[] payload, fina } private WebSocketSession getSessionOrFail(final String sessionId) { + if (StringUtils.isEmpty(sessionId)) { + throw new IllegalStateException("SessionId is not set"); + } final WebSocketSession session = sessions.get(sessionId); if (session == null) { throw new IllegalStateException("Session was not found for the sessionId: " + sessionId); @@ -102,20 +106,12 @@ private WebSocketSession getSessionOrFail(final String sessionId) { } public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException { - if (!StringUtils.isEmpty(sessionId)) { - final WebSocketSession session = getSessionOrFail(sessionId); - sendMessage.send(session); - } else { - //The sessionID is not specified so broadcast the message to all connected client sessions. - sessions.keySet().forEach(itrSessionId -> { - try { - final WebSocketSession session = getSessionOrFail(itrSessionId); - sendMessage.send(session); - } catch (IOException e) { - logger.warn("Failed to send message to session {} due to {}", itrSessionId, e, e); - } - }); - } + final WebSocketSession session = getSessionOrFail(sessionId); + sendMessage.send(session); + } + + public HashSet getSessionIds() { + return new HashSet(sessions.keySet()); } public void disconnect(final String sessionId, final String reason) throws IOException { diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java index ae70ae57d011..56ee82eb38d1 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java @@ -19,6 +19,7 @@ import org.apache.nifi.processor.Processor; import java.io.IOException; +import java.util.HashSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -68,4 +69,8 @@ public void sendMessage(final String endpointId, final String sessionId, final S router.sendMessage(sessionId, sendMessage); } + public HashSet getSessionIds(final String endpointId) throws WebSocketConfigurationException { + final WebSocketMessageRouter router = getRouterOrFail(endpointId); + return router.getSessionIds(); + } } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java index 90a3a3805f33..d7a4e1f08122 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java @@ -22,6 +22,7 @@ import org.apache.nifi.ssl.RestrictedSSLContextService; import java.io.IOException; +import java.util.HashSet; /** * Control an embedded WebSocket service instance. @@ -45,4 +46,5 @@ public interface WebSocketService extends ControllerService { void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException; + HashSet getSessionIds(final String endpointId) throws WebSocketConfigurationException; } From 01c68930bb4173fae376af500cbbbe3b17d798c9 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 22 Aug 2017 07:47:23 +0900 Subject: [PATCH 2/3] NIFI-4169: Enhance PutWebSocket error handling - Added "Fork Failed Broadcast Sessions" property to specify whether failed FlowFiles should be forked and transferred individually so that user can construct a error handling flow using each session id. - Added "Enable Detailed Failure Relationships" property to separate failure routes into more detailed ones, such as 'no connected sessions' or 'communication failure' - Added "websocket.broadcast.succeeded" and "websocket.broadcast.failed" attributes to outgoing FlowFiles when a message is broadcast. --- .../processors/websocket/PutWebSocket.java | 386 ++++++++-------- .../WebSocketProcessorAttributes.java | 3 +- .../websocket/TestPutWebSocket.java | 437 +++++++++--------- .../websocket/AbstractWebSocketService.java | 6 +- .../websocket/SessionNotFoundException.java | 23 + .../websocket/WebSocketMessageRouter.java | 19 +- .../websocket/WebSocketMessageRouters.java | 6 +- .../nifi/websocket/WebSocketService.java | 6 +- .../websocket/TestWebSocketMessageRouter.java | 2 +- 9 files changed, 452 insertions(+), 436 deletions(-) create mode 100644 nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SessionNotFoundException.java diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java index 140f022f57a1..ed5e9b3f2cc4 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java @@ -24,7 +24,8 @@ import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_BROADCAST; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_BROADCAST_SUCCEEDED; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_BROADCAST_FAILED; import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME; import java.io.IOException; @@ -45,11 +46,10 @@ import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.behavior.ReadsAttributes; -import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -60,6 +60,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.websocket.SessionNotFoundException; import org.apache.nifi.websocket.WebSocketConfigurationException; import org.apache.nifi.websocket.WebSocketMessage; import org.apache.nifi.websocket.WebSocketService; @@ -68,31 +69,21 @@ @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @TriggerSerially @CapabilityDescription("Sends messages to a WebSocket remote endpoint" + - " using a WebSocket session that is established by either ListenWebSocket or ConnectWebSocket.") -@ReadsAttributes({ - @ReadsAttribute(attribute = ATTR_WS_BROADCAST, description = "Broadcast related. Defined when the flowfile was forked due to a send failure to the WebSocket session id. " + - "Used to control success behavior; on success, this causes the flowfile to be dropped. Since this is a forked flowfile, the parent " + - "flowfile already transfered to success (because at least one client received the broadcast message). Transfering any forked flowfile to " + - "success would effectivly duplicate success for the same data."), -}) + " using a WebSocket session that is established by either ListenWebSocket or ConnectWebSocket." + + " This processor can be configured to send a message to a specific peer by providing a WebSocket session id," + + " or broadcasting to all connected peers, see description of 'WebSocket Session Id' for detail.") @WritesAttributes({ @WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket Controller Service id."), @WritesAttribute(attribute = ATTR_WS_SESSION_ID, description = "Established WebSocket session id."), @WritesAttribute(attribute = ATTR_WS_ENDPOINT_ID, description = "WebSocket endpoint id."), @WritesAttribute(attribute = ATTR_WS_MESSAGE_TYPE, description = "TEXT or BINARY."), - @WritesAttribute(attribute = ATTR_WS_LOCAL_ADDRESS, description = "WebSocket server address. If the message is sent to more than one client (e.g. broadcast) " + - "then it will be a comma separated list of addresses; where any address that could not be sent to on the first try will end with an '*'. " + - "This maintains an audit trail of addresses; because if the client's message is actually sent to the client later, that (forked) flowfile is dropped " + - "(it does not transfer to success). Only the parent flowfile transfers to success."), - @WritesAttribute(attribute = ATTR_WS_REMOTE_ADDRESS, description = "WebSocket client address. If the message is sent to more than one client (e.g. broadcast) " + - "then it will be a comma separated list of addresses; where any address that could not be sent to on the first try will end with an '*'. " + - "This maintains an audit trail of addresses; because if the client's message is actually sent to the client later, that (forked) flowfile is dropped " + - "(it does not transfer to success). Only the parent flowfile transfers to success."), - @WritesAttribute(attribute = ATTR_WS_FAILURE_DETAIL, description = "Detail of the failure."), - @WritesAttribute(attribute = ATTR_WS_BROADCAST, description = "Broadcast related. Defined when the flowfile was forked due to a send failure to the WebSocket session id. " + - "Used to control success behavior; on success, this causes the flowfile to be dropped. Since this is a forked flowfile, the parent " + - "flowfile already transfered to success (because at least one client received the broadcast message). Transfering any forked flowfile to " + - "success would effectivly duplicate success for the same data."), + @WritesAttribute(attribute = ATTR_WS_LOCAL_ADDRESS, description = "The address of sending peer."), + @WritesAttribute(attribute = ATTR_WS_REMOTE_ADDRESS, description = "The address of receiving peer. " + + "If the message is sent to more than one peers (i.e. broadcast) then only the last peer address is captured."), + @WritesAttribute(attribute = ATTR_WS_FAILURE_DETAIL, description = "Detail of the failure. " + + "If the message fails with more than one peers (i.e. broadcast) then only the last peer address is captured."), + @WritesAttribute(attribute = ATTR_WS_BROADCAST_SUCCEEDED, description = "The number of messages sent successfully. Only available when a specific session id is not provided."), + @WritesAttribute(attribute = ATTR_WS_BROADCAST_FAILED, description = "The number of messages failed to be sent. Only available when a specific session id is not provided.") }) @SystemResourceConsideration(resource = SystemResource.MEMORY) public class PutWebSocket extends AbstractProcessor { @@ -101,18 +92,13 @@ public class PutWebSocket extends AbstractProcessor { .name("websocket-session-id") .displayName("WebSocket Session Id") .description("A NiFi Expression to retrieve the session id. If not specified, a message will be " + - "sent (e.g. broadcast) to all connected WebSocket clients for the WebSocket controller service id and endpoint. " + - "Under the condition where some clients can be sent to and others cannot, the flowfile will transfer to success " + - "and each failed client will get a forked copy of the flowfile with ${" + ATTR_WS_SESSION_ID + "} set (to the client's session id) and " + - "${" + ATTR_WS_BROADCAST + "} defined. Then it is routed to failure; such that if failure is routed back " + - "into this processor it can be retried (as a non-broadcast message). For this reason, using a value other than the default can " + - "interfere with forked message processing. To handle this potential issue, any message that does not have a session id but " + - "has ${" + ATTR_WS_BROADCAST + "} defined will be dropped (so that it is not broadcast to all clients). " + - "If this processor is being used in a broadcast capacity and flowfiles come from a processor that adds the ${" + ATTR_WS_SESSION_ID + "} " + - "attribute (ex. ListenWebSocket), the ${" + ATTR_WS_SESSION_ID + "} attribute needs to be removed from the flowfile before it gets to this " + - "processor. Another option is to get creative with Expression Language referencing ${" + ATTR_WS_SESSION_ID + "} and ${" + ATTR_WS_BROADCAST + "}.") + "sent (i.e. broadcast) to all connected WebSocket peers for the specified endpoint of the WebSocket controller service. " + + "Under the condition where some peers can be sent to and others cannot, the FlowFile will be transferred to success " + + "and if 'Fork Failed Broadcast Sessions' is enabled, for each failed peer a forked copy of the FlowFile " + + "with ${" + ATTR_WS_SESSION_ID + "} set (to the peer's session id) is routed to failure. " + + "So that if failure is routed back into this processor it can be retried (as a non-broadcast message).") .required(false) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .addValidator(Validator.VALID) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue("${" + ATTR_WS_SESSION_ID + "}") .build(); @@ -120,15 +106,7 @@ public class PutWebSocket extends AbstractProcessor { public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ID = new PropertyDescriptor.Builder() .name("websocket-controller-service-id") .displayName("WebSocket ControllerService Id") - .description("A NiFi Expression to retrieve the id of a WebSocket ControllerService. " + - "Setting this value to the Id of a ConnectWebSocket processor allows this processor to send messages via the ConnectWebSocket's " + - "WebSocket controller service id and endpoint; as well as the session id of the client it is connected to (at the time the message " + - "is sent). Under this configuration, do not set this processor's WebSocket Session Id (this allows the session id to be obtained from " + - "the referenced ConnectWebSocket processor). Similarly, if the flowfile comes from a processor that writes this value to an attribute " + - "(ex. ListenWebSocket) it can be used; and, if this processor's WebSocket Session Id is not set then this processor will send the " + - "message to every client connected to it (at the time the message is sent). The creation of forked copies of the flowfile " + - "can only occur when more than one client is in the list of clients (to send to)." + - "See also WebSocket Endpoint Id.") + .description("A NiFi Expression to retrieve one from incoming FlowFile , or a specific WebSocket ControllerService ID.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @@ -138,9 +116,9 @@ public class PutWebSocket extends AbstractProcessor { public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ENDPOINT = new PropertyDescriptor.Builder() .name("websocket-endpoint-id") .displayName("WebSocket Endpoint Id") - .description("A NiFi Expression to retrieve the endpoint id of a WebSocket ControllerService. " + - "Setting this value to a ConnectWebSocket processor's WebSocket Endpoint Id value allows this processor to send messages via the " + - "ConnectWebSocket's connection. See also WebSocket ControllerService Id.") + .description("A NiFi Expression to retrieve one from incoming FlowFile, or a specific 'endpoint id' of a WebSocket ControllerService. " + + "An 'endpoint id' is managed differently by different WebSocket ControllerServices to group one or more WebSocket sessions. " + + "'WebSocket Client Id' for ConnectWebSocket, and 'Server URL Path' for ListenWebSocket respectively. ") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @@ -157,21 +135,51 @@ public class PutWebSocket extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + public static final PropertyDescriptor PROP_FORK_FAILED_BROADCAST_SESSIONS = new PropertyDescriptor.Builder() + .name("fork-failed-broadcast-sessions") + .displayName("Fork Failed Broadcast Sessions") + .description("Whether to create forked copies of incoming FlowFile with individual WebSocket Session ID." + + " Useful when specific error handling or retry flow is needed per failed session.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor PROP_ENABLE_DETAILED_FAILURE_RELATIONSHIPS = new PropertyDescriptor.Builder() + .name("enable-detailed-failure-relationships") + .displayName("Enable Detailed Failure Relationships") + .description("If enabled, detailed failure relationships: 'no connected sessions' and 'communication failure' are activated in addition to general 'failure' relationship.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") - .description("FlowFiles that are sent successfully to the destination are transferred to this relationship.") + .description("FlowFiles sent successfully to the destination are transferred to this relationship.") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("FlowFiles that failed to send to the destination are transferred to this relationship.") + .description("FlowFiles failed to be sent to the destination are transferred to this relationship.") .build(); - public static final Relationship REL_SESSION_UNKNOWN = new Relationship.Builder() - .name("session unknown") - .description("FlowFiles that failed to send to the destination because the session id does not exist are transferred to this relationship.") + public static final Relationship REL_NO_CONNECTED_SESSIONS = new Relationship.Builder() + .name("no connected sessions") + .description("FlowFiles intended to be broadcast to any connected sessions," + + " but no connected session is available at that time, are transferred to this relationship.") .build(); + public static final Relationship REL_COMMUNICATION_FAILURE = new Relationship.Builder() + .name("communication failure") + .description("FlowFiles failed to be sent to the destination due to communication failure are transferred to this relationship." + + " Connecting this relationship back to PutWebSocket may recover from a temporal communication error.") + .build(); + + public static final String NO_CONNECTED_WEB_SOCKET_SESSIONS = "No connected WebSocket Sessions"; private static final List descriptors; private static final Set relationships; + private static final Set detailedRelationships; static{ final List innerDescriptorsList = new ArrayList<>(); @@ -179,29 +187,36 @@ public class PutWebSocket extends AbstractProcessor { innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ID); innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ENDPOINT); innerDescriptorsList.add(PROP_WS_MESSAGE_TYPE); + innerDescriptorsList.add(PROP_FORK_FAILED_BROADCAST_SESSIONS); + innerDescriptorsList.add(PROP_ENABLE_DETAILED_FAILURE_RELATIONSHIPS); descriptors = Collections.unmodifiableList(innerDescriptorsList); - final Set innerRelationshipsSet = new HashSet<>(); - innerRelationshipsSet.add(REL_SUCCESS); - innerRelationshipsSet.add(REL_FAILURE); - innerRelationshipsSet.add(REL_SESSION_UNKNOWN); - relationships = Collections.unmodifiableSet(innerRelationshipsSet); + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + + final Set detailedRels = new HashSet<>(rels); + detailedRels.add(REL_COMMUNICATION_FAILURE); + detailedRels.add(REL_NO_CONNECTED_SESSIONS); + detailedRelationships = Collections.unmodifiableSet(detailedRels); } - @SuppressWarnings("unused") - private class sessionIdInfo { + /** + * Contains information per session ID. + */ + private class SessionIdInfo { String sessionId; - long startSending; - long stopSending; - final AtomicReference transitUri = new AtomicReference<>(); - final Map attrs = new HashMap<>(); - boolean success = false; + String localAddress; + String remoteAddress; Exception e = null; } + private volatile boolean enableDetailedFailureRelationships = false; + @Override public Set getRelationships() { - return relationships; + return enableDetailedFailureRelationships ? detailedRelationships : relationships; } @Override @@ -209,6 +224,13 @@ public final List getSupportedPropertyDescriptors() { return descriptors; } + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + if (PROP_ENABLE_DETAILED_FAILURE_RELATIONSHIPS.equals(descriptor)) { + enableDetailedFailureRelationships = Boolean.valueOf(newValue); + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession processSession) throws ProcessException { final FlowFile flowfile = processSession.get(); @@ -216,9 +238,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession process return; } - final List sessionIdInfoList = new ArrayList(); - - final String sessionIdProperty = context.getProperty(PROP_WS_SESSION_ID) + final String specifiedSessionId = context.getProperty(PROP_WS_SESSION_ID) .evaluateAttributeExpressions(flowfile).getValue(); final String webSocketServiceId = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ID) .evaluateAttributeExpressions(flowfile).getValue(); @@ -227,25 +247,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession process final String messageTypeStr = context.getProperty(PROP_WS_MESSAGE_TYPE) .evaluateAttributeExpressions(flowfile).getValue(); final WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf(messageTypeStr); - final boolean flowfileIsBroadcast = (null != flowfile.getAttribute(ATTR_WS_BROADCAST)); - - if (StringUtils.isEmpty(sessionIdProperty)) { - getLogger().debug("Specific SessionID not specified. Message will be broadcast to all connected clients."); - } if (StringUtils.isEmpty(webSocketServiceId) || StringUtils.isEmpty(webSocketServiceEndpoint)) { - transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found."); + transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found.", null); return; } final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(webSocketServiceId); if (controllerService == null) { - transferToFailure(processSession, flowfile, "WebSocket ControllerService was not found."); + transferToFailure(processSession, flowfile, "WebSocket ControllerService was not found.", null); return; } else if (!(controllerService instanceof WebSocketService)) { transferToFailure(processSession, flowfile, "The ControllerService found was not a WebSocket ControllerService but a " - + controllerService.getClass().getName()); + + controllerService.getClass().getName(), null); return; } @@ -253,76 +268,56 @@ public void onTrigger(final ProcessContext context, final ProcessSession process final byte[] messageContent = new byte[(int) flowfile.getSize()]; final long startSending = System.currentTimeMillis(); + final AtomicReference transitUri = new AtomicReference<>(); + final Map attrs = new HashMap<>(); + attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier()); + + boolean isEmptySessionId = StringUtils.isEmpty(specifiedSessionId); + if (!isEmptySessionId) { + attrs.put(ATTR_WS_SESSION_ID, specifiedSessionId); + } + + attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint); + attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr); + processSession.read(flowfile, in -> { StreamUtils.fillBuffer(in, messageContent, true); }); - final boolean broadcastMessage = StringUtils.isEmpty(sessionIdProperty); - - if (broadcastMessage && flowfileIsBroadcast) { - // message is a broadcast (no websocket.session.id) *and* id'd as a FORKed message (websocket.broadcast) - which isn't good. - // Sending it on would cause a duplicate, so just drop it. - final FlowFile updatedFlowFile = processSession.penalize(processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, - "No session id; message was FORKed from a broadcast message")); - processSession.remove(updatedFlowFile); // terminate relationship - return; - } - - final List sessionIds = new ArrayList(); - final HashSet currentSessionIds = new HashSet(); - WebSocketConfigurationException currentSessionIdsEx = null; - try { - currentSessionIds.addAll(webSocketService.getSessionIds(webSocketServiceEndpoint)); - } catch (WebSocketConfigurationException e) { - // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped. - currentSessionIdsEx = e; - } - if (broadcastMessage) { - if (currentSessionIdsEx != null) { - getLogger().error("Failed to obtain list of SessionIds via WebSocket due to " + currentSessionIdsEx, currentSessionIdsEx); - transferToFailure(processSession, flowfile, currentSessionIdsEx.toString()); - return; - } - else if (currentSessionIds.isEmpty()) { - getLogger().error("Failed to obtain list of SessionIds via WebSocket due to no Sessions connected"); - transferToFailure(processSession, flowfile, "No connected WebSocket Sessions"); - return; - } - sessionIds.addAll(currentSessionIds); - } - else { - if (currentSessionIds.contains(sessionIdProperty)) { - sessionIds.add(sessionIdProperty); - } - else { - final FlowFile updatedFlowFile = processSession.penalize(processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, "No such session id")); - if( flowfileIsBroadcast ) { - // flowfile was created earlier by the below FORK; don't need it anymore. Sending it to SESSION UNKNOWN would cause a duplicate. - processSession.remove(updatedFlowFile); // terminate relationship - } - else { - processSession.transfer(updatedFlowFile, REL_SESSION_UNKNOWN); + final List sessionIds = new ArrayList<>(); + if (isEmptySessionId) { + try { + // If session id is not specified, broadcast message to all connected peers. + final Set currentSessionIds = webSocketService.getSessionIds(webSocketServiceEndpoint); + if (currentSessionIds.isEmpty()) { + getLogger().error("Failed to obtain list of SessionIds via WebSocket, no Sessions connected"); + transferToFailure(processSession, flowfile, NO_CONNECTED_WEB_SOCKET_SESSIONS, null); + return; } + sessionIds.addAll(currentSessionIds); + + } catch (WebSocketConfigurationException e) { + // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped. + getLogger().error("Failed to obtain list of SessionIds via WebSocket due to " + e, e); + transferToFailure(processSession, flowfile, e.toString(), e); return; } + } else { + sessionIds.add(specifiedSessionId); } - final HashSet ws_local_addresses = new HashSet(); - final HashSet ws_remote_addresses = new HashSet(); - Integer sessionCount = 0, successCount = 0, failureCount = 0; + // Only if there are more than 1 session Ids to send. + // Even if session id is not provided, if there is only one session id, then it's not broadcasting. + final boolean shouldBroadcast = sessionIds.size() > 1; + + final List succeededSessionIds = new ArrayList<>(); + final List failedSessionIds = new ArrayList<>(); + SessionIdInfo lastFailedSessionId = null; + for (String sessionId : sessionIds) { - sessionCount ++; - boolean sendSuccess = false; - final sessionIdInfo sessionIdInfoEntry = new sessionIdInfo(); - sessionIdInfoEntry.sessionId = sessionId; - sessionIdInfoEntry.attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier()); - sessionIdInfoEntry.attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint); - sessionIdInfoEntry.attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr); - if (!broadcastMessage) { - sessionIdInfoEntry.attrs.put(ATTR_WS_SESSION_ID, sessionId); - } - sessionIdInfoEntry.startSending = System.currentTimeMillis(); + final SessionIdInfo sessionIdInfo = new SessionIdInfo(); + sessionIdInfo.sessionId = sessionId; try { webSocketService.sendMessage(webSocketServiceEndpoint, sessionId, sender -> { switch (messageType) { @@ -333,81 +328,84 @@ else if (currentSessionIds.isEmpty()) { sender.sendBinary(ByteBuffer.wrap(messageContent)); break; } - sessionIdInfoEntry.attrs.put(ATTR_WS_LOCAL_ADDRESS, sender.getLocalAddress().toString()); - sessionIdInfoEntry.attrs.put(ATTR_WS_REMOTE_ADDRESS, sender.getRemoteAddress().toString()); - sessionIdInfoEntry.transitUri.set(sender.getTransitUri()); + + final String localAddress = sender.getLocalAddress().toString(); + final String remoteAddress = sender.getRemoteAddress().toString(); + sessionIdInfo.localAddress = localAddress; + sessionIdInfo.remoteAddress = remoteAddress; + + // When there are multiple peers to send, we keep the only last peer's information + attrs.put(ATTR_WS_LOCAL_ADDRESS, localAddress); + attrs.put(ATTR_WS_REMOTE_ADDRESS, remoteAddress); + transitUri.set(sender.getTransitUri()); }); - sendSuccess = true; - } catch (WebSocketConfigurationException|IllegalStateException|IOException e) { + succeededSessionIds.add(sessionIdInfo); + + } catch (WebSocketConfigurationException | SessionNotFoundException | IOException e) { // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped. // IllegalStateException: Session is already closed or not found. // IOException: other IO error. - sessionIdInfoEntry.e = e; getLogger().error("Failed to send message (sessionId '" + sessionId + "') via WebSocket due to " + e, e); + sessionIdInfo.e = e; + failedSessionIds.add(sessionIdInfo); + lastFailedSessionId = sessionIdInfo; } - if (sendSuccess) { - successCount ++; - sessionIdInfoEntry.success = true; - } - else { - failureCount ++; - } - String address = sessionIdInfoEntry.attrs.get(ATTR_WS_LOCAL_ADDRESS); - if (address != null) - ws_local_addresses.add(address + (sendSuccess ? "" : "*")); - address = sessionIdInfoEntry.attrs.get(ATTR_WS_REMOTE_ADDRESS); - if (address != null) - ws_remote_addresses.add(address + (sendSuccess ? "" : "*")); - - sessionIdInfoEntry.stopSending = System.currentTimeMillis(); - sessionIdInfoList.add(sessionIdInfoEntry); } - final long transmissionMillis = System.currentTimeMillis() - startSending; - if (sessionCount > 1 && failureCount > 0) { - // FORK every failed broadcast (sessionCount >1) message; adding the session id attribute to make it a non-broadcast message - for (sessionIdInfo sessionIdInfoEntry : sessionIdInfoList) { - if (sessionIdInfoEntry.success) - continue; + if (isEmptySessionId) { + // If a session id is not specified, user would like to know how many messages are sent or failed. + attrs.put(ATTR_WS_BROADCAST_SUCCEEDED, String.valueOf(succeededSessionIds.size())); + attrs.put(ATTR_WS_BROADCAST_FAILED, String.valueOf(failedSessionIds.size())); + } + + // FORK every failed broadcast message; adding the session id attribute so that it can be used as a non-broadcast message in subsequent flow + final boolean forkFailedBroadcastSessions = shouldBroadcast && context.getProperty(PROP_FORK_FAILED_BROADCAST_SESSIONS).asBoolean(); + if (forkFailedBroadcastSessions) { + Map parSessionIdAttrs = new HashMap<>(); + for (SessionIdInfo sessionIdInfoEntry : failedSessionIds) { FlowFile forkFlowFile = processSession.create(flowfile); - forkFlowFile = processSession.putAllAttributes(forkFlowFile, sessionIdInfoEntry.attrs); - forkFlowFile = processSession.putAttribute(forkFlowFile, ATTR_WS_SESSION_ID, sessionIdInfoEntry.sessionId); - forkFlowFile = processSession.putAttribute(forkFlowFile, ATTR_WS_BROADCAST, ""); // so we know it was a FORK; used later on - transferToFailure(processSession, forkFlowFile, sessionIdInfoEntry.e.toString()); + forkFlowFile = processSession.putAllAttributes(forkFlowFile, attrs); + parSessionIdAttrs.put(ATTR_WS_SESSION_ID, sessionIdInfoEntry.sessionId); + parSessionIdAttrs.put(ATTR_WS_LOCAL_ADDRESS, sessionIdInfoEntry.localAddress); + parSessionIdAttrs.put(ATTR_WS_REMOTE_ADDRESS, sessionIdInfoEntry.remoteAddress); + forkFlowFile = processSession.putAllAttributes(forkFlowFile, parSessionIdAttrs); + transferToFailure(processSession, forkFlowFile, sessionIdInfoEntry.e.toString(), sessionIdInfoEntry.e); } } - // decide fate of the flowfile - final sessionIdInfo sessionIdInfoEntry = sessionIdInfoList.get(0); // use info from the first (and maybe only) entry - if (successCount > 0) { // SUCCESS sending to at least one destination - if (sessionIds.size() > 1) { // when applicable, retain list of local and remote address that were sent to successfully - String addresses; - if (!ws_local_addresses.isEmpty()) { - addresses = ws_local_addresses.toString(); - sessionIdInfoEntry.attrs.put(ATTR_WS_LOCAL_ADDRESS, addresses.substring(1, addresses.length()-1)); - } - if (!ws_remote_addresses.isEmpty()) { - addresses = ws_remote_addresses.toString(); - sessionIdInfoEntry.attrs.put(ATTR_WS_REMOTE_ADDRESS, addresses.substring(1, addresses.length()-1)); - } - } - final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, sessionIdInfoEntry.attrs); - processSession.getProvenanceReporter().send(updatedFlowFile, sessionIdInfoEntry.transitUri.get(), transmissionMillis); - if( flowfileIsBroadcast ) { - // flowfile was created earlier by the above FORK; don't need it anymore. Sending it to SUCCESS would cause a duplicate. - processSession.remove(updatedFlowFile); // terminate relationship - } - else { - processSession.transfer(updatedFlowFile, REL_SUCCESS); + + if (succeededSessionIds.size() > 0) { + // SUCCESS sending to at least one destination + final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs); + final long transmissionMillis = System.currentTimeMillis() - startSending; + processSession.getProvenanceReporter().send(updatedFlowFile, transitUri.get(), transmissionMillis); + processSession.transfer(updatedFlowFile, REL_SUCCESS); + + } else if (lastFailedSessionId != null) { + // Every session has failed to send the message. + if (forkFailedBroadcastSessions) { + // When all broadcast messages are failed, just remove the original FlowFile, as failed messages are forked and sent to failure. + processSession.remove(flowfile); + } else { + // If not creating forked FlowFiles, send the original FlowFile to failure. + final FlowFile failedOriginalFlowFile = processSession.putAllAttributes(flowfile, attrs); + transferToFailure(processSession, failedOriginalFlowFile, lastFailedSessionId.e.toString(), lastFailedSessionId.e); } } - else { // FAILURE sending to at least one destination - transferToFailure(processSession, flowfile, sessionIdInfoEntry.e.toString()); - } } - private FlowFile transferToFailure(final ProcessSession processSession, FlowFile flowfile, final String value) { - flowfile = processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, value); - processSession.transfer(flowfile, REL_FAILURE); + private FlowFile transferToFailure(final ProcessSession processSession, FlowFile flowfile, final String failureDetail, final Exception e) { + flowfile = processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, failureDetail); + if (enableDetailedFailureRelationships) { + if (NO_CONNECTED_WEB_SOCKET_SESSIONS.equals(failureDetail)) { + processSession.transfer(flowfile, REL_NO_CONNECTED_SESSIONS); + } else if (e instanceof IOException) { + processSession.transfer(flowfile, REL_COMMUNICATION_FAILURE); + } else { + processSession.transfer(flowfile, REL_FAILURE); + } + } else { + processSession.transfer(flowfile, REL_FAILURE); + } return flowfile; } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/WebSocketProcessorAttributes.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/WebSocketProcessorAttributes.java index 2d82cde3d94c..593d65ec632e 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/WebSocketProcessorAttributes.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/WebSocketProcessorAttributes.java @@ -26,7 +26,8 @@ public final class WebSocketProcessorAttributes{ public static final String ATTR_WS_MESSAGE_TYPE = "websocket.message.type"; public static final String ATTR_WS_LOCAL_ADDRESS = "websocket.local.address"; public static final String ATTR_WS_REMOTE_ADDRESS = "websocket.remote.address"; - public static final String ATTR_WS_BROADCAST = "websocket.broadcast"; + public static final String ATTR_WS_BROADCAST_SUCCEEDED = "websocket.broadcast.succeeded"; + public static final String ATTR_WS_BROADCAST_FAILED = "websocket.broadcast.failed"; private WebSocketProcessorAttributes() { } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java index f76ea971d7a7..2e857601c6b4 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.websocket; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_BROADCAST_FAILED; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_BROADCAST_SUCCEEDED; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL; @@ -23,11 +25,9 @@ import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_BROADCAST; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -40,8 +40,10 @@ import java.net.InetSocketAddress; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -51,6 +53,7 @@ import org.apache.nifi.util.TestRunners; import org.apache.nifi.websocket.AbstractWebSocketSession; import org.apache.nifi.websocket.SendMessage; +import org.apache.nifi.websocket.SessionNotFoundException; import org.apache.nifi.websocket.WebSocketMessage; import org.apache.nifi.websocket.WebSocketService; import org.apache.nifi.websocket.WebSocketSession; @@ -68,15 +71,19 @@ private void assertFlowFile(WebSocketSession webSocketSession, String serviceId, assertEquals(messageType != null ? messageType.name() : null, ff.getAttribute(ATTR_WS_MESSAGE_TYPE)); } - private WebSocketSession getWebSocketSession() { + private WebSocketSession getWebSocketSession(String sessionId) { final WebSocketSession webSocketSession = spy(AbstractWebSocketSession.class); - when(webSocketSession.getSessionId()).thenReturn("ws-session-id"); + when(webSocketSession.getSessionId()).thenReturn(sessionId); when(webSocketSession.getLocalAddress()).thenReturn(new InetSocketAddress("localhost", 12345)); when(webSocketSession.getRemoteAddress()).thenReturn(new InetSocketAddress("example.com", 80)); when(webSocketSession.getTransitUri()).thenReturn("ws://example.com/web-socket"); return webSocketSession; } + private WebSocketSession getWebSocketSession() { + return getWebSocketSession("ws-session-id"); + } + @Test public void testSessionIsNotSpecified() throws Exception { final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); @@ -87,7 +94,7 @@ public void testSessionIsNotSpecified() throws Exception { final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); + final HashSet sessionIds = new HashSet<>(); sessionIds.add(webSocketSession.getSessionId()); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); @@ -110,9 +117,6 @@ public void testSessionIsNotSpecified() throws Exception { //assertEquals(1, failedFlowFiles.size()); //No longer valid test after NIFI-3318 assertEquals(0, failedFlowFiles.size()); - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); - final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); @@ -149,9 +153,6 @@ public void testServiceIsNotFound() throws Exception { final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); - final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); @@ -188,34 +189,23 @@ public void testServiceIsNotWebSocketService() throws Exception { final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); - final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); } @Test - public void testSendFailure() throws Exception { + public void testNoConnectedSessions() throws Exception { final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + runner.setProperty(PutWebSocket.PROP_ENABLE_DETAILED_FAILURE_RELATIONSHIPS, "true"); final WebSocketService service = spy(WebSocketService.class); - final WebSocketSession webSocketSession = getWebSocketSession(); - final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); - sessionIds.add(webSocketSession.getSessionId()); + final HashSet sessionIds = new HashSet<>(); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); - doAnswer(invocation -> { - final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); - sendMessage.send(webSocketSession); - return null; - }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); - doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); runner.addControllerService(serviceId, service); runner.enableControllerService(service); @@ -223,7 +213,6 @@ public void testSendFailure() throws Exception { final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); - attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); runner.enqueue(textMessageFromServer, attributes); runner.run(); @@ -231,21 +220,17 @@ public void testSendFailure() throws Exception { final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); assertEquals(0, succeededFlowFiles.size()); - final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_NO_CONNECTED_SESSIONS); assertEquals(1, failedFlowFiles.size()); final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); - assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); - - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); + assertEquals(PutWebSocket.NO_CONNECTED_WEB_SOCKET_SESSIONS, failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); - } @Test - public void testSuccess() throws Exception { + public void testSendFailure() throws Exception { final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); final WebSocketService service = spy(WebSocketService.class); @@ -254,7 +239,7 @@ public void testSuccess() throws Exception { final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); + final HashSet sessionIds = new HashSet<>(); sessionIds.add(webSocketSession.getSessionId()); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); @@ -263,44 +248,36 @@ public void testSuccess() throws Exception { sendMessage.send(webSocketSession); return null; }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); runner.addControllerService(serviceId, service); runner.enableControllerService(service); - runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); - - // Enqueue 1st file as Text. final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); - attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); runner.enqueue(textMessageFromServer, attributes); - // Enqueue 2nd file as Binary. - attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.BINARY.name()); - runner.enqueue(textMessageFromServer.getBytes(), attributes); - - runner.run(2); + runner.run(); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(2, succeededFlowFiles.size()); - assertFlowFile(webSocketSession, serviceId, endpointId, succeededFlowFiles.get(0), WebSocketMessage.Type.TEXT); - assertFlowFile(webSocketSession, serviceId, endpointId, succeededFlowFiles.get(1), WebSocketMessage.Type.BINARY); + assertEquals(0, succeededFlowFiles.size()); final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(0, failedFlowFiles.size()); - - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); + assertEquals(1, failedFlowFiles.size()); + final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); + assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(2, provenanceEvents.size()); + assertEquals(0, provenanceEvents.size()); + } @Test - public void testSessionUnknown() throws Exception { + public void testSendFailureDetailedRelationship() throws Exception { final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + runner.setProperty(PutWebSocket.PROP_ENABLE_DETAILED_FAILURE_RELATIONSHIPS, "true"); final WebSocketService service = spy(WebSocketService.class); final WebSocketSession webSocketSession = getWebSocketSession(); @@ -308,8 +285,16 @@ public void testSessionUnknown() throws Exception { final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; + final HashSet sessionIds = new HashSet<>(); + sessionIds.add(webSocketSession.getSessionId()); when(service.getIdentifier()).thenReturn(serviceId); - when(service.getSessionIds(endpointId)).thenReturn(new HashSet()); + when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + return null; + }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); runner.addControllerService(serviceId, service); runner.enableControllerService(service); @@ -317,7 +302,6 @@ public void testSessionUnknown() throws Exception { final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); - attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); runner.enqueue(textMessageFromServer, attributes); @@ -326,19 +310,18 @@ public void testSessionUnknown() throws Exception { final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); assertEquals(0, succeededFlowFiles.size()); - final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(0, failedFlowFiles.size()); - - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(1, unknownFlowFiles.size()); + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_COMMUNICATION_FAILURE); + assertEquals(1, failedFlowFiles.size()); + final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); + assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); + } @Test - public void testBroadcast_FailureAll() throws Exception { - // test failed sending to all broadcast client + public void testSuccess() throws Exception { final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); final WebSocketService service = spy(WebSocketService.class); @@ -347,10 +330,8 @@ public void testBroadcast_FailureAll() throws Exception { final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); - sessionIds.add("ws-session-id-1"); - sessionIds.add("ws-session-id-2"); - sessionIds.add("ws-session-id-3"); + final HashSet sessionIds = new HashSet<>(); + sessionIds.add(webSocketSession.getSessionId()); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); doAnswer(invocation -> { @@ -358,53 +339,40 @@ public void testBroadcast_FailureAll() throws Exception { sendMessage.send(webSocketSession); return null; }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); - doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); runner.addControllerService(serviceId, service); runner.enableControllerService(service); + runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); + + // Enqueue 1st file as Text. final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); + attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); runner.enqueue(textMessageFromServer, attributes); - runner.run(); + // Enqueue 2nd file as Binary. + attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.BINARY.name()); + runner.enqueue(textMessageFromServer.getBytes(), attributes); + + runner.run(2); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(0, succeededFlowFiles.size()); + assertEquals(2, succeededFlowFiles.size()); + assertFlowFile(webSocketSession, serviceId, endpointId, succeededFlowFiles.get(0), WebSocketMessage.Type.TEXT); + assertFlowFile(webSocketSession, serviceId, endpointId, succeededFlowFiles.get(1), WebSocketMessage.Type.BINARY); final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(4, failedFlowFiles.size()); - - // sequence the flowfiles not important - int broadcastCnt = 0, sessionIdCnt = 0; - for (int i = 0; i < failedFlowFiles.size(); i++ ) { - MockFlowFile failedFlowFile = failedFlowFiles.get(i); - assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); - if (failedFlowFile.getAttribute(ATTR_WS_SESSION_ID) == null || failedFlowFile.getAttribute(ATTR_WS_SESSION_ID).isEmpty()) { - sessionIdCnt ++; - assertNull(failedFlowFile.getAttribute(ATTR_WS_BROADCAST)); - } - else { - broadcastCnt ++; - assertNotNull(failedFlowFile.getAttribute(ATTR_WS_BROADCAST)); - } - } - assertEquals(1, sessionIdCnt); - assertEquals(3, broadcastCnt); - - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); + assertEquals(0, failedFlowFiles.size()); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(1, provenanceEvents.size()); // logging the FORK - assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.FORK); // verify EventType + assertEquals(2, provenanceEvents.size()); } @Test - public void testBroadcast_SuccessAll() throws Exception { - // test success sending to all broadcast clients + public void testSessionUnknown() throws Exception { final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); final WebSocketService service = spy(WebSocketService.class); @@ -413,83 +381,76 @@ public void testBroadcast_SuccessAll() throws Exception { final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); - sessionIds.add("ws-session-id-1"); - sessionIds.add("ws-session-id-2"); - sessionIds.add("ws-session-id-3"); when(service.getIdentifier()).thenReturn(serviceId); - when(service.getSessionIds(endpointId)).thenReturn(sessionIds); - doAnswer(invocation -> { - final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); - sendMessage.send(webSocketSession); - return null; - }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); - + when(service.getSessionIds(endpointId)).thenReturn(new HashSet<>()); + doThrow(new SessionNotFoundException("Not found")).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); runner.addControllerService(serviceId, service); runner.enableControllerService(service); - runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); - final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); + attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); runner.enqueue(textMessageFromServer, attributes); runner.run(); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(1, succeededFlowFiles.size()); - assertNull(succeededFlowFiles.get(0).getAttribute(ATTR_WS_BROADCAST)); + assertEquals(0, succeededFlowFiles.size()); final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(0, failedFlowFiles.size()); - - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); + assertEquals(1, failedFlowFiles.size()); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(1, provenanceEvents.size()); - assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.SEND); // verify EventType + assertEquals(0, provenanceEvents.size()); } @Test - public void testBroadcast_1Success_2Failure() throws Exception { - // test success and failure sending to all broadcast clients + public void testBroadcast_FailureAll() throws Exception { + // test failed sending to all broadcast client final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + runner.setProperty(PutWebSocket.PROP_ENABLE_DETAILED_FAILURE_RELATIONSHIPS, "true"); final WebSocketService service = spy(WebSocketService.class); - final WebSocketSession webSocketSession = getWebSocketSession(); + final WebSocketSession webSocketSession1 = getWebSocketSession("ws-session-id-1"); + final WebSocketSession webSocketSession3 = getWebSocketSession("ws-session-id-3"); final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); + final HashSet sessionIds = new HashSet<>(); sessionIds.add("ws-session-id-1"); sessionIds.add("ws-session-id-2"); sessionIds.add("ws-session-id-3"); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + // For session 1 doAnswer(invocation -> { final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); - sendMessage.send(webSocketSession); + sendMessage.send(webSocketSession1); return null; - }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); - doAnswer(invocation -> { - final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); - sendMessage.send(webSocketSession); - throw new IOException("Sending message failed."); - }).when(service).sendMessage(anyString(), eq("ws-session-id-2"), any(SendMessage.class)); + }).when(service).sendMessage(anyString(), eq("ws-session-id-1"), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")) + .when(webSocketSession1).sendString(anyString()); + + // For session 2 + doThrow(new SessionNotFoundException("Simulate the second session is removed")) + .when(service).sendMessage(anyString(), eq("ws-session-id-2"), any(SendMessage.class)); + + // For session 3 doAnswer(invocation -> { final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); - sendMessage.send(webSocketSession); - throw new IOException("Sending message failed."); + sendMessage.send(webSocketSession3); + return null; }).when(service).sendMessage(anyString(), eq("ws-session-id-3"), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")) + .when(webSocketSession3).sendString(anyString()); - runner.addControllerService(serviceId, service); + runner.addControllerService(serviceId, service); runner.enableControllerService(service); final Map attributes = new HashMap<>(); @@ -501,98 +462,107 @@ public void testBroadcast_1Success_2Failure() throws Exception { runner.run(); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(1, succeededFlowFiles.size()); - assertNull(succeededFlowFiles.get(0).getAttribute(ATTR_WS_BROADCAST)); - - final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(2, failedFlowFiles.size()); + assertEquals(0, succeededFlowFiles.size()); - // sequence the flowfiles not important - int broadcastCnt = 0, sessionIdCnt = 0; - for (int i = 0; i < failedFlowFiles.size(); i++ ) { - MockFlowFile failedFlowFile = failedFlowFiles.get(i); - assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); - if (failedFlowFile.getAttribute(ATTR_WS_SESSION_ID) == null || failedFlowFile.getAttribute(ATTR_WS_SESSION_ID).isEmpty()) { - sessionIdCnt ++; - assertNull(failedFlowFile.getAttribute(ATTR_WS_BROADCAST)); - } - else { - broadcastCnt ++; - assertNotNull(failedFlowFile.getAttribute(ATTR_WS_BROADCAST)); - } - } - assertEquals(0, sessionIdCnt); - assertEquals(2, broadcastCnt); + // The original FlowFile should be transferred when configured not to fork. + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_COMMUNICATION_FAILURE); + assertEquals(1, failedFlowFiles.size()); + MockFlowFile failedFlowFile = failedFlowFiles.get(0); + failedFlowFile.assertAttributeNotExists(ATTR_WS_SESSION_ID); + failedFlowFile.assertAttributeEquals(ATTR_WS_BROADCAST_SUCCEEDED, "0"); + failedFlowFile.assertAttributeEquals(ATTR_WS_BROADCAST_FAILED, "3"); - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(2, provenanceEvents.size()); // logging SEND and FORK - int sendCnt = 0, forkCnt = 0; - for (ProvenanceEventRecord provEvent : provenanceEvents ) { - if (provEvent.getEventType() == ProvenanceEventType.SEND) - sendCnt ++; - else if (provEvent.getEventType() == ProvenanceEventType.FORK) - forkCnt ++; - } - assertEquals(1, sendCnt); // verify EventType SEND (due to success) - assertEquals(1, forkCnt); // verify EventType FORK (due to failure) + assertEquals(0, provenanceEvents.size()); } @Test - public void testBroadcast_SessionIdSet_BroadcastFlagSet_Success() throws Exception { - // test failed FORKed flowfile rerouted back into processor; to status: Success + public void testBroadcast_FailureAll_Fork() throws Exception { + // test failed sending to all broadcast client final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + runner.setProperty(PutWebSocket.PROP_ENABLE_DETAILED_FAILURE_RELATIONSHIPS, "true"); + runner.setProperty(PutWebSocket.PROP_FORK_FAILED_BROADCAST_SESSIONS, "true"); final WebSocketService service = spy(WebSocketService.class); - final WebSocketSession webSocketSession = getWebSocketSession(); + final WebSocketSession webSocketSession1 = getWebSocketSession("ws-session-id-1"); + final WebSocketSession webSocketSession3 = getWebSocketSession("ws-session-id-3"); final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); - sessionIds.add(webSocketSession.getSessionId()); + final Set sessionIds = new LinkedHashSet<>(); + sessionIds.add("ws-session-id-1"); + sessionIds.add("ws-session-id-2"); + sessionIds.add("ws-session-id-3"); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + + // For session 1 doAnswer(invocation -> { final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); - sendMessage.send(webSocketSession); + sendMessage.send(webSocketSession1); return null; - }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); - runner.addControllerService(serviceId, service); + }).when(service).sendMessage(anyString(), eq("ws-session-id-1"), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")) + .when(webSocketSession1).sendString(anyString()); - runner.enableControllerService(service); + // For session 2 + doThrow(new SessionNotFoundException("Simulate the second session is removed")) + .when(service).sendMessage(anyString(), eq("ws-session-id-2"), any(SendMessage.class)); - runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); + // For session 3 + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession3); + return null; + }).when(service).sendMessage(anyString(), eq("ws-session-id-3"), any(SendMessage.class)); + doThrow(new IOException("Sending message failed.")) + .when(webSocketSession3).sendString(anyString()); + + + runner.addControllerService(serviceId, service); + runner.enableControllerService(service); final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); - attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); - attributes.put(ATTR_WS_BROADCAST, ""); runner.enqueue(textMessageFromServer, attributes); runner.run(); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(0, succeededFlowFiles.size()); // zero because it was DROPped (like a Terminated Relationship) + assertEquals(0, succeededFlowFiles.size()); - final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(0, failedFlowFiles.size()); + final List communicationErrors = runner.getFlowFilesForRelationship(PutWebSocket.REL_COMMUNICATION_FAILURE); + assertEquals(2, communicationErrors.size()); - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); + MockFlowFile commError1 = communicationErrors.get(0); + commError1.assertAttributeEquals(ATTR_WS_SESSION_ID, "ws-session-id-1"); + commError1.assertAttributeEquals(ATTR_WS_BROADCAST_SUCCEEDED, "0"); + commError1.assertAttributeEquals(ATTR_WS_BROADCAST_FAILED, "3"); + + MockFlowFile commError2 = communicationErrors.get(1); + commError2.assertAttributeEquals(ATTR_WS_SESSION_ID, "ws-session-id-3"); + commError2.assertAttributeEquals(ATTR_WS_BROADCAST_SUCCEEDED, "0"); + commError2.assertAttributeEquals(ATTR_WS_BROADCAST_FAILED, "3"); + + final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); + assertEquals(1, failedFlowFiles.size()); + MockFlowFile failed = failedFlowFiles.get(0); + failed.assertAttributeEquals(ATTR_WS_SESSION_ID, "ws-session-id-2"); + failed.assertAttributeEquals(ATTR_WS_BROADCAST_SUCCEEDED, "0"); + failed.assertAttributeEquals(ATTR_WS_BROADCAST_FAILED, "3"); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(1, provenanceEvents.size()); - assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.SEND); // verify EventType + assertEquals(1, provenanceEvents.size()); // logging the FORK + assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.FORK); // verify EventType } @Test - public void testBroadcast_SessionIdSet_BroadcastFlagSet_Failure() throws Exception { - // test failed FORKed flowfile rerouted back into processor; to status: Failure + public void testBroadcast_SuccessAll() throws Exception { + // test success sending to all broadcast clients final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); final WebSocketService service = spy(WebSocketService.class); @@ -601,8 +571,10 @@ public void testBroadcast_SessionIdSet_BroadcastFlagSet_Failure() throws Excepti final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); - sessionIds.add(webSocketSession.getSessionId()); + final HashSet sessionIds = new HashSet<>(); + sessionIds.add("ws-session-id-1"); + sessionIds.add("ws-session-id-2"); + sessionIds.add("ws-session-id-3"); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); doAnswer(invocation -> { @@ -610,7 +582,7 @@ public void testBroadcast_SessionIdSet_BroadcastFlagSet_Failure() throws Excepti sendMessage.send(webSocketSession); return null; }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); - doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); + runner.addControllerService(serviceId, service); runner.enableControllerService(service); @@ -620,29 +592,25 @@ public void testBroadcast_SessionIdSet_BroadcastFlagSet_Failure() throws Excepti final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); - attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); - attributes.put(ATTR_WS_BROADCAST, ""); runner.enqueue(textMessageFromServer, attributes); runner.run(); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(0, succeededFlowFiles.size()); + assertEquals(1, succeededFlowFiles.size()); final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(1, failedFlowFiles.size()); - - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); + assertEquals(0, failedFlowFiles.size()); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(0, provenanceEvents.size()); + assertEquals(1, provenanceEvents.size()); + assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.SEND); // verify EventType } @Test - public void testBroadcast_SessionIdSet_BroadcastFlagSet_SessionUnknown() throws Exception { - // test failed FORKed flowfile rerouted back into processor; to status: Session Unknown; which is DROPed + public void testBroadcast_1Success_2Failure() throws Exception { + // test success and failure sending to some broadcast clients final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); final WebSocketService service = spy(WebSocketService.class); @@ -651,50 +619,56 @@ public void testBroadcast_SessionIdSet_BroadcastFlagSet_SessionUnknown() throws final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); - sessionIds.add("a-bogus-session-id"); + final HashSet sessionIds = new HashSet<>(); + sessionIds.add("ws-session-id-1"); + sessionIds.add("ws-session-id-2"); + sessionIds.add("ws-session-id-3"); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); sendMessage.send(webSocketSession); return null; }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); - doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString()); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + throw new IOException("Sending message failed."); + }).when(service).sendMessage(anyString(), eq("ws-session-id-2"), any(SendMessage.class)); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + throw new IOException("Sending message failed."); + }).when(service).sendMessage(anyString(), eq("ws-session-id-3"), any(SendMessage.class)); + runner.addControllerService(serviceId, service); runner.enableControllerService(service); - runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); - final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); - attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId()); attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); - attributes.put(ATTR_WS_BROADCAST, ""); runner.enqueue(textMessageFromServer, attributes); runner.run(); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(0, succeededFlowFiles.size()); - - final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(0, failedFlowFiles.size()); + assertEquals(1, succeededFlowFiles.size()); - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); // zero because it was DROPped (like a Terminated Relationship) + runner.assertTransferCount(PutWebSocket.REL_FAILURE, 0); + runner.assertTransferCount(PutWebSocket.REL_COMMUNICATION_FAILURE, 0); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(0, provenanceEvents.size()); + assertEquals(1, provenanceEvents.size()); // logging SEND } @Test - public void testBroadcast_SessionIdNotSet_BroadcastFlagSet() throws Exception { - // test of a theoretical bad processor config (session id being blank); catches a flowfile that looks like a - // broadcast, but is actually a FORKed flowfile (because websocket.broadcast attrib exists) -- where it is just DROPped + public void testBroadcast_1Success_2Failure_Fork() throws Exception { + // test success and failure sending to some broadcast clients final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class); + runner.setProperty(PutWebSocket.PROP_FORK_FAILED_BROADCAST_SESSIONS, "true"); final WebSocketService service = spy(WebSocketService.class); final WebSocketSession webSocketSession = getWebSocketSession(); @@ -702,41 +676,58 @@ public void testBroadcast_SessionIdNotSet_BroadcastFlagSet() throws Exception { final String serviceId = "ws-service"; final String endpointId = "client-1"; final String textMessageFromServer = "message from server."; - final HashSet sessionIds = new HashSet(); - sessionIds.add(webSocketSession.getSessionId()); + final HashSet sessionIds = new HashSet<>(); + sessionIds.add("ws-session-id-1"); + sessionIds.add("ws-session-id-2"); + sessionIds.add("ws-session-id-3"); when(service.getIdentifier()).thenReturn(serviceId); when(service.getSessionIds(endpointId)).thenReturn(sessionIds); + doAnswer(invocation -> { final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); sendMessage.send(webSocketSession); return null; }).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class)); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + throw new IOException("Sending message failed."); + }).when(service).sendMessage(anyString(), eq("ws-session-id-2"), any(SendMessage.class)); + doAnswer(invocation -> { + final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class); + sendMessage.send(webSocketSession); + throw new IOException("Sending message failed."); + }).when(service).sendMessage(anyString(), eq("ws-session-id-3"), any(SendMessage.class)); + runner.addControllerService(serviceId, service); runner.enableControllerService(service); - runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}"); - final Map attributes = new HashMap<>(); attributes.put(ATTR_WS_CS_ID, serviceId); attributes.put(ATTR_WS_ENDPOINT_ID, endpointId); attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name()); - attributes.put(ATTR_WS_BROADCAST, ""); runner.enqueue(textMessageFromServer, attributes); runner.run(); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(0, succeededFlowFiles.size()); + assertEquals(1, succeededFlowFiles.size()); final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(0, failedFlowFiles.size()); - - final List unknownFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SESSION_UNKNOWN); - assertEquals(0, unknownFlowFiles.size()); + assertEquals(2, failedFlowFiles.size()); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(0, provenanceEvents.size()); + assertEquals(2, provenanceEvents.size()); // logging SEND and FORK + int sendCnt = 0, forkCnt = 0; + for (ProvenanceEventRecord provEvent : provenanceEvents ) { + if (provEvent.getEventType() == ProvenanceEventType.SEND) + sendCnt ++; + else if (provEvent.getEventType() == ProvenanceEventType.FORK) + forkCnt ++; + } + assertEquals(1, sendCnt); // verify EventType SEND (due to success) + assertEquals(1, forkCnt); // verify EventType FORK (due to failure) } } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java index 90586d0df81f..d7083921ad7c 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/AbstractWebSocketService.java @@ -20,7 +20,7 @@ import org.apache.nifi.processor.Processor; import java.io.IOException; -import java.util.HashSet; +import java.util.Set; public abstract class AbstractWebSocketService extends AbstractControllerService implements WebSocketService { @@ -42,12 +42,12 @@ public void deregisterProcessor(final String endpointId, final Processor process } @Override - public void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException { + public void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException, SessionNotFoundException { routers.sendMessage(endpointId, sessionId, sendMessage); } @Override - public HashSet getSessionIds(final String endpointId) throws WebSocketConfigurationException { + public Set getSessionIds(final String endpointId) throws WebSocketConfigurationException { return routers.getSessionIds(endpointId); } } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SessionNotFoundException.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SessionNotFoundException.java new file mode 100644 index 000000000000..973f79f29b90 --- /dev/null +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/SessionNotFoundException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +public class SessionNotFoundException extends RuntimeException { + public SessionNotFoundException(String message) { + super(message); + } +} diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java index 038a435e538c..495fa9d3b695 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.HashSet; @@ -63,6 +64,8 @@ public synchronized void deregisterProcessor(final Processor processor) { sessions.keySet().forEach(sessionId -> { try { disconnect(sessionId, "Processing has stopped."); + } catch (SessionNotFoundException e) { + logger.debug("Session was not found. " + e.getMessage()); } catch (IOException e) { logger.warn("Failed to disconnect session {} due to {}", sessionId, e, e); } @@ -82,39 +85,39 @@ public void onWebSocketClose(final String sessionId, final int statusCode, final sessions.remove(sessionId); } - public void onWebSocketText(final String sessionId, final String message) { + public void onWebSocketText(final String sessionId, final String message) throws SessionNotFoundException { if (processor != null && processor instanceof TextMessageConsumer) { ((TextMessageConsumer)processor).consume(getSessionOrFail(sessionId), message); } } - public void onWebSocketBinary(final String sessionId, final byte[] payload, final int offset, final int length) { + public void onWebSocketBinary(final String sessionId, final byte[] payload, final int offset, final int length) throws SessionNotFoundException { if (processor != null && processor instanceof BinaryMessageConsumer) { ((BinaryMessageConsumer)processor).consume(getSessionOrFail(sessionId), payload, offset, length); } } - private WebSocketSession getSessionOrFail(final String sessionId) { + private WebSocketSession getSessionOrFail(final String sessionId) throws SessionNotFoundException { if (StringUtils.isEmpty(sessionId)) { throw new IllegalStateException("SessionId is not set"); } final WebSocketSession session = sessions.get(sessionId); if (session == null) { - throw new IllegalStateException("Session was not found for the sessionId: " + sessionId); + throw new SessionNotFoundException("Session was not found for the sessionId: " + sessionId); } return session; } - public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException { + public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException, SessionNotFoundException { final WebSocketSession session = getSessionOrFail(sessionId); sendMessage.send(session); } - public HashSet getSessionIds() { - return new HashSet(sessions.keySet()); + public Set getSessionIds() { + return new HashSet<>(sessions.keySet()); } - public void disconnect(final String sessionId, final String reason) throws IOException { + public void disconnect(final String sessionId, final String reason) throws IOException, SessionNotFoundException { final WebSocketSession session = getSessionOrFail(sessionId); session.close(reason); sessions.remove(sessionId); diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java index 56ee82eb38d1..8a48e35723ba 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouters.java @@ -19,8 +19,8 @@ import org.apache.nifi.processor.Processor; import java.io.IOException; -import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class WebSocketMessageRouters { @@ -64,12 +64,12 @@ public synchronized void deregisterProcessor(final String endpointId, final Proc router.deregisterProcessor(processor); } - public void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException { + public void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException, SessionNotFoundException { final WebSocketMessageRouter router = getRouterOrFail(endpointId); router.sendMessage(sessionId, sendMessage); } - public HashSet getSessionIds(final String endpointId) throws WebSocketConfigurationException { + public Set getSessionIds(final String endpointId) throws WebSocketConfigurationException { final WebSocketMessageRouter router = getRouterOrFail(endpointId); return router.getSessionIds(); } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java index d7a4e1f08122..0676c80e4ceb 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketService.java @@ -22,7 +22,7 @@ import org.apache.nifi.ssl.RestrictedSSLContextService; import java.io.IOException; -import java.util.HashSet; +import java.util.Set; /** * Control an embedded WebSocket service instance. @@ -44,7 +44,7 @@ public interface WebSocketService extends ControllerService { void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException; - void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException; + void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException, SessionNotFoundException; - HashSet getSessionIds(final String endpointId) throws WebSocketConfigurationException; + Set getSessionIds(final String endpointId) throws WebSocketConfigurationException; } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java index 208fe5bbf8bf..494ca9435b48 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/test/java/org/apache/nifi/websocket/TestWebSocketMessageRouter.java @@ -80,7 +80,7 @@ public void testSendMessage() throws Exception { try { router.sendMessage("session-2", sender -> sender.sendString("message")); fail("Should fail because there's no session with id session-2."); - } catch (IllegalStateException e) { + } catch (SessionNotFoundException e) { } } From 421453d9496843121511fbbc60e96cb2f42a1e86 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 15 Mar 2019 12:38:32 +0900 Subject: [PATCH 3/3] Fixed test failure. --- .../apache/nifi/processors/websocket/TestPutWebSocket.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java index 2e857601c6b4..9250f8295b6b 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java @@ -556,7 +556,7 @@ public void testBroadcast_FailureAll_Fork() throws Exception { failed.assertAttributeEquals(ATTR_WS_BROADCAST_FAILED, "3"); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(1, provenanceEvents.size()); // logging the FORK + assertEquals(3, provenanceEvents.size()); // logging the FORK assertEquals(provenanceEvents.get(0).getEventType(), ProvenanceEventType.FORK); // verify EventType } @@ -718,7 +718,7 @@ public void testBroadcast_1Success_2Failure_Fork() throws Exception { assertEquals(2, failedFlowFiles.size()); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(2, provenanceEvents.size()); // logging SEND and FORK + assertEquals(3, provenanceEvents.size()); // logging SEND and FORK int sendCnt = 0, forkCnt = 0; for (ProvenanceEventRecord provEvent : provenanceEvents ) { if (provEvent.getEventType() == ProvenanceEventType.SEND) @@ -727,7 +727,7 @@ else if (provEvent.getEventType() == ProvenanceEventType.FORK) forkCnt ++; } assertEquals(1, sendCnt); // verify EventType SEND (due to success) - assertEquals(1, forkCnt); // verify EventType FORK (due to failure) + assertEquals(2, forkCnt); // verify EventType FORK (due to failure) } }