From 5ffdb906b560458ad8df5a892554b6e806d98d9e Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Thu, 19 Oct 2023 16:03:36 -0400 Subject: [PATCH] Change from using aonymous inner classes to static classes to avoid holding onto "this" longer than needed. Fixes CXF-8946 --- .../cxf/jaxrs/client/AbstractClient.java | 7 + .../transport/http/HttpClientHTTPConduit.java | 232 ++++++++++++------ 2 files changed, 163 insertions(+), 76 deletions(-) diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java index a8356ce0026..3b839638047 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java @@ -336,6 +336,13 @@ public Client reset() { return this; } + @SuppressWarnings("deprecation") + @Override + protected void finalize() throws Throwable { + close(); + super.finalize(); + } + @Override public void close() { if (closed.compareAndSet(false, true)) { diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index 86284ba0931..10a65c6e508 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -353,7 +353,141 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { } } - class HttpClientWrappedOutputStream extends WrappedOutputStream { + static class HttpClientPipedOutputStream extends PipedOutputStream { + HttpClientWrappedOutputStream stream; + HTTPClientPolicy csPolicy; + HttpClientBodyPublisher publisher; + HttpClientPipedOutputStream(HttpClientWrappedOutputStream s, + PipedInputStream pin, + HTTPClientPolicy cp, + HttpClientBodyPublisher bp) throws IOException { + super(pin); + stream = s; + csPolicy = cp; + publisher = bp; + } + public void close() throws IOException { + super.close(); + csPolicy = null; + stream = null; + if (publisher != null) { + publisher.close(); + publisher = null; + } + } + synchronized boolean canWrite() throws IOException { + return stream.isConnectionAttemptCompleted(csPolicy, this); + } + @Override + public void write(int b) throws IOException { + if (stream != null && (stream.connectionComplete || canWrite())) { + super.write(b); + } + } + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (stream != null && (stream.connectionComplete || canWrite())) { + super.write(b, off, len); + } + } + + }; + private static final class HttpClientFilteredInputStream extends FilterInputStream { + boolean closed; + + private HttpClientFilteredInputStream(InputStream in) { + super(in); + } + @Override + public int read() throws IOException { + if (closed) { + throw new IOException("stream is closed"); + } + return super.read(); + } + + @Override + public int read(byte[] b) throws IOException { + if (closed) { + throw new IOException("stream is closed"); + } + return super.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("stream is closed"); + } + return super.read(b, off, len); + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + super.close(); + in = null; + } + } + } + private static final class InputStreamSupplier implements Supplier { + final InputStream in; + InputStreamSupplier(InputStream i) { + in = i; + } + + public InputStream get() { + return in; + } + } + private static final class HttpClientBodyPublisher implements BodyPublisher { + PipedInputStream pin; + HttpClientWrappedOutputStream stream; + long contentLen; + + private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, PipedInputStream pin) { + this.stream = s; + this.pin = pin; + } + synchronized void close() { + if (stream != null) { + contentLen = stream.contentLen; + stream = null; + } + } + + @Override + public synchronized void subscribe(Subscriber subscriber) { + if (stream != null) { + stream.connectionComplete = true; + contentLen = stream.contentLen; + if (stream.pout != null) { + synchronized (stream.pout) { + stream.pout.notifyAll(); + } + if (stream != null) { + contentLen = stream.contentLen; + } + BodyPublishers.ofInputStream(new InputStreamSupplier(pin)).subscribe(subscriber); + stream = null; + pin = null; + return; + } + } + BodyPublishers.noBody().subscribe(subscriber); + } + + @Override + public long contentLength() { + if (stream != null) { + contentLen = stream.contentLen; + } + return contentLen; + } + } + class HttpClientWrappedOutputStream extends WrappedOutputStream { + List> subscribers = new LinkedList<>(); CompletableFuture> future; long contentLen = -1; @@ -361,6 +495,7 @@ class HttpClientWrappedOutputStream extends WrappedOutputStream { volatile Throwable exception; volatile boolean connectionComplete; PipedOutputStream pout; + HttpClientBodyPublisher publisher; HttpRequest request; @@ -371,7 +506,20 @@ class HttpClientWrappedOutputStream extends WrappedOutputStream { chunkThreshold, conduitName, ((Address)message.get(KEY_HTTP_CONNECTION_ADDRESS)).getURI()); } - + @Override + public void close() throws IOException { + super.close(); + if (pout != null) { + pout.close(); + pout = null; + } + if (publisher != null) { + publisher.close(); + publisher = null; + } + request = null; + subscribers = null; + } void addSubscriber(Flow.Subscriber subscriber) { subscribers.add(subscriber); } @@ -460,6 +608,7 @@ private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, PipedOut return true; } + @Override protected void setProtocolHeaders() throws IOException { HttpClient cl = outMessage.get(HttpClient.class); @@ -476,52 +625,14 @@ protected void setProtocolHeaders() throws IOException { final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0 ? 4096 : csPolicy.getChunkLength()); + + this.publisher = new HttpClientBodyPublisher(this, pin); if (contentLen != 0) { - pout = new PipedOutputStream(pin) { - synchronized boolean canWrite() throws IOException { - return isConnectionAttemptCompleted(csPolicy, this); - } - @Override - public void write(int b) throws IOException { - if (connectionComplete || canWrite()) { - super.write(b); - } - } - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (connectionComplete || canWrite()) { - super.write(b, off, len); - } - } - }; + pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher); } - - BodyPublisher bp = new BodyPublisher() { - @Override - public void subscribe(Subscriber subscriber) { - connectionComplete = true; - if (pout != null) { - synchronized (pout) { - pout.notifyAll(); - } - BodyPublishers.ofInputStream(new Supplier() { - public InputStream get() { - return pin; - } - }).subscribe(subscriber); - } else { - BodyPublishers.noBody().subscribe(subscriber); - } - } - - @Override - public long contentLength() { - return contentLen; - } - }; HttpRequest.Builder rb = HttpRequest.newBuilder() - .method(httpRequestMethod, bp); + .method(httpRequestMethod, publisher); String verc = (String)outMessage.getContextualProperty(FORCE_HTTP_VERSION); if (verc == null) { verc = csPolicy.getVersion(); @@ -684,38 +795,7 @@ protected InputStream getInputStream() throws IOException { } } } - return new FilterInputStream(resp.body()) { - boolean closed; - @Override - public int read() throws IOException { - if (closed) { - throw new IOException("stream is closed"); - } - return super.read(); - } - - @Override - public int read(byte[] b) throws IOException { - if (closed) { - throw new IOException("stream is closed"); - } - return super.read(b); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (closed) { - throw new IOException("stream is closed"); - } - return super.read(b, off, len); - } - - @Override - public void close() throws IOException { - closed = true; - super.close(); - } - }; + return new HttpClientFilteredInputStream(resp.body()); } @Override