Skip to content

Commit

Permalink
Add ability to configure the queue capacity for ChunkedOutput (#5621)
Browse files Browse the repository at this point in the history
  • Loading branch information
rutterpaul-personal committed Apr 25, 2024
1 parent 75a9755 commit be13798
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 15 deletions.
175 changes: 163 additions & 12 deletions core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -27,12 +27,11 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.inject.Provider;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.ext.WriterInterceptor;

import javax.inject.Provider;

import org.glassfish.jersey.process.internal.RequestContext;
import org.glassfish.jersey.process.internal.RequestScope;
import org.glassfish.jersey.server.internal.LocalizationMessages;
Expand All @@ -51,7 +50,7 @@
public class ChunkedOutput<T> extends GenericType<T> implements Closeable {
private static final byte[] ZERO_LENGTH_DELIMITER = new byte[0];

private final BlockingDeque<T> queue = new LinkedBlockingDeque<>();
private final BlockingDeque<T> queue;
private final byte[] chunkDelimiter;
private final AtomicBoolean resumed = new AtomicBoolean(false);
private final Object lock = new Object();
Expand All @@ -70,12 +69,59 @@ public class ChunkedOutput<T> extends GenericType<T> implements Closeable {
private volatile ContainerResponse responseContext;
private volatile ConnectionCallback connectionCallback;


/**
* Create new {@code ChunkedOutput}.
*/
protected ChunkedOutput() {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
queue = new LinkedBlockingDeque<>();
}

/**
* Create new {@code ChunkedOutput} based on builder.
*
* @param builder the builder to use
*/
protected ChunkedOutput(Builder<T> builder) {
super();
if (builder.queueCapacity > 0) {
queue = new LinkedBlockingDeque<>(builder.queueCapacity);
} else {
queue = new LinkedBlockingDeque<>();
}
if (builder.chunkDelimiter != null) {
this.chunkDelimiter = new byte[builder.chunkDelimiter.length];
System.arraycopy(builder.chunkDelimiter, 0, this.chunkDelimiter, 0, builder.chunkDelimiter.length);
} else {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
}
if (builder.asyncContextProvider != null) {
this.asyncContext = builder.asyncContextProvider.get();
}
}

/**
* Create new {@code ChunkedOutput} based on builder.
*
* @param builder the builder to use
*/
private ChunkedOutput(TypedBuilder<T> builder) {
super(builder.chunkType);

if (builder.queueCapacity > 0) {
queue = new LinkedBlockingDeque<>(builder.queueCapacity);
} else {
queue = new LinkedBlockingDeque<>();
}
if (builder.chunkDelimiter != null) {
this.chunkDelimiter = new byte[builder.chunkDelimiter.length];
System.arraycopy(builder.chunkDelimiter, 0, this.chunkDelimiter, 0, builder.chunkDelimiter.length);
} else {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
}
if (builder.asyncContextProvider != null) {
this.asyncContext = builder.asyncContextProvider.get();
}
}

/**
Expand All @@ -86,6 +132,7 @@ protected ChunkedOutput() {
public ChunkedOutput(final Type chunkType) {
super(chunkType);
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -101,6 +148,7 @@ protected ChunkedOutput(final byte[] chunkDelimiter) {
} else {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
}
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -118,6 +166,7 @@ protected ChunkedOutput(final byte[] chunkDelimiter, Provider<AsyncContext> asyn
}

this.asyncContext = asyncContextProvider == null ? null : asyncContextProvider.get();
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -135,6 +184,7 @@ public ChunkedOutput(final Type chunkType, final byte[] chunkDelimiter) {
} else {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
}
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -149,6 +199,7 @@ protected ChunkedOutput(final String chunkDelimiter) {
} else {
this.chunkDelimiter = chunkDelimiter.getBytes();
}
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -165,6 +216,26 @@ public ChunkedOutput(final Type chunkType, final String chunkDelimiter) {
} else {
this.chunkDelimiter = chunkDelimiter.getBytes();
}
queue = new LinkedBlockingDeque<>();
}

/**
* Returns a builder to create a ChunkedOutput with custom configuration.
*
* @return builder
*/
public static <T> Builder<T> builder() {
return new Builder<>();
}

/**
* Returns a builder to create a ChunkedOutput with custom configuration.
*
* @param chunkType chunk type. Must not be {code null}.
* @return builder
*/
public static <T> TypedBuilder<T> builder(Type chunkType) {
return new TypedBuilder<>(chunkType);
}

/**
Expand All @@ -179,7 +250,12 @@ public void write(final T chunk) throws IOException {
}

if (chunk != null) {
queue.add(chunk);
try {
queue.put(chunk);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}

flushQueue();
Expand Down Expand Up @@ -265,9 +341,9 @@ public Void call() throws IOException {
}
throw mpe;
} finally {
synchronized (lock) {
touchingEntityStream = false;
}
synchronized (lock) {
touchingEntityStream = false;
}
}

t = queue.poll();
Expand Down Expand Up @@ -341,7 +417,6 @@ public void close() throws IOException {

/**
* Get state information.
*
* Please note that {@code ChunkedOutput} can be closed by the client side - client can close connection
* from its side.
*
Expand All @@ -353,10 +428,12 @@ public boolean isClosed() {

/**
* Executed only in case of close being triggered by client.
*
* @param e Exception causing the close
*/
protected void onClose(Exception e){

protected void onClose(Exception e) {
// drain queue when an exception occurs to prevent deadlocks
queue.clear();
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
Expand Down Expand Up @@ -399,4 +476,78 @@ void setContext(final RequestScope requestScope,
this.connectionCallback = connectionCallbackRunner;
flushQueue();
}

/**
* Builder that allows to create a new ChunkedOutput based on the given configuration options.
*
* @param <Y>
*/
public static class Builder<Y> {
byte[] chunkDelimiter;
int queueCapacity = -1;
Provider<AsyncContext> asyncContextProvider;

private Builder() {
// hide constructor
}

/**
* Set the chunk delimiter, in bytes.
* @param chunkDelimiter the chunk delimiter in bytes
* @return builder
*/
public Builder<Y> chunkDelimiter(byte[] chunkDelimiter) {
this.chunkDelimiter = chunkDelimiter;
return this;
}

/**
* Set the queue capacity. If greater than 0, the queue is bounded and will block when full.
* @param queueCapacity the queue capacity
* @return builder
*/
public Builder<Y> queueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
return this;
}

/**
* Set the async context provider.
* @param asyncContextProvider the async context provider
* @return builder
*/
public Builder<Y> asyncContextProvider(Provider<AsyncContext> asyncContextProvider) {
this.asyncContextProvider = asyncContextProvider;
return this;
}

/**
* Build the ChunkedOutput based on the given configuration.
* @return the ChunkedOutput
*/
public ChunkedOutput<Y> build() {
return new ChunkedOutput<>(this);
}
}

/**
* Builder that allows to create a new ChunkedOutput based on the given configuration options.
*
* @param <Y>
*/
public static class TypedBuilder<Y> extends Builder<Y> {
private Type chunkType;

private TypedBuilder(Type chunkType) {
this.chunkType = chunkType;
}

/**
* Build the ChunkedOutput based on the given configuration.
* @return the ChunkedOutput
*/
public ChunkedOutput<Y> build() {
return new ChunkedOutput<>(this);
}
}
}
5 changes: 4 additions & 1 deletion docs/src/main/docbook/async.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<!--
Copyright (c) 2012, 2021 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
This program and the accompanying materials are made available under the
terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -270,6 +270,9 @@ public class AsyncResource {
public ChunkedOutput<String> getChunkedResponse() {
final ChunkedOutput<String> output = new ChunkedOutput<String>(String.class);
// Or use the builder pattern instead, which also allows to configure the queue capacity
// final ChunkedOutput<String> output = ChunkedOutput.<String>builder(String.class).queueCapacity(10).build();
new Thread() {
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2022 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -60,14 +60,34 @@ public class ChunkedInputOutputTest extends JerseyTest {
*/
@Path("/test")
public static class TestResource {
/**
* Get chunk stream with a queue capacity of 2.
*
* @return chunk stream.
*/
@GET
@Path("/testWithBuilder")
public ChunkedOutput<String> getWithBuilder() {
return getOutput(ChunkedOutput.<String>builder(String.class).queueCapacity(2)
.chunkDelimiter("\r\n".getBytes()).build());
}

/**
* Get chunk stream.
*
* @return chunk stream.
*/
@GET
public ChunkedOutput<String> get() {
final ChunkedOutput<String> output = new ChunkedOutput<>(String.class, "\r\n");
return getOutput(new ChunkedOutput<>(String.class, "\r\n"));
}

/**
* Get chunk stream.
*
* @return chunk stream.
*/
private ChunkedOutput<String> getOutput(ChunkedOutput<String> output) {

new Thread() {
@Override
Expand Down Expand Up @@ -182,6 +202,19 @@ public void testChunkedOutputToSingleString() throws Exception {
"Unexpected value of chunked response unmarshalled as a single string.");
}

/**
* Test retrieving chunked response stream as a single response string, when a builder with capacity is used.
*
* @throws Exception in case of a failure during the test execution.
*/
@Test
public void testChunkedOutputToSingleStringWithBuilder() throws Exception {
final String response = target().path("test/testWithBuilder").request().get(String.class);

assertEquals("test\r\ntest\r\ntest\r\n", response,
"Unexpected value of chunked response unmarshalled as a single string.");
}

/**
* Test retrieving chunked response stream sequentially as individual chunks using chunked input.
*
Expand Down

0 comments on commit be13798

Please sign in to comment.