Skip to content
This repository has been archived by the owner on Nov 18, 2018. It is now read-only.

AGSMPLPUSH-42 "xhr_streaming/xhr_polling protocols are not working" #67

Merged
merged 1 commit into from
Mar 7, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,10 @@ public interface SockJsSessionContext {
/**
* Get the underlying ChannelHandlerContext.
*/
ChannelHandlerContext getContext();
ChannelHandlerContext getConnectionContext();

/**
* Get the underlying ChannelHandlerContext.
*/
ChannelHandlerContext getCurrentContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void run() {
}
if (session.timestamp() + session.config().sessionTimeout() < now) {
final SockJsSession removed = sessions.remove(session.sessionId());
session.context().close();
session.connectionContext().close();
sessionTimer.cancel(true);
heartbeatFuture.cancel(true);
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.jboss.aerogear.io.netty.handler.codec.sockjs.handler;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.HeartbeatFrame;

import java.util.concurrent.ConcurrentMap;

class JsonpPollingSessionState extends PollingSessionState {

public JsonpPollingSessionState(ConcurrentMap<String, SockJsSession> sessions, HttpRequest request, SockJsConfig config) {
super(sessions, request, config);
}

@Override
public void sendNoMessagesResponse(final HttpRequest request, final SockJsConfig config, final ChannelHandlerContext ctx) {
ctx.writeAndFlush(new HeartbeatFrame()).addListener(ChannelFutureListener.CLOSE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.SockJsConfig;
import org.jboss.aerogear.io.netty.handler.codec.sockjs.protocol.MessageFrame;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.ConcurrentMap;


/**
* A polling state does not have a persistent connection to the client, instead a client
* will connect, poll, to request data.
Expand All @@ -40,47 +43,71 @@
* in the response.
*
*/
class PollingSessionState extends AbstractTimersSessionState {
abstract class PollingSessionState extends AbstractTimersSessionState {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PollingSessionState.class);
private final ConcurrentMap<String, SockJsSession> sessions;
private final HttpRequest request;
private final SockJsConfig config;

public PollingSessionState(final ConcurrentMap<String, SockJsSession> sessions) {
public PollingSessionState(final ConcurrentMap<String, SockJsSession> sessions,
final HttpRequest request,
final SockJsConfig config) {
super(sessions);
this.sessions = sessions;
this.request = request;
this.config = config;
}

/**
* Gives implementations the ability to decide what a response should look like and
* also how it should be written back to the client.
*
* @param request the polling HttpRequest.
* @param config the SockJsConfig.
* @param ctx {@code ChannelHandlerContext} the context.
*/
public abstract void sendNoMessagesResponse(HttpRequest request, SockJsConfig config, ChannelHandlerContext ctx);

@Override
public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) {
flushMessages(ctx, session);
}

private static void flushMessages(final ChannelHandlerContext ctx, final SockJsSession session) {
@Override
public ChannelHandlerContext getSendingContext(SockJsSession session) {
return session.connectionContext();
}

private void flushMessages(final ChannelHandlerContext ctx, final SockJsSession session) {
final String[] allMessages = session.getAllMessages();
if (allMessages.length == 0) {
sendNoMessagesResponse(request, config, ctx);
return;
}
final MessageFrame messageFrame = new MessageFrame(allMessages);
ctx.channel().writeAndFlush(messageFrame).addListener(new ChannelFutureListener() {
ChannelFuture channelFuture = ctx.channel().writeAndFlush(messageFrame);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
session.addMessages(allMessages);
}
}
});
channelFuture.addListener(ChannelFutureListener.CLOSE);
}

@Override
public boolean isInUse(final SockJsSession session) {
return session.context().channel().isActive() || session.inuse();
return session.connectionContext().channel().isActive() || session.inuse();
}

@Override
public void onSockJSServerInitiatedClose(final SockJsSession session) {
final ChannelHandlerContext context = session.context();
final ChannelHandlerContext context = session.connectionContext();
if (context != null) { //could be null if the request is aborted, for example due to missing callback.
if (logger.isDebugEnabled()) {
logger.debug("Will close session context {}", session.context());
logger.debug("Will close session connectionContext {}", session.connectionContext());
}
context.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ public void onConnect(final SockJsSession session, final ChannelHandlerContext c
public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) {
}

@Override
public ChannelHandlerContext getSendingContext(SockJsSession session) {
return session.currentContext();
}

@Override
public void onSockJSServerInitiatedClose(final SockJsSession session) {
if (logger.isDebugEnabled()) {
logger.debug("Will close session context {}", session.context());
logger.debug("Will close session connectionContext {}", session.connectionContext());
}
session.context().close();
session.connectionContext().close();
sessions.remove(session.sessionId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*
* For every connection received a new SessionHandler will be created
* and added to the pipeline
* Depending on the type of connection (polling, streaming, send, or websocket)
* Depending on the type of connection (polling, streaming, send, or WebSocket)
* the type of {@link SessionState} that this session handles will differ.
*
*/
Expand All @@ -43,7 +43,6 @@ public enum Events { CLOSE_SESSION, HANDLE_SESSION }

private final SessionState sessionState;
private final SockJsSession session;
private ChannelHandlerContext currentContext;

public SessionHandler(final SessionState sessionState, final SockJsSession session) {
ArgumentUtil.checkNotNull(sessionState, "sessionState");
Expand All @@ -53,6 +52,7 @@ public SessionHandler(final SessionState sessionState, final SockJsSession sessi

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
session.setCurrentContext(ctx);
if (msg instanceof HttpRequest) {
handleSession(ctx);
} else if (msg instanceof String) {
Expand All @@ -63,15 +63,14 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
}

private void handleSession(final ChannelHandlerContext ctx) throws Exception {
currentContext = ctx;
if (logger.isDebugEnabled()) {
logger.debug("handleSession {}", sessionState);
}
switch (session.getState()) {
case CONNECTING:
logger.debug("State.CONNECTING sending open frame");
ctx.channel().writeAndFlush(new OpenFrame());
session.setContext(ctx);
session.setConnectionContext(ctx);
session.onOpen(this);
sessionState.onConnect(session, ctx);
break;
Expand Down Expand Up @@ -103,19 +102,14 @@ private void handleMessage(final String message) throws Exception {

@Override
public void send(String message) {
final Channel channel = getActiveChannel();
final Channel channel = sessionState.getSendingContext(session).channel();
if (isWritable(channel)) {
channel.writeAndFlush(new MessageFrame(message));
} else {
session.addMessage(message);
}
}

private Channel getActiveChannel() {
final Channel sessionChannel = session.context().channel();
return sessionChannel.isActive() && sessionChannel.isRegistered() ? sessionChannel : currentContext.channel();
}

@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
session.resetInuse();
Expand All @@ -130,7 +124,7 @@ private static boolean isWritable(final Channel channel) {
public void close() {
session.onClose();
sessionState.onClose();
final Channel channel = getActiveChannel();
final Channel channel = sessionState.getSendingContext(session).channel();
if (isWritable(channel)) {
final CloseFrame closeFrame = new CloseFrame(3000, "Go away!");
if (logger.isDebugEnabled()) {
Expand All @@ -150,8 +144,13 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object eve
}

@Override
public ChannelHandlerContext getContext() {
return currentContext;
public ChannelHandlerContext getConnectionContext() {
return session.connectionContext();
}

@Override
public ChannelHandlerContext getCurrentContext() {
return session.currentContext();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ interface SessionState {
*/
void onOpen(SockJsSession session, ChannelHandlerContext ctx);

/**
* Returns the ChannelHandlerContext that should be used to communicate with the client.
* This may be different for different transports. For some transports this will be the
* context that opened the connection and others it will be the current context.
*
* @param session the {@link SockJsSession}.
* @return {@code ChannelHandlerContext} the context to be used for sending.
*/
ChannelHandlerContext getSendingContext(SockJsSession session);

/**
* Called after the {@link SockJsSession#onClose()} method has been called enabling
* this SessionState to perform any clean up actions requried.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ private static void handleSession(final SockJsServiceFactory factory,
switch (pathParams.transport()) {
case XHR:
addTransportHandler(new XhrPollingTransport(factory.config(), request), ctx);
addSessionHandler(new PollingSessionState(sessions), getSession(factory, pathParams.sessionId()), ctx);
addSessionHandler(new XhrPollingSessionState(sessions, request, factory.config()), getSession(factory, pathParams.sessionId()), ctx);
break;
case JSONP:
addTransportHandler(new JsonpPollingTransport(factory.config(), request), ctx);
addSessionHandler(new PollingSessionState(sessions), getSession(factory, pathParams.sessionId()), ctx);
addSessionHandler(new JsonpPollingSessionState(sessions, request, factory.config()), getSession(factory, pathParams.sessionId()), ctx);
break;
case XHR_SEND:
checkSessionExists(pathParams.sessionId(), request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,28 @@ enum States { CONNECTING, OPEN, CLOSED, INTERRUPTED }
private final LinkedList<String> messages = new LinkedList<String>();
private final AtomicLong timestamp = new AtomicLong();
private final AtomicBoolean inuse = new AtomicBoolean();
private ChannelHandlerContext ctx;
private ChannelHandlerContext connectionContext;
private ChannelHandlerContext currentContext;

public SockJsSession(final String sessionId, final SockJsService service) {
this.sessionId = sessionId;
this.service = service;
}

public synchronized ChannelHandlerContext context() {
return ctx;
public synchronized ChannelHandlerContext connectionContext() {
return connectionContext;
}

public synchronized void setContext(final ChannelHandlerContext ctx) {
if (this.ctx == null) {
this.ctx = ctx;
}
public synchronized void setConnectionContext(final ChannelHandlerContext ctx) {
connectionContext = ctx;
}

public synchronized ChannelHandlerContext currentContext() {
return currentContext;
}

public synchronized void setCurrentContext(final ChannelHandlerContext ctx) {
currentContext = ctx;
}

public synchronized void setState(States state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx)
flushMessages(ctx, session);
}

@Override
public ChannelHandlerContext getSendingContext(SockJsSession session) {
return session.connectionContext();
}

private static void flushMessages(final ChannelHandlerContext ignored, final SockJsSession session) {
final Channel channel = session.context().channel();
final Channel channel = session.connectionContext().channel();
if (channel.isActive() && channel.isRegistered()) {
final String[] allMessages = session.getAllMessages();
if (allMessages.length == 0) {
Expand All @@ -71,16 +76,16 @@ public void operationComplete(final ChannelFuture future) throws Exception {

@Override
public void onSockJSServerInitiatedClose(final SockJsSession session) {
final ChannelHandlerContext context = session.context();
final ChannelHandlerContext context = session.connectionContext();
if (context != null) { //could be null if the request is aborted, for example due to missing callback.
logger.debug("Will close session context " + session.context());
logger.debug("Will close session connectionContext " + session.connectionContext());
context.close();
}
}

@Override
public boolean isInUse(final SockJsSession session) {
return session.context().channel().isActive();
return session.connectionContext().channel().isActive();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ public void run() {
public void onOpen(final SockJsSession session, final ChannelHandlerContext ctx) {
}

@Override
public ChannelHandlerContext getSendingContext(SockJsSession session) {
return session.connectionContext();
}

@Override
public boolean isInUse(final SockJsSession session) {
return session.context().channel().isActive();
return session.connectionContext().channel().isActive();
}

@Override
Expand Down
Loading