Skip to content

Commit

Permalink
[BACKPORT #6221] Report cause for Akka/IO TCP CommandFailed events (#…
Browse files Browse the repository at this point in the history
…6224)

* Port [#22954](akka/akka#22954)

## Changes

> I think we should finally fix this once and forever. We have so many failed test jobs where we cannot be sure why "Connect failed" and we have no way of finding out without enabling DEBUG log level everywhere. That same pain is felt by every user.

(cherry-picked from c9c0b44)

* Update API Verify list

Co-authored-by: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com>
  • Loading branch information
Arkatufus and ismaelhamed committed Nov 3, 2022
1 parent dca908b commit eeb156c
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 28 deletions.
10 changes: 10 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt
Expand Up @@ -3328,6 +3328,11 @@ namespace Akka.IO
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
public class ConnectException : System.Exception
{
public ConnectException(string message) { }
}
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
{
public static readonly Akka.IO.Dns Instance;
Expand Down Expand Up @@ -3542,8 +3547,13 @@ namespace Akka.IO
public sealed class CommandFailed : Akka.IO.Tcp.Event
{
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
public Akka.Util.Option<System.Exception> Cause { get; }
[Akka.Annotations.InternalApiAttribute()]
public string CauseString { get; }
public Akka.IO.Tcp.Command Cmd { get; }
public override string ToString() { }
[Akka.Annotations.InternalApiAttribute()]
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
}
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
Expand Down
32 changes: 24 additions & 8 deletions src/core/Akka/IO/Tcp.cs
Expand Up @@ -11,10 +11,12 @@
using System.Linq;
using System.Net;
using Akka.Actor;
using Akka.Annotations;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Event;
using Akka.IO.Buffers;
using Akka.Util;

namespace Akka.IO
{
Expand Down Expand Up @@ -85,7 +87,7 @@ internal sealed class SocketConnected : SocketCompleted
public class Message : INoSerializationVerificationNeeded { }

#region user commands

// COMMANDS
/// <summary>
/// TBD
Expand Down Expand Up @@ -729,7 +731,7 @@ public ResumeAccepting(int batchSize)
#endregion

#region user events

/// <summary>
/// Common interface for all events generated by the TCP layer actors.
/// </summary>
Expand Down Expand Up @@ -804,18 +806,32 @@ public sealed class CommandFailed : Event
/// TBD
/// </summary>
/// <param name="cmd">TBD</param>
public CommandFailed(Command cmd)
{
Cmd = cmd;
}
public CommandFailed(Command cmd) => Cmd = cmd;

/// <summary>
/// TBD
/// </summary>
public Command Cmd { get; }

public override string ToString() =>
$"CommandFailed({Cmd})";
/// <summary>
/// Optionally contains the cause why the command failed.
/// </summary>
public Option<Exception> Cause { get; private set; } = Option<Exception>.None;

/// <summary>
/// Creates a copy of this object with a new cause set.
/// </summary>
[InternalApi]
public CommandFailed WithCause(Exception cause)
{
// Needs to be added with a mutable property for compatibility reasons
return new CommandFailed(Cmd) { Cause = cause };
}

[InternalApi]
public string CauseString => Cause.HasValue ? $" because of {Cause.Value.Message}" : "";

public override string ToString() => $"CommandFailed({Cmd}){CauseString}";
}

/// <summary>
Expand Down
10 changes: 8 additions & 2 deletions src/core/Akka/IO/TcpConnection.cs
Expand Up @@ -104,6 +104,12 @@ enum ConnectionStatus

private IActorRef _watchedActor = Context.System.DeadLetters;

private readonly IOException droppingWriteBecauseWritingIsSuspendedException =
new IOException("Dropping write because writing is suspended");

private readonly IOException droppingWriteBecauseQueueIsFullException =
new IOException("Dropping write because queue is full");

protected TcpConnection(TcpExt tcp, Socket socket, bool pullMode, Option<int> writeCommandsBufferMaxSize)
{
if (socket == null) throw new ArgumentNullException(nameof(socket));
Expand Down Expand Up @@ -328,7 +334,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
if (HasStatus(ConnectionStatus.WritingSuspended))
{
if (_traceLogging) Log.Debug("Dropping write because writing is suspended");
Sender.Tell(write.FailureMessage);
Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseWritingIsSuspendedException));
}
if (HasStatus(ConnectionStatus.Sending))
Expand Down Expand Up @@ -405,7 +411,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
private void DropWrite(ConnectionInfo info, WriteCommand write)
{
if (_traceLogging) Log.Debug("Dropping write because queue is full");
Sender.Tell(write.FailureMessage);
Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseQueueIsFullException));
if (info.UseResumeWriting) SetStatus(ConnectionStatus.WritingSuspended);
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/TcpListener.cs
Expand Up @@ -57,7 +57,7 @@ partial class TcpListener : ActorBase, IRequiresMessageQueue<IUnboundedMessageQu
}
catch (Exception e)
{
_bindCommander.Tell(bind.FailureMessage);
_bindCommander.Tell(_bind.FailureMessage.WithCause(e));
_log.Error(e, "Bind failed for TCP channel on endpoint [{0}]", bind.LocalAddress);
Context.Stop(Self);
}
Expand Down
44 changes: 27 additions & 17 deletions src/core/Akka/IO/TcpOutgoingConnection.cs
Expand Up @@ -12,12 +12,14 @@
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Annotations;
using Akka.Util;

namespace Akka.IO
{
/// <summary>
/// TBD
/// An actor handling the connection state machine for an outgoing connection
/// to be established.
/// </summary>
internal sealed class TcpOutgoingConnection : TcpConnection
{
Expand All @@ -26,6 +28,9 @@ internal sealed class TcpOutgoingConnection : TcpConnection

private SocketAsyncEventArgs _connectArgs;

private readonly ConnectException finishConnectNeverReturnedTrueException =
new ConnectException("Could not establish connection because finishConnect never returned true");

public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect)
: base(
tcp,
Expand Down Expand Up @@ -61,11 +66,11 @@ private void ReleaseConnectionSocketArgs()
}
}

private void Stop()
private void Stop(Exception cause)
{
ReleaseConnectionSocketArgs();

StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage));
StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage.WithCause(cause)));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -77,30 +82,29 @@ private void ReportConnectFailure(Action thunk)
}
catch (Exception e)
{
Log.Error(e, "Could not establish connection to [{0}].", _connect.RemoteAddress);
Stop();
Log.Debug(e, "Could not establish connection to [{0}] due to {1}", _connect.RemoteAddress, e.Message);
Stop(e);
}
}

protected override void PreStart()
{
ReportConnectFailure(() =>
{
if (_connect.RemoteAddress is DnsEndPoint)
if (_connect.RemoteAddress is DnsEndPoint remoteAddress)
{
var remoteAddress = (DnsEndPoint) _connect.RemoteAddress;
Log.Debug("Resolving {0} before connecting", remoteAddress.Host);
var resolved = Dns.ResolveName(remoteAddress.Host, Context.System, Self);
if (resolved == null)
Become(Resolving(remoteAddress));
else if(resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families
else if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families
Register(new IPEndPoint(resolved.Ipv4.FirstOrDefault(), remoteAddress.Port), new IPEndPoint(resolved.Ipv6.FirstOrDefault(), remoteAddress.Port));
else // one or the other
Register(new IPEndPoint(resolved.Addr, remoteAddress.Port), null);
}
else if(_connect.RemoteAddress is IPEndPoint)
else if (_connect.RemoteAddress is IPEndPoint point)
{
Register((IPEndPoint)_connect.RemoteAddress, null);
Register(point, null);
}
else throw new NotSupportedException($"Couldn't connect to [{_connect.RemoteAddress}]: only IP and DNS-based endpoints are supported");
});
Expand All @@ -123,8 +127,7 @@ private Receive Resolving(DnsEndPoint remoteAddress)
{
return message =>
{
var resolved = message as Dns.Resolved;
if (resolved != null)
if (message is Dns.Resolved resolved)
{
if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // multiple addresses
{
Expand All @@ -144,7 +147,6 @@ private Receive Resolving(DnsEndPoint remoteAddress)
};
}


private void Register(IPEndPoint address, IPEndPoint fallbackAddress)
{
ReportConnectFailure(() =>
Expand All @@ -165,7 +167,7 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
{
return message =>
{
if (message is IO.Tcp.SocketConnected)
if (message is Tcp.SocketConnected)
{
if (args.SocketError == SocketError.Success)
{
Expand Down Expand Up @@ -202,19 +204,27 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
else
{
Log.Debug("Could not establish connection because finishConnect never returned true (consider increasing akka.io.tcp.finish-connect-retries)");
Stop();
Stop(finishConnectNeverReturnedTrueException);
}
return true;
}
if (message is ReceiveTimeout)
{
if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout
Log.Error("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
Stop();
Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired"));
return true;
}
return false;
};
}
}

[InternalApi]
public class ConnectException : Exception
{
public ConnectException(string message)
: base(message)
{ }
}
}

0 comments on commit eeb156c

Please sign in to comment.