Skip to content

Commit

Permalink
Renamed to RunOnThread. Separated config for virtual and platform.
Browse files Browse the repository at this point in the history
  • Loading branch information
spericas committed May 7, 2024
1 parent a2187a4 commit f1f4cae
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@Target({ElementType.METHOD, ElementType.TYPE})
@Inherited
@InterceptorBinding
public @interface OnNewThread {
public @interface RunOnThead {

/**
* Type of thread to use for invocation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import jakarta.enterprise.inject.spi.ProcessSyntheticBean;

/**
* CDI extension to support the {@link OnNewThread} annotation.
* CDI extension to support the {@link RunOnThead} annotation.
*/
public class OnNewThreadExtension implements Extension {
public class RunOnThreadExtension implements Extension {

private final LazyValue<Map<Method, AnnotatedMethod<?>>> methodMap = LazyValue.create(ConcurrentHashMap::new);

Expand All @@ -53,7 +53,7 @@ void registerMethods(@Observes ProcessManagedBean<?> event) {

private void registerMethods(AnnotatedType<?> type) {
for (AnnotatedMethod<?> annotatedMethod : type.getMethods()) {
if (annotatedMethod.isAnnotationPresent(OnNewThread.class)) {
if (annotatedMethod.isAnnotationPresent(RunOnThead.class)) {
methodMap.get().put(annotatedMethod.getJavaMember(), annotatedMethod);
}
}
Expand All @@ -64,30 +64,30 @@ void validateAnnotations(BeanManager bm, @Observes @Initialized(ApplicationScope
}

private static void validateExecutor(BeanManager bm, AnnotatedMethod<?> method) {
OnNewThread onNewThread = method.getAnnotation(OnNewThread.class);
if (onNewThread.value() == OnNewThread.ThreadType.EXECUTOR) {
String executorName = onNewThread.executorName();
RunOnThead runOnThread = method.getAnnotation(RunOnThead.class);
if (runOnThread.value() == RunOnThead.ThreadType.EXECUTOR) {
String executorName = runOnThread.executorName();
Set<Bean<?>> beans = bm.getBeans(ExecutorService.class, NamedLiteral.of(executorName));
if (beans.isEmpty()) {
throw new IllegalArgumentException("Unable to resolve named executor service '"
+ onNewThread.value() + "' at "
+ runOnThread.value() + "' at "
+ method.getJavaMember().getDeclaringClass().getName() + "::"
+ method.getJavaMember().getName());
}
}
}

OnNewThread getAnnotation(Method method) {
RunOnThead getAnnotation(Method method) {
AnnotatedMethod<?> annotatedMethod = methodMap.get().get(method);
if (annotatedMethod != null) {
return annotatedMethod.getAnnotation(OnNewThread.class);
return annotatedMethod.getAnnotation(RunOnThead.class);
}
throw new IllegalArgumentException("Unable to map method " + method);
}

void registerInterceptors(@Observes BeforeBeanDiscovery discovery, BeanManager bm) {
discovery.addAnnotatedType(bm.createAnnotatedType(OnNewThreadInterceptor.class),
OnNewThreadInterceptor.class.getName());
discovery.addAnnotatedType(bm.createAnnotatedType(RunOnThreadInterceptor.class),
RunOnThreadInterceptor.class.getName());
}

void clearMethodMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@
* Intercepts calls to bean methods to be executed on a new thread.
*/
@Interceptor
@OnNewThread
@RunOnThead
@Priority(Interceptor.Priority.PLATFORM_AFTER + 10)
class OnNewThreadInterceptor {
class RunOnThreadInterceptor {

private static final String ON_NEW_THREAD = "on-new-thread";
private static final String EXECUTOR_SERVICE_CONFIG = "mp.on-new-thread.executor-service";
private static final String RUN_ON_THREAD = "run-on-thread";
private static final String RUN_ON_VIRTUAL_THREAD = "mp.run-on-thread.virtual";
private static final String RUN_ON_PLATFORM_THREAD = "mp.run-on-thread.platform";

private static final LazyValue<ExecutorService> PLATFORM_EXECUTOR_SERVICE
= LazyValue.create(() -> {
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
return ThreadPoolSupplier.builder()
.threadNamePrefix(ON_NEW_THREAD)
.config(config.get(EXECUTOR_SERVICE_CONFIG))
.threadNamePrefix(RUN_ON_THREAD)
.config(config.get(RUN_ON_PLATFORM_THREAD))
.virtualThreads(false) // overrides to platform threads
.build()
.get();
Expand All @@ -60,16 +61,20 @@ class OnNewThreadInterceptor {
= LazyValue.create(() -> {
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
String threadNamePrefix = config.get(RUN_ON_VIRTUAL_THREAD)
.get("thread-name-prefix")
.asString()
.asOptional()
.orElse(RUN_ON_THREAD);
return ThreadPoolSupplier.builder()
.threadNamePrefix(ON_NEW_THREAD)
.config(config.get(EXECUTOR_SERVICE_CONFIG))
.virtualThreads(true) // overrides to virtual threads
.threadNamePrefix(threadNamePrefix)
.virtualThreads(true)
.build()
.get();
});

@Inject
private OnNewThreadExtension extension;
private RunOnThreadExtension extension;

/**
* Intercepts a call to bean method annotated by {@code @OnNewThread}.
Expand All @@ -79,23 +84,23 @@ class OnNewThreadInterceptor {
* @throws Throwable If a problem occurs.
*/
@AroundInvoke
public Object runOnNewThread(InvocationContext context) throws Throwable {
OnNewThread onNewThread = extension.getAnnotation(context.getMethod());
return switch (onNewThread.value()) {
public Object runOnThread(InvocationContext context) throws Throwable {
RunOnThead runOnThread = extension.getAnnotation(context.getMethod());
return switch (runOnThread.value()) {
case PLATFORM -> PLATFORM_EXECUTOR_SERVICE.get()
.submit(context::proceed)
.get(onNewThread.timeout(), onNewThread.unit());
.get(runOnThread.timeout(), runOnThread.unit());
case VIRTUAL -> VIRTUAL_EXECUTOR_SERVICE.get()
.submit(context::proceed)
.get(onNewThread.timeout(), onNewThread.unit());
case EXECUTOR -> findExecutor(onNewThread.executorName())
.get(runOnThread.timeout(), runOnThread.unit());
case EXECUTOR -> findExecutor(runOnThread.executorName())
.submit(context::proceed)
.get(onNewThread.timeout(), onNewThread.unit());
.get(runOnThread.timeout(), runOnThread.unit());
};
}

/**
* Find executor by name. Validation in {@link OnNewThreadExtension#validateAnnotations(BeanManager, Object)}.
* Find executor by name. Validation in {@link RunOnThreadExtension#validateAnnotations(BeanManager, Object)}.
*
* @param executorName name of executor
* @return executor instance looked up via CDI
Expand Down
3 changes: 1 addition & 2 deletions microprofile/cdi/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.helidon.common.features.api.Feature;
import io.helidon.common.features.api.HelidonFlavor;
import io.helidon.microprofile.cdi.OnNewThreadExtension;

/**
* CDI implementation enhancements for Helidon MP.
Expand Down Expand Up @@ -72,7 +71,7 @@
with io.helidon.microprofile.cdi.ExecutorServices;

provides jakarta.enterprise.inject.spi.Extension
with OnNewThreadExtension;
with io.helidon.microprofile.cdi.RunOnThreadExtension;

opens io.helidon.microprofile.cdi to weld.core.impl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static io.helidon.microprofile.cdi.OnNewThread.ThreadType;
import static io.helidon.microprofile.cdi.RunOnThead.ThreadType;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

class OnNewThreadTest {
class RunOnThreadTest {

static SeContainer seContainer;
static OnNewThreadBean bean;
Expand All @@ -44,7 +44,7 @@ class OnNewThreadTest {
static void startCdi() {
seContainer = SeContainerInitializer.newInstance()
.disableDiscovery()
.addExtensions(OnNewThreadExtension.class)
.addExtensions(RunOnThreadExtension.class)
.addBeanClasses(OnNewThreadBean.class)
.initialize();
bean = CDI.current().select(OnNewThreadBean.class).get();
Expand All @@ -57,37 +57,37 @@ static void stopCdi() {

static class OnNewThreadBean {

@OnNewThread
@RunOnThead
Thread cpuIntensive() {
return Thread.currentThread();
}

@OnNewThread(ThreadType.PLATFORM)
@RunOnThead(ThreadType.PLATFORM)
Thread cpuIntensiveWithType() {
return Thread.currentThread();
}

@OnNewThread(timeout = 10000)
@RunOnThead(timeout = 10000)
Thread evenMoreCpuIntensive() {
return Thread.currentThread();
}

@OnNewThread(ThreadType.VIRTUAL)
@RunOnThead(ThreadType.VIRTUAL)
Thread onVirtualThread() {
return Thread.currentThread();
}

@OnNewThread(value = ThreadType.EXECUTOR, executorName = "my-executor")
@RunOnThead(value = ThreadType.EXECUTOR, executorName = "my-executor")
Thread onMyExecutor() {
return Thread.currentThread();
}

@OnNewThread
@RunOnThead
Optional<String> verifyContextVirtual() {
return Contexts.context().flatMap(context -> context.get("hello", String.class));
}

@OnNewThread(ThreadType.PLATFORM)
@RunOnThead(ThreadType.PLATFORM)
Optional<String> verifyContextPlatform() {
return Contexts.context().flatMap(context -> context.get("hello", String.class));
}
Expand All @@ -103,28 +103,28 @@ ExecutorService myExecutor() {
void cpuIntensiveTest() {
Thread thread = bean.cpuIntensive();
assertThat(thread.isVirtual(), is(false));
assertThat(thread.getName().startsWith("my-thread"), is(true));
assertThat(thread.getName().startsWith("my-platform-thread"), is(true));
}

@Test
void cpuIntensiveWithTypeTest() {
Thread thread = bean.cpuIntensiveWithType();
assertThat(thread.isVirtual(), is(false));
assertThat(thread.getName().startsWith("my-thread"), is(true));
assertThat(thread.getName().startsWith("my-platform-thread"), is(true));
}

@Test
void evenMoreCpuIntensiveTest() {
Thread thread = bean.evenMoreCpuIntensive();
assertThat(thread.isVirtual(), is(false));
assertThat(thread.getName().startsWith("my-thread"), is(true));
assertThat(thread.getName().startsWith("my-platform-thread"), is(true));
}

@Test
void onVirtualThread() {
Thread thread = bean.onVirtualThread();
assertThat(thread.isVirtual(), is(true));
assertThat(thread.getName().startsWith("my-thread"), is(true));
assertThat(thread.getName().startsWith("my-virtual-thread"), is(true));
}

@Test
Expand Down
8 changes: 5 additions & 3 deletions microprofile/cdi/src/test/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#

mp:
on-new-thread:
executor-service:
thread-name-prefix: "my-thread"
run-on-thread:
platform:
thread-name-prefix: "my-platform-thread"
core-pool-size: 1
max-pool-size: 2
queue-capacity: 10
virtual:
thread-name-prefix: "my-virtual-thread"

0 comments on commit f1f4cae

Please sign in to comment.