Skip to content

Commit

Permalink
Change from using aonymous inner classes to static classes to avoid h…
Browse files Browse the repository at this point in the history
…olding onto "this" longer than needed.

Fixes CXF-8946
  • Loading branch information
dkulp committed Oct 19, 2023
1 parent f35fd78 commit 5ffdb90
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ public Client reset() {
return this;
}

@SuppressWarnings("deprecation")
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,149 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
}
}

class HttpClientWrappedOutputStream extends WrappedOutputStream {
static class HttpClientPipedOutputStream extends PipedOutputStream {
HttpClientWrappedOutputStream stream;
HTTPClientPolicy csPolicy;
HttpClientBodyPublisher publisher;
HttpClientPipedOutputStream(HttpClientWrappedOutputStream s,
PipedInputStream pin,
HTTPClientPolicy cp,
HttpClientBodyPublisher bp) throws IOException {
super(pin);
stream = s;
csPolicy = cp;
publisher = bp;
}
public void close() throws IOException {
super.close();
csPolicy = null;
stream = null;
if (publisher != null) {
publisher.close();
publisher = null;
}
}
synchronized boolean canWrite() throws IOException {
return stream.isConnectionAttemptCompleted(csPolicy, this);
}
@Override
public void write(int b) throws IOException {
if (stream != null && (stream.connectionComplete || canWrite())) {
super.write(b);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (stream != null && (stream.connectionComplete || canWrite())) {
super.write(b, off, len);
}
}

};
private static final class HttpClientFilteredInputStream extends FilterInputStream {
boolean closed;

private HttpClientFilteredInputStream(InputStream in) {
super(in);
}
@Override
public int read() throws IOException {
if (closed) {
throw new IOException("stream is closed");
}
return super.read();
}

@Override
public int read(byte[] b) throws IOException {
if (closed) {
throw new IOException("stream is closed");
}
return super.read(b);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("stream is closed");
}
return super.read(b, off, len);
}

@Override
public void close() throws IOException {
if (!closed) {
closed = true;
super.close();
in = null;
}
}
}
private static final class InputStreamSupplier implements Supplier<InputStream> {
final InputStream in;
InputStreamSupplier(InputStream i) {
in = i;
}

public InputStream get() {
return in;
}
}
private static final class HttpClientBodyPublisher implements BodyPublisher {
PipedInputStream pin;
HttpClientWrappedOutputStream stream;
long contentLen;

private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, PipedInputStream pin) {
this.stream = s;
this.pin = pin;
}
synchronized void close() {
if (stream != null) {
contentLen = stream.contentLen;
stream = null;
}
}

@Override
public synchronized void subscribe(Subscriber<? super ByteBuffer> subscriber) {
if (stream != null) {
stream.connectionComplete = true;
contentLen = stream.contentLen;
if (stream.pout != null) {
synchronized (stream.pout) {
stream.pout.notifyAll();
}
if (stream != null) {
contentLen = stream.contentLen;
}
BodyPublishers.ofInputStream(new InputStreamSupplier(pin)).subscribe(subscriber);
stream = null;
pin = null;
return;
}
}
BodyPublishers.noBody().subscribe(subscriber);
}

@Override
public long contentLength() {
if (stream != null) {
contentLen = stream.contentLen;
}
return contentLen;
}
}
class HttpClientWrappedOutputStream extends WrappedOutputStream {

List<Flow.Subscriber<? super ByteBuffer>> subscribers = new LinkedList<>();
CompletableFuture<HttpResponse<InputStream>> future;
long contentLen = -1;
int rtimeout;
volatile Throwable exception;
volatile boolean connectionComplete;
PipedOutputStream pout;
HttpClientBodyPublisher publisher;
HttpRequest request;


Expand All @@ -371,7 +506,20 @@ class HttpClientWrappedOutputStream extends WrappedOutputStream {
chunkThreshold, conduitName, ((Address)message.get(KEY_HTTP_CONNECTION_ADDRESS)).getURI());
}


@Override
public void close() throws IOException {
super.close();
if (pout != null) {
pout.close();
pout = null;
}
if (publisher != null) {
publisher.close();
publisher = null;
}
request = null;
subscribers = null;
}
void addSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
subscribers.add(subscriber);
}
Expand Down Expand Up @@ -460,6 +608,7 @@ private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, PipedOut
return true;
}


@Override
protected void setProtocolHeaders() throws IOException {
HttpClient cl = outMessage.get(HttpClient.class);
Expand All @@ -476,52 +625,14 @@ protected void setProtocolHeaders() throws IOException {

final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
? 4096 : csPolicy.getChunkLength());

this.publisher = new HttpClientBodyPublisher(this, pin);
if (contentLen != 0) {
pout = new PipedOutputStream(pin) {
synchronized boolean canWrite() throws IOException {
return isConnectionAttemptCompleted(csPolicy, this);
}
@Override
public void write(int b) throws IOException {
if (connectionComplete || canWrite()) {
super.write(b);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (connectionComplete || canWrite()) {
super.write(b, off, len);
}
}
};
pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher);
}

BodyPublisher bp = new BodyPublisher() {
@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
connectionComplete = true;
if (pout != null) {
synchronized (pout) {
pout.notifyAll();
}
BodyPublishers.ofInputStream(new Supplier<InputStream>() {
public InputStream get() {
return pin;
}
}).subscribe(subscriber);
} else {
BodyPublishers.noBody().subscribe(subscriber);
}
}

@Override
public long contentLength() {
return contentLen;
}
};

HttpRequest.Builder rb = HttpRequest.newBuilder()
.method(httpRequestMethod, bp);
.method(httpRequestMethod, publisher);
String verc = (String)outMessage.getContextualProperty(FORCE_HTTP_VERSION);
if (verc == null) {
verc = csPolicy.getVersion();
Expand Down Expand Up @@ -684,38 +795,7 @@ protected InputStream getInputStream() throws IOException {
}
}
}
return new FilterInputStream(resp.body()) {
boolean closed;
@Override
public int read() throws IOException {
if (closed) {
throw new IOException("stream is closed");
}
return super.read();
}

@Override
public int read(byte[] b) throws IOException {
if (closed) {
throw new IOException("stream is closed");
}
return super.read(b);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("stream is closed");
}
return super.read(b, off, len);
}

@Override
public void close() throws IOException {
closed = true;
super.close();
}
};
return new HttpClientFilteredInputStream(resp.body());
}

@Override
Expand Down

0 comments on commit 5ffdb90

Please sign in to comment.