Skip to content

Commit

Permalink
461499 - ConnectionPool may leak connections.
Browse files Browse the repository at this point in the history
Made associate(), disassociate() and abort() atomic operations using
the HttpExchange state to coordinate atomicity.
In this way, it's not possible to associate a HttpChannel and a
HttpExchange if the latter has been aborted.
  • Loading branch information
sbordet committed Mar 24, 2015
1 parent 3b31d6a commit 1dc66b7
Show file tree
Hide file tree
Showing 12 changed files with 552 additions and 337 deletions.
106 changes: 79 additions & 27 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java
Expand Up @@ -18,74 +18,126 @@

package org.eclipse.jetty.client;

import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.SpinLock;

public abstract class HttpChannel
{
protected static final Logger LOG = Log.getLogger(HttpChannel.class);

private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
private final HttpDestination destination;
private final SpinLock _lock = new SpinLock();
private final HttpDestination _destination;
private HttpExchange _exchange;

protected HttpChannel(HttpDestination destination)
{
this.destination = destination;
this._destination = destination;
}

public HttpDestination getHttpDestination()
{
return destination;
return _destination;
}

public void associate(HttpExchange exchange)
/**
* <p>Associates the given {@code exchange} to this channel in order to be sent over the network.</p>
* <p>If the association is successful, the exchange can be sent. Otherwise, the channel must be
* disposed because whoever terminated the exchange did not do it - it did not have the channel yet.</p>
*
* @param exchange the exchange to associate
* @return true if the association was successful, false otherwise
*/
public boolean associate(HttpExchange exchange)
{
if (this.exchange.compareAndSet(null, exchange))
boolean result = false;
boolean abort = true;
try (SpinLock.Lock lock = _lock.lock())
{
exchange.associate(this);
if (LOG.isDebugEnabled())
LOG.debug("{} associated to {}", exchange, this);
if (_exchange == null)
{
abort = false;
result = exchange.associate(this);
if (result)
_exchange = exchange;
}
}
else
{

if (abort)
exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported"));
}

if (LOG.isDebugEnabled())
LOG.debug("{} associated {} to {}", exchange, result, this);

return result;
}

public HttpExchange disassociate()
public boolean disassociate(HttpExchange exchange)
{
HttpExchange exchange = this.exchange.getAndSet(null);
if (exchange != null)
exchange.disassociate(this);
boolean result = false;
try (SpinLock.Lock lock = _lock.lock())
{
HttpExchange existing = _exchange;
_exchange = null;
if (existing == exchange)
{
existing.disassociate(this);
result = true;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} disassociated from {}", exchange, this);
return exchange;
LOG.debug("{} disassociated {} from {}", exchange, result, this);
return result;
}

public HttpExchange getHttpExchange()
{
return exchange.get();
try (SpinLock.Lock lock = _lock.lock())
{
return _exchange;
}
}

protected abstract HttpSender getHttpSender();

protected abstract HttpReceiver getHttpReceiver();

public abstract void send();

public abstract void proceed(HttpExchange exchange, Throwable failure);
public abstract void release();

public void proceed(HttpExchange exchange, Throwable failure)
{
getHttpSender().proceed(exchange, failure);
}

public abstract boolean abort(Throwable cause);
public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure)
{
boolean requestAborted = false;
if (requestFailure != null)
requestAborted = getHttpSender().abort(exchange, requestFailure);

public abstract boolean abortResponse(Throwable cause);
boolean responseAborted = false;
if (responseFailure != null)
responseAborted = abortResponse(exchange, responseFailure);

return requestAborted || responseAborted;
}

public boolean abortResponse(HttpExchange exchange, Throwable failure)
{
return getHttpReceiver().abort(exchange, failure);
}

public void exchangeTerminated(Result result)
public void exchangeTerminated(HttpExchange exchange, Result result)
{
disassociate();
disassociate(exchange);
}

@Override
public String toString()
{
return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), exchange);
return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), getHttpExchange());
}
}

0 comments on commit 1dc66b7

Please sign in to comment.