Skip to content

Commit

Permalink
Added method ws.closeWithReason()
Browse files Browse the repository at this point in the history
Added test for ws.closeWithReason()
Cleaned my messy

Signed-off-by: francesco <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed Jan 17, 2018
1 parent c75d7cc commit dc1ab18
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 39 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/http/WebSocketBase.java
Expand Up @@ -239,6 +239,11 @@ public interface WebSocketBase extends ReadStream<Buffer>, WriteStream<Buffer> {
*/
void close();

/*
* Close sending a close frame with specified status code and reason
*/
void closeWithReason(short statusCode, String reason);

/**
* @return the remote address for this socket
*/
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Expand Up @@ -23,7 +23,7 @@
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.ConnectionListener;
import io.vertx.core.http.impl.ws.WebSocketCloseFrameCodes;
import io.vertx.core.http.impl.ws.WebSocketCloseFrameCode;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.logging.Logger;
Expand Down Expand Up @@ -711,8 +711,21 @@ public synchronized void close() {
} else {
// make sure everything is flushed out on close
endReadAndFlush();
// close the websocket connection by sending a close frame.
handshaker.close(chctx.channel(), new CloseWebSocketFrame(true, 0, WebSocketCloseFrameCodes.CLOSE_1000));
// close the websocket connection by sending a close frame with specified payload.
handshaker.close(chctx.channel(), new CloseWebSocketFrame(true, 0, 1000, null));
}
}

@Override
public void closeWithPayload(ByteBuf byteBuf) {
listener.onDiscard();
if (handshaker == null) {
super.close();
} else {
// make sure everything is flushed out on close
endReadAndFlush();
// close the websocket connection by sending a close frame with specified payload.
handshaker.close(chctx.channel(), new CloseWebSocketFrame(true, 0, byteBuf));
}
}

Expand Down
Expand Up @@ -79,6 +79,8 @@ private WebSocketFrame encodeFrame(Object obj) {
}
}

abstract public void closeWithPayload(ByteBuf byteBuf);

@Override
public Http1xConnectionBase closeHandler(Handler<Void> handler) {
return (Http1xConnectionBase) super.closeHandler(handler);
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java
Expand Up @@ -45,7 +45,7 @@
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.core.http.impl.ws.WebSocketCloseFrameCodes;
import io.vertx.core.http.impl.ws.WebSocketCloseFrameCode;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal;
Expand Down Expand Up @@ -311,7 +311,17 @@ public void close() {
super.close();
} else {
endReadAndFlush();
handshaker.close(chctx.channel(), new CloseWebSocketFrame(true, 0, WebSocketCloseFrameCodes.CLOSE_1000));
handshaker.close(chctx.channel(), new CloseWebSocketFrame(true, 0, 1000, null));
}
}

@Override
public void closeWithPayload(ByteBuf byteBuf) {
if (handshaker == null) {
super.close();
} else {
endReadAndFlush();
handshaker.close(chctx.channel(), new CloseWebSocketFrame(true, 0, byteBuf));
}
}

Expand Down
Expand Up @@ -48,7 +48,7 @@ public class ServerWebSocketImpl extends WebSocketImplBase<ServerWebSocket> impl
private HttpResponseStatus rejectedStatus;

public ServerWebSocketImpl(VertxInternal vertx, String uri, String path, String query, MultiMap headers,
ConnectionBase conn, boolean supportsContinuation, Supplier<String> connectRunnable,
Http1xConnectionBase conn, boolean supportsContinuation, Supplier<String> connectRunnable,
int maxWebSocketFrameSize, int maxWebSocketMessageSize) {
super(vertx, conn, supportsContinuation, maxWebSocketFrameSize, maxWebSocketMessageSize);
this.uri = uri;
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java
Expand Up @@ -18,7 +18,7 @@
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.WebSocketBase;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.core.http.impl.ws.WebSocketCloseFrameCodes;
import io.vertx.core.http.impl.ws.WebSocketCloseFrameCode;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.VertxInternal;
Expand Down Expand Up @@ -57,11 +57,11 @@ public abstract class WebSocketImplBase<S extends WebSocketBase> implements WebS
private Handler<Throwable> exceptionHandler;
private Handler<Void> closeHandler;
private Handler<Void> endHandler;
protected final ConnectionBase conn;
protected final Http1xConnectionBase conn;
protected boolean closed;


WebSocketImplBase(VertxInternal vertx, ConnectionBase conn, boolean supportsContinuation,
WebSocketImplBase(VertxInternal vertx, Http1xConnectionBase conn, boolean supportsContinuation,
int maxWebSocketFrameSize, int maxWebSocketMessageSize) {
this.supportsContinuation = supportsContinuation;
this.textHandlerID = UUID.randomUUID().toString();
Expand Down Expand Up @@ -93,7 +93,15 @@ public boolean writeQueueFull() {
public void close() {
synchronized (conn) {
checkClosed();
writeFrame(WebSocketFrame.factory.closeFrame(WebSocketCloseFrameCodes.CLOSE_1000));
conn.close();
cleanupHandlers();
}
}

public void closeWithReason(short statusCode, String reason) {
synchronized (conn) {
checkClosed();
conn.closeWithPayload(WebSocketCloseFrameCode.generateByteBuffer(statusCode, reason));
cleanupHandlers();
}
}
Expand Down Expand Up @@ -253,7 +261,7 @@ void checkClosed() {
void handleFrame(WebSocketFrameInternal frame) {
synchronized (conn) {
conn.reportBytesRead(frame.binaryData().length());
if (dataHandler != null) {
if (dataHandler != null && frame.type() != FrameType.CLOSE) {
Buffer buff = Buffer.buffer(frame.getBinaryData());
dataHandler.handle(buff);
}
Expand Down
@@ -0,0 +1,37 @@
package io.vertx.core.http.impl.ws;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.nio.charset.Charset;

/**
* @author Francesco Guardiani @slinkydeveloper
*/
public enum WebSocketCloseFrameCode {

NORMAL_CLOSE((short)1000, "Connection Closed");

private short statusCode;
private String reason;

WebSocketCloseFrameCode(short statusCode, String reason) {
this.statusCode = statusCode;
this.reason = reason;
}

public ByteBuf byteBuf() {
return generateByteBuffer(this.statusCode, this.reason);
}

public static ByteBuf generateByteBuffer(short statusCode, String reason) {
if (reason != null)
return Unpooled.copiedBuffer(
Unpooled.copyShort(statusCode), // First two bytes are reserved for status code
Unpooled.copiedBuffer(reason, Charset.forName("UTF-8"))
);
else
return Unpooled.copyShort(statusCode);
}

}

This file was deleted.

69 changes: 59 additions & 10 deletions src/test/java/io/vertx/test/core/WebsocketTest.java
Expand Up @@ -12,6 +12,8 @@
package io.vertx.test.core;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -1110,7 +1112,7 @@ private void testWriteMessage(int size, WebsocketVersion version) {
Buffer actual = Buffer.buffer();
ws.handler(actual::appendBuffer);
ws.closeHandler(v -> {
assertArrayEquals(expected, actual.getBytes());
assertArrayEquals(expected, actual.getBytes(0, actual.length()));
testComplete();
});
});
Expand Down Expand Up @@ -1459,7 +1461,8 @@ private void testUpgrade(boolean delayed) {
buff.appendBuffer(b);
});
ws.endHandler(v -> {
assertEquals("helloworld", buff.toString());
// Last two bytes are status code payload
assertEquals("helloworld", buff.toString("UTF-8"));
testComplete();
});
ws.write(Buffer.buffer("foo"));
Expand Down Expand Up @@ -2090,17 +2093,14 @@ public void testCloseStatusCodeFromServer() throws InterruptedException {
server = vertx.createHttpServer(new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT))
.websocketHandler(socket -> {
socket.closeHandler(a -> {
System.out.println("Closed");
latch.countDown();
});
vertx.setTimer(1000, (ar) -> socket.close());
})
.listen(ar -> {
client.websocket(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/", ws -> {
ws.frameHandler(frame -> {
System.out.println("Frame Received");
System.out.println(frame.binaryData().getByteBuf().readerIndex(2).toString(Charset.forName("UTF-8")));
System.out.println(frame.binaryData().getByteBuf().getShort(0));
assertEquals(1000, frame.binaryData().getByteBuf().getShort(0));
latch.countDown();
});
});
Expand All @@ -2115,13 +2115,10 @@ public void testCloseStatusCodeFromClient() throws InterruptedException {
server = vertx.createHttpServer(new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT))
.websocketHandler(socket -> {
socket.closeHandler(a -> {
System.out.println("Closed");
latch.countDown();
});
socket.frameHandler(frame -> {
System.out.println("Frame Received");
System.out.println(frame.binaryData().getByteBuf().readerIndex(2).toString(Charset.forName("UTF-8")));
System.out.println(frame.binaryData().getByteBuf().getShort(0));
assertEquals(1000, frame.binaryData().getByteBuf().getShort(0));
latch.countDown();
});
})
Expand All @@ -2132,4 +2129,56 @@ public void testCloseStatusCodeFromClient() throws InterruptedException {
});
awaitLatch(latch);
}

@Test
public void testCloseCustomPayloadFromServer() throws InterruptedException {
final String REASON = "I'm moving away!";
final short STATUS_CODE = (short)1001;

CountDownLatch latch = new CountDownLatch(2);
client = vertx.createHttpClient();
server = vertx.createHttpServer(new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT))
.websocketHandler(socket -> {
socket.closeHandler(a -> {
latch.countDown();
});
vertx.setTimer(1000, (ar) -> socket.closeWithReason(STATUS_CODE, REASON));
})
.listen(ar -> {
client.websocket(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/", ws -> {
ws.frameHandler(frame -> {
assertEquals(REASON, frame.binaryData().getByteBuf().readerIndex(2).toString(Charset.forName("UTF-8")));
assertEquals(STATUS_CODE, frame.binaryData().getByteBuf().getShort(0));
latch.countDown();
});
});
});
awaitLatch(latch);
}

@Test
public void testCloseCustomPayloadFromClient() throws InterruptedException {
final String REASON = "I'm moving away!";
final short STATUS_CODE = (short)1001;

CountDownLatch latch = new CountDownLatch(2);
client = vertx.createHttpClient();
server = vertx.createHttpServer(new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT))
.websocketHandler(socket -> {
socket.closeHandler(a -> {
latch.countDown();
});
socket.frameHandler(frame -> {
assertEquals(REASON, frame.binaryData().getByteBuf().readerIndex(2).toString(Charset.forName("UTF-8")));
assertEquals(STATUS_CODE, frame.binaryData().getByteBuf().getShort(0));
latch.countDown();
});
})
.listen(ar -> {
client.websocket(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/", ws -> {
ws.closeWithReason(STATUS_CODE, REASON);
});
});
awaitLatch(latch);
}
}

0 comments on commit dc1ab18

Please sign in to comment.