Skip to content
This repository has been archived by the owner on Nov 18, 2018. It is now read-only.

Fixing SockJS session handling for xhr/jsonp polling. #72

Merged
merged 1 commit into from
Mar 12, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 18 additions & 1 deletion netty-codec-sockjs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,24 @@
<scope>test</scope>
</dependency>

</dependencies>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<configuration>
<mainClass>org.jboss.aerogear.io.netty.handler.codec.sockjs.NettySockJsServer</mainClass>
<classpathScope>test</classpathScope>
<arguments>
<!-- Port -->
<argument>8081</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>

</project>

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.MessageFrame;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.MessageFrame;

import java.util.concurrent.ConcurrentMap;

Expand All @@ -43,45 +41,29 @@
* in the response.
*
*/
abstract class PollingSessionState extends AbstractTimersSessionState {
class PollingSessionState extends AbstractTimersSessionState {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PollingSessionState.class);
private final ConcurrentMap<String, SockJsSession> sessions;
private final HttpRequest request;
private final SockJsConfig config;

public PollingSessionState(final ConcurrentMap<String, SockJsSession> sessions,
final HttpRequest request,
final SockJsConfig config) {
PollingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
super(sessions);
this.sessions = sessions;
this.request = request;
this.config = config;
}

/**
* Gives implementations the ability to decide what a response should look like and
* also how it should be written back to the client.
*
* @param request the polling HttpRequest.
* @param config the SockJsConfig.
* @param ctx {@code ChannelHandlerContext} the context.
*/
public abstract void sendNoMessagesResponse(HttpRequest request, SockJsConfig config, ChannelHandlerContext ctx);

@Override
public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) {
flushMessages(ctx, session);
}

@Override
public ChannelHandlerContext getSendingContext(SockJsSession session) {
return session.connectionContext();
final ChannelHandlerContext openContext = session.openContext();
return openContext == null ? session.connectionContext() : openContext;
}

private void flushMessages(final ChannelHandlerContext ctx, final SockJsSession session) {
final String[] allMessages = session.getAllMessages();
if (allMessages.length == 0) {
sendNoMessagesResponse(request, config, ctx);
return;
}
final MessageFrame messageFrame = new MessageFrame(allMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SendingSessionState implements SessionState {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(SendingSessionState.class);
private final ConcurrentMap<String, SockJsSession> sessions;

public SendingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
SendingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
ArgumentUtil.checkNotNull(sessions, "sessions");
this.sessions = sessions;
}
Expand All @@ -48,7 +48,7 @@ public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx)

@Override
public ChannelHandlerContext getSendingContext(SockJsSession session) {
return session.currentContext();
return session.openContext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private void handleSession(final ChannelHandlerContext ctx) {
session.setState(States.INTERRUPTED);
} else {
session.setInuse();
session.setOpenContext(ctx);
sessionState.onOpen(session, ctx);
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ private static void handleSession(final SockJsServiceFactory factory,
switch (pathParams.transport()) {
case XHR:
addTransportHandler(new XhrPollingTransport(factory.config(), request), ctx);
addSessionHandler(new XhrPollingSessionState(sessions, request, factory.config()), getSession(factory, pathParams.sessionId()), ctx);
addSessionHandler(new PollingSessionState(sessions), getSession(factory, pathParams.sessionId()), ctx);
break;
case JSONP:
addTransportHandler(new JsonpPollingTransport(factory.config(), request), ctx);
addSessionHandler(new JsonpPollingSessionState(sessions, request, factory.config()), getSession(factory, pathParams.sessionId()), ctx);
addSessionHandler(new PollingSessionState(sessions), getSession(factory, pathParams.sessionId()), ctx);
break;
case XHR_SEND:
checkSessionExists(pathParams.sessionId(), request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

class SockJsSession {


enum States { CONNECTING, OPEN, CLOSED, INTERRUPTED }

private States state = States.CONNECTING;
Expand All @@ -38,24 +39,63 @@ enum States { CONNECTING, OPEN, CLOSED, INTERRUPTED }
private final AtomicBoolean inuse = new AtomicBoolean();
private ChannelHandlerContext connectionContext;
private ChannelHandlerContext currentContext;
private ChannelHandlerContext openContext;

public SockJsSession(final String sessionId, final SockJsService service) {
this.sessionId = sessionId;
this.service = service;
}

/**
* Returns the ChannelHandlerContext used to initially connect.
*
* @return {@code ChannelHandlerContext} the ChannelHandlerContext used establishing a connection.
*/
public synchronized ChannelHandlerContext connectionContext() {
return connectionContext;
}

/**
* Sets the ChannelHandlerContext used to initially connect.
*
* @param ctx the ChannelHandlerContext used establishing a connection.
*/
public synchronized void setConnectionContext(final ChannelHandlerContext ctx) {
connectionContext = ctx;
}

/**
* Returns the ChannelHandlerContext used on an open session.
*
* @return {@code ChannelHandlerContext} the ChannelHandlerContext used establishing a connection.
*/
public synchronized ChannelHandlerContext openContext() {
return openContext;
}

/**
* Sets the ChannelHandlerContext used to initially connect.
*
* @param ctx the ChannelHandlerContext used when the session is open.
*/
public synchronized void setOpenContext(final ChannelHandlerContext ctx) {
openContext = ctx;
}

/**
* Returns the ChannelHandlerContext for the current connection.
*
* @return {@code ChannelHandlerContext} the ChannelHandlerContext for the current connection
*/
public synchronized ChannelHandlerContext currentContext() {
return currentContext;
}

/**
* Sets the ChannelHandlerContext for the current connection.
*
* @param ctx the ChannelHandlerContext for the current connection.
*/
public synchronized void setCurrentContext(final ChannelHandlerContext ctx) {
currentContext = ctx;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
class StreamingSessionState extends AbstractTimersSessionState {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(StreamingSessionState.class);

public StreamingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
StreamingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
super(sessions);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public SockJsService create() {
}

public static void main(final String[] args) throws Exception {
final int port = args.length > 0 ? Integer.parseInt(args[0]) : 8090;
final int port = args.length > 0 ? Integer.parseInt(args[0]) : 8081;
new NettySockJsServer(port).run();
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1053,25 +1053,6 @@ public void jsonpPollingTestTransport() throws Exception {
verifyNotCached(pollResponse);
}

@Test
public void jsonpPollingNoData() throws Exception {
final String serviceName = "/echo";
final String sessionUrl = serviceName + "/222/" + UUID.randomUUID().toString();
final SockJsServiceFactory echoService = echoService();

final FullHttpResponse openResponse = jsonpRequest(sessionUrl + "/jsonp?c=%63allback", echoService);
assertThat(openResponse.getStatus(), is(HttpResponseStatus.OK));
assertThat(openResponse.content().toString(UTF_8), equalTo("callback(\"o\");\r\n"));
assertThat(openResponse.headers().get(CONTENT_TYPE), equalTo(Transports.CONTENT_TYPE_JAVASCRIPT));
verifyNotCached(openResponse);

final FullHttpResponse pollResponse = jsonpRequest(sessionUrl + "/jsonp?c=callback", echoService);
assertThat(pollResponse.getStatus(), is(HttpResponseStatus.OK));
assertThat(pollResponse.headers().get(CONTENT_TYPE), equalTo(Transports.CONTENT_TYPE_JAVASCRIPT));
assertThat(pollResponse.content().toString(UTF_8), equalTo("callback(\"h\");\r\n"));
verifyNotCached(pollResponse);
}

/*
* Equivalent to JsonPolling.test_no_callback in sockjs-protocol-0.3.3.py.
*/
Expand Down