Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Virtual Threads in Executor Services #5648

Merged
merged 3 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -88,6 +87,7 @@
import org.glassfish.jersey.client.innate.http.SSLParamConfigurator;
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.innate.VirtualThreadUtil;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;

Expand Down Expand Up @@ -129,14 +129,15 @@ class NettyConnector implements Connector {

NettyConnector(Client client) {

final Map<String, Object> properties = client.getConfiguration().getProperties();
final Configuration configuration = client.getConfiguration();
final Map<String, Object> properties = configuration.getProperties();
final Object threadPoolSize = properties.get(ClientProperties.ASYNC_THREADPOOL_SIZE);

if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) {
executorService = Executors.newFixedThreadPool((Integer) threadPoolSize);
executorService = VirtualThreadUtil.withConfig(configuration).newFixedThreadPool((Integer) threadPoolSize);
this.group = new NioEventLoopGroup((Integer) threadPoolSize);
} else {
executorService = Executors.newCachedThreadPool();
executorService = VirtualThreadUtil.withConfig(configuration).newCachedThreadPool();
this.group = new NioEventLoopGroup();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010, 2022 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -18,10 +18,15 @@

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ThreadFactory;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.Configuration;

import org.glassfish.jersey.grizzly2.httpserver.internal.LocalizationMessages;
import org.glassfish.jersey.innate.VirtualThreadSupport;
import org.glassfish.jersey.innate.VirtualThreadUtil;
import org.glassfish.jersey.innate.virtual.LoomishExecutors;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler;
import org.glassfish.jersey.server.ApplicationHandler;
Expand Down Expand Up @@ -281,11 +286,20 @@ public static HttpServer createHttpServer(final URI uri,
: uri.getPort();

final NetworkListener listener = new NetworkListener("grizzly", host, port);
final Configuration configuration = handler != null ? handler.getConfiguration().getConfiguration() : null;

listener.getTransport().getWorkerThreadPoolConfig().setThreadFactory(new ThreadFactoryBuilder()
final LoomishExecutors executors = VirtualThreadUtil.withConfig(configuration, false);
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("grizzly-http-server-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler())
.build());
.setThreadFactory(executors.getThreadFactory())
.build();

if (executors.isVirtual()) {
listener.getTransport().setWorkerThreadPool(executors.newCachedThreadPool());
} else {
listener.getTransport().getWorkerThreadPoolConfig().setThreadFactory(threadFactory);
}

listener.setSecure(secure);
if (sslEngineConfigurator != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -23,6 +23,7 @@
import javax.servlet.Servlet;

import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.glassfish.jersey.uri.UriComponent;

Expand Down Expand Up @@ -251,11 +252,13 @@ private static HttpServer create(URI u, Class<? extends Servlet> c, Servlet serv
}
}

ResourceConfig configuration = new ResourceConfig();
if (initParams != null) {
registration.setInitParameters(initParams);
configuration.addProperties((Map) initParams);
}

HttpServer server = GrizzlyHttpServerFactory.createHttpServer(u);
HttpServer server = GrizzlyHttpServerFactory.createHttpServer(u, configuration);
context.deploy(server);
return server;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,7 +20,10 @@
import java.util.concurrent.ThreadFactory;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.Configuration;

import org.glassfish.jersey.innate.VirtualThreadUtil;
import org.glassfish.jersey.innate.virtual.LoomishExecutors;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
import org.glassfish.jersey.jetty.internal.LocalizationMessages;
import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler;
Expand Down Expand Up @@ -253,7 +256,8 @@ public static Server createServer(final URI uri,
}
final int port = (uri.getPort() == -1) ? defaultPort : uri.getPort();

final Server server = new Server(new JettyConnectorThreadPool());
final Configuration configuration = handler != null ? handler.getConfiguration() : null;
final Server server = new Server(new JettyConnectorThreadPool(configuration));
final HttpConfiguration config = new HttpConfiguration();
if (sslContextFactory != null) {
config.setSecureScheme("https");
Expand Down Expand Up @@ -291,10 +295,20 @@ public static Server createServer(final URI uri,
//
// Keeping this for backwards compatibility for the time being
private static final class JettyConnectorThreadPool extends QueuedThreadPool {
private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("jetty-http-server-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler())
.build();
private final ThreadFactory threadFactory;

private JettyConnectorThreadPool(Configuration configuration) {
final LoomishExecutors executors = VirtualThreadUtil.withConfig(configuration, false);
if (executors.isVirtual()) {
super.setMaxThreads(Integer.MAX_VALUE - 1);
}

this.threadFactory = new ThreadFactoryBuilder()
.setNameFormat("jetty-http-server-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler())
.setThreadFactory(executors.getThreadFactory())
.build();
}

@Override
public Thread newThread(Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -32,11 +32,12 @@
import org.glassfish.jersey.internal.util.collection.Value;
import org.glassfish.jersey.internal.util.collection.Values;
import org.glassfish.jersey.model.internal.ComponentBag;
import org.glassfish.jersey.model.internal.ManagedObjectsFinalizer;
import org.glassfish.jersey.process.internal.AbstractExecutorProvidersConfigurator;
import org.glassfish.jersey.spi.ExecutorServiceProvider;
import org.glassfish.jersey.spi.ScheduledExecutorServiceProvider;

import javax.ws.rs.core.Configuration;

/**
* Configurator which initializes and register {@link ExecutorServiceProvider} and
* {@link ScheduledExecutorServiceProvider}.
Expand Down Expand Up @@ -64,7 +65,8 @@ class ClientExecutorProvidersConfigurator extends AbstractExecutorProvidersConfi

@Override
public void init(InjectionManager injectionManager, BootstrapBag bootstrapBag) {
Map<String, Object> runtimeProperties = bootstrapBag.getConfiguration().getProperties();
final Configuration configuration = bootstrapBag.getConfiguration();
Map<String, Object> runtimeProperties = configuration.getProperties();

ExecutorServiceProvider defaultAsyncExecutorProvider;
ScheduledExecutorServiceProvider defaultScheduledExecutorProvider;
Expand Down Expand Up @@ -94,12 +96,12 @@ public void init(InjectionManager injectionManager, BootstrapBag bootstrapBag) {
.named("ClientAsyncThreadPoolSize");
injectionManager.register(asyncThreadPoolSizeBinding);

defaultAsyncExecutorProvider = new DefaultClientAsyncExecutorProvider(asyncThreadPoolSize);
defaultAsyncExecutorProvider = new DefaultClientAsyncExecutorProvider(asyncThreadPoolSize, configuration);
} else {
if (MANAGED_EXECUTOR_SERVICE != null) {
defaultAsyncExecutorProvider = new ClientExecutorServiceProvider(MANAGED_EXECUTOR_SERVICE);
} else {
defaultAsyncExecutorProvider = new DefaultClientAsyncExecutorProvider(0);
defaultAsyncExecutorProvider = new DefaultClientAsyncExecutorProvider(0, configuration);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,6 +20,8 @@

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.Context;

import org.glassfish.jersey.client.internal.LocalizationMessages;
import org.glassfish.jersey.internal.util.collection.LazyValue;
Expand All @@ -46,8 +48,9 @@ class DefaultClientAsyncExecutorProvider extends ThreadPoolExecutorProvider {
* See also {@link org.glassfish.jersey.client.ClientProperties#ASYNC_THREADPOOL_SIZE}.
*/
@Inject
public DefaultClientAsyncExecutorProvider(@Named("ClientAsyncThreadPoolSize") final int poolSize) {
super("jersey-client-async-executor");
public DefaultClientAsyncExecutorProvider(@Named("ClientAsyncThreadPoolSize") final int poolSize,
@Context Configuration configuration) {
super("jersey-client-async-executor", configuration);

this.asyncThreadPoolSize = Values.lazy(new Value<Integer>() {
@Override
Expand Down
2 changes: 1 addition & 1 deletion core-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@
<configuration>
<target>
<mkdir dir="${java21.build.outputDirectory}" />
<javac srcdir="${java21.sourceDirectory}" destdir="${java21.build.outputDirectory}"
<javac srcdir="${java21.sourceDirectory}${path.separator}${project.basedir}/src/main/java/org/glassfish/jersey/innate/virtual" destdir="${java21.build.outputDirectory}"
classpath="${project.build.outputDirectory}" includeantruntime="false" release="21"/>
</target>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -320,6 +320,30 @@ public final class CommonProperties {
*/
public static final String PARAM_CONVERTERS_THROW_IAE = "jersey.config.paramconverters.throw.iae";

/**
* <p>
* Defines the {@link java.util.concurrent.ThreadFactory} to be used by internal default Executor Services.
* </p>
* <p>
* The default is {@link java.util.concurrent.Executors#defaultThreadFactory()} on platform threads and
* {@code Thread.ofVirtual().factory()} on virtual threads.
* </p>
* @since 2.44
*/
public static String THREAD_FACTORY = "jersey.config.threads.factory";

/**
* <p>
* Defines whether the virtual threads should be used by Jersey on JDK 21+ when not using an exact number
* of threads by {@code FixedThreadPool}.
* </p>
* <p>
* The default is {@code false} for this version of Jersey, and {@code true} for Jersey 3.1+.
* </p>
* @since 2.44
*/
public static String USE_VIRTUAL_THREADS = "jersey.config.threads.use.virtual";

/**
* Prevent instantiation.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.innate;

import org.glassfish.jersey.CommonProperties;
import org.glassfish.jersey.innate.virtual.LoomishExecutors;

import javax.ws.rs.core.Configuration;
import java.util.concurrent.ThreadFactory;

/**
* Factory class to provide JDK specific implementation of bits related to the virtual thread support.
*/
public final class VirtualThreadUtil {

private static final boolean USE_VIRTUAL_THREADS_BY_DEFAULT = false;

/**
* Do not instantiate.
*/
private VirtualThreadUtil() {
throw new IllegalStateException();
}

/**
* Return an instance of {@link LoomishExecutors} based on a configuration property.
* @param config the {@link Configuration}
* @return the {@link LoomishExecutors} instance.
*/
public static LoomishExecutors withConfig(Configuration config) {
return withConfig(config, USE_VIRTUAL_THREADS_BY_DEFAULT);
}

/**
* Return an instance of {@link LoomishExecutors} based on a configuration property.
* @param config the {@link Configuration}
* @param useVirtualByDefault the default use if not said otherwise by property
* @return the {@link LoomishExecutors} instance.
*/
public static LoomishExecutors withConfig(Configuration config, boolean useVirtualByDefault) {
ThreadFactory tfThreadFactory = null;
boolean useVirtualThreads = useVirtualThreads(config, useVirtualByDefault);

if (config != null) {
Object threadFactory = config.getProperty(CommonProperties.THREAD_FACTORY);
if (threadFactory != null && ThreadFactory.class.isInstance(threadFactory)) {
tfThreadFactory = (ThreadFactory) threadFactory;
}
}

return tfThreadFactory == null
? VirtualThreadSupport.allowVirtual(useVirtualThreads)
: VirtualThreadSupport.allowVirtual(useVirtualThreads, tfThreadFactory);
}

/**
* Check configuration if the use of the virtual threads is expected or return the default value if not.
* @param config the {@link Configuration}
* @param useByDefault the default expectation
* @return the expected
*/
private static boolean useVirtualThreads(Configuration config, boolean useByDefault) {
boolean bUseVirtualThreads = useByDefault;
if (config != null) {
Object useVirtualThread = config.getProperty(CommonProperties.USE_VIRTUAL_THREADS);
if (useVirtualThread != null && Boolean.class.isInstance(useVirtualThread)) {
bUseVirtualThreads = (boolean) useVirtualThread;
}
if (useVirtualThread != null && String.class.isInstance(useVirtualThread)) {
bUseVirtualThreads = Boolean.parseBoolean(useVirtualThread.toString());
}
}
return bUseVirtualThreads;
}
}
Loading