diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/SockJsSessionContext.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/SockJsSessionContext.java index 0aab8bd..c7f032a 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/SockJsSessionContext.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/SockJsSessionContext.java @@ -38,5 +38,10 @@ public interface SockJsSessionContext { /** * Get the underlying ChannelHandlerContext. */ - ChannelHandlerContext getContext(); + ChannelHandlerContext getConnectionContext(); + + /** + * Get the underlying ChannelHandlerContext. + */ + ChannelHandlerContext getCurrentContext(); } diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/AbstractTimersSessionState.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/AbstractTimersSessionState.java index c3f329d..a54c87e 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/AbstractTimersSessionState.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/AbstractTimersSessionState.java @@ -61,7 +61,7 @@ public void run() { } if (session.timestamp() + session.config().sessionTimeout() < now) { final SockJsSession removed = sessions.remove(session.sessionId()); - session.context().close(); + session.connectionContext().close(); sessionTimer.cancel(true); heartbeatFuture.cancel(true); if (logger.isDebugEnabled()) { diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/JsonpPollingSessionState.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/JsonpPollingSessionState.java new file mode 100644 index 0000000..0315c39 --- /dev/null +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/JsonpPollingSessionState.java @@ -0,0 +1,37 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.jboss.aerogear.io.netty.handler.codec.sockjs.handler; + +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpRequest; +import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig; +import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.HeartbeatFrame; + +import java.util.concurrent.ConcurrentMap; + +class JsonpPollingSessionState extends PollingSessionState { + + public JsonpPollingSessionState(ConcurrentMap sessions, HttpRequest request, SockJsConfig config) { + super(sessions, request, config); + } + + @Override + public void sendNoMessagesResponse(final HttpRequest request, final SockJsConfig config, final ChannelHandlerContext ctx) { + ctx.writeAndFlush(new HeartbeatFrame()).addListener(ChannelFutureListener.CLOSE); + } + +} diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/PollingSessionState.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/PollingSessionState.java index 570c626..e586985 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/PollingSessionState.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/PollingSessionState.java @@ -18,6 +18,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; +import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig; import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.MessageFrame; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -25,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; + /** * A polling state does not have a persistent connection to the client, instead a client * will connect, poll, to request data. @@ -40,27 +43,50 @@ * in the response. * */ -class PollingSessionState extends AbstractTimersSessionState { +abstract class PollingSessionState extends AbstractTimersSessionState { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PollingSessionState.class); private final ConcurrentMap sessions; + private final HttpRequest request; + private final SockJsConfig config; - public PollingSessionState(final ConcurrentMap sessions) { + public PollingSessionState(final ConcurrentMap sessions, + final HttpRequest request, + final SockJsConfig config) { super(sessions); this.sessions = sessions; + this.request = request; + this.config = config; } + /** + * Gives implementations the ability to decide what a response should look like and + * also how it should be written back to the client. + * + * @param request the polling HttpRequest. + * @param config the SockJsConfig. + * @param ctx {@code ChannelHandlerContext} the context. + */ + public abstract void sendNoMessagesResponse(HttpRequest request, SockJsConfig config, ChannelHandlerContext ctx); + @Override public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) { flushMessages(ctx, session); } - private static void flushMessages(final ChannelHandlerContext ctx, final SockJsSession session) { + @Override + public ChannelHandlerContext getSendingContext(SockJsSession session) { + return session.connectionContext(); + } + + private void flushMessages(final ChannelHandlerContext ctx, final SockJsSession session) { final String[] allMessages = session.getAllMessages(); if (allMessages.length == 0) { + sendNoMessagesResponse(request, config, ctx); return; } final MessageFrame messageFrame = new MessageFrame(allMessages); - ctx.channel().writeAndFlush(messageFrame).addListener(new ChannelFutureListener() { + ChannelFuture channelFuture = ctx.channel().writeAndFlush(messageFrame); + channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(final ChannelFuture future) throws Exception { if (!future.isSuccess()) { @@ -68,19 +94,20 @@ public void operationComplete(final ChannelFuture future) throws Exception { } } }); + channelFuture.addListener(ChannelFutureListener.CLOSE); } @Override public boolean isInUse(final SockJsSession session) { - return session.context().channel().isActive() || session.inuse(); + return session.connectionContext().channel().isActive() || session.inuse(); } @Override public void onSockJSServerInitiatedClose(final SockJsSession session) { - final ChannelHandlerContext context = session.context(); + final ChannelHandlerContext context = session.connectionContext(); if (context != null) { //could be null if the request is aborted, for example due to missing callback. if (logger.isDebugEnabled()) { - logger.debug("Will close session context {}", session.context()); + logger.debug("Will close session connectionContext {}", session.connectionContext()); } context.close(); } diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SendingSessionState.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SendingSessionState.java index 103fe66..1b4d3cf 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SendingSessionState.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SendingSessionState.java @@ -46,12 +46,17 @@ public void onConnect(final SockJsSession session, final ChannelHandlerContext c public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) { } + @Override + public ChannelHandlerContext getSendingContext(SockJsSession session) { + return session.currentContext(); + } + @Override public void onSockJSServerInitiatedClose(final SockJsSession session) { if (logger.isDebugEnabled()) { - logger.debug("Will close session context {}", session.context()); + logger.debug("Will close session connectionContext {}", session.connectionContext()); } - session.context().close(); + session.connectionContext().close(); sessions.remove(session.sessionId()); } diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SessionHandler.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SessionHandler.java index d5df1df..798e5dd 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SessionHandler.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SessionHandler.java @@ -32,7 +32,7 @@ * * For every connection received a new SessionHandler will be created * and added to the pipeline - * Depending on the type of connection (polling, streaming, send, or websocket) + * Depending on the type of connection (polling, streaming, send, or WebSocket) * the type of {@link SessionState} that this session handles will differ. * */ @@ -43,7 +43,6 @@ public enum Events { CLOSE_SESSION, HANDLE_SESSION } private final SessionState sessionState; private final SockJsSession session; - private ChannelHandlerContext currentContext; public SessionHandler(final SessionState sessionState, final SockJsSession session) { ArgumentUtil.checkNotNull(sessionState, "sessionState"); @@ -53,6 +52,7 @@ public SessionHandler(final SessionState sessionState, final SockJsSession sessi @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + session.setCurrentContext(ctx); if (msg instanceof HttpRequest) { handleSession(ctx); } else if (msg instanceof String) { @@ -63,7 +63,6 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw } private void handleSession(final ChannelHandlerContext ctx) throws Exception { - currentContext = ctx; if (logger.isDebugEnabled()) { logger.debug("handleSession {}", sessionState); } @@ -71,7 +70,7 @@ private void handleSession(final ChannelHandlerContext ctx) throws Exception { case CONNECTING: logger.debug("State.CONNECTING sending open frame"); ctx.channel().writeAndFlush(new OpenFrame()); - session.setContext(ctx); + session.setConnectionContext(ctx); session.onOpen(this); sessionState.onConnect(session, ctx); break; @@ -103,7 +102,7 @@ private void handleMessage(final String message) throws Exception { @Override public void send(String message) { - final Channel channel = getActiveChannel(); + final Channel channel = sessionState.getSendingContext(session).channel(); if (isWritable(channel)) { channel.writeAndFlush(new MessageFrame(message)); } else { @@ -111,11 +110,6 @@ public void send(String message) { } } - private Channel getActiveChannel() { - final Channel sessionChannel = session.context().channel(); - return sessionChannel.isActive() && sessionChannel.isRegistered() ? sessionChannel : currentContext.channel(); - } - @Override public void channelInactive(final ChannelHandlerContext ctx) throws Exception { session.resetInuse(); @@ -130,7 +124,7 @@ private static boolean isWritable(final Channel channel) { public void close() { session.onClose(); sessionState.onClose(); - final Channel channel = getActiveChannel(); + final Channel channel = sessionState.getSendingContext(session).channel(); if (isWritable(channel)) { final CloseFrame closeFrame = new CloseFrame(3000, "Go away!"); if (logger.isDebugEnabled()) { @@ -150,8 +144,13 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object eve } @Override - public ChannelHandlerContext getContext() { - return currentContext; + public ChannelHandlerContext getConnectionContext() { + return session.connectionContext(); + } + + @Override + public ChannelHandlerContext getCurrentContext() { + return session.currentContext(); } } diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SessionState.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SessionState.java index 60bef58..5df7c30 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SessionState.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SessionState.java @@ -45,6 +45,16 @@ interface SessionState { */ void onOpen(SockJsSession session, ChannelHandlerContext ctx); + /** + * Returns the ChannelHandlerContext that should be used to communicate with the client. + * This may be different for different transports. For some transports this will be the + * context that opened the connection and others it will be the current context. + * + * @param session the {@link SockJsSession}. + * @return {@code ChannelHandlerContext} the context to be used for sending. + */ + ChannelHandlerContext getSendingContext(SockJsSession session); + /** * Called after the {@link SockJsSession#onClose()} method has been called enabling * this SessionState to perform any clean up actions requried. diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SockJsHandler.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SockJsHandler.java index 2b20a77..b554c39 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SockJsHandler.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SockJsHandler.java @@ -128,11 +128,11 @@ private static void handleSession(final SockJsServiceFactory factory, switch (pathParams.transport()) { case XHR: addTransportHandler(new XhrPollingTransport(factory.config(), request), ctx); - addSessionHandler(new PollingSessionState(sessions), getSession(factory, pathParams.sessionId()), ctx); + addSessionHandler(new XhrPollingSessionState(sessions, request, factory.config()), getSession(factory, pathParams.sessionId()), ctx); break; case JSONP: addTransportHandler(new JsonpPollingTransport(factory.config(), request), ctx); - addSessionHandler(new PollingSessionState(sessions), getSession(factory, pathParams.sessionId()), ctx); + addSessionHandler(new JsonpPollingSessionState(sessions, request, factory.config()), getSession(factory, pathParams.sessionId()), ctx); break; case XHR_SEND: checkSessionExists(pathParams.sessionId(), request); diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SockJsSession.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SockJsSession.java index e75252d..e74ffff 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SockJsSession.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/SockJsSession.java @@ -36,21 +36,28 @@ enum States { CONNECTING, OPEN, CLOSED, INTERRUPTED } private final LinkedList messages = new LinkedList(); private final AtomicLong timestamp = new AtomicLong(); private final AtomicBoolean inuse = new AtomicBoolean(); - private ChannelHandlerContext ctx; + private ChannelHandlerContext connectionContext; + private ChannelHandlerContext currentContext; public SockJsSession(final String sessionId, final SockJsService service) { this.sessionId = sessionId; this.service = service; } - public synchronized ChannelHandlerContext context() { - return ctx; + public synchronized ChannelHandlerContext connectionContext() { + return connectionContext; } - public synchronized void setContext(final ChannelHandlerContext ctx) { - if (this.ctx == null) { - this.ctx = ctx; - } + public synchronized void setConnectionContext(final ChannelHandlerContext ctx) { + connectionContext = ctx; + } + + public synchronized ChannelHandlerContext currentContext() { + return currentContext; + } + + public synchronized void setCurrentContext(final ChannelHandlerContext ctx) { + currentContext = ctx; } public synchronized void setState(States state) { diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/StreamingSessionState.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/StreamingSessionState.java index 6f80b3c..cbc543d 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/StreamingSessionState.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/StreamingSessionState.java @@ -48,8 +48,13 @@ public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) flushMessages(ctx, session); } + @Override + public ChannelHandlerContext getSendingContext(SockJsSession session) { + return session.connectionContext(); + } + private static void flushMessages(final ChannelHandlerContext ignored, final SockJsSession session) { - final Channel channel = session.context().channel(); + final Channel channel = session.connectionContext().channel(); if (channel.isActive() && channel.isRegistered()) { final String[] allMessages = session.getAllMessages(); if (allMessages.length == 0) { @@ -71,16 +76,16 @@ public void operationComplete(final ChannelFuture future) throws Exception { @Override public void onSockJSServerInitiatedClose(final SockJsSession session) { - final ChannelHandlerContext context = session.context(); + final ChannelHandlerContext context = session.connectionContext(); if (context != null) { //could be null if the request is aborted, for example due to missing callback. - logger.debug("Will close session context " + session.context()); + logger.debug("Will close session connectionContext " + session.connectionContext()); context.close(); } } @Override public boolean isInUse(final SockJsSession session) { - return session.context().channel().isActive(); + return session.connectionContext().channel().isActive(); } @Override diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/WebSocketSessionState.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/WebSocketSessionState.java index 75e99c5..f06471f 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/WebSocketSessionState.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/WebSocketSessionState.java @@ -64,9 +64,14 @@ public void run() { public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) { } + @Override + public ChannelHandlerContext getSendingContext(SockJsSession session) { + return session.connectionContext(); + } + @Override public boolean isInUse(final SockJsSession session) { - return session.context().channel().isActive(); + return session.connectionContext().channel().isActive(); } @Override diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/XhrPollingSessionState.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/XhrPollingSessionState.java new file mode 100644 index 0000000..d2439ba --- /dev/null +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/XhrPollingSessionState.java @@ -0,0 +1,44 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.jboss.aerogear.io.netty.handler.codec.sockjs.handler; + +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpRequest; +import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig; + +import java.util.concurrent.ConcurrentMap; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.jboss.aerogear.io.netty.handler.codec.sockjs.transport.Transports.setDefaultHeaders; + +class XhrPollingSessionState extends PollingSessionState { + + public XhrPollingSessionState(ConcurrentMap sessions, HttpRequest request, SockJsConfig config) { + super(sessions, request, config); + } + + @Override + public void sendNoMessagesResponse(final HttpRequest request, final SockJsConfig config, final ChannelHandlerContext ctx) { + final DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.getProtocolVersion(), OK); + setDefaultHeaders(response, config); + if (ctx.channel().isActive() && ctx.channel().isRegistered()) { + ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + } + +} diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/JsonpPollingTransport.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/JsonpPollingTransport.java index 3321e5f..afde972 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/JsonpPollingTransport.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/JsonpPollingTransport.java @@ -15,12 +15,6 @@ */ package org.jboss.aerogear.io.netty.handler.codec.sockjs.transport; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import static io.netty.util.CharsetUtil.UTF_8; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; @@ -34,13 +28,17 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.util.ReferenceCountUtil; import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig; import org.jboss.aerogear.io.netty.handler.codec.sockjs.handler.SessionHandler.Events; import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.Frame; -import io.netty.util.ReferenceCountUtil; import java.util.List; +import static io.netty.handler.codec.http.HttpHeaders.Names.*; +import static io.netty.handler.codec.http.HttpResponseStatus.*; +import static io.netty.util.CharsetUtil.*; + /** * JSON Padding (JSONP) Polling is a transport where there is no open connection between * the client and the server. Instead the client will issue a new request for polling from diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/JsonpSendTransport.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/JsonpSendTransport.java index b910016..9aee360 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/JsonpSendTransport.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/JsonpSendTransport.java @@ -16,10 +16,18 @@ package org.jboss.aerogear.io.netty.handler.codec.sockjs.transport; import static io.netty.handler.codec.http.HttpResponseStatus.OK; + +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig; import io.netty.util.internal.StringUtil; +import static org.jboss.aerogear.io.netty.handler.codec.sockjs.transport.Transports.CONTENT_TYPE_PLAIN; +import static org.jboss.aerogear.io.netty.handler.codec.sockjs.transport.Transports.responseWithContent; +import static org.jboss.aerogear.io.netty.handler.codec.sockjs.transport.Transports.setDefaultHeaders; /** * JSON Padding (JSONP) Polling is a transport where there is no open connection between @@ -32,13 +40,20 @@ */ public class JsonpSendTransport extends AbstractSendTransport { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(JsonpSendTransport.class); + public JsonpSendTransport(final SockJsConfig config) { super(config); } @Override public void respond(final ChannelHandlerContext ctx, final FullHttpRequest request) throws Exception { - respond(ctx, request.getProtocolVersion(), OK, "ok"); + final FullHttpResponse response = responseWithContent(request.getProtocolVersion(), OK, CONTENT_TYPE_PLAIN, "ok"); + logger.info("Responding=" + response.getStatus() + ", request.uri=" + request.getUri()); + setDefaultHeaders(response, config); + if (ctx.channel().isActive() && ctx.channel().isRegistered()) { + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } } @Override diff --git a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/RawWebSocketTransport.java b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/RawWebSocketTransport.java index e9566fb..8abe1d1 100644 --- a/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/RawWebSocketTransport.java +++ b/netty-codec-sockjs/src/main/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/RawWebSocketTransport.java @@ -129,7 +129,11 @@ public void close() { ctx.close(); } @Override - public ChannelHandlerContext getContext() { + public ChannelHandlerContext getConnectionContext() { + return ctx; + } + @Override + public ChannelHandlerContext getCurrentContext() { return ctx; } }); diff --git a/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/XhrPollingSessionStateTest.java b/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/XhrPollingSessionStateTest.java new file mode 100644 index 0000000..b549ccf --- /dev/null +++ b/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/handler/XhrPollingSessionStateTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.jboss.aerogear.io.netty.handler.codec.sockjs.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.*; +import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig; +import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsService; +import org.junit.Test; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static io.netty.handler.codec.http.HttpResponseStatus.*; +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.MatcherAssert.*; +import static org.mockito.Mockito.*; + +public class XhrPollingSessionStateTest { + + @Test + public void flushMessagedRepsondNoContent() { + final EmbeddedChannel channel = new EmbeddedChannel(); + final PollingSessionState pollingSessionState = createPollingSessionState(); + final ChannelHandlerContext ctx = contextForChannel(channel); + final SockJsSession sockJsSession = new SockJsSession("1234", mock(SockJsService.class)); + pollingSessionState.onOpen(sockJsSession, ctx); + final HttpResponse response = channel.readOutbound(); + assertThat(response.getStatus(), is(OK)); + } + + private PollingSessionState createPollingSessionState() { + final ConcurrentMap sessions = new ConcurrentHashMap(); + final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/123/456/xhr"); + final SockJsConfig config = SockJsConfig.withPrefix("serviceName").build(); + final PollingSessionState pollingSessionState = new XhrPollingSessionState(sessions, request, config); + return pollingSessionState; + } + + private ChannelHandlerContext contextForChannel(final EmbeddedChannel ch) { + final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(ch); + return ctx; + } + +} diff --git a/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/protocol/SockJsProtocolTest.java b/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/protocol/SockJsProtocolTest.java index d8c0bef..2974190 100644 --- a/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/protocol/SockJsProtocolTest.java +++ b/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/protocol/SockJsProtocolTest.java @@ -1070,6 +1070,25 @@ public void jsonpPollingTestTransport() throws Exception { verifyNotCached(pollResponse); } + @Test + public void jsonpPollingNoData() throws Exception { + final String serviceName = "/echo"; + final String sessionUrl = serviceName + "/222/" + UUID.randomUUID().toString(); + final SockJsServiceFactory echoService = echoService(); + + final FullHttpResponse openResponse = jsonpRequest(sessionUrl + "/jsonp?c=%63allback", echoService); + assertThat(openResponse.getStatus(), is(HttpResponseStatus.OK)); + assertThat(openResponse.content().toString(UTF_8), equalTo("callback(\"o\");\r\n")); + assertThat(openResponse.headers().get(CONTENT_TYPE), equalTo(Transports.CONTENT_TYPE_JAVASCRIPT)); + verifyNotCached(openResponse); + + final FullHttpResponse pollResponse = jsonpRequest(sessionUrl + "/jsonp?c=callback", echoService); + assertThat(pollResponse.getStatus(), is(HttpResponseStatus.OK)); + assertThat(pollResponse.headers().get(CONTENT_TYPE), equalTo(Transports.CONTENT_TYPE_JAVASCRIPT)); + assertThat(pollResponse.content().toString(UTF_8), equalTo("callback(\"h\");\r\n")); + verifyNotCached(pollResponse); + } + /* * Equivalent to JsonPolling.test_no_callback in sockjs-protocol-0.3.3.py. */ diff --git a/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/WebSocketTransportTest.java b/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/WebSocketTransportTest.java index 13328ec..268f1f4 100644 --- a/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/WebSocketTransportTest.java +++ b/netty-codec-sockjs/src/test/java/org/jboss/aerogear/io/netty/handler/codec/sockjs/transport/WebSocketTransportTest.java @@ -27,7 +27,6 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import static org.jboss.aerogear.io.netty.handler.codec.sockjs.util.ChannelUtil.webSocketChannel; import static org.jboss.aerogear.io.netty.handler.codec.sockjs.util.HttpUtil.decode; -import static org.jboss.aerogear.io.netty.handler.codec.sockjs.util.HttpUtil.decodeFullResponse; import static org.jboss.aerogear.io.netty.handler.codec.sockjs.util.HttpUtil.decodeFullHttpResponse; import static org.jboss.aerogear.io.netty.handler.codec.sockjs.util.HttpUtil.webSocketUpgradeRequest; import static org.hamcrest.CoreMatchers.nullValue; diff --git a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/NotificationHandler.java b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/NotificationHandler.java index 5d1e155..be57cdf 100644 --- a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/NotificationHandler.java +++ b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/NotificationHandler.java @@ -71,7 +71,7 @@ public void messageReceived(final ChannelHandlerContext ctx, final Object msg) t if (msg instanceof FullHttpRequest) { final FullHttpRequest request = (FullHttpRequest) msg; final String requestUri = request.getUri(); - logger.info(requestUri); + logger.debug(requestUri); if (requestUri.startsWith(simplePushServer.config().endpointPrefix())) { handleHttpRequest(ctx, request); } else { diff --git a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/SimplePushServiceFactory.java b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/SimplePushServiceFactory.java index c328a97..5c373d0 100644 --- a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/SimplePushServiceFactory.java +++ b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/SimplePushServiceFactory.java @@ -19,11 +19,7 @@ import org.jboss.aerogear.io.netty.handler.codec.sockjs.AbstractSockJsServiceFactory; import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig; import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsService; - -import org.jboss.aerogear.simplepush.server.DefaultSimplePushServer; import org.jboss.aerogear.simplepush.server.SimplePushServer; -import org.jboss.aerogear.simplepush.server.SimplePushServerConfig; -import org.jboss.aerogear.simplepush.server.datastore.DataStore; /** * Factory class that creates instances of {@link SimplePushSockJSService}. diff --git a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/SimplePushSockJSService.java b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/SimplePushSockJSService.java index 462985d..a91689a 100644 --- a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/SimplePushSockJSService.java +++ b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/SimplePushSockJSService.java @@ -135,7 +135,7 @@ private void processUnacked(final String uaid, final SockJsSessionContext sessio logger.info("Cancelled Re-Acknowledger job"); } } else if (ackJobFuture == null) { - ackJobFuture = session.getContext().executor().scheduleAtFixedRate(new Runnable() { + ackJobFuture = session.getConnectionContext().executor().scheduleAtFixedRate(new Runnable() { @Override public void run() { final Set unacked = simplePushServer.getUnacknowledged(uaid); diff --git a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/UserAgentReaper.java b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/UserAgentReaper.java index ed27757..4aeef27 100644 --- a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/UserAgentReaper.java +++ b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/UserAgentReaper.java @@ -61,14 +61,14 @@ public void run() { // remove from database simplePushServer.removeAllChannels(userAgent.uaid()); - // close the user agent context + // close the user agent connectContext userAgent.context().close(); } } } private boolean isChannelInactive(final UserAgent userAgent) { - final Channel ch = userAgent.context().getContext().channel(); + final Channel ch = userAgent.context().getConnectionContext().channel(); return !ch.isActive() && !ch.isRegistered(); } } diff --git a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/WebSocketSslServerSslContext.java b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/WebSocketSslServerSslContext.java index 7df0f81..8a026a7 100644 --- a/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/WebSocketSslServerSslContext.java +++ b/server-netty/src/main/java/org/jboss/aerogear/simplepush/server/netty/WebSocketSslServerSslContext.java @@ -43,7 +43,7 @@ public final class WebSocketSslServerSslContext { /** * Creates a new {@link SSLContext}. This is an expensive operation and should only be done - * once and then the SSL context can be reused. + * once and then the SSL connectContext can be reused. * * @return {@link SSLContext} the SSLContext. */ diff --git a/server-netty/src/test/java/org/jboss/aerogear/simplepush/server/netty/NotificationHandlerTest.java b/server-netty/src/test/java/org/jboss/aerogear/simplepush/server/netty/NotificationHandlerTest.java index 0dcbb45..12f593d 100644 --- a/server-netty/src/test/java/org/jboss/aerogear/simplepush/server/netty/NotificationHandlerTest.java +++ b/server-netty/src/test/java/org/jboss/aerogear/simplepush/server/netty/NotificationHandlerTest.java @@ -194,7 +194,12 @@ public void close() { } @Override - public ChannelHandlerContext getContext() { + public ChannelHandlerContext getConnectionContext() { + return null; + } + + @Override + public ChannelHandlerContext getCurrentContext() { return null; } }; diff --git a/server-netty/src/test/java/org/jboss/aerogear/simplepush/server/netty/UserAgentReaperTest.java b/server-netty/src/test/java/org/jboss/aerogear/simplepush/server/netty/UserAgentReaperTest.java index 7ead062..4770cee 100644 --- a/server-netty/src/test/java/org/jboss/aerogear/simplepush/server/netty/UserAgentReaperTest.java +++ b/server-netty/src/test/java/org/jboss/aerogear/simplepush/server/netty/UserAgentReaperTest.java @@ -83,7 +83,7 @@ private SockJsSessionContext newSessionContext(final boolean active) { final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); when(ctx.channel()).thenReturn(channel); final SockJsSessionContext sessionContext = mock(SockJsSessionContext.class); - when(sessionContext.getContext()).thenReturn(ctx); + when(sessionContext.getConnectionContext()).thenReturn(ctx); return sessionContext; } diff --git a/server-netty/src/test/resources/sockjs-client.html b/server-netty/src/test/resources/sockjs-client.html index 4f1f9ca..0d226b9 100644 --- a/server-netty/src/test/resources/sockjs-client.html +++ b/server-netty/src/test/resources/sockjs-client.html @@ -65,7 +65,7 @@

SockJS SimplePush example

div.scrollTop(div.scrollTop()+10000); }; - sockjs.onopen = function() { + sockjs.onopen = function() { print('[*] open', sockjs.protocol); }; @@ -73,7 +73,7 @@

SockJS SimplePush example

print('[.] message', e.data); }; - sockjs.onclose = function() { + sockjs.onclose = function() { print('[*] close'); }; diff --git a/wildfly-module/src/main/resources/wildfly-config.cli b/wildfly-module/src/main/resources/wildfly-config.cli index 2459ac3..4decea3 100644 --- a/wildfly-module/src/main/resources/wildfly-config.cli +++ b/wildfly-module/src/main/resources/wildfly-config.cli @@ -7,7 +7,7 @@ batch ## Add SimplePush Netty Server /socket-binding-group=standard-sockets/socket-binding=simplepush:add(port=7777) -/:composite(steps=[{"operation" => "add", "address" => [("subsystem" => "simplepush")]}, {"operation" => "add", "address" => [("subsystem" => "simplepush"), ("server" => "simplepush")], "socket-binding" => "simplepush", "datasource-jndi-name" => "java:jboss/datasources/SimplePushDS", "password" => "changeMe!", "notification-tls" => false, "sockjs-tls" => false}] +/:composite(steps=[{"operation" => "add", "address" => [("subsystem" => "simplepush")]}, {"operation" => "add", "address" => [("subsystem" => "simplepush"), ("server" => "simplepush")], "socket-binding" => "simplepush", "datasource-jndi-name" => "java:jboss/datasources/SimplePushDS", "password" => "changeMe!", "notification-tls" => false, "sockjs-tls" => false, "sockjs-session-timeout" => 604800000L}] /:composite(steps=[{"operation" => "add", "address" => [("subsystem" => "simplepush"), ("server" => "simplepush"), ("datastore" => "in-memory")]}] run-batch