Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
shenfeng committed Apr 13, 2013
1 parent db43b6f commit d789b6f
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 73 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
[org.clojure/tools.logging "0.2.6"]
[ch.qos.logback/logback-classic "1.0.9"]
[clj-http "0.6.5"]
[io.netty/netty "3.6.2.Final"]
[io.netty/netty "3.6.5.Final"]
[org.clojure/data.json "0.2.1"]
[http.async.client "0.5.2"]
[compojure "1.1.5"]
Expand Down
File renamed without changes.
63 changes: 29 additions & 34 deletions src/java/org/httpkit/server/AsyncChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import clojure.lang.IFn;
import clojure.lang.Keyword;

@SuppressWarnings({ "unchecked" })
@SuppressWarnings({"unchecked"})
public class AsyncChannel {
private final SelectionKey key;
private final HttpServer server;
Expand Down Expand Up @@ -75,22 +75,22 @@ private void firstWrite(Object data, boolean close) throws IOException {
headers.put("Content-Type", "text/html; charset=utf-8");
}

if (close) { // normal response
if (close) { // normal response, Content-Length. Every http client understand it
buffers = encode(status, headers, body);
} else {
headers.put("Transfer-Encoding", "chunked"); // first chunk
ByteBuffer[] bb = encode(status, headers, body);
if (body == null) {
buffers = bb;
} else {
buffers = new ByteBuffer[] { bb[0], chunkSize(bb[1].remaining()), bb[1],
ByteBuffer.wrap(newLineBytes) };
buffers = new ByteBuffer[]{bb[0], chunkSize(bb[1].remaining()), bb[1],
ByteBuffer.wrap(newLineBytes)};
}
}
if (close) {
onClose(0);
}
write(buffers);
server.tryWrite(key, buffers);
}

private void writeChunk(Object body, boolean close) throws IOException {
Expand All @@ -102,8 +102,8 @@ private void writeChunk(Object body, boolean close) throws IOException {
ByteBuffer t = bodyBuffer(body);
if (t.hasRemaining()) {
ByteBuffer size = chunkSize(t.remaining());
buffers = new ByteBuffer[] { size, t, ByteBuffer.wrap(newLineBytes) };
write(buffers);
buffers = new ByteBuffer[]{size, t, ByteBuffer.wrap(newLineBytes)};
server.tryWrite(key, buffers);
}
}
if (close) {
Expand All @@ -125,7 +125,7 @@ public void messageReceived(final Object mesg) {
}

public void sendHandshake(Map<String, Object> headers) {
write(encode(101, headers, null));
server.tryWrite(key, encode(101, headers, null));
}

public void setCloseHandler(IFn fn) {
Expand All @@ -152,10 +152,10 @@ public boolean serverClose(int status) {
return false; // already closed
}
if (isWebSocket()) {
write(WSEncoder.encode(OPCODE_CLOSE, ByteBuffer.allocate(2)
.putShort((short) status).array()));
server.tryWrite(key, WSEncoder.encode(OPCODE_CLOSE, ByteBuffer.allocate(2)
.putShort((short) status).array()));
} else {
write(ByteBuffer.wrap(finalChunkBytes));
server.tryWrite(key, ByteBuffer.wrap(finalChunkBytes));
}
IFn f = closeHandler.get();
if (f != null) {
Expand All @@ -178,12 +178,12 @@ public boolean send(Object data, boolean close) throws IOException {
}

if (data instanceof String) { // null is not allowed
write(WSEncoder.encode(OPCODE_TEXT, ((String) data).getBytes(UTF_8)));
server.tryWrite(key, WSEncoder.encode(OPCODE_TEXT, ((String) data).getBytes(UTF_8)));
} else if (data instanceof byte[]) {
write(WSEncoder.encode(OPCODE_BINARY, (byte[]) data));
server.tryWrite(key, WSEncoder.encode(OPCODE_BINARY, (byte[]) data));
} else if (data instanceof InputStream) {
DynamicBytes bytes = readAll((InputStream) data);
write(WSEncoder.encode(OPCODE_BINARY, bytes.get(), bytes.length()));
server.tryWrite(key, WSEncoder.encode(OPCODE_BINARY, bytes.get(), bytes.length()));
} else if (data != null) { // ignore null
throw new IllegalArgumentException(
"only accept string, byte[], InputStream, get" + data);
Expand All @@ -193,7 +193,7 @@ public boolean send(Object data, boolean close) throws IOException {
serverClose(1000);
}
} else {
if (isHeaderSent) {
if (isHeaderSent) { // HTTP Streaming
writeChunk(data, close);
} else {
isHeaderSent = true;
Expand All @@ -208,11 +208,6 @@ public String toString() {
return s.getLocalSocketAddress() + "<->" + s.getRemoteSocketAddress();
}

private void write(ByteBuffer... buffers) {
((ServerAtta) key.attachment()).addBuffer(buffers);
server.queueWrite(key);
}

public boolean isWebSocket() {
return key.attachment() instanceof WsServerAtta;
}
Expand All @@ -234,20 +229,20 @@ public boolean isClosed() {

private static Keyword readable(int status) {
switch (status) {
case 0:
return K_BY_SERVER;
case -1:
return K_CLIENT_CLOSED;
case 1000:
return K_WS_1000;
case 1001:
return K_WS_1001;
case 1002:
return K_WS_1002;
case 1003:
return K_WS_1003;
default:
return K_UNKNOWN;
case 0:
return K_BY_SERVER;
case -1:
return K_CLIENT_CLOSED;
case 1000:
return K_WS_1000;
case 1001:
return K_WS_1001;
case 1002:
return K_WS_1002;
case 1003:
return K_WS_1003;
default:
return K_UNKNOWN;
}
}
}
53 changes: 27 additions & 26 deletions src/java/org/httpkit/server/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,11 @@ private void decodeHttp(HttpServerAtta atta, SelectionKey key, SocketChannel ch)
} catch (ProtocolException e) {
closeKey(key, -1);
} catch (RequestTooLargeException e) {
ByteBuffer[] buffers = ClojureRing.encode(413, null, e.getMessage());
atta.addBuffer(buffers);
atta.keepalive = false; // close after write
key.interestOps(OP_WRITE);
atta.keepalive = false;
tryWrite(key, ClojureRing.encode(413, null, e.getMessage()));
} catch (LineTooLargeException e) {
ByteBuffer[] buffers = ClojureRing.encode(414, null, e.getMessage());
atta.keepalive = false; // close after write
atta.addBuffer(buffers);
key.interestOps(OP_WRITE);
tryWrite(key, ClojureRing.encode(414, null, e.getMessage()));
}
}

Expand All @@ -120,16 +116,14 @@ private void decodeWs(WsServerAtta atta, SelectionKey key) {
handler.handle(atta.asycChannel, frame);
atta.decoder.reset();
} else if (frame instanceof PingFrame) {
atta.addBuffer(WSEncoder.encode(WSDecoder.OPCODE_PONG, frame.data));
atta.decoder.reset();
key.interestOps(OP_WRITE);
tryWrite(key, WSEncoder.encode(WSDecoder.OPCODE_PONG, frame.data));
} else if (frame instanceof CloseFrame) {
// even though the logic connection is closed. the socket
// did not, if client willing to reuse it, http-kit is more
// than happy
handler.clientClose(atta.asycChannel, ((CloseFrame) frame).getStatus());
atta.addBuffer(WSEncoder.encode(WSDecoder.OPCODE_CLOSE, frame.data));
key.interestOps(OP_WRITE);
tryWrite(key, WSEncoder.encode(WSDecoder.OPCODE_CLOSE, frame.data));
}
} while (buffer.hasRemaining()); // consume all
} catch (ProtocolException e) {
Expand Down Expand Up @@ -164,10 +158,10 @@ private void doWrite(SelectionKey key) {
ServerAtta atta = (ServerAtta) key.attachment();
SocketChannel ch = (SocketChannel) key.channel();
try {
LinkedList<ByteBuffer> toWrites = atta.toWrites;
// the sync is per socket (per client). virtually, no contention
// 1. keep byte data order, 2. ensure visibility
synchronized (atta.toWrites) {
synchronized (atta) {
LinkedList<ByteBuffer> toWrites = atta.toWrites;
int size = toWrites.size();
if (size == 1) {
ch.write(toWrites.get(0));
Expand All @@ -176,7 +170,7 @@ private void doWrite(SelectionKey key) {
} else if (size > 0) {
ByteBuffer buffers[] = new ByteBuffer[size];
toWrites.toArray(buffers);
ch.write(buffers);
ch.write(buffers, 0, buffers.length);
}
Iterator<ByteBuffer> ite = toWrites.iterator();
while (ite.hasNext()) {
Expand All @@ -199,23 +193,35 @@ private void doWrite(SelectionKey key) {
}

public void tryWrite(final SelectionKey key, ByteBuffer... buffers) {
ServerAtta atta = (ServerAtta) key.attachment();
synchronized (atta) {
// If has pending write, order should be maintained. (WebSocket)
if (!atta.toWrites.isEmpty()) {
for (ByteBuffer b : buffers) {
atta.toWrites.add(b);
}
pendings.add(key);
selector.wakeup();
return;
}
}
SocketChannel ch = (SocketChannel) key.channel();
try {
// TCP buffer most of time is empty, writable(8K ~ 16K)
// One IO thread => One thread reading + Many thread writing
ch.write(buffers);
ch.write(buffers, 0, buffers.length);
if (buffers[buffers.length - 1].hasRemaining()) {
ServerAtta atta = (ServerAtta) key.attachment();
// enqueue
for (ByteBuffer b : buffers) {
if (b.hasRemaining()) {
atta.addBuffer(b);
synchronized (atta) {
for (ByteBuffer b : buffers) {
if (b.hasRemaining()) {
atta.toWrites.add(b);
}
}
}
pendings.add(key);
selector.wakeup();
} else {
if (!((ServerAtta) key.attachment()).isKeepAlive()) {
if (!atta.isKeepAlive()) {
closeKey(key, CLOSE_NORMAL);
}
}
Expand All @@ -224,11 +230,6 @@ public void tryWrite(final SelectionKey key, ByteBuffer... buffers) {
}
}

public void queueWrite(final SelectionKey key) {
pendings.add(key);
selector.wakeup(); // JVM is smart enough: only once per loop
}

public void run() {
while (true) {
try {
Expand Down
10 changes: 0 additions & 10 deletions src/java/org/httpkit/server/ServerAtta.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,6 @@
public abstract class ServerAtta {
final LinkedList<ByteBuffer> toWrites = new LinkedList<ByteBuffer>();

public void addBuffer(ByteBuffer... buffer) {
synchronized (toWrites) {
for (ByteBuffer b : buffer) {
if (b != null) {
toWrites.add(b);
}
}
}
}

protected AsyncChannel asycChannel;

public abstract boolean isKeepAlive();
Expand Down
11 changes: 11 additions & 0 deletions test/java/org/httpkit/ws/WebSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ public class WebSocketClient {
Channel ch = null;
private BlockingQueue<WebSocketFrame> queue = new ArrayBlockingQueue<WebSocketFrame>(10);

public static void main(String[] args) throws Exception {
WebSocketClient client = new WebSocketClient("ws://localhost:9090/ws2/ws");
client.sendMessage("{:length 3145728, :times 1}");
for (int i = 0; i < 1; i++) {
Object message = client.getMessage();
if(message instanceof String) {
System.out.println(((String) message).length());
}
}
}

public WebSocketClient(String url) throws Exception {
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newFixedThreadPool(1), Executors.newFixedThreadPool(1)));
Expand Down
6 changes: 4 additions & 2 deletions test/org/httpkit/server_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
ring.middleware.file-info
org.httpkit.test-util
[clojure.java.io :only [input-stream]]
(compojure [core :only [defroutes GET POST HEAD DELETE ANY]]
(compojure [core :only [defroutes GET POST HEAD DELETE ANY context]]
[handler :only [site]])
org.httpkit.server
org.httpkit.timer)
(:require [clj-http.client :as http]
[org.httpkit.ws-test :as ws]
[org.httpkit.client :as client]
[clj-http.util :as u])
(:import [java.io File FileOutputStream FileInputStream]
Expand Down Expand Up @@ -103,6 +104,7 @@
(GET "/ws" [] (fn [req]
(with-channel req con
(on-receive con (fn [mesg] (send! con mesg))))))
(context "/ws2" [] ws/test-routes)
(GET "/inputstream" [] inputstream-handler)
(POST "/multipart" [] multipart-handler)
(POST "/chunked-input" [] (fn [req] {:status 200
Expand Down Expand Up @@ -159,7 +161,7 @@
(is (= (:body resp) "Hello World"))))

(deftest test-body-file
(doseq [length (range 1 (* 1024 1024 5) 1439987)]
(doseq [length (range 1 (* 1024 1024 8) 1439987)]
(let [resp (http/get "http://localhost:4347/file?l=" length)]
(is (= (:status resp) 200))
(is (= (get-in resp [:headers "content-type"]) "text/plain"))
Expand Down

0 comments on commit d789b6f

Please sign in to comment.