diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java index 988f5b207c..b446e8a944 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java @@ -85,7 +85,10 @@ public void execute(final IOSession session) { }))) { if (status.compareAndSet(Status.READY, Status.RUNNING)) { - ioReactorRef.get().start(); + if(this instanceof AsyncServer){ + //start DefaultListeningIOReactor + ioReactorRef.get().start(0); + } } } else { throw new IllegalStateException("I/O reactor has already been started"); diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1AuthenticationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1AuthenticationTest.java index 0126ee398a..d814289443 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1AuthenticationTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1AuthenticationTest.java @@ -65,8 +65,6 @@ import org.apache.hc.core5.testing.classic.LoggingConnPoolListener; import org.apache.hc.core5.testing.classic.LoggingHttp1StreamListener; import org.apache.hc.core5.util.Timeout; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Rule; @@ -74,6 +72,8 @@ import org.junit.rules.ExternalResource; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) public class Http1AuthenticationTest { @@ -202,11 +202,10 @@ protected void after() { @Test public void testGetRequestAuthentication() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort()); @@ -236,11 +235,10 @@ public void testGetRequestAuthentication() throws Exception { @Test public void testPostRequestAuthentication() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort()); final Random rnd = new Random(); @@ -274,11 +272,10 @@ public void testPostRequestAuthentication() throws Exception { @Test public void testPostRequestAuthenticationNoExpectContinue() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort()); final Random rnd = new Random(); diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1ServerAndRequesterTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1ServerAndRequesterTest.java index 9d1065c1ac..069e7e19d3 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1ServerAndRequesterTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1ServerAndRequesterTest.java @@ -76,8 +76,6 @@ import org.apache.hc.core5.testing.SSLTestContexts; import org.apache.hc.core5.testing.classic.LoggingConnPoolListener; import org.apache.hc.core5.util.Timeout; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Rule; @@ -85,6 +83,8 @@ import org.junit.rules.ExternalResource; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) public class Http1ServerAndRequesterTest { @@ -244,11 +244,10 @@ protected void after() { @Test public void testSequentialRequests() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Future> resultFuture1 = requester.execute( @@ -287,11 +286,10 @@ public void testSequentialRequests() throws Exception { @Test public void testSequentialRequestsNonPersistentConnection() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Future> resultFuture1 = requester.execute( @@ -330,11 +328,10 @@ public void testSequentialRequestsNonPersistentConnection() throws Exception { @Test public void testSequentialRequestsSameEndpoint() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Future endpointFuture = requester.connect(target, Timeout.ofSeconds(5)); @@ -381,11 +378,10 @@ public void testSequentialRequestsSameEndpoint() throws Exception { @Test public void testPipelinedRequests() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Future endpointFuture = requester.connect(target, Timeout.ofSeconds(5)); diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ALPNTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ALPNTest.java index 835ec1dba8..644920f6fe 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ALPNTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ALPNTest.java @@ -60,8 +60,6 @@ import org.apache.hc.core5.testing.SSLTestContexts; import org.apache.hc.core5.testing.TestingSupport; import org.apache.hc.core5.util.Timeout; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Assume; @@ -71,6 +69,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Http2ALPNTest { @@ -191,11 +191,10 @@ public void testALPNLax() throws Exception { .setStreamListener(LoggingHttp2StreamListener.INSTANCE) .create(); - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), URIScheme.HTTPS.id); final Future> resultFuture1 = requester.execute( @@ -228,11 +227,10 @@ public void testALPNStrict() throws Exception { .setStreamListener(LoggingHttp2StreamListener.INSTANCE) .create(); - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), URIScheme.HTTPS.id); final Future> resultFuture1 = requester.execute( diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ProtocolNegotiationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ProtocolNegotiationTest.java index cf67abf52b..c5c1561e66 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ProtocolNegotiationTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ProtocolNegotiationTest.java @@ -63,8 +63,6 @@ import org.apache.hc.core5.testing.classic.LoggingConnPoolListener; import org.apache.hc.core5.testing.classic.LoggingHttp1StreamListener; import org.apache.hc.core5.util.Timeout; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Assume; @@ -73,6 +71,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Http2ProtocolNegotiationTest { @@ -206,11 +206,10 @@ public void checkVersion() { @Test public void testForceHttp1() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), "https"); final Future connectFuture = requester.connect(target, TIMEOUT, HttpVersionPolicy.FORCE_HTTP_1, null); @@ -229,11 +228,10 @@ public void testForceHttp1() throws Exception { @Test public void testForceHttp2() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), "https"); final Future connectFuture = requester.connect(target, TIMEOUT, HttpVersionPolicy.FORCE_HTTP_2, null); @@ -252,11 +250,10 @@ public void testForceHttp2() throws Exception { @Test public void testNegotiateProtocol() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), "https"); final Future connectFuture = requester.connect(target, TIMEOUT, HttpVersionPolicy.NEGOTIATE, null); diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java index 72aae99c57..258ab4830a 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java @@ -68,8 +68,6 @@ import org.apache.hc.core5.testing.TestingSupport; import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Assume; @@ -80,6 +78,8 @@ import org.junit.rules.ExternalResource; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) public class Http2ServerAndMultiplexingRequesterTest { @@ -215,11 +215,10 @@ public void checkVersion() { @Test public void testSequentialRequests() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Future> resultFuture1 = requester.execute( @@ -258,11 +257,10 @@ public void testSequentialRequests() throws Exception { @Test public void testMultiplexedRequests() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Queue>> queue = new LinkedList<>(); @@ -293,11 +291,10 @@ public void testMultiplexedRequests() throws Exception { @Test public void testValidityCheck() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); requester.setValidateAfterInactivity(TimeValue.ofMillis(10)); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); @@ -341,11 +338,10 @@ public void testValidityCheck() throws Exception { @Test public void testMultiplexedRequestCancellation() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final int reqNo = 20; diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndRequesterTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndRequesterTest.java index c5eb965331..474d7b60e8 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndRequesterTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndRequesterTest.java @@ -65,8 +65,6 @@ import org.apache.hc.core5.testing.TestingSupport; import org.apache.hc.core5.testing.classic.LoggingConnPoolListener; import org.apache.hc.core5.util.Timeout; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Assume; @@ -77,6 +75,8 @@ import org.junit.rules.ExternalResource; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) public class Http2ServerAndRequesterTest { @@ -217,11 +217,10 @@ public void checkVersion() { @Test public void testSequentialRequests() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Future> resultFuture1 = requester.execute( @@ -260,11 +259,10 @@ public void testSequentialRequests() throws Exception { @Test public void testSequentialRequestsSameEndpoint() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Future endpointFuture = requester.connect(target, Timeout.ofSeconds(5)); @@ -311,11 +309,10 @@ public void testSequentialRequestsSameEndpoint() throws Exception { @Test public void testPipelinedRequests() throws Exception { - server.start(); + server.serverStart(); final Future future = server.listen(new InetSocketAddress(0)); final ListenerEndpoint listener = future.get(); final InetSocketAddress address = (InetSocketAddress) listener.getAddress(); - requester.start(); final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id); final Future endpointFuture = requester.connect(target, Timeout.ofSeconds(5)); diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java index d0e5c126d3..56b8650005 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java @@ -104,7 +104,7 @@ public void cleanup() throws Exception { @Test public void testEndpointUpAndDown() throws Exception { - ioreactor.start(); + ioreactor.start(0); Set endpoints = ioreactor.getEndpoints(); Assert.assertNotNull(endpoints); @@ -138,7 +138,7 @@ public void testEndpointUpAndDown() throws Exception { @Test public void testEndpointAlreadyBound() throws Exception { - ioreactor.start(); + ioreactor.start(0); final Future future1 = ioreactor.listen(new InetSocketAddress(0)); final ListenerEndpoint endpoint1 = future1.get(); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java index 5336ca518f..baaa624292 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java @@ -98,8 +98,8 @@ public Future requestSession( } @Override - public void start() { - ioReactor.start(); + public void start(final int i) { + ioReactor.start(i); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java index 1ed225a341..d416e239c6 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java @@ -73,8 +73,12 @@ public AsyncServer( } @Override - public void start() { - ioReactor.start(); + public void start(final int i) { + throw new RuntimeException("can not call start(i) method with AsyncServer class,maybe call serverStart method?"); + } + + public void serverStart(){ + ioReactor.start(0); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java index 44c0e3896f..9e5f11d9f7 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java @@ -35,6 +35,7 @@ import java.util.Date; import java.util.Queue; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.core5.io.ShutdownType; @@ -110,6 +111,8 @@ public void execute() { } } } + }else if (this.status.get().compareTo(IOReactorStatus.ACTIVE) > 0 ){ + throw new RejectedExecutionException("SingleCoreIOReactor terminated"); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java index 69741c2b97..1a7efbbc20 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java @@ -110,8 +110,8 @@ public DefaultConnectingIOReactor(final IOEventHandlerFactory eventHandlerFactor } @Override - public void start() { - ioReactor.start(); + public void start(final int i) { + ioReactor.start(i); } @Override @@ -138,6 +138,7 @@ public Future connect( } final int i = Math.abs(currentWorker.incrementAndGet() % workerCount); try { + start(i); return dispatchers[i].connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback); } catch (final IOReactorShutdownException ex) { initiateShutdown(); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java index eb6e908c76..121c29b5dc 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java @@ -149,8 +149,8 @@ public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory } @Override - public void start() { - ioReactor.start(); + public void start(final int i) { + ioReactor.start(i); } @Override @@ -190,6 +190,7 @@ public List getExceptionLog() { private void enqueueChannel(final SocketChannel socketChannel) { final int i = Math.abs(currentWorker.incrementAndGet() % workerCount); try { + ioReactor.start(i + 1); dispatchers[i].enqueueChannel(socketChannel); } catch (final IOReactorShutdownException ex) { initiateShutdown(); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorService.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorService.java index 5ae125ea28..58966d6ff5 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorService.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorService.java @@ -36,7 +36,7 @@ */ public interface IOReactorService extends IOReactor { - void start(); + void start(int i ); List getExceptionLog(); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java index 8811ed2ed3..8fc05ded32 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java @@ -28,6 +28,7 @@ package org.apache.hc.core5.reactor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.core5.io.ShutdownType; @@ -39,12 +40,17 @@ class MultiCoreIOReactor implements IOReactor { private final IOReactor[] ioReactors; private final Thread[] threads; private final AtomicReference status; + private final AtomicBoolean[] threadsStart; MultiCoreIOReactor(final IOReactor[] ioReactors, final Thread[] threads) { super(); this.ioReactors = ioReactors.clone(); this.threads = threads.clone(); this.status = new AtomicReference<>(IOReactorStatus.INACTIVE); + threadsStart = new AtomicBoolean[threads.length]; + for(int i = 0;i < threads.length;++i){ + threadsStart[i] = new AtomicBoolean(false); + } } @Override @@ -59,17 +65,16 @@ public IOReactorStatus getStatus() { * reacting to I/O events and dispatch I/O event notifications to the * {@link IOEventHandler} associated with the given I/O session. */ - public final void start() { - if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE)) { - for (int i = 0; i < this.threads.length; i++) { - this.threads[i].start(); - } - } + public final void start(final int i) { + this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE); + if(threadsStart[i].compareAndSet(false, true)){ + this.threads[i].start(); + } } @Override public final void initiateShutdown() { - if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) { + if(this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)){ for (int i = 0; i < this.ioReactors.length; i++) { final IOReactor ioReactor = this.ioReactors[i]; ioReactor.initiateShutdown(); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java index 979910ad7b..10df978d70 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java @@ -66,6 +66,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect private volatile long lastTimeoutCheck; + SingleCoreIOReactor( final Queue auditLog, final IOEventHandlerFactory eventHandlerFactory,