Permalink
Browse files

Fix to asynch I/O

  • Loading branch information...
boybeater authored and TooTallNate committed Aug 24, 2010
1 parent eaba3cf commit b6d0a83305a7d027b2be9bb3eb86a674988dc078
@@ -3,9 +3,12 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.BlockingQueue;
/**
* Represents one end (client or server) of a single WebSocket connection.
@@ -76,6 +79,15 @@
* The bytes that make up the current text frame being read.
*/
private ByteBuffer currentFrame;
/**
* Queue of buffers that need to be sent to the client.
*/
private BlockingQueue<ByteBuffer> bufferQueue;
/**
* Lock object to ensure that data is sent from the bufferQueue in
* the proper order
*/
private Object bufferQueueMutex = new Object();
// CONSTRUCTOR /////////////////////////////////////////////////////////////
@@ -84,11 +96,16 @@
* @param socketChannel The <tt>SocketChannel</tt> instance to read and
* write to. The channel should already be registered
* with a Selector before construction of this object.
* @param bufferQueue The Queue that we should use to buffer data that
* hasn't been sent to the client yet.
* @param listener The {@link WebSocketListener} to notify of events when
* they occur.
*/
WebSocket(SocketChannel socketChannel, WebSocketListener listener) {
WebSocket(SocketChannel socketChannel, BlockingQueue<ByteBuffer> bufferQueue,
WebSocketListener listener)
{
this.socketChannel = socketChannel;
this.bufferQueue = bufferQueue;
this.handshakeComplete = false;
this.remoteHandshake = this.currentFrame = null;
this.buffer = ByteBuffer.allocate(1);
@@ -133,7 +150,11 @@ public void close() throws IOException {
this.wsl.onClose(this);
}
public void send(String text) throws IOException {
/**
* @return True if all of the text was sent to the client by this thread.
* False if some of the text had to be buffered to be sent later.
*/
public boolean send(String text) throws IOException {
if (!this.handshakeComplete) throw new NotYetConnectedException();
if (text == null) throw new NullPointerException("Cannot send 'null' data to a WebSocket.");
@@ -143,12 +164,50 @@ public void send(String text) throws IOException {
b.put(START_OF_FRAME);
b.put(textBytes);
b.put(END_OF_FRAME);
b.rewind();
// Write the ByteBuffer to the socket
b.rewind();
this.socketChannel.write(b);
// See if we have any backlog that needs to be sent first
if (handleWrite()) {
// Write the ByteBuffer to the socket
this.socketChannel.write(b);
}
// If we didn't get it all sent, add it to the buffer of buffers
if (b.remaining() > 0) {
if (!this.bufferQueue.offer(b)) {
throw new IOException("Buffers are full, message could not be sent to" +
this.socketChannel.socket().getRemoteSocketAddress());
}
return false;
}
return true;
}
boolean hasBufferedData() {
return !this.bufferQueue.isEmpty();
}
/**
* @return True if all data has been sent to the client, false if there
* is still some buffered.
*/
boolean handleWrite() throws IOException {
synchronized (this.bufferQueueMutex) {
ByteBuffer buffer = this.bufferQueue.peek();
while (buffer != null) {
this.socketChannel.write(buffer);
if (buffer.remaining() > 0) {
return false; // Didn't finish this buffer. There's more to send.
} else {
this.bufferQueue.poll(); // Buffer finished. Remove it.
buffer = this.bufferQueue.peek();
}
}
return true;
}
}
public SocketChannel socketChannel() {
return this.socketChannel;
}
@@ -255,4 +314,5 @@ private void completeHandshake(byte[] handShakeBody) throws IOException, NoSuchA
close();
}
}
}
@@ -12,6 +12,7 @@
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import net.tootallnate.websocket.WebSocketListener.Draft;
@@ -119,7 +120,7 @@ public void run() {
Selector selector = Selector.open();
this.conn = new WebSocket(client, this);
this.conn = new WebSocket(client, new LinkedBlockingQueue<ByteBuffer>(), this);
client.register(selector, client.validOps());
// Continuous loop that is only supposed to end when close is called
@@ -11,7 +11,9 @@
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
/**
* <tt>WebSocketServer</tt> is an abstract class that only takes care of the
@@ -167,6 +169,18 @@ public Draft getDraft() {
return draft;
}
/**
* @return A BlockingQueue that should be used by a WebSocket
* to hold data that is waiting to be sent to the client.
* The default implementation returns an unbounded
* LinkedBlockingQueue, but you may choose to override
* this to return a bounded queue to protect against
* running out of memory.
*/
protected BlockingQueue<ByteBuffer> newBufferQueue() {
return new LinkedBlockingQueue<ByteBuffer>();
}
// Runnable IMPLEMENTATION /////////////////////////////////////////////////
public void run() {
@@ -179,33 +193,61 @@ public void run() {
server.register(selector, server.validOps());
while(true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> i = keys.iterator();
while(i.hasNext()) {
SelectionKey key = i.next();
// Remove the current key
i.remove();
// if isAccetable == true
// then a client required a connection
if (key.isAcceptable()) {
SocketChannel client = server.accept();
client.configureBlocking(false);
WebSocket c = new WebSocket(client, this);
client.register(selector, SelectionKey.OP_READ, c);
}
// if isReadable == true
// then the server is ready to read
if (key.isReadable()) {
WebSocket conn = (WebSocket)key.attachment();
conn.handleRead();
}
}
}
try {
selector.select(100L);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> i = keys.iterator();
while(i.hasNext()) {
SelectionKey key = i.next();
// Remove the current key
i.remove();
// if isAcceptable == true
// then a client required a connection
if (key.isAcceptable()) {
SocketChannel client = server.accept();
client.configureBlocking(false);
WebSocket c = new WebSocket(client, newBufferQueue(), this);
client.register(selector, SelectionKey.OP_READ, c);
}
// if isReadable == true
// then the server is ready to read
if (key.isReadable()) {
WebSocket conn = (WebSocket)key.attachment();
conn.handleRead();
}
// if isWritable == true
// then we need to send the rest of the data to the client
if (key.isWritable()) {
WebSocket conn = (WebSocket)key.attachment();
if (conn.handleWrite()) {
conn.socketChannel().register(selector,
SelectionKey.OP_READ, conn);
}
}
}
for (WebSocket conn : this.connections) {
// We have to do this check here, and not in the thread that
// adds the buffered data to the WebSocket, because the
// Selector is not thread-safe, and can only be accessed
// by this thread.
if (conn.hasBufferedData()) {
conn.socketChannel().register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE, conn);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (RuntimeException e) {
e.printStackTrace();
}
}
} catch (IOException ex) {
ex.printStackTrace();
} catch (NoSuchAlgorithmException e) {

0 comments on commit b6d0a83

Please sign in to comment.