Skip to content

Commit

Permalink
Merge branch 'jetty-9.4.x' of github.com:eclipse/jetty.project into j…
Browse files Browse the repository at this point in the history
…etty-9.4.x
  • Loading branch information
joakime committed Sep 25, 2019
2 parents f6ca4f1 + 3edc6c9 commit 39ee316
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public enum Result
FLUSH, // The buffers previously generated should be flushed
CONTINUE, // Continue generating the message
SHUTDOWN_OUT, // Need EOF to be signaled
DONE // Message generation complete
DONE // The current phase of generation is complete
}

// other statics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,9 @@ public Action process() throws Exception
{
HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
LOG.debug("generate: {} for {} ({},{},{})@{}",
result,
this,
BufferUtil.toSummaryString(_header),
BufferUtil.toSummaryString(_content),
_lastContent,
Expand Down Expand Up @@ -826,8 +826,10 @@ public Action process() throws Exception
}
case DONE:
{
// If shutdown after commit, we can still close here.
if (getConnector().isShutdown())
// If this is the end of the response and the connector was shutdown after response was committed,
// we can't add the Connection:close header, but we are still allowed to close the connection
// by shutting down the output.
if (getConnector().isShutdown() && _generator.isEnd() && _generator.isPersistent())
_shutdownOut = true;

return Action.SUCCEEDED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,17 +591,18 @@ public void write(byte[] b, int off, int len) throws IOException
// handle blocking write

// Should we aggregate?
int capacity = getBufferSize();
// Yes - if the write is smaller than the commitSize (==aggregate buffer size)
// and the write is not the last one, or is last but will fit in an already allocated aggregate buffer.
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
if (len <= _commitSize && (!last || len <= BufferUtil.space(_aggregate)))
{
acquireBuffer();

// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);

// return if we are not complete, not full and filled all the content
if (filled == len && !BufferUtil.isFull(_aggregate))
// return if we are not the last write and have aggregated all of the content
if (!last && filled == len && !BufferUtil.isFull(_aggregate))
return;

// adjust offset/length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -166,6 +168,66 @@ public void testGracefulTimeout() throws Exception
client.close();
}


/**
* Test completed writes during shutdown do not close output
* @throws Exception on test failure
*/
@Test
public void testWriteDuringShutdown() throws Exception
{
Server server = new Server();
server.setStopTimeout(1000);

ServerConnector connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);

ABHandler handler = new ABHandler();
StatisticsHandler stats = new StatisticsHandler();
server.setHandler(stats);
stats.setHandler(handler);

server.start();

Thread stopper = new Thread(() ->
{
try
{
handler.latchA.await();
server.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
});
stopper.start();

final int port = connector.getLocalPort();
try(Socket client = new Socket("127.0.0.1", port))
{
client.getOutputStream().write((
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"\r\n"
).getBytes());
client.getOutputStream().flush();

while (!connector.isShutdown())
Thread.sleep(10);

handler.latchB.countDown();

String response = IO.toString(client.getInputStream());
assertThat(response, startsWith("HTTP/1.1 200 "));
assertThat(response, containsString("Content-Length: 2"));
assertThat(response, containsString("Connection: close"));
assertThat(response, endsWith("ab"));
}
stopper.join();
}

/**
* Test of standard graceful timeout mechanism when a block request does
* complete. Note that even though the request completes after 100ms, the
Expand Down Expand Up @@ -736,6 +798,30 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
}
}

static class ABHandler extends AbstractHandler
{
final CountDownLatch latchA = new CountDownLatch(1);
final CountDownLatch latchB = new CountDownLatch(1);

@Override
public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setContentLength(2);
response.getOutputStream().write("a".getBytes());
try
{
latchA.countDown();
latchB.await();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
response.flushBuffer();
response.getOutputStream().write("b".getBytes());
}
}

static class TestHandler extends AbstractHandler
{
final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,38 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;

/* A Lifecycle that can be gracefully shutdown.
/**
* <p>Jetty components that wish to be part of a Graceful shutdown implement this interface so that
* the {@link Graceful#shutdown()} method will be called to initiate a shutdown. Shutdown operations
* can fall into the following categories:</p>
* <ul>
* <li>Preventing new load from being accepted (eg connectors stop accepting connections)</li>
* <li>Preventing existing load expanding (eg stopping existing connections accepting new requests)</li>
* <li>Waiting for existing load to complete (eg waiting for active request count to reduce to 0)</li>
* <li>Performing cleanup operations that may take time (eg closing an SSL connection)</li>
* </ul>
* <p>The {@link Future} returned by the the shutdown call will be completed to indicate the shutdown operation is completed.
* Some shutdown operations may be instantaneous and always return a completed future.
* </p><p>
* Graceful shutdown is typically orchestrated by the doStop methods of Server or ContextHandler (for a full or partial
* shutdown respectively).
* </p>
*/
public interface Graceful
{
Future<Void> shutdown();

boolean isShutdown();

/**
* A utility Graceful that uses a {@link FutureCallback} to indicate if shutdown is completed.
* By default the {@link FutureCallback} is returned as already completed, but the {@link #newShutdownCallback()} method
* can be overloaded to return a non-completed callback that will require a {@link Callback#succeeded()} or
* {@link Callback#failed(Throwable)} call to be completed.
*/
class Shutdown implements Graceful
{
private final AtomicReference<FutureCallback> _shutdown = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class SuspendResumeTest
Expand Down Expand Up @@ -195,8 +196,7 @@ public void testSuspendAfterClose() throws Exception
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
assertThat(serverSocket.closeCode, is(StatusCode.NORMAL));

// suspend the client so that no read events occur
SuspendToken suspendToken = clientSocket.session.suspend();
suspendToken.resume();
// suspend after closed throws ISE
assertThrows(IllegalStateException.class, () -> clientSocket.session.suspend());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private final EventDriver websocket;
private final Executor executor;
private final WebSocketPolicy policy;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean onCloseCalled = new AtomicBoolean(false);
private ClassLoader classLoader;
private ExtensionFactory extensionFactory;
private RemoteEndpointFactory remoteEndpointFactory;
Expand All @@ -80,7 +80,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
private CompletableFuture<Session> openFuture;
private AtomicBoolean onCloseCalled = new AtomicBoolean(false);

public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, EventDriver websocket, LogicalConnection connection)
{
Expand Down Expand Up @@ -338,10 +337,9 @@ public void incomingFrame(Frame frame)
public boolean isOpen()
{
if (this.connection == null)
{
return false;
}
return !closed.get() && this.connection.isOpen();

return !onCloseCalled.get() && this.connection.isOpen();
}

@Override
Expand Down Expand Up @@ -546,6 +544,9 @@ public void setUpgradeResponse(UpgradeResponse response)
@Override
public SuspendToken suspend()
{
if (onCloseCalled.get())
throw new IllegalStateException("Not open");

return connection.suspend();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ Action getAction(ByteBuffer buffer)

/**
* Requests that reads from the connection be suspended.
*
* @return whether the suspending was successful
*/
boolean suspending()
void suspending()
{
synchronized (this)
{
Expand All @@ -101,9 +99,7 @@ boolean suspending()
{
case READING:
state = State.SUSPENDING;
return true;
case EOF:
return false;
break;
default:
throw new IllegalStateException(toString(state));
}
Expand Down Expand Up @@ -131,8 +127,6 @@ ByteBuffer resume()
ByteBuffer bb = buffer;
buffer = null;
return bb;
case EOF:
return null;
default:
throw new IllegalStateException(toString(state));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ReadStateTest
{
Expand All @@ -50,7 +49,7 @@ public void testSuspendingThenResume()
ReadState readState = new ReadState();
assertThat("Initially reading", readState.isReading(), is(true));

assertTrue(readState.suspending());
readState.suspending();
assertThat("Suspending doesn't take effect immediately", readState.isSuspended(), is(false));

assertNull(readState.resume());
Expand All @@ -64,7 +63,7 @@ public void testSuspendingThenSuspendThenResume()
ReadState readState = new ReadState();
assertThat("Initially reading", readState.isReading(), is(true));

assertThat(readState.suspending(), is(true));
readState.suspending();
assertThat("Suspending doesn't take effect immediately", readState.isSuspended(), is(false));

ByteBuffer content = BufferUtil.toBuffer("content");
Expand All @@ -84,8 +83,8 @@ public void testEof()

assertThat(readState.isReading(), is(false));
assertThat(readState.isSuspended(), is(true));
assertThat(readState.suspending(), is(false));
assertThrows(IllegalStateException.class, readState::suspending);
assertThat(readState.getAction(content), is(ReadState.Action.EOF));
assertNull(readState.resume());
assertThrows(IllegalStateException.class, readState::resume);
}
}

0 comments on commit 39ee316

Please sign in to comment.