From 565e5a4daffa1309efe023055822791ab44e980c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 5 Oct 2016 16:19:55 -0400 Subject: [PATCH 1/2] NIFI-2836: Ensure that we wait until a request is completed before unlocking the lock for request replication --- .../ThreadPoolRequestReplicator.java | 305 ++++++++++-------- .../TestThreadPoolRequestReplicator.java | 198 ++++++++++-- 2 files changed, 343 insertions(+), 160 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index 258588dd27c7..df49e984fa85 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -249,12 +249,27 @@ public AsyncClusterResponse replicate(Set nodeIds, String method lock.lock(); try { logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri); - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true); + + // Unlocking of the lock is performed within the replicate method, as we need to ensure that it is unlocked only after + // the entire request has completed. + final Object monitor = new Object(); + synchronized (monitor) { + final AsyncClusterResponse response = replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true, monitor); + + try { + monitor.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return response; + } } finally { lock.unlock(); + logger.debug("Unlocked {} after replication completed for {} {}", lock, method, uri); } } else { - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true, null); } } @@ -269,7 +284,7 @@ public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier coordinato updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain); } - return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false); + return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false, null); } /** @@ -283,107 +298,133 @@ public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier coordinato * @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable. * @param response the response to update with the results * @param executionPhase true if this is the execution phase, false otherwise + * @param monitor a monitor that will be notified when the request completes (successfully or otherwise) * @return an AsyncClusterResponse that can be used to obtain the result */ - private AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean performVerification, - StandardAsyncClusterResponse response, boolean executionPhase, boolean merge) { - - // state validation - Objects.requireNonNull(nodeIds); - Objects.requireNonNull(method); - Objects.requireNonNull(uri); - Objects.requireNonNull(entity); - Objects.requireNonNull(headers); + AsyncClusterResponse replicate(final Set nodeIds, final String method, final URI uri, final Object entity, final Map headers, + final boolean performVerification, StandardAsyncClusterResponse response, final boolean executionPhase, final boolean merge, final Object monitor) { + try { + // state validation + Objects.requireNonNull(nodeIds); + Objects.requireNonNull(method); + Objects.requireNonNull(uri); + Objects.requireNonNull(entity); + Objects.requireNonNull(headers); + + if (nodeIds.isEmpty()) { + throw new IllegalArgumentException("Cannot replicate request to 0 nodes"); + } - if (nodeIds.isEmpty()) { - throw new IllegalArgumentException("Cannot replicate request to 0 nodes"); - } + // verify all of the nodes exist and are in the proper state + for (final NodeIdentifier nodeId : nodeIds) { + final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); + if (status == null) { + throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster"); + } - // verify all of the nodes exist and are in the proper state - for (final NodeIdentifier nodeId : nodeIds) { - final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); - if (status == null) { - throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster"); + if (status.getState() != NodeConnectionState.CONNECTED) { + throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected"); + } } - if (status.getState() != NodeConnectionState.CONNECTED) { - throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected"); - } - } + logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response); - logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response); + // Update headers to indicate the current revision so that we can + // prevent multiple users changing the flow at the same time + final Map updatedHeaders = new HashMap<>(headers); + final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); - // Update headers to indicate the current revision so that we can - // prevent multiple users changing the flow at the same time - final Map updatedHeaders = new HashMap<>(headers); - final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); + if (performVerification) { + verifyClusterState(method, uri.getPath()); + } - if (performVerification) { - verifyClusterState(method, uri.getPath()); - } + int numRequests = responseMap.size(); + if (numRequests >= MAX_CONCURRENT_REQUESTS) { + numRequests = purgeExpiredRequests(); + } - int numRequests = responseMap.size(); - if (numRequests >= MAX_CONCURRENT_REQUESTS) { - numRequests = purgeExpiredRequests(); - } + if (numRequests >= MAX_CONCURRENT_REQUESTS) { + final Map countsByUri = responseMap.values().stream().collect( + Collectors.groupingBy( + StandardAsyncClusterResponse::getURIPath, + Collectors.counting())); - if (numRequests >= MAX_CONCURRENT_REQUESTS) { - final Map countsByUri = responseMap.values().stream().collect( - Collectors.groupingBy( - StandardAsyncClusterResponse::getURIPath, - Collectors.counting())); + logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri); + throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); + } - logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri); - throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); - } + // create the request objects and replicate to all nodes. + // When the request has completed, we need to ensure that we notify the monitor, if there is one. + final CompletionCallback completionCallback = clusterResponse -> { + try { + onCompletedResponse(requestId); + } finally { + if (monitor != null) { + synchronized (monitor) { + monitor.notify(); + } - // create the request objects and replicate to all nodes - final CompletionCallback completionCallback = clusterResponse -> onCompletedResponse(requestId); - final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); + logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri); + } + } + }; - // create a response object if one was not already passed to us - if (response == null) { - response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, + final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); + + // create a response object if one was not already passed to us + if (response == null) { + response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, responseMapper, completionCallback, responseConsumedCallback, merge); - responseMap.put(requestId, response); - } + responseMap.put(requestId, response); + } - logger.debug("For Request ID {}, response object is {}", requestId, response); - - // if mutable request, we have to do a two-phase commit where we ask each node to verify - // that the request can take place and then, if all nodes agree that it can, we can actually - // issue the request. This is all handled by calling performVerification, which will replicate - // the 'vote' request to all nodes and then if successful will call back into this method to - // replicate the actual request. - final boolean mutableRequest = isMutableRequest(method, uri.getPath()); - if (mutableRequest && performVerification) { - logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId); - performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge); - return response; - } + logger.debug("For Request ID {}, response object is {}", requestId, response); + + // if mutable request, we have to do a two-phase commit where we ask each node to verify + // that the request can take place and then, if all nodes agree that it can, we can actually + // issue the request. This is all handled by calling performVerification, which will replicate + // the 'vote' request to all nodes and then if successful will call back into this method to + // replicate the actual request. + final boolean mutableRequest = isMutableRequest(method, uri.getPath()); + if (mutableRequest && performVerification) { + logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId); + performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor); + return response; + } - // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work - final StandardAsyncClusterResponse finalResponse = response; - NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> { - logger.debug("Received response from {} for {} {}", nodeResponse.getNodeId(), method, uri.getPath()); - finalResponse.add(nodeResponse); - }; + // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work + final StandardAsyncClusterResponse finalResponse = response; + NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> { + logger.debug("Received response from {} for {} {}", nodeResponse.getNodeId(), method, uri.getPath()); + finalResponse.add(nodeResponse); + }; - // instruct the node to actually perform the underlying action - if (mutableRequest && executionPhase) { - updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true"); - } + // instruct the node to actually perform the underlying action + if (mutableRequest && executionPhase) { + updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true"); + } - // replicate the request to all nodes - final Function requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback); - replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); + // replicate the request to all nodes + final Function requestFactory = + nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback); + replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); - return response; + return response; + } catch (final Throwable t) { + if (monitor != null) { + synchronized (monitor) { + monitor.notify(); + } + logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", monitor, method, uri, t); + } + + throw t; + } } - private void performVerification(Set nodeIds, String method, URI uri, Object entity, Map headers, StandardAsyncClusterResponse clusterResponse, boolean merge) { + private void performVerification(final Set nodeIds, final String method, final URI uri, final Object entity, final Map headers, + final StandardAsyncClusterResponse clusterResponse, final boolean merge, final Object monitor) { logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath()); final Map validationHeaders = new HashMap<>(headers); @@ -418,64 +459,72 @@ public void onCompletion(final NodeResponse nodeResponse) { // to all nodes and we are finished. if (dissentingCount == 0) { logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath()); - replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge); + replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge, monitor); return; } - final Map cancelLockHeaders = new HashMap<>(headers); - cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true"); - final Thread cancelLockThread = new Thread(new Runnable() { - @Override - public void run() { - logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath()); + try { + final Map cancelLockHeaders = new HashMap<>(headers); + cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true"); + final Thread cancelLockThread = new Thread(new Runnable() { + @Override + public void run() { + logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath()); - final Function requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null); + final Function requestFactory = + nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null); - replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders); - } - }); - cancelLockThread.setName("Cancel Flow Locks"); - cancelLockThread.start(); - - // Add a NodeResponse for each node to the Cluster Response - // Check that all nodes responded successfully. - for (final NodeResponse response : nodeResponses) { - if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) { - final ClientResponse clientResponse = response.getClientResponse(); - - final String message; - if (clientResponse == null) { - message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus(); - - logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur", - response.getStatus(), response.getNodeId(), method, uri.getPath()); - } else { - final String nodeExplanation = clientResponse.getEntity(String.class); - message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation; - - logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}", - response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation); + replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders); } - - // if a node reports forbidden, use that as the response failure - final RuntimeException failure; - if (response.getStatus() == Status.FORBIDDEN.getStatusCode()) { - if (response.hasThrowable()) { - failure = new AccessDeniedException(message, response.getThrowable()); + }); + cancelLockThread.setName("Cancel Flow Locks"); + cancelLockThread.start(); + + // Add a NodeResponse for each node to the Cluster Response + // Check that all nodes responded successfully. + for (final NodeResponse response : nodeResponses) { + if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) { + final ClientResponse clientResponse = response.getClientResponse(); + + final String message; + if (clientResponse == null) { + message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus(); + + logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur", + response.getStatus(), response.getNodeId(), method, uri.getPath()); } else { - failure = new AccessDeniedException(message); + final String nodeExplanation = clientResponse.getEntity(String.class); + message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation; + + logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. " + + "The action will not occur. Node explanation: {}", response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation); } - } else { - if (response.hasThrowable()) { - failure = new IllegalClusterStateException(message, response.getThrowable()); + + // if a node reports forbidden, use that as the response failure + final RuntimeException failure; + if (response.getStatus() == Status.FORBIDDEN.getStatusCode()) { + if (response.hasThrowable()) { + failure = new AccessDeniedException(message, response.getThrowable()); + } else { + failure = new AccessDeniedException(message); + } } else { - failure = new IllegalClusterStateException(message); + if (response.hasThrowable()) { + failure = new IllegalClusterStateException(message, response.getThrowable()); + } else { + failure = new IllegalClusterStateException(message); + } } - } - clusterResponse.setFailure(failure); - break; + clusterResponse.setFailure(failure); + break; + } + } + } finally { + if (monitor != null) { + synchronized (monitor) { + monitor.notify(); + } } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index edbc05b2da1d..02578a5b277a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -16,13 +16,28 @@ */ package org.apache.nifi.cluster.coordination.http.replication; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.ClientResponse.Status; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.core.header.InBoundHeaders; -import com.sun.jersey.core.header.OutBoundHeaders; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.ws.rs.HttpMethod; + import org.apache.commons.collections4.map.MultiValueMap; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; @@ -43,25 +58,13 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import javax.ws.rs.HttpMethod; -import java.io.ByteArrayInputStream; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.header.InBoundHeaders; +import com.sun.jersey.core.header.OutBoundHeaders; public class TestThreadPoolRequestReplicator { @@ -115,10 +118,6 @@ public void testLongWaitForResponse() { // We should get back the same response object assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); - assertFalse(response.isComplete()); - - final NodeResponse nodeResponse = response.getNodeResponse(nodeId); - assertNull(nodeResponse); final NodeResponse completedNodeResponse = response.awaitMergedResponse(2, TimeUnit.SECONDS); assertNotNull(completedNodeResponse); @@ -321,14 +320,149 @@ protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilde } } + @Test(timeout = 5000) + public void testMonitorNotifiedOnException() { + withReplicator(replicator -> { + final Object monitor = new Object(); + + final CountDownLatch preNotifyLatch = new CountDownLatch(1); + final CountDownLatch postNotifyLatch = new CountDownLatch(1); + + new Thread(new Runnable() { + @Override + public void run() { + synchronized (monitor) { + while (true) { + // If monitor is not notified, this will block indefinitely, and the test will timeout + try { + preNotifyLatch.countDown(); + monitor.wait(); + break; + } catch (InterruptedException e) { + continue; + } + } + + postNotifyLatch.countDown(); + } + } + }).start(); + + // wait for the background thread to notify that it is synchronized on monitor. + preNotifyLatch.await(); + + try { + // Pass in Collections.emptySet() for the node ID's so that an Exception is thrown + replicator.replicate(Collections.emptySet(), "GET", new URI("localhost:8080/nifi"), Collections.emptyMap(), + Collections.emptyMap(), true, null, true, true, monitor); + Assert.fail("replicate did not throw IllegalArgumentException"); + } catch (final IllegalArgumentException iae) { + // expected + } + + // wait for monitor to be notified. + postNotifyLatch.await(); + }); + } + + @Test(timeout = 5000) + public void testMonitorNotifiedOnSuccessfulCompletion() { + withReplicator(replicator -> { + final Object monitor = new Object(); + + final CountDownLatch preNotifyLatch = new CountDownLatch(1); + final CountDownLatch postNotifyLatch = new CountDownLatch(1); + + new Thread(new Runnable() { + @Override + public void run() { + synchronized (monitor) { + while (true) { + // If monitor is not notified, this will block indefinitely, and the test will timeout + try { + preNotifyLatch.countDown(); + monitor.wait(); + break; + } catch (InterruptedException e) { + continue; + } + } + + postNotifyLatch.countDown(); + } + } + }).start(); + + // wait for the background thread to notify that it is synchronized on monitor. + preNotifyLatch.await(); + + final Set nodeIds = new HashSet<>(); + final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false); + nodeIds.add(nodeId); + final URI uri = new URI("http://localhost:8080/processors/1"); + final Entity entity = new ProcessorEntity(); + + replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, null, true, true, monitor); + + // wait for monitor to be notified. + postNotifyLatch.await(); + }); + } + + + @Test(timeout = 5000) + public void testMonitorNotifiedOnFailureResponse() { + withReplicator(replicator -> { + final Object monitor = new Object(); + + final CountDownLatch preNotifyLatch = new CountDownLatch(1); + final CountDownLatch postNotifyLatch = new CountDownLatch(1); + + new Thread(new Runnable() { + @Override + public void run() { + synchronized (monitor) { + while (true) { + // If monitor is not notified, this will block indefinitely, and the test will timeout + try { + preNotifyLatch.countDown(); + monitor.wait(); + break; + } catch (InterruptedException e) { + continue; + } + } + + postNotifyLatch.countDown(); + } + } + }).start(); + + // wait for the background thread to notify that it is synchronized on monitor. + preNotifyLatch.await(); + + final Set nodeIds = new HashSet<>(); + final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false); + nodeIds.add(nodeId); + final URI uri = new URI("http://localhost:8080/processors/1"); + final Entity entity = new ProcessorEntity(); + + replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, null, true, true, monitor); + + // wait for monitor to be notified. + postNotifyLatch.await(); + }, Status.INTERNAL_SERVER_ERROR, 0L, null); + } + + private void withReplicator(final WithReplicator function) { withReplicator(function, ClientResponse.Status.OK, 0L, null); } private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure) { final ClusterCoordinator coordinator = createClusterCoordinator(); - final ThreadPoolRequestReplicator replicator - = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { + final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null); + final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) { @Override protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, Map givenHeaders) { From 79b6ba66c180f7d74601c3675cb2e078a7ce041c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 6 Oct 2016 13:34:53 -0400 Subject: [PATCH 2/2] NIFI-2836: Ensure that failures do not trigger request completion logic unless the failure is the last node to report its status --- .../replication/StandardAsyncClusterResponse.java | 14 ++++++++++---- .../replication/ThreadPoolRequestReplicator.java | 5 +++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java index 926151e56edd..1d4ea69460f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java @@ -223,12 +223,18 @@ void add(final NodeResponse nodeResponse) { } } - synchronized void setFailure(final RuntimeException failure) { + synchronized void setFailure(final RuntimeException failure, final NodeIdentifier nodeId) { this.failure = failure; - notifyAll(); - if (completionCallback != null) { - completionCallback.onCompletion(this); + final int completedCount = requestsCompleted.incrementAndGet(); + logger.debug("Notified of failure for {} from {}", id, nodeId); + + if (completedCount == responseMap.size()) { + + notifyAll(); + if (completionCallback != null) { + completionCallback.onCompletion(this); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index df49e984fa85..ff7900096d91 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -516,8 +516,7 @@ public void run() { } } - clusterResponse.setFailure(failure); - break; + clusterResponse.setFailure(failure, response.getNodeId()); } } } finally { @@ -525,6 +524,8 @@ public void run() { synchronized (monitor) { monitor.notify(); } + + logger.debug("Notified monitor {} because request {} {} has failed due to at least 1 dissenting node", monitor, method, uri); } } }