Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
public class Http2MultiplexingRequester extends AsyncRequester{

private final H2ConnPool connPool;
private final Timeout handshakeTimeout;

/**
* Use {@link Http2MultiplexingRequesterBootstrap} to create instances of this class.
Expand All @@ -95,12 +94,10 @@ public Http2MultiplexingRequester(
final Decorator<IOSession> ioSessionDecorator,
final IOSessionListener sessionListener,
final Resolver<HttpHost, InetSocketAddress> addressResolver,
final TlsStrategy tlsStrategy,
final Timeout handshakeTimeout) {
final TlsStrategy tlsStrategy) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
this.handshakeTimeout = handshakeTimeout;
}

public void closeIdle(final TimeValue idleTime) {
Expand Down Expand Up @@ -161,7 +158,7 @@ public void sendRequest(
throw new ProtocolException("Request authority not specified");
}
final HttpHost target = new HttpHost(scheme, authority);
connPool.getSession(target, timeout, handshakeTimeout, new FutureCallback<IOSession>() {
connPool.getSession(target, timeout, new FutureCallback<IOSession>() {

@Override
public void completed(final IOSession ioSession) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Timeout;

/**
* {@link Http2MultiplexingRequester} bootstrap.
Expand All @@ -68,7 +67,6 @@ public class Http2MultiplexingRequesterBootstrap {
private CharCodingConfig charCodingConfig;
private H2Config h2Config;
private TlsStrategy tlsStrategy;
private Timeout handshakeTimeout;
private boolean strictALPNHandshake;
private Decorator<IOSession> ioSessionDecorator;
private IOSessionListener sessionListener;
Expand Down Expand Up @@ -122,11 +120,6 @@ public final Http2MultiplexingRequesterBootstrap setTlsStrategy(final TlsStrateg
return this;
}

public final Http2MultiplexingRequesterBootstrap setHandshakeTimeout(final Timeout handshakeTimeout) {
this.handshakeTimeout = handshakeTimeout;
return this;
}

public final Http2MultiplexingRequesterBootstrap setStrictALPNHandshake(final boolean strictALPNHandshake) {
this.strictALPNHandshake = strictALPNHandshake;
return this;
Expand Down Expand Up @@ -218,8 +211,7 @@ public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Obj
ioSessionDecorator,
sessionListener,
DefaultAddressResolver.INSTANCE,
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
handshakeTimeout);
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,10 @@ protected void closeSession(
@Override
protected Future<IOSession> connectSession(
final HttpHost namedEndpoint,
final Timeout requestTimeout,
final Timeout handshakeTimeout,
final Timeout connectTimeout,
final FutureCallback<IOSession> callback) {
final InetSocketAddress remoteAddress = addressResolver.resolve(namedEndpoint);
return connectionInitiator.connect(namedEndpoint, remoteAddress, null, requestTimeout, null, new FutureCallback<IOSession>() {
return connectionInitiator.connect(namedEndpoint, remoteAddress, null, connectTimeout, null, new FutureCallback<IOSession>() {

@Override
public void completed(final IOSession ioSession) {
Expand All @@ -114,8 +113,8 @@ public void completed(final IOSession ioSession) {
ioSession.getLocalAddress(),
ioSession.getRemoteAddress(),
null,
handshakeTimeout);
ioSession.setSocketTimeout(requestTimeout);
connectTimeout);
ioSession.setSocketTimeout(connectTimeout);
}
callback.completed(ioSession);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public AbstractIOSessionPool() {

protected abstract Future<IOSession> connectSession(
T namedEndpoint,
Timeout requestTimeout,
final Timeout handshakeTimeout,
Timeout connectTimeout,
FutureCallback<IOSession> callback);

protected abstract void validateSession(
Expand Down Expand Up @@ -123,15 +122,13 @@ PoolEntry getPoolEntry(final T endpoint) {

public final Future<IOSession> getSession(
final T endpoint,
final Timeout requestTimeout,
final Timeout handshakeTimeout,
final Timeout connectTimeout,
final FutureCallback<IOSession> callback) {
Args.notNull(endpoint, "Endpoint");
Asserts.check(!closed.get(), "Connection pool shut down");
final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
final PoolEntry poolEntry = getPoolEntry(endpoint);
getSessionInternal(poolEntry, false, endpoint, requestTimeout, handshakeTimeout,
new FutureCallback<IOSession>() {
getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {

@Override
public void completed(final IOSession ioSession) {
Expand All @@ -142,7 +139,7 @@ public void execute(final Boolean result) {
if (result) {
future.completed(ioSession);
} else {
getSessionInternal(poolEntry, true, endpoint, requestTimeout, handshakeTimeout,
getSessionInternal(poolEntry, true, endpoint, connectTimeout,
new FutureCallback<IOSession>() {

@Override
Expand Down Expand Up @@ -185,8 +182,7 @@ private void getSessionInternal(
final PoolEntry poolEntry,
final boolean requestNew,
final T namedEndpoint,
final Timeout requestTimeout,
final Timeout handshakeTimeout,
final Timeout connectTimeout,
final FutureCallback<IOSession> callback) {
synchronized (poolEntry) {
if (poolEntry.session != null && requestNew) {
Expand All @@ -203,8 +199,7 @@ private void getSessionInternal(
if (poolEntry.sessionFuture == null) {
poolEntry.sessionFuture = connectSession(
namedEndpoint,
requestTimeout,
handshakeTimeout,
connectTimeout,
new FutureCallback<IOSession>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public void testGetSessions() throws Exception {
Mockito.when(impl.connectSession(
ArgumentMatchers.anyString(),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<FutureCallback<IOSession>>any())).thenReturn(connectFuture);

Mockito.doAnswer(new Answer() {
Expand All @@ -90,26 +89,24 @@ public Object answer(final InvocationOnMock invocation) throws Throwable {

}).when(impl).validateSession(ArgumentMatchers.<IOSession>any(), ArgumentMatchers.<Callback<Boolean>>any());

final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null, null);
final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
Assert.assertThat(future1, CoreMatchers.notNullValue());
Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(false));
Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));

Mockito.verify(impl).connectSession(
ArgumentMatchers.eq("somehost"),
ArgumentMatchers.eq(Timeout.ofSeconds(123L)),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<FutureCallback<IOSession>>any());

final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null, null);
final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
Assert.assertThat(future2, CoreMatchers.notNullValue());
Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(false));
Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));

Mockito.verify(impl, Mockito.times(1)).connectSession(
ArgumentMatchers.eq("somehost"),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.argThat(new ArgumentMatcher<FutureCallback<IOSession>>() {

@Override
Expand All @@ -128,12 +125,11 @@ public boolean matches(final FutureCallback<IOSession> callback) {

Mockito.verify(impl, Mockito.times(2)).validateSession(ArgumentMatchers.<IOSession>any(), ArgumentMatchers.<Callback<Boolean>>any());

final Future<IOSession> future3 = impl.getSession("somehost", Timeout.ofSeconds(123L), null, null);
final Future<IOSession> future3 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);

Mockito.verify(impl, Mockito.times(1)).connectSession(
ArgumentMatchers.eq("somehost"),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<FutureCallback<IOSession>>any());

Mockito.verify(impl, Mockito.times(3)).validateSession(ArgumentMatchers.<IOSession>any(), ArgumentMatchers.<Callback<Boolean>>any());
Expand All @@ -148,29 +144,26 @@ public void testGetSessionFailure() throws Exception {
Mockito.when(impl.connectSession(
ArgumentMatchers.anyString(),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<FutureCallback<IOSession>>any())).thenReturn(connectFuture);

final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null, null);
final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
Assert.assertThat(future1, CoreMatchers.notNullValue());
Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(false));
Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));

Mockito.verify(impl).connectSession(
ArgumentMatchers.eq("somehost"),
ArgumentMatchers.eq(Timeout.ofSeconds(123L)),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<FutureCallback<IOSession>>any());

final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null, null);
final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
Assert.assertThat(future2, CoreMatchers.notNullValue());
Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(false));
Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));

Mockito.verify(impl, Mockito.times(1)).connectSession(
ArgumentMatchers.eq("somehost"),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.argThat(new ArgumentMatcher<FutureCallback<IOSession>>() {

@Override
Expand Down Expand Up @@ -269,12 +262,11 @@ public Object answer(final InvocationOnMock invocation) throws Throwable {

}).when(impl).validateSession(ArgumentMatchers.<IOSession>any(), ArgumentMatchers.<Callback<Boolean>>any());

impl.getSession("somehost", Timeout.ofSeconds(123L), null, null);
impl.getSession("somehost", Timeout.ofSeconds(123L), null);

Mockito.verify(impl, Mockito.times(1)).connectSession(
ArgumentMatchers.eq("somehost"),
ArgumentMatchers.eq(Timeout.ofSeconds(123L)),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<FutureCallback<IOSession>>any());
}

Expand All @@ -286,12 +278,11 @@ public void testGetSessionReconnectIfClosed() throws Exception {

Mockito.when(ioSession1.isClosed()).thenReturn(true);

impl.getSession("somehost", Timeout.ofSeconds(123L), null, null);
impl.getSession("somehost", Timeout.ofSeconds(123L), null);

Mockito.verify(impl).connectSession(
ArgumentMatchers.eq("somehost"),
ArgumentMatchers.eq(Timeout.ofSeconds(123L)),
ArgumentMatchers.<Timeout>any(),
ArgumentMatchers.<FutureCallback<IOSession>>any());
}

Expand Down