Skip to content
This repository has been archived by the owner on Nov 18, 2018. It is now read-only.

Commit

Permalink
Fixing SockJS session handling for xhr/jsonp polling.
Browse files Browse the repository at this point in the history
The is a re-opening of AGSMPLPUSH-42 which intended to fix this issue
but the fix was just plain wrong.

The original issue was that the polling transports did to reply with any
data when messages had been sent to them. The fix then was to add an
explicit empty reply to trigger SockJS to repoll and it was then able
to retrieve data.
But the polling transports are "long polling" in that they will remain
until either data is made available by the server side or the session
times out.

This commit fixes the session handling by introducing a field in the
SockJSSession class which is the ChannelHandlerContext used for an
open connection. This 'openContext' is then available to be used
by SessionState implementations, for example the PollingSessionState.
PollingSessionState sometimes needs to use the ChannelHandlerContext
used to connect which is needed to send the SockJS OpenFrame, but
successive calls needs to use the ChannelHandlerContext for the
open session.

Additional changes:
- SockJS test server can now be started with mvn exec:java
- sockjs-client.html contains examples of specifying transports types

[https://issues.jboss.org/browse/AGSMPLPUSH-42]
  • Loading branch information
danbev committed Mar 12, 2014
1 parent 32a2136 commit 6b9bf8c
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 192 deletions.
19 changes: 18 additions & 1 deletion netty-codec-sockjs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,24 @@
<scope>test</scope>
</dependency>

</dependencies>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<configuration>
<mainClass>org.jboss.aerogear.io.netty.handler.codec.sockjs.NettySockJsServer</mainClass>
<classpathScope>test</classpathScope>
<arguments>
<!-- Port -->
<argument>8081</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>

</project>

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.MessageFrame;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.MessageFrame;

import java.util.concurrent.ConcurrentMap;

Expand All @@ -43,45 +41,29 @@
* in the response.
*
*/
abstract class PollingSessionState extends AbstractTimersSessionState {
class PollingSessionState extends AbstractTimersSessionState {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PollingSessionState.class);
private final ConcurrentMap<String, SockJsSession> sessions;
private final HttpRequest request;
private final SockJsConfig config;

public PollingSessionState(final ConcurrentMap<String, SockJsSession> sessions,
final HttpRequest request,
final SockJsConfig config) {
PollingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
super(sessions);
this.sessions = sessions;
this.request = request;
this.config = config;
}

/**
* Gives implementations the ability to decide what a response should look like and
* also how it should be written back to the client.
*
* @param request the polling HttpRequest.
* @param config the SockJsConfig.
* @param ctx {@code ChannelHandlerContext} the context.
*/
public abstract void sendNoMessagesResponse(HttpRequest request, SockJsConfig config, ChannelHandlerContext ctx);

@Override
public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) {
flushMessages(ctx, session);
}

@Override
public ChannelHandlerContext getSendingContext(SockJsSession session) {
return session.connectionContext();
final ChannelHandlerContext openContext = session.openContext();
return openContext == null ? session.connectionContext() : openContext;
}

private void flushMessages(final ChannelHandlerContext ctx, final SockJsSession session) {
final String[] allMessages = session.getAllMessages();
if (allMessages.length == 0) {
sendNoMessagesResponse(request, config, ctx);
return;
}
final MessageFrame messageFrame = new MessageFrame(allMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SendingSessionState implements SessionState {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(SendingSessionState.class);
private final ConcurrentMap<String, SockJsSession> sessions;

public SendingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
SendingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
ArgumentUtil.checkNotNull(sessions, "sessions");
this.sessions = sessions;
}
Expand All @@ -48,7 +48,7 @@ public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx)

@Override
public ChannelHandlerContext getSendingContext(SockJsSession session) {
return session.currentContext();
return session.openContext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private void handleSession(final ChannelHandlerContext ctx) {
session.setState(States.INTERRUPTED);
} else {
session.setInuse();
session.setOpenContext(ctx);
sessionState.onOpen(session, ctx);
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ private static void handleSession(final SockJsServiceFactory factory,
switch (pathParams.transport()) {
case XHR:
addTransportHandler(new XhrPollingTransport(factory.config(), request), ctx);
addSessionHandler(new XhrPollingSessionState(sessions, request, factory.config()), getSession(factory, pathParams.sessionId()), ctx);
addSessionHandler(new PollingSessionState(sessions), getSession(factory, pathParams.sessionId()), ctx);
break;
case JSONP:
addTransportHandler(new JsonpPollingTransport(factory.config(), request), ctx);
addSessionHandler(new JsonpPollingSessionState(sessions, request, factory.config()), getSession(factory, pathParams.sessionId()), ctx);
addSessionHandler(new PollingSessionState(sessions), getSession(factory, pathParams.sessionId()), ctx);
break;
case XHR_SEND:
checkSessionExists(pathParams.sessionId(), request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

class SockJsSession {


enum States { CONNECTING, OPEN, CLOSED, INTERRUPTED }

private States state = States.CONNECTING;
Expand All @@ -38,24 +39,63 @@ enum States { CONNECTING, OPEN, CLOSED, INTERRUPTED }
private final AtomicBoolean inuse = new AtomicBoolean();
private ChannelHandlerContext connectionContext;
private ChannelHandlerContext currentContext;
private ChannelHandlerContext openContext;

public SockJsSession(final String sessionId, final SockJsService service) {
this.sessionId = sessionId;
this.service = service;
}

/**
* Returns the ChannelHandlerContext used to initially connect.
*
* @return {@code ChannelHandlerContext} the ChannelHandlerContext used establishing a connection.
*/
public synchronized ChannelHandlerContext connectionContext() {
return connectionContext;
}

/**
* Sets the ChannelHandlerContext used to initially connect.
*
* @param ctx the ChannelHandlerContext used establishing a connection.
*/
public synchronized void setConnectionContext(final ChannelHandlerContext ctx) {
connectionContext = ctx;
}

/**
* Returns the ChannelHandlerContext used on an open session.
*
* @return {@code ChannelHandlerContext} the ChannelHandlerContext used establishing a connection.
*/
public synchronized ChannelHandlerContext openContext() {
return openContext;
}

/**
* Sets the ChannelHandlerContext used to initially connect.
*
* @param ctx the ChannelHandlerContext used when the session is open.
*/
public synchronized void setOpenContext(final ChannelHandlerContext ctx) {
openContext = ctx;
}

/**
* Returns the ChannelHandlerContext for the current connection.
*
* @return {@code ChannelHandlerContext} the ChannelHandlerContext for the current connection
*/
public synchronized ChannelHandlerContext currentContext() {
return currentContext;
}

/**
* Sets the ChannelHandlerContext for the current connection.
*
* @param ctx the ChannelHandlerContext for the current connection.
*/
public synchronized void setCurrentContext(final ChannelHandlerContext ctx) {
currentContext = ctx;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
class StreamingSessionState extends AbstractTimersSessionState {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(StreamingSessionState.class);

public StreamingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
StreamingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
super(sessions);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public SockJsService create() {
}

public static void main(final String[] args) throws Exception {
final int port = args.length > 0 ? Integer.parseInt(args[0]) : 8090;
final int port = args.length > 0 ? Integer.parseInt(args[0]) : 8081;
new NettySockJsServer(port).run();
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1053,25 +1053,6 @@ public void jsonpPollingTestTransport() throws Exception {
verifyNotCached(pollResponse);
}

@Test
public void jsonpPollingNoData() throws Exception {
final String serviceName = "/echo";
final String sessionUrl = serviceName + "/222/" + UUID.randomUUID().toString();
final SockJsServiceFactory echoService = echoService();

final FullHttpResponse openResponse = jsonpRequest(sessionUrl + "/jsonp?c=%63allback", echoService);
assertThat(openResponse.getStatus(), is(HttpResponseStatus.OK));
assertThat(openResponse.content().toString(UTF_8), equalTo("callback(\"o\");\r\n"));
assertThat(openResponse.headers().get(CONTENT_TYPE), equalTo(Transports.CONTENT_TYPE_JAVASCRIPT));
verifyNotCached(openResponse);

final FullHttpResponse pollResponse = jsonpRequest(sessionUrl + "/jsonp?c=callback", echoService);
assertThat(pollResponse.getStatus(), is(HttpResponseStatus.OK));
assertThat(pollResponse.headers().get(CONTENT_TYPE), equalTo(Transports.CONTENT_TYPE_JAVASCRIPT));
assertThat(pollResponse.content().toString(UTF_8), equalTo("callback(\"h\");\r\n"));
verifyNotCached(pollResponse);
}

/*
* Equivalent to JsonPolling.test_no_callback in sockjs-protocol-0.3.3.py.
*/
Expand Down

0 comments on commit 6b9bf8c

Please sign in to comment.