Skip to content

Commit

Permalink
Merge pull request #2362 from guusdk/OF-2752_StreamManagement-try-to-…
Browse files Browse the repository at this point in the history
…send-data-on-expected-network-disconnect

OF-2752: Refactor solution
  • Loading branch information
akrherz committed Dec 1, 2023
2 parents a10ffad + d01f83d commit 90f2665
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 182 deletions.
62 changes: 38 additions & 24 deletions xmppserver/src/main/java/org/jivesoftware/openfire/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,43 +156,62 @@ public interface Connection extends Closeable {
boolean isUsingSelfSignedCertificate();

/**
* Processes an event where the network connection between Openfire and the remote peer has been disconnected.
* Close this connection including associated session. The events for closing
* the connection are:
*
* Handling is comparable to that of an invocation of {@link #close()} or {@link #close(StreamError)}, with two
* differences:
* <ul>
* <li>As the network connection is assumed to be broken, no data is send to the peer</li>
* <li>When stream management is active, the session will be resumable (under the applicable configuration restrictions)</li>
* <li>Set closing flag to prevent redundant shutdowns.
* <li>Close the socket.
* <li>Notify all listeners that the channel is shut down.
* </ul>
*
* Not all implementations use the same order of events.
*
* Invocation of this method is expected to occur when a coordinated, 'clean' disconnect occurs. Such disconnects
* are expected to be user (or server) initiated. As a result, a session closed by this method is not resumable,
* even if Stream Management was activated for this session. Refer to {@link #close(StreamError, boolean)} for
* processing of unexpected disconnects (that <em>are</em> potentially resumable).
*/
void onUnexpectedDisconnect();
@Override
default void close() {
close(null, false);
}

/**
* Close this connection including associated session. The order of
* events for closing the connection is:
* Close this connection including associated session, optionally citing a stream error. The events for closing
* the connection are:
*
* <ul>
* <li>Set closing flag to prevent redundant shutdowns.
* <li>Call notifyEvent all listeners that the channel is shutting down.
* <li>Close the socket.
* <li>Notify all listeners that the channel is shut down.
* </ul>
*
* Not all implementations use the same order of events.
*
* Invocation of this method is expected to occur when a coordinated, 'clean' disconnect occurs. Such disconnects
* are expected to be user (or server) initiated. As a result, a session closed by this method is not resumable,
* even if Stream Management was activated for this session. Refer to {@link #onUnexpectedDisconnect()} for processing
* of unexpected disconnects (that <em>are</em> potentially resumable).
* even if Stream Management was activated for this session. Refer to {@link #close(StreamError, boolean)} for
* processing of unexpected disconnects (that <em>are</em> potentially resumable).
*
* Note this method overrides the base interface to suppress exceptions. However,
* it otherwise fulfills the requirements of the {@link Closeable#close()} contract
* (idempotent, try-with-resources, etc.)
* @param error If non-null, the end-stream tag will be preceded with this error.
*/
@Override
void close();
default void close(@Nullable final StreamError error) {
close(error, false);
}

/**
* Close this connection including associated session, optionally citing a
* stream error. The events for closing the connection are:
* Close this connection including associated session, optionally citing a stream error.
*
* The 'networkInterruption' argument should be set to 'true' if the connection is being closed because it is known
* or assumed that the network connection between Openfire and the peer was unexpectedly terminated (eg: due to a
* networking failure). These typically are scenarios where a peer becomes unresponsive (without having terminated
* its session with a <tt></stream:stream></tt> or comparable message).
*
* When the 'networkInterruption' argument is set to 'true', then a session is eligible for resumption (if Stream
* Management was activiated for the session).
*
* The events for closing the connection are:
* <ul>
* <li>Set closing flag to prevent redundant shutdowns.
* <li>Close the socket.
Expand All @@ -201,14 +220,9 @@ public interface Connection extends Closeable {
*
* Not all implementations use the same order of events.
*
* Invocation of this method is expected to occur when a coordinated, 'clean' disconnect occurs. Such disconnects
* are expected to be user (or server) initiated. As a result, a session closed by this method is not resumable,
* even if Stream Management was activated for this session. Refer to {@link #onUnexpectedDisconnect()} for processing
* of unexpected disconnects (that <em>are</em> potentially resumable).
*
* @param error If non-null, the end-stream tag will be preceded with this error.
*/
void close(@Nullable final StreamError error);
void close(@Nullable final StreamError error, final boolean networkInterruption);

/**
* Notification message indicating that the server is being shutdown. Implementors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1250,11 +1250,6 @@ public HttpVirtualConnection(@Nonnull final InetAddress address, @Nullable final
this.connectionType = connectionType;
}

@Override
public void onVirtualUnexpectedDisconnect() {
((HttpSession) session).closeSession(null, false);
}

@Override
public void closeVirtualConnection(@Nullable final StreamError error) {
((HttpSession) session).closeSession(error, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,4 @@ public void closeVirtualConnection(@Nullable final StreamError error) {
}
}
}

@Override
public void onVirtualUnexpectedDisconnect() {
throw new IllegalStateException("Unable to process disconnect event.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZOutputStream;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.IncomingServerSession;
Expand Down Expand Up @@ -226,7 +224,7 @@ public boolean validate() {
}
catch (Exception e) {
Log.warn("Closing no longer valid connection" + "\n" + this, e);
close(new StreamError(StreamError.Condition.internal_server_error, "An error occurred while trying to validate this connection."));
close(new StreamError(StreamError.Condition.internal_server_error, "An error occurred while trying to validate this connection."), true);
}
finally {
// Register that we finished sending data on the connection
Expand Down Expand Up @@ -376,61 +374,30 @@ public PacketDeliverer getPacketDeliverer() {
return backupDeliverer;
}

@Override
public void onUnexpectedDisconnect()
{
if (state.compareAndSet(State.OPEN, State.CLOSED)) {
Log.trace("Remote peer unexpectedly disconnected: {}, cleaning up connection.", this);

if (session != null) {
// Ensure that the state of this connection, its session and the Netty Channel are eventually closed.
session.setStatus(Session.Status.CLOSED);
}

closeConnection();
notifyCloseListeners();
closeListeners.clear();
}
}

/**
* Closes the connection without sending any data (not even a stream end-tag).
*
* @deprecated replaced by #onUnexpectedDisconnect
* @deprecated replaced by {@link #close(StreamError, boolean)}
*/
@Deprecated // Remove in or after Openfire 4.9.0
public void forceClose() {
close(true, null);
close(null,false, true);
}

/**
* Closes the connection after trying to send a stream end tag.
*/
@Override
public void close() {
close(false, null);
}

/**
* Closes the connection after trying to send an optional error and stream end tag.
*
* @param error If non-null, the end-stream tag will be preceded with this error.
*/
@Override
public void close(@Nullable final StreamError error) {
close(false, error);
public void close(@Nullable final StreamError error, final boolean networkInterruption) {
close(error, networkInterruption, false);
}

/**
* Normal connection close will attempt to write the stream end tag. Otherwise this method
* forces the connection closed immediately. This method will be called when we need to close the socket, discard
* the connection and its session.
*/
private void close(boolean force, @Nullable final StreamError error) {
private void close(@Nullable final StreamError error, final boolean networkInterruption, final boolean force) {
if (state.compareAndSet(State.OPEN, State.CLOSED)) {

if (session != null) {
if (!force) {
if (!force && !networkInterruption) {
// A 'clean' closure should never be resumed (see #onRemoteDisconnect for handling of unclean disconnects). OF-2752
session.getStreamManager().formalClose();
}
Expand Down Expand Up @@ -502,7 +469,7 @@ boolean checkHealth() {
Log.debug("Closing connection: " + this + " that started sending data at: " +
new Date(writeTimestamp));
}
onUnexpectedDisconnect();
close(new StreamError(StreamError.Condition.connection_timeout, "Unable to validate the connection. Connection has been idle long enough for it to be considered 'unhealthy'."), true);
return true;
}
else {
Expand All @@ -515,7 +482,7 @@ boolean checkHealth() {
if (Log.isDebugEnabled()) {
Log.debug("Closing connection that has been idle: " + this);
}
onUnexpectedDisconnect();
close(new StreamError(StreamError.Condition.connection_timeout, "Not received data recently. Connection has been idle long enough for it to be considered 'unhealthy'."), true);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,62 +118,21 @@ public boolean isInitialized() {
return session != null && !isClosed();
}

@Override
public void onUnexpectedDisconnect()
{
if (state.compareAndSet(State.OPEN, State.CLOSED)) {
Log.trace("Remote peer unexpectedly disconnected: {}, cleaning up connection.", this);

if (session != null) {
// Ensure that the state of this connection, its session and the Netty Channel are eventually closed.
session.setStatus(Session.Status.CLOSED);
}

// See OF-1596
// The notification will trigger some shutdown procedures that, amongst other things,
// check what type of session (eg: anonymous) is being closed. This check depends on the
// session still being available.
//
// For that reason, it's important to first notify the listeners, and then close the
// session - not the other way around.
//
// This fixes a very visible bug where MUC users would remain in the MUC room long after
// their session was closed. Effectively, the bug prevents the MUC room from getting a
// presence update to notify it that the user logged off.
notifyCloseListeners();
closeListeners.clear();

try {
onVirtualUnexpectedDisconnect();
} catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.close") + "\n" + toString(), e);
}
}
}

/**
* Closes the session, the virtual connection and notifies listeners that the connection
* has been closed.
*/
@Override
public void close() {
close(null);
}

/**
* Closes the session, the virtual connection and notifies listeners that the connection
* has been closed.
*
* @param error If non-null, the end-stream tag will be preceded with this error.
*/
@Override
public void close(@Nullable final StreamError error) {
public void close(@Nullable final StreamError error, final boolean networkInterruption) {
if (state.compareAndSet(State.OPEN, State.CLOSED)) {

if (session != null) {
// A 'clean' closure should never be resumed (see #onRemoteDisconnect for handling of unclean disconnects). OF-2752
session.getStreamManager().formalClose();

if (!networkInterruption) {
// A 'clean' closure should never be resumed (see #onRemoteDisconnect for handling of unclean disconnects). OF-2752
session.getStreamManager().formalClose();
}
session.setStatus(Session.Status.CLOSED);
}

Expand Down Expand Up @@ -206,12 +165,4 @@ public void close(@Nullable final StreamError error) {
* @param error If non-null, this error will be sent to the peer before the connection is disconnected.
*/
public abstract void closeVirtualConnection(@Nullable final StreamError error);

/**
* Processes an event where the network connection between Openfire and the remote peer has been disconnected.
* At this point the session has a CLOSED state.
*
* Implementations should free up resources without attempting to send data to the peer.
*/
public abstract void onVirtualUnexpectedDisconnect();
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,47 +186,18 @@ public PacketDeliverer getPacketDeliverer() {
}

@Override
public void onUnexpectedDisconnect()
{
if (state.compareAndSet(State.OPEN, State.CLOSED)) {
Log.trace("Remote peer unexpectedly disconnected: {}, cleaning up connection.", this);

if (session != null) {
// Ensure that the state of this connection, its session and the Netty Channel are eventually closed.
session.setStatus(Session.Status.CLOSED);
}

try {
final ChannelFuture f = channelHandlerContext.newSucceededFuture();
f.addListener(ChannelFutureListener.CLOSE)
.addListener(e -> {
Log.trace("Notifying close listeners.");
notifyCloseListeners();
closeListeners.clear();
})
.addListener(e -> Log.trace("Finished closing connection."))
.sync();
} catch (Exception e) {
Log.error("Problem during connection close or cleanup", e);
}
}
}

@Override
public void close() {
close(null);
}

@Override
public void close(@Nullable final StreamError error) {
public void close(@Nullable final StreamError error, final boolean networkInterruption) {
if (state.compareAndSet(State.OPEN, State.CLOSED)) {
Log.trace("Closing {} with optional error: {}", this, error);

ChannelFuture f;

if (session != null) {
// A 'clean' closure should never be resumed (see #onRemoteDisconnect for handling of unclean disconnects). OF-2752
session.getStreamManager().formalClose();

if (!networkInterruption) {
// A 'clean' closure should never be resumed (OF-2752).
session.getStreamManager().formalClose();
}

// Ensure that the state of this connection, its session and the Netty Channel are eventually closed.
session.setStatus(Session.Status.CLOSED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
// Idle flag already present. Connection has been idle for a while. Close it.
final NettyConnection connection = channel.attr(CONNECTION).get();
Log.debug("Closing connection because of inactivity: {}", connection);
connection.close(new StreamError(StreamError.Condition.connection_timeout, doPing ? "Connection has been idle and did not respond to a keep-alive check." : "Connection has been idle."));
connection.close(new StreamError(StreamError.Condition.connection_timeout, doPing ? "Connection has been idle and did not respond to a keep-alive check." : "Connection has been idle."), doPing);
}
}
super.userEventTriggered(ctx, evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
NettyConnection connection = ctx.channel().attr(CONNECTION).get();
if (cause instanceof IOException) {
Log.warn("IOException caught by XMPP decoder. Marking connection as 'closed' (but potentially resumable): {}", connection, cause);
connection.onUnexpectedDisconnect();
} else {
Log.warn("Error occurred while decoding XMPP stanza, closing connection: {}", connection, cause);
connection.close(new StreamError(StreamError.Condition.internal_server_error, "An error occurred in XMPP Decoder"));
}
final NettyConnection connection = ctx.channel().attr(CONNECTION).get();
Log.warn("Error occurred while decoding XMPP stanza, closing connection: {}", connection, cause);
connection.close(new StreamError(StreamError.Condition.internal_server_error, "An error occurred in XMPP Decoder"), cause instanceof IOException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ public void onError(Throwable error)
try {
if (isWebSocketOpen()) {
Log.warn("Attempting to close connection on which an error occurred: {}", wsConnection, error);
wsConnection.close(new StreamError(StreamError.Condition.internal_server_error));
wsConnection.close(new StreamError(StreamError.Condition.internal_server_error), !isWebSocketOpen());
} else {
Log.debug("Error detected on websocket that isn't open (any more):", error);
wsConnection.onVirtualUnexpectedDisconnect();
wsConnection.close(null, !isWebSocketOpen());
}
} catch (Exception e) {
Log.error("Error disconnecting websocket", e);
Expand Down
Loading

0 comments on commit 90f2665

Please sign in to comment.