Skip to content

Commit

Permalink
Get basic response body writing working.
Browse files Browse the repository at this point in the history
Lots of TODOs remain (flow control is currently ignored, stream state is not stracked etc.) but it is in a state where at least some of the examples app can be used.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1680867 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed May 21, 2015
1 parent d550e85 commit 754c5db
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 22 deletions.
30 changes: 28 additions & 2 deletions java/org/apache/coyote/http2/Http2UpgradeHandler.java
Expand Up @@ -66,6 +66,7 @@ public class Http2UpgradeHandler extends AbstractStream implements InternalHttpU
private static final int FLAG_END_OF_STREAM = 1;
private static final int FLAG_END_OF_HEADERS = 4;

private static final int FRAME_TYPE_DATA = 0;
private static final int FRAME_TYPE_HEADERS = 1;
private static final int FRAME_TYPE_PRIORITY = 2;
private static final int FRAME_TYPE_SETTINGS = 4;
Expand Down Expand Up @@ -701,12 +702,14 @@ void writeHeaders(Stream stream, Response coyoteResponse) throws IOException {
ByteUtil.setThreeBytes(header, 0, target.limit());
if (first) {
header[3] = FRAME_TYPE_HEADERS;
if (stream.getOutputBuffer().hasNoBody()) {
header[4] = FLAG_END_OF_STREAM;
}
} else {
header[3] = FRAME_TYPE_CONTINUATION;
}
if (state == State.COMPLETE) {
// TODO Determine end of stream correctly
header[4] = FLAG_END_OF_HEADERS + FLAG_END_OF_STREAM;
header[4] += FLAG_END_OF_HEADERS;
}
if (log.isDebugEnabled()) {
log.debug(target.limit() + " bytes");
Expand All @@ -719,6 +722,29 @@ void writeHeaders(Stream stream, Response coyoteResponse) throws IOException {
}
}


void writeBody(Stream stream, ByteBuffer data) throws IOException {
data.flip();
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.writeBody", Integer.toString(connectionId),
stream.getIdentifier(), Integer.toString(data.remaining())));
}
synchronized (socketWrapper) {
// TODO Manage window sizes
byte[] header = new byte[9];
ByteUtil.setThreeBytes(header, 0, data.remaining());
header[3] = FRAME_TYPE_DATA;
if (stream.getOutputBuffer().isFinished()) {
header[4] = FLAG_END_OF_STREAM;
}
ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
socketWrapper.write(true, header, 0, header.length);
socketWrapper.write(true, data.array(), data.arrayOffset(), data.limit());
socketWrapper.flush(true);
}
}


private void processWrites() throws IOException {
if (socketWrapper.flush(false)) {
socketWrapper.registerWriteInterest();
Expand Down
71 changes: 61 additions & 10 deletions java/org/apache/coyote/http2/Stream.java
Expand Up @@ -17,6 +17,7 @@
package org.apache.coyote.http2;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Request;
Expand All @@ -35,6 +36,7 @@ public class Stream extends AbstractStream implements HeaderEmitter {
private final Http2UpgradeHandler handler;
private final Request coyoteRequest = new Request();
private final Response coyoteResponse = new Response();
private final StreamOutputBuffer outputBuffer = new StreamOutputBuffer();

private volatile long flowControlWindowSize;

Expand All @@ -44,7 +46,7 @@ public Stream(Integer identifier, Http2UpgradeHandler handler) {
this.handler = handler;
setParentStream(handler);
flowControlWindowSize = handler.getRemoteSettings().getInitialWindowSize();
coyoteResponse.setOutputBuffer(new StreamOutputBuffer());
coyoteResponse.setOutputBuffer(outputBuffer);
}


Expand Down Expand Up @@ -110,8 +112,12 @@ void flushData() {
log.debug(sm.getString("stream.write",
Long.toString(getConnectionId()), getIdentifier()));
}
// TODO
handler.addWrite("DATA");
try {
outputBuffer.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


Expand All @@ -127,31 +133,76 @@ protected final int getConnectionId() {
}


public Request getCoyoteRequest() {
Request getCoyoteRequest() {
return coyoteRequest;
}


public Response getCoyoteResponse() {
Response getCoyoteResponse() {
return coyoteResponse;
}


private class StreamOutputBuffer implements OutputBuffer {
StreamOutputBuffer getOutputBuffer() {
return outputBuffer;
}


class StreamOutputBuffer implements OutputBuffer {

private volatile ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
private volatile long written = 0;
private volatile boolean finished = false;

@Override
public int doWrite(ByteChunk chunk) throws IOException {
// TODO Blocking. Write to buffer. flushData() if full.
log.debug("Write [" + chunk.getLength() + "] bytes");
written += chunk.getLength();
return chunk.getLength();
if (finished) {
// TODO i18n
throw new IllegalStateException();
}
int len = chunk.getLength();
int offset = 0;
while (len > 0) {
int thisTime = Math.min(buffer.remaining(), len);
buffer.put(chunk.getBytes(), chunk.getOffset() + offset, thisTime);
offset += thisTime;
len -= thisTime;
if (!buffer.hasRemaining()) {
flush();
}
}
written += offset;
return offset;
}

public void flush() throws IOException {
if (buffer.position() == 0) {
// Buffer is empty. Nothing to do.
return;
}
handler.writeBody(Stream.this, buffer);
buffer.clear();
}

@Override
public long getBytesWritten() {
return written;
}

public void finished() {
finished = true;
}

public boolean isFinished() {
return finished;
}

/**
* @return <code>true</code> if it is certain that the associated
* response has no body.
*/
public boolean hasNoBody() {
return ((written == 0) && finished);
}
}
}
27 changes: 17 additions & 10 deletions java/org/apache/coyote/http2/StreamProcessor.java
Expand Up @@ -56,7 +56,7 @@ public void run() {
try {
adapter.service(request, response);
// Ensure the response is complete
response.action(ActionCode.CLIENT_FLUSH, null);
response.action(ActionCode.CLOSE, null);
} catch (Exception e) {
// TODO
e.printStackTrace();
Expand All @@ -67,24 +67,31 @@ public void run() {
@Override
public void action(ActionCode actionCode, Object param) {
switch (actionCode) {
case REQ_HOST_ADDR_ATTRIBUTE: {
request.remoteAddr().setString(socketWrapper.getRemoteAddr());
case COMMIT: {
if (!response.isCommitted()) {
response.setCommitted(true);
stream.writeHeaders();
}
break;
}
case IS_ERROR: {
((AtomicBoolean) param).set(getErrorState().isError());
case CLOSE: {
// Tell the output buffer there will be no more data
stream.getOutputBuffer().finished();
// Then flush it
action(ActionCode.CLIENT_FLUSH, null);
break;
}
case CLIENT_FLUSH: {
action(ActionCode.COMMIT, null);
stream.flushData();
break;
}
case COMMIT: {
if (!response.isCommitted()) {
response.setCommitted(true);
stream.writeHeaders();
}
case REQ_HOST_ADDR_ATTRIBUTE: {
request.remoteAddr().setString(socketWrapper.getRemoteAddr());
break;
}
case IS_ERROR: {
((AtomicBoolean) param).set(getErrorState().isError());
break;
}

Expand Down

0 comments on commit 754c5db

Please sign in to comment.