Skip to content

Commit

Permalink
Fix Netty leaks (#1397)
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed May 24, 2020
1 parent 37ddf09 commit b8d5e37
Showing 1 changed file with 46 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,20 @@ public void delete() {

HttpRequestProvider requestProvider = httpDeleteRequestProvider();

ResponseCallback<Void> callback = new ResponseCallback<>();
try (ResponseCallback<Void> callback = new ResponseCallback<>()) {

HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, callback);
HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, callback);

Channel channel = getChannel();
Channel channel = getChannel();

channel.pipeline().addLast(responseHandler);
channel.pipeline().addLast(responseHandler);

sendRequest(requestProvider, channel);
sendRequest(requestProvider, channel);

callback.awaitResult();
callback.awaitResult();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public void get(ResultCallback<Frame> resultCallback) {
Expand All @@ -141,12 +144,13 @@ public void get(ResultCallback<Frame> resultCallback) {
}

public <T> T get(TypeReference<T> typeReference) {
try (ResponseCallback<T> callback = new ResponseCallback<>()) {
get(typeReference, callback);

ResponseCallback<T> callback = new ResponseCallback<>();

get(typeReference, callback);

return callback.awaitResult();
return callback.awaitResult();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public <T> void get(TypeReference<T> typeReference, ResultCallback<T> resultCallback) {
Expand Down Expand Up @@ -267,12 +271,13 @@ public void run() {
}

public <T> T post(final Object entity, TypeReference<T> typeReference) {
try (ResponseCallback<T> callback = new ResponseCallback<>()) {
post(entity, typeReference, callback);

ResponseCallback<T> callback = new ResponseCallback<>();

post(entity, typeReference, callback);

return callback.awaitResult();
return callback.awaitResult();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public <T> void post(final Object entity, TypeReference<T> typeReference, final ResultCallback<T> resultCallback) {
Expand Down Expand Up @@ -372,12 +377,13 @@ private void setDefaultHeaders(HttpRequest request) {
}

public <T> T post(TypeReference<T> typeReference, InputStream body) {
try (ResponseCallback<T> callback = new ResponseCallback<>()) {
post(typeReference, callback, body);

ResponseCallback<T> callback = new ResponseCallback<>();

post(typeReference, callback, body);

return callback.awaitResult();
return callback.awaitResult();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public <T> void post(TypeReference<T> typeReference, ResultCallback<T> resultCallback, InputStream body) {
Expand Down Expand Up @@ -464,28 +470,30 @@ public void put(InputStream body, com.github.dockerjava.core.MediaType mediaType

Channel channel = getChannel();

ResponseCallback<Void> resultCallback = new ResponseCallback<>();
try (ResponseCallback<Void> resultCallback = new ResponseCallback<>()) {
HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, resultCallback);

HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, resultCallback);
channel.pipeline().addLast(new ChunkedWriteHandler());
channel.pipeline().addLast(responseHandler);

channel.pipeline().addLast(new ChunkedWriteHandler());
channel.pipeline().addLast(responseHandler);

HttpRequest request = requestProvider.getHttpRequest(resource);
HttpRequest request = requestProvider.getHttpRequest(resource);

// don't accept FullHttpRequest here
if (request instanceof FullHttpRequest) {
throw new DockerClientException("fatal: request is instance of FullHttpRequest");
}
// don't accept FullHttpRequest here
if (request instanceof FullHttpRequest) {
throw new DockerClientException("fatal: request is instance of FullHttpRequest");
}

request.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
request.headers().remove(HttpHeaderNames.CONTENT_LENGTH);
request.headers().set(HttpHeaderNames.CONTENT_TYPE, mediaType.getMediaType());
request.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
request.headers().remove(HttpHeaderNames.CONTENT_LENGTH);
request.headers().set(HttpHeaderNames.CONTENT_TYPE, mediaType.getMediaType());

channel.write(request);
channel.write(new ChunkedStream(new BufferedInputStream(body, 1024 * 1024)));
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
channel.write(request);
channel.write(new ChunkedStream(new BufferedInputStream(body, 1024 * 1024)));
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);

resultCallback.awaitResult();
resultCallback.awaitResult();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit b8d5e37

Please sign in to comment.