Skip to content

Commit

Permalink
pool: http-mover refactor file release on channel close code
Browse files Browse the repository at this point in the history
Motivation:

There are a few ways in which a channel can close while transfers are
underway.  There is code in each case that handles closing all the files
currently being transferred.  This code is largely copy-n-pasted, with
some minor changes, which makes the code non-DRY.

This patch is in preparation for a subsequent patch that distinguishes
between a failed upload because the the client disconnected and a failed
upload because the connection was idle for too long.

Modification:

Handle file releasing in a single place.

Result:

No user- or admin observable changes.

Target: master
Requires-notes: no
Requires-book: no
Patch: https://rb.dcache.org/r/12471/
Acked-by: Tigran Mkrtchyan
  • Loading branch information
paulmillar committed Jul 16, 2020
1 parent 6716389 commit cda8dc2
Showing 1 changed file with 47 additions and 15 deletions.
Expand Up @@ -64,6 +64,7 @@
import static io.netty.handler.codec.http.HttpHeaders.is100ContinueExpected;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static java.util.Objects.requireNonNull;
import static org.dcache.util.Checksums.TO_RFC3230;
import static org.dcache.util.StringMarkup.percentEncode;
import static org.dcache.util.StringMarkup.quotedString;
Expand Down Expand Up @@ -113,6 +114,28 @@ public class HttpPoolRequestHandler extends HttpRequestHandler

private Optional<ChecksumType> _wantedDigest;

/**
* A simple data class to encapsulate the errors to return by the mover to
* the pool for file uploads and downloads, should transfers be aborted.
*/
private static class FileReleaseErrors
{
private Optional<? extends Exception> errorForDownload = Optional.empty();
private Optional<? extends Exception> errorForUpload = Optional.empty();

public FileReleaseErrors downloadsSeeError(Exception e)
{
errorForDownload = Optional.of(requireNonNull(e));
return this;
}

public FileReleaseErrors uploadsSeeError(Exception e)
{
errorForUpload = Optional.of(requireNonNull(e));
return this;
}
}

public HttpPoolRequestHandler(NettyTransferService<HttpProtocolInfo> server, int chunkSize)
{
_server = server;
Expand Down Expand Up @@ -238,35 +261,44 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception
_logger.debug("HTTP connection from {} established", ctx.channel().remoteAddress());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
private static FileReleaseErrors uploadsSeeError(Exception e)
{
return new FileReleaseErrors().uploadsSeeError(e);
}

private static FileReleaseErrors downloadsSeeError(Exception e)
{
return new FileReleaseErrors().downloadsSeeError(e);
}

private void releaseAllFiles(FileReleaseErrors errors)
{
_logger.debug("HTTP connection from {} closed", ctx.channel().remoteAddress());
for (NettyTransferService<HttpProtocolInfo>.NettyMoverChannel file: _files) {
if (file == _writeChannel) {
file.release(new FileCorruptedCacheException("Connection lost before end of file."));
Optional<? extends Exception> possibleError = file == _writeChannel
? errors.errorForUpload : errors.errorForDownload;
if (possibleError.isPresent()) {
file.release(possibleError.get());
} else {
file.release();
}
}
_files.clear();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
_logger.debug("HTTP connection from {} closed", ctx.channel().remoteAddress());
releaseAllFiles(uploadsSeeError(new FileCorruptedCacheException("Connection lost before end of file.")));
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
{
if (t instanceof ClosedChannelException) {
_logger.info("Connection {} unexpectedly closed.", ctx.channel());
} else if (t instanceof Exception) {
for (NettyTransferService<HttpProtocolInfo>.NettyMoverChannel file : _files) {
CacheException cause;
if (file == _writeChannel) {
cause = new FileCorruptedCacheException("Connection lost before end of file: " + t, t);
} else {
cause = new CacheException(t.toString(), t);
}
file.release(cause);
}
_files.clear();
releaseAllFiles(downloadsSeeError(new CacheException(t.toString(), t))
.uploadsSeeError(new FileCorruptedCacheException("Connection lost before end of file: " + t, t)));
ctx.close();
} else {
Thread me = Thread.currentThread();
Expand Down

0 comments on commit cda8dc2

Please sign in to comment.