Skip to content

Commit

Permalink
Issue #272 - Attempting to centralize close logic in Session.close()
Browse files Browse the repository at this point in the history
  • Loading branch information
joakime committed Oct 3, 2017
1 parent bb51951 commit c0dfa1d
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 211 deletions.
Expand Up @@ -157,7 +157,7 @@ public interface Session extends Closeable
*
* @return whether the session is open
*/
abstract boolean isOpen();
boolean isOpen();

/**
* Return true if and only if the underlying socket is using a secure transport.
Expand Down
Expand Up @@ -31,27 +31,10 @@
public interface LogicalConnection extends OutgoingFrames, SuspendToken
{
/**
* Send a websocket Close frame, without a status code or reason.
* <p>
* Basic usage: results in an non-blocking async write, then connection close.
*
* @see org.eclipse.jetty.websocket.api.StatusCode
* @see #close(int, String)
*/
public void close();

/**
* Send a websocket Close frame, with status code.
* <p>
* Advanced usage: results in an non-blocking async write, then connection close.
*
* @param statusCode
* the status code
* @param reason
* the (optional) reason. (can be null for no reason)
* @see org.eclipse.jetty.websocket.api.StatusCode
* Called to indicate a close frame was successfully sent to the remote.
* @param close the close details
*/
public void close(int statusCode, String reason);
void onLocalClose(CloseInfo close);

/**
* Terminate the connection (no close frame sent)
Expand All @@ -75,7 +58,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*
* @return the idle timeout in milliseconds
*/
public long getIdleTimeout();
long getIdleTimeout();

/**
* Get the IOState of the connection.
Expand Down Expand Up @@ -119,7 +102,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*
* @return true if connection is open
*/
public boolean isOpen();
boolean isOpen();

/**
* Tests if the connection is actively reading.
Expand Down Expand Up @@ -149,6 +132,13 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/
void setNextIncomingFrames(IncomingFrames incoming);

/**
* Associate the Active Session with the connection.
*
* @param session the session for this connection
*/
void setSession(WebSocketSession session);

/**
* Suspend a the incoming read events on the connection.
* @return the suspend token
Expand All @@ -159,5 +149,5 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
* Get Unique ID for the Connection
* @return the unique ID for the connection
*/
public String getId();
String getId();
}
Expand Up @@ -29,9 +29,11 @@
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
Expand All @@ -51,11 +53,13 @@
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.frames.CloseFrame;
import org.eclipse.jetty.websocket.common.io.IOState;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
Expand All @@ -64,6 +68,68 @@
@ManagedObject("A Jetty WebSocket Session")
public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory, WebSocketSessionScope, IncomingFrames, Connection.Listener, ConnectionStateListener
{
public static class OnCloseLocalCallback implements WriteCallback
{
private final Callback callback;
private final LogicalConnection connection;
private final CloseInfo close;

public OnCloseLocalCallback(Callback callback, LogicalConnection connection, CloseInfo close)
{
this.callback = callback;
this.connection = connection;
this.close = close;
}

@Override
public void writeSuccess()
{
try
{
if (callback != null)
{
callback.succeeded();
}
}
finally
{
connection.onLocalClose(close);
}
}

@Override
public void writeFailed(Throwable x)
{
try
{
if (callback != null)
{
callback.failed(x);
}
}
finally
{
connection.onLocalClose(close);
}
}
}

public class DisconnectCallback implements Callback
{
@Override
public void failed(Throwable x)
{
disconnect();
}

@Override
public void succeeded()
{
disconnect();
}
}


private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN");
private final WebSocketContainerScope containerScope;
Expand All @@ -72,6 +138,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private final EventDriver websocket;
private final Executor executor;
private final WebSocketPolicy policy;
private final AtomicBoolean closed = new AtomicBoolean();
private ClassLoader classLoader;
private ExtensionFactory extensionFactory;
private RemoteEndpointFactory remoteEndpointFactory;
Expand Down Expand Up @@ -100,27 +167,59 @@ public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI,
this.connection.getIOState().addListener(this);
this.policy = websocket.getPolicy();

this.connection.setSession(this);

addBean(this.connection);
addBean(this.websocket);
}

/**
* Aborts the active session abruptly.
*/
public void abort(int statusCode, String reason)
{
close(new CloseInfo(statusCode, reason), new DisconnectCallback());
}

@Override
public void close()
{
/* This is assumed to always be a NORMAL closure, no reason phrase */
close(StatusCode.NORMAL, null);
close(new CloseInfo(StatusCode.NORMAL), null);
}

@Override
public void close(CloseStatus closeStatus)
{
close(closeStatus.getCode(),closeStatus.getPhrase());
close(new CloseInfo(closeStatus.getCode(),closeStatus.getPhrase()), null);
}

@Override
public void close(int statusCode, String reason)
{
connection.close(statusCode,reason);
close(new CloseInfo(statusCode, reason), null);
}

/**
* CLOSE Primary Entry Point.
*
* <ul>
* <li>atomically enqueue CLOSE frame + flip flag to reject more frames</li>
* <li>setup CLOSE frame callback: must close flusher</li>
* </ul>
*
* @param closeInfo the close details
*/
private void close(CloseInfo closeInfo, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("close({})", closeInfo);

if (closed.compareAndSet(false, true))
{
CloseFrame frame = closeInfo.asFrame();
connection.outgoingFrame(frame, new OnCloseLocalCallback(callback, connection, closeInfo), BatchMode.OFF);
}
}

/**
Expand Down Expand Up @@ -388,7 +487,7 @@ public boolean isOpen()
{
return false;
}
return this.connection.isOpen();
return !closed.get() && this.connection.isOpen();
}

@Override
Expand Down Expand Up @@ -420,11 +519,21 @@ public void notifyError(Throwable cause)
incomingError(cause);
}

/**
* Jetty Connection onClosed event
*
* @param connection the connection that was closed
*/
@Override
public void onClosed(Connection connection)
{
}

/**
* Jetty Connection onOpen event
*
* @param connection the connection that was opened
*/
@Override
public void onOpened(Connection connection)
{
Expand Down

0 comments on commit c0dfa1d

Please sign in to comment.