Skip to content

Commit

Permalink
[EJBCLIENT-361] DiscoveryEJBClientInterceptor: static blacklisting de…
Browse files Browse the repository at this point in the history
…stinations after RequestSendFailedException
  • Loading branch information
tadamski committed Jan 21, 2020
1 parent 1d41329 commit 1539c2c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 24 deletions.
Expand Up @@ -30,6 +30,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -39,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -82,6 +84,16 @@ public final class DiscoveryEJBClientInterceptor implements EJBClientInterceptor
public static final int PRIORITY = ClientInterceptorPriority.JBOSS_AFTER + 100;

private static final AttachmentKey<Set<URI>> BL_KEY = new AttachmentKey<>();
private static final Map<URI, Long> blacklist = new ConcurrentHashMap<>();
private static final long BLACKLIST_TIMEOUT =
AccessController.doPrivileged((PrivilegedAction<Long>) () -> {
String val = System.getProperty("org.jboss.ejb.client.discovery.blacklist.timeout");
try {
return TimeUnit.MILLISECONDS.toNanos(Long.valueOf(val));
} catch (NumberFormatException e) {
return TimeUnit.MILLISECONDS.toNanos(5000L);
}
});

/**
* Construct a new instance.
Expand Down Expand Up @@ -109,7 +121,7 @@ public void handleInvocation(final EJBClientInvocationContext context) throws Ex
try {
context.sendRequest();
} catch (NoSuchEJBException | RequestSendFailedException e) {
processMissingTarget(context);
processMissingTarget(context, e);
throw e;
} finally {
if (problems != null) for (Throwable problem : problems) {
Expand All @@ -123,7 +135,7 @@ public Object handleInvocationResult(final EJBClientInvocationContext context) t
try {
result = context.getResult();
} catch (NoSuchEJBException | RequestSendFailedException e) {
processMissingTarget(context);
processMissingTarget(context, e);
throw e;
}
final EJBLocator<?> locator = context.getLocator();
Expand Down Expand Up @@ -171,7 +183,7 @@ public SessionID handleSessionCreation(final EJBSessionCreationInvocationContext
try {
sessionID = context.proceed();
} catch (NoSuchEJBException | RequestSendFailedException e) {
processMissingTarget(context);
processMissingTarget(context, e);
throw withSuppressed(e, problems);
} catch (Exception t) {
throw withSuppressed(t, problems);
Expand Down Expand Up @@ -235,7 +247,7 @@ static void setupSessionAffinities(EJBSessionCreationInvocationContext context)

}

private void processMissingTarget(final AbstractInvocationContext context) {
private void processMissingTarget(final AbstractInvocationContext context, final Exception cause) {
final URI destination = context.getDestination();

if (destination == null || context.getTargetAffinity() == Affinity.LOCAL) {
Expand All @@ -244,7 +256,12 @@ private void processMissingTarget(final AbstractInvocationContext context) {
}

// Oops, we got some wrong information!
addBlackListedDestination(context, destination);
if(cause instanceof NoSuchEJBException){
addInvocationBlackListedDestination(context, destination);
} else {
addBlackListedDestination(destination);
}


// clear the weak affinity so that cluster invocations can be re-targeted.
context.setWeakAffinity(Affinity.NONE);
Expand All @@ -253,7 +270,7 @@ private void processMissingTarget(final AbstractInvocationContext context) {
context.requestRetry();
}

static boolean addBlackListedDestination(AbstractInvocationContext context, URI destination) {
static void addInvocationBlackListedDestination(AbstractInvocationContext context, URI destination) {
Assert.checkNotNullParam("context", context);
if (destination != null) {
Set<URI> set = context.getAttachment(BL_KEY);
Expand All @@ -263,21 +280,50 @@ static boolean addBlackListedDestination(AbstractInvocationContext context, URI
set = appearing;
}
}
set.add(destination);
if (Logs.INVOCATION.isDebugEnabled()) {
Logs.INVOCATION.debugf("DiscoveryEJBClientInterceptor: blacklisting destination for this invocation (locator = %s, weak affinity = %s, missing target = %s)", context.getLocator(), context.getWeakAffinity(), destination);
}
}
}

static void addBlackListedDestination(URI destination) {
if (destination != null) {
if (Logs.INVOCATION.isDebugEnabled()) {
Logs.INVOCATION.debugf("DiscoveryEJBClientInterceptor: blacklisting destination (locator = %s, weak affinity = %s, missing target = %s)", context.getLocator(), context.getWeakAffinity(), destination);
Logs.INVOCATION.debugf("DiscoveryEJBClientInterceptor: blacklisting destination %s", destination);
}
blacklist.put(destination, System.nanoTime());
}
}

return set.add(destination);
static boolean isBlackListed(AbstractInvocationContext context, URI destination) {
final Set<URI> invocationBlacklist = context.getAttachment(BL_KEY);
if (invocationBlacklist != null && invocationBlacklist.contains(destination)) {
return true;
}
if (!blacklist.containsKey(destination)) {
return false;
}
final long blacklistedTimestamp = blacklist.get(destination);
final long delta = System.nanoTime() - blacklistedTimestamp;
if (delta < BLACKLIST_TIMEOUT) {
return true;
} else {
blacklist.remove(destination);
return false;
}
}

static boolean isBlackListed(AbstractInvocationContext context, URI destination) {
final Set<URI> blacklist = context.getAttachment(BL_KEY);
return blacklist != null && blacklist.contains(destination);
private static Set<URI> getBlacklist(){
blacklist.entrySet().removeIf(e ->
{
final long delta = System.nanoTime() - e.getValue();
return delta < BLACKLIST_TIMEOUT;
});
return blacklist.keySet();
}


ServicesQueue discover(final FilterSpec filterSpec) {
return getDiscovery().discover(EJB_SERVICE_TYPE, filterSpec);
}
Expand Down Expand Up @@ -369,12 +415,12 @@ private List<Throwable> doFirstMatchDiscovery(AbstractInvocationContext context,
Logs.INVOCATION.debugf("DiscoveryEJBClientInterceptor: performing first-match discovery(locator = %s, weak affinity = %s, filter spec = %s)", context.getLocator(), context.getWeakAffinity(), filterSpec);
}
final List<Throwable> problems;
final Set<URI> set = context.getAttachment(BL_KEY);
final Set<URI> blacklist = getBlacklist();
try (final ServicesQueue queue = discover(filterSpec)) {
ServiceURL serviceURL;
while ((serviceURL = queue.takeService(DISCOVERY_TIMEOUT, TimeUnit.SECONDS)) != null) {
final URI location = serviceURL.getLocationURI();
if (set == null || ! set.contains(location)) {
if (!blacklist.contains(location)) {
// Got a match! See if there's a node affinity to set for the invocation.
final AttributeValue nodeValue = serviceURL.getFirstAttributeValue(FILTER_ATTR_NODE);
if (nodeValue != null) {
Expand Down Expand Up @@ -428,7 +474,7 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
Logs.INVOCATION.tracef("DiscoveryEJBClientInterceptor: performing any discovery(locator = %s, weak affinity = %s, filter spec = %s)", context.getLocator(), context.getWeakAffinity(), filterSpec);
final List<Throwable> problems;
// blacklist
final Set<URI> blacklist = context.getAttachment(BL_KEY);
final Set<URI> blacklist = getBlacklist();
final Map<URI, String> nodes = new HashMap<>();
final Map<String, URI> uris = new HashMap<>();
final Map<URI, List<String>> clusterAssociations = new HashMap<>();
Expand All @@ -438,7 +484,7 @@ private List<Throwable> doAnyDiscovery(AbstractInvocationContext context, final
ServiceURL serviceURL;
while ((serviceURL = queue.takeService(DISCOVERY_TIMEOUT, TimeUnit.SECONDS)) != null) {
final URI location = serviceURL.getLocationURI();
if (blacklist == null || ! blacklist.contains(location)) {
if (!blacklist.contains(location)) {
// Got a match! See if there's a node affinity to set for the invocation.
final AttributeValue nodeValue = serviceURL.getFirstAttributeValue(FILTER_ATTR_NODE);
if (nodeValue != null) {
Expand Down Expand Up @@ -556,12 +602,12 @@ private List<Throwable> doClusterDiscovery(AbstractInvocationContext context, fi
Map<String, URI> nodes = new HashMap<>();
final EJBClientContext clientContext = context.getClientContext();
final List<Throwable> problems;
final Set<URI> set = context.getAttachment(BL_KEY);
final Set<URI> blacklist = getBlacklist();
try (final ServicesQueue queue = discover(filterSpec)) {
ServiceURL serviceURL;
while ((serviceURL = queue.takeService()) != null) {
final URI location = serviceURL.getLocationURI();
if (set == null || ! set.contains(location)) {
if (!blacklist.contains(location)) {
final EJBReceiver transportProvider = clientContext.getTransportProvider(location.getScheme());
if (transportProvider != null && satisfiesSourceAddress(serviceURL, transportProvider)) {
final AttributeValue nodeNameValue = serviceURL.getFirstAttributeValue(FILTER_ATTR_NODE);
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/org/jboss/ejb/client/NamingEJBClientInterceptor.java
Expand Up @@ -19,6 +19,7 @@
package org.jboss.ejb.client;

import static org.jboss.ejb.client.DiscoveryEJBClientInterceptor.addBlackListedDestination;
import static org.jboss.ejb.client.DiscoveryEJBClientInterceptor.addInvocationBlackListedDestination;
import static org.jboss.ejb.client.DiscoveryEJBClientInterceptor.isBlackListed;

import java.net.URI;
Expand Down Expand Up @@ -71,8 +72,8 @@ public void handleInvocation(final EJBClientInvocationContext context) throws Ex
}
if (setDestination(context, namingProvider)) try {
context.sendRequest();
} catch (NoSuchEJBException | RequestSendFailedException e) {
processMissingTarget(context);
} catch (NoSuchEJBException | RequestSendFailedException e){
processMissingTarget(context, e);
throw e;
} else {
throw Logs.INVOCATION.noMoreDestinations();
Expand All @@ -83,9 +84,9 @@ public void handleInvocation(final EJBClientInvocationContext context) throws Ex
public Object handleInvocationResult(final EJBClientInvocationContext context) throws Exception {
try {
return context.getResult();
} catch (NoSuchEJBException | RequestSendFailedException e) {
} catch (NoSuchEJBException | RequestSendFailedException e){
if (context.getAttachment(SKIP_MISSING_TARGET) != Boolean.TRUE) {
processMissingTarget(context);
processMissingTarget(context, e);
}
throw e;
} finally {
Expand Down Expand Up @@ -117,7 +118,7 @@ public SessionID handleSessionCreation(final EJBSessionCreationInvocationContext
}
return theSessionID;
} catch (NoSuchEJBException | RequestSendFailedException e) {
processMissingTarget(context);
processMissingTarget(context, e);
throw e;
} else {
throw Logs.INVOCATION.noMoreDestinations();
Expand Down Expand Up @@ -206,7 +207,7 @@ private static List<URI> findPreferredURIs(AbstractInvocationContext context, Li
return result;
}

private void processMissingTarget(final AbstractInvocationContext context) {
private void processMissingTarget(final AbstractInvocationContext context, Exception cause) {
final URI destination = context.getDestination();
if (destination == null || context.getTargetAffinity() == Affinity.LOCAL) {
// some later interceptor cleared it out on us
Expand All @@ -218,7 +219,11 @@ private void processMissingTarget(final AbstractInvocationContext context) {
}

// Oops, we got some wrong information!
addBlackListedDestination(context, destination);
if (cause instanceof NoSuchEJBException) {
addInvocationBlackListedDestination(context, destination);
} else {
addBlackListedDestination(destination);
}

final EJBLocator<?> locator = context.getLocator();
if (! (locator.getAffinity() instanceof ClusterAffinity)) {
Expand Down

0 comments on commit 1539c2c

Please sign in to comment.