Skip to content

Commit

Permalink
New RequestScopeHelper class to track request scopes across threads (#…
Browse files Browse the repository at this point in the history
…2856)

* Run concurrent requests for each test.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* New test.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* New helper class to manage request scope logic across threads. Update of thread local variable in Jersey to make sure correct InjectionManager is used in non-request threads.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Updated test to show the problem described in #2632 using multiple applications.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Fixed copyright.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
  • Loading branch information
spericas authored and paulparkinson committed Mar 29, 2021
1 parent 9ca87ed commit bf9f256
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 106 deletions.
4 changes: 4 additions & 0 deletions microprofile/fault-tolerance/pom.xml
Expand Up @@ -89,6 +89,10 @@
<groupId>io.helidon.microprofile.server</groupId>
<artifactId>helidon-microprofile-server</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.cdi</groupId>
<artifactId>jersey-weld2-se</artifactId>
</dependency>
<!-- To keep HK2 from complaining when run with Java9+ -->
<dependency>
<groupId>jakarta.activation</groupId>
Expand Down
Expand Up @@ -18,7 +18,6 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -28,10 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.Logger;

import javax.enterprise.context.control.RequestContextController;
import javax.enterprise.inject.spi.CDI;
import javax.interceptor.InvocationContext;

import io.helidon.common.context.Context;
Expand All @@ -51,8 +47,6 @@
import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
import org.eclipse.microprofile.metrics.Counter;
import org.glassfish.jersey.process.internal.RequestContext;
import org.glassfish.jersey.process.internal.RequestScope;

import static io.helidon.microprofile.faulttolerance.FaultToleranceExtension.isFaultToleranceMetricsEnabled;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.BREAKER_CALLS_FAILED_TOTAL;
Expand Down Expand Up @@ -91,7 +85,6 @@
* all invocations of a method, including for circuit breakers and bulkheads.
*/
class MethodInvoker implements FtSupplier<Object> {
private static final Logger LOGGER = Logger.getLogger(MethodInvoker.class.getName());

/**
* The method being intercepted.
Expand Down Expand Up @@ -130,21 +123,6 @@ class MethodInvoker implements FtSupplier<Object> {
*/
private final Context helidonContext;

/**
* Jersey's request scope object. Will be non-null if request scope is active.
*/
private RequestScope requestScope;

/**
* Jersey's request scope object.
*/
private RequestContext requestContext;

/**
* CDI's request scope controller used for activation/deactivation.
*/
private RequestContextController requestController;

/**
* Record thread interruption request for later use.
*/
Expand All @@ -156,6 +134,11 @@ class MethodInvoker implements FtSupplier<Object> {
*/
private Thread asyncInterruptThread;

/**
* Helper to properly propagate active request scope to other threads.
*/
private final RequestScopeHelper requestScopeHelper;

/**
* State associated with a method in {@code METHOD_STATES}. This include the
* FT handler created for the method.
Expand Down Expand Up @@ -315,15 +298,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
handler = createMethodHandler(methodState);

// Gather information about current request scope if active
try {
requestController = CDI.current().select(RequestContextController.class).get();
requestScope = CDI.current().select(RequestScope.class).get();
requestContext = requestScope.referenceCurrent();
} catch (Exception e) {
requestScope = null;
LOGGER.fine(() -> "Request context not active for method " + method
+ " on thread " + Thread.currentThread().getName());
}
requestScopeHelper = new RequestScopeHelper();
requestScopeHelper.saveScope();

// Gauges and other metrics for bulkhead and circuit breakers
if (isFaultToleranceMetricsEnabled()) {
Expand Down Expand Up @@ -409,10 +385,8 @@ public Object get() throws Throwable {

// Update resultFuture based on outcome of asyncFuture
asyncFuture.whenComplete((result, throwable) -> {
// Release request context if referenced
if (requestContext != null) {
requestContext.release();
}
// Release request context
requestScopeHelper.clearScope();

if (throwable != null) {
if (throwable instanceof CancellationException) {
Expand Down Expand Up @@ -455,10 +429,8 @@ public Object get() throws Throwable {
} catch (Throwable t) {
cause = map(t);
} finally {
// Release request context if referenced
if (requestContext != null) {
requestContext.release();
}
// Release request context
requestScopeHelper.clearScope();
}
updateMetricsAfter(cause);
if (cause != null) {
Expand All @@ -468,41 +440,6 @@ public Object get() throws Throwable {
}
}

/**
* Wraps a supplier with additional code to preserve request context (if active)
* when running in a different thread. This is required for {@code @Inject} and
* {@code @Context} to work properly. Note that it is possible for only CDI's
* request scope to be active at this time (e.g. in TCKs).
*/
private FtSupplier<Object> requestContextSupplier(FtSupplier<Object> supplier) {
FtSupplier<Object> wrappedSupplier;
if (requestScope != null) { // Jersey and CDI
wrappedSupplier = () -> requestScope.runInScope(requestContext,
(Callable<?>) (() -> {
try {
requestController.activate();
return supplier.get();
} catch (Throwable t) {
throw t instanceof Exception ? ((Exception) t) : new RuntimeException(t);
} finally {
requestController.deactivate();
}
}));
} else if (requestController != null) { // CDI only
wrappedSupplier = () -> {
try {
requestController.activate();
return supplier.get();
} finally {
requestController.deactivate();
}
};
} else {
wrappedSupplier = supplier;
}
return wrappedSupplier;
}

/**
* Initializes method state by creating handlers for all FT annotations
* except fallbacks. A fallback can reference the current invocation context
Expand Down Expand Up @@ -612,7 +549,7 @@ Supplier<? extends CompletionStage<Object>> toCompletionStageSupplier(FtSupplier
invocationStartNanos = System.nanoTime();

// Wrap supplier with request context setup
FtSupplier wrappedSupplier = requestContextSupplier(supplier);
FtSupplier<Object> wrappedSupplier = requestScopeHelper.wrapInScope(supplier);

CompletableFuture<Object> resultFuture = new CompletableFuture<>();
if (introspector.isAsynchronous()) {
Expand Down
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.microprofile.faulttolerance;

import java.util.concurrent.Callable;

import javax.enterprise.context.control.RequestContextController;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.spi.CDI;

import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.process.internal.RequestContext;
import org.glassfish.jersey.process.internal.RequestScope;
import org.glassfish.jersey.weld.se.WeldRequestScope;

class RequestScopeHelper {

enum State {
CLEARED,
STORED
}

private State state = State.CLEARED;

/**
* CDI's request scope controller used for activation/deactivation.
*/
private RequestContextController requestController;

/**
* Jersey's request scope object. Will be non-null if request scope is active.
*/
private RequestScope requestScope;

/**
* Jersey's request scope object.
*/
private RequestContext requestContext;

/**
* Jersey's injection manager.
*/
private InjectionManager injectionManager;

/**
* Store request context information from the current thread. State
* related to Jersey and CDI to handle {@code @Context} and {@code @Inject}
* injections.
*/
void saveScope() {
if (state == State.STORED) {
throw new IllegalStateException("Request scope state already stored");
}
// CDI scope
Instance<RequestContextController> rcc = CDI.current().select(RequestContextController.class);
if (rcc.isResolvable()) {
requestController = rcc.get();
}
// Jersey scope
injectionManager = WeldRequestScope.actualInjectorManager.get(); // thread local
try {
requestScope = CDI.current().select(RequestScope.class).get();
requestContext = requestScope.referenceCurrent();
} catch (Exception e) {
// Ignored, Jersey request scope not active
} finally {
state = State.STORED;
}
}

/**
* Wraps a supplier into another supplier that actives the request scope
* before calling it.
*
* @param supplier supplier to wrap
* @return wrapped supplier
*/
FtSupplier<Object> wrapInScope(FtSupplier<Object> supplier) {
if (state != State.STORED) {
throw new IllegalStateException("Request scope state never stored");
}
if (requestScope != null) { // Jersey and CDI
return () -> requestScope.runInScope(requestContext,
(Callable<?>) (() -> {
InjectionManager old = WeldRequestScope.actualInjectorManager.get();
try {
requestController.activate();
WeldRequestScope.actualInjectorManager.set(injectionManager);
return supplier.get();
} catch (Throwable t) {
throw t instanceof Exception ? ((Exception) t) : new RuntimeException(t);
} finally {
requestController.deactivate();
WeldRequestScope.actualInjectorManager.set(old);
}
}));
} else if (requestController != null) { // CDI only
return () -> {
try {
requestController.activate();
return supplier.get();
} finally {
requestController.deactivate();
}
};
} else {
return supplier;
}
}

/**
* Clears internal state saved by calling {@link #saveScope()}.
*/
void clearScope() {
if (requestContext != null) {
requestContext.release();
requestContext = null;
}
requestScope = null;
requestController = null;
injectionManager = null;
state = State.CLEARED;
}
}
4 changes: 3 additions & 1 deletion microprofile/fault-tolerance/src/main/java/module-info.java
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,8 @@
requires microprofile.metrics.api;
requires microprofile.fault.tolerance.api;

requires jersey.weld2.se;

exports io.helidon.microprofile.faulttolerance;

// needed when running with modules - to make private methods accessible
Expand Down
4 changes: 4 additions & 0 deletions tests/functional/request-scope/pom.xml
Expand Up @@ -65,5 +65,9 @@
<artifactId>helidon-microprofile-tests-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.opentracing</groupId>
<artifactId>microprofile-opentracing-api</artifactId>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.tests.functional.requestscope;

import java.util.Set;

import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.core.Application;

/**
* This functional test requires having two application subclasses.
* See: https://github.com/oracle/helidon/issues/2632#issuecomment-796831904
*/
@ApplicationScoped
class Application1 extends Application {

@Override
public Set<Class<?>> getClasses() {
return Set.of(Service1.class, Service2.class, Service3.class);
}
}

0 comments on commit bf9f256

Please sign in to comment.