Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Async TestKit] Modernize TcpStage and TcpListener #5989

Merged
merged 5 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 45 additions & 45 deletions src/core/Akka.Streams/Implementation/IO/TcpStages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,55 +126,55 @@ private void Receive((IActorRef, object) args)
{
var sender = args.Item1;
var msg = args.Item2;
if (msg is Tcp.Bound)
switch (msg)
{
var bound = (Tcp.Bound)msg;
_listener = sender;
StageActor.Watch(_listener);
case Tcp.Bound bound:
_listener = sender;
StageActor.Watch(_listener);

if (IsAvailable(_stage._out))
_listener.Tell(new Tcp.ResumeAccepting(1), StageActor.Ref);
if (IsAvailable(_stage._out))
_listener.Tell(new Tcp.ResumeAccepting(1), StageActor.Ref);

var thisStage = StageActor.Ref;
var binding = new StreamTcp.ServerBinding(bound.LocalAddress, () =>
{
// To allow unbind() to be invoked multiple times with minimal chance of dead letters, we check if
// it's already unbound before sending the message.
if (!_unbindPromise.Task.IsCompleted)
var thisStage = StageActor.Ref;
var binding = new StreamTcp.ServerBinding(bound.LocalAddress, () =>
{
// Beware, sender must be explicit since stageActor.ref will be invalid to access after the stage stopped
thisStage.Tell(Tcp.Unbind.Instance, thisStage);
}
return _unbindPromise.Task;
});

_bindingPromise.NonBlockingTrySetResult(binding);
}
else if (msg is Tcp.CommandFailed)
{
var ex = BindFailedException.Instance;
_bindingPromise.NonBlockingTrySetException(ex);
_unbindPromise.TrySetResult(NotUsed.Instance);
FailStage(ex);
}
else if (msg is Tcp.Connected)
{
var connected = (Tcp.Connected)msg;
Push(_stage._out, ConnectionFor(connected, sender));
}
else if (msg is Tcp.Unbind)
{
if (!IsClosed(_stage._out) && !ReferenceEquals(_listener, null))
TryUnbind();
}
else if (msg is Tcp.Unbound)
{
UnbindCompleted();
}
else if (msg is Terminated)
{
if (_unbindStarted) UnbindCompleted();
else FailStage(new IllegalStateException("IO Listener actor terminated unexpectedly"));
// To allow unbind() to be invoked multiple times with minimal chance of dead letters, we check if
// it's already unbound before sending the message.
if (!_unbindPromise.Task.IsCompleted)
{
// Beware, sender must be explicit since stageActor.ref will be invalid to access after the stage stopped
thisStage.Tell(Tcp.Unbind.Instance, thisStage);
}
return _unbindPromise.Task;
});

_bindingPromise.NonBlockingTrySetResult(binding);
break;

case Tcp.CommandFailed _:
var ex = BindFailedException.Instance;
_bindingPromise.NonBlockingTrySetException(ex);
_unbindPromise.TrySetResult(NotUsed.Instance);
FailStage(ex);
break;

case Tcp.Connected connected:
Push(_stage._out, ConnectionFor(connected, sender));
break;

case Tcp.Unbind _:
if (!(_unbindStarted || IsClosed(_stage._out) || ReferenceEquals(_listener, null)))
TryUnbind();
break;

case Tcp.Unbound _:
case Terminated _ when _unbindStarted:
UnbindCompleted();
break;

case Terminated _:
FailStage(new IllegalStateException("IO Listener actor terminated unexpectedly"));
break;
}
}

Expand Down
20 changes: 16 additions & 4 deletions src/core/Akka/IO/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,10 @@ public class ResumeWriting : Command
/// TBD
/// </summary>
public static readonly ResumeWriting Instance = new ResumeWriting();

private ResumeWriting()
{
}
}

/// <summary>
Expand All @@ -679,7 +683,7 @@ public class SuspendReading : Command
/// <summary>
/// TBD
/// </summary>
public static SuspendReading Instance = new SuspendReading();
public static readonly SuspendReading Instance = new SuspendReading();

private SuspendReading()
{
Expand All @@ -695,7 +699,7 @@ public class ResumeReading : Command
/// <summary>
/// TBD
/// </summary>
public static ResumeReading Instance = new ResumeReading();
public static readonly ResumeReading Instance = new ResumeReading();

private ResumeReading()
{
Expand Down Expand Up @@ -830,7 +834,11 @@ public class WritingResumed : Event
/// <summary>
/// TBD
/// </summary>
public static WritingResumed Instance = new WritingResumed();
public static readonly WritingResumed Instance = new WritingResumed();

private WritingResumed()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are breaking API changes, but no big deal. Need to be able to do stuff like this in Akka.NET v1.5.

{
}
}

/// <summary>
Expand Down Expand Up @@ -867,7 +875,11 @@ public class Unbound : Event
/// <summary>
/// Singleton instance
/// </summary>
public static Unbound Instance = new Unbound();
public static readonly Unbound Instance = new Unbound();

private Unbound()
{
}
}

/// <summary>
Expand Down
155 changes: 106 additions & 49 deletions src/core/Akka/IO/TcpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@
using Akka.Util.Internal;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Akka.IO
{
partial class TcpListener : ActorBase, IRequiresMessageQueue<IUnboundedMessageQueueSemantics>
class TcpListener : ActorBase, IRequiresMessageQueue<IUnboundedMessageQueueSemantics>
{
private readonly TcpExt _tcp;
private readonly IActorRef _bindCommander;
private readonly Tcp.Bind _bind;
private readonly Socket _socket;
private Tcp.Bind _bind;
private Socket _socket;
private readonly ILoggingAdapter _log = Context.GetLogger();
private int _acceptLimit;
private SocketAsyncEventArgs[] _saeas;

private int acceptLimit;
private bool _binding;

/// <summary>
/// TBD
Expand All @@ -40,79 +40,136 @@ partial class TcpListener : ActorBase, IRequiresMessageQueue<IUnboundedMessageQu
{
_tcp = tcp;
_bindCommander = bindCommander;
_bind = bind;

Context.Watch(bind.Handler);

_socket = new Socket(_bind.LocalAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp) { Blocking = false };

_acceptLimit = bind.PullMode ? 0 : _tcp.Settings.BatchAcceptLimit;

try
{
bind.Options.ForEach(x => x.BeforeServerSocketBind(_socket));
_socket.Bind(bind.LocalAddress);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, moving this out of the constructor

_socket.Listen(bind.Backlog);
_saeas = Accept(_acceptLimit).ToArray();
}
catch (Exception e)
{
_bindCommander.Tell(bind.FailureMessage);
_log.Error(e, "Bind failed for TCP channel on endpoint [{0}]", bind.LocalAddress);
Context.Stop(Self);
}

bindCommander.Tell(new Tcp.Bound(_socket.LocalEndPoint));
Become(Initializing());
Self.Tell(bind);
}
private IEnumerable<SocketAsyncEventArgs> Accept(int limit)

private Receive Initializing() => message =>
{
for(var i = 0; i < _acceptLimit; i++)
switch (message)
{
var self = Self;
var saea = new SocketAsyncEventArgs();
saea.Completed += (s, e) => self.Tell(new SocketEvent(e));
if (!_socket.AcceptAsync(saea))
Self.Tell(new SocketEvent(saea));
yield return saea;
case Tcp.Bind bind:
if (_binding)
{
_log.Warning("Already trying to bind to TCP channel on endpoint [{0}]", _bind.LocalAddress);
return true;
}
_binding = true;
_bind = bind;
_acceptLimit = bind.PullMode ? 0 : _tcp.Settings.BatchAcceptLimit;
BindAsync().PipeTo(Self);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

return true;

case Status.Failure fail:
_bindCommander.Tell(_bind.FailureMessage);
_log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress);
Context.Stop(Self);
_binding = false;
return true;

case Tcp.Bound bound:
Context.Watch(_bind.Handler);
_bindCommander.Tell(bound);
Become(Bound());
_binding = false;
return true;

default:
return false;
}
}
};

protected override SupervisorStrategy SupervisorStrategy()
{
return Tcp.ConnectionSupervisorStrategy;
}

protected override bool Receive(object message)
private Receive Bound() => message =>
{
switch (message)
{
case SocketEvent evt:
var saea = evt.Args;
if (saea.SocketError == SocketError.Success)
Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));

saea.AcceptSocket = null;

if (!_socket.AcceptAsync(saea))
Self.Tell(new SocketEvent(saea));
return true;

case Tcp.ResumeAccepting resumeAccepting:
_acceptLimit = resumeAccepting.BatchSize;
// TODO: this is dangerous, previous async args are not disposed and there's no guarantee that they're not still receiving data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous socket async event args?

Copy link
Contributor Author

@Arkatufus Arkatufus Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_saeas is an array of SocketAsyncEventArgs, assigned by the Accept method.
In this case, the old code actually discarded all of the old args while most of them still active accepting connections, we would lose all reference to those event args.

  1. They cause a memory leak, or
  2. They can get finalized and disposed and something breaks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't these SAEA only handling bind events though, which have no buffers?

_saeas = Accept(_acceptLimit).ToArray();
return true;

case Tcp.Unbind _:
_log.Debug("Unbinding endpoint {0}", _bind.LocalAddress);
_socket.Dispose();
Sender.Tell(Tcp.Unbound.Instance);
Become(Unbinding(Sender));
UnbindAsync().PipeTo(Self);
return true;

default:
return false;
}
};

private Receive Unbinding(IActorRef requester) => message =>
{
switch (message)
{
case Tcp.Unbound unbound:
requester.Tell(unbound);
_log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress);
Context.Stop(Self);
return true;


case Status.Failure fail:
_log.Error(fail.Cause, "Failed to unbind TCP listener for address [{0}]", _bind.LocalAddress);
Context.Stop(Self);
return true;

default:
return false;
}
};

private IEnumerable<SocketAsyncEventArgs> Accept(int limit)
{
for(var i = 0; i < limit; i++)
{
var self = Self;
var saea = new SocketAsyncEventArgs();
saea.Completed += (s, e) => self.Tell(new SocketEvent(e));
if (!_socket.AcceptAsync(saea))
Self.Tell(new SocketEvent(saea));
yield return saea;
}
}

private async Task<Tcp.Bound> BindAsync()
{
_socket = new Socket(_bind.LocalAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp) { Blocking = false };

_bind.Options.ForEach(x => x.BeforeServerSocketBind(_socket));
_socket.Bind(_bind.LocalAddress);
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
_socket.Listen(_bind.Backlog);
_saeas = Accept(_acceptLimit).ToArray();

return new Tcp.Bound(_socket.LocalEndPoint);
}

private async Task<Tcp.Unbound> UnbindAsync()
{
_log.Debug("Unbinding endpoint {0}", _bind.LocalAddress);
_socket.Close();
return Tcp.Unbound.Instance;
}

protected override SupervisorStrategy SupervisorStrategy()
{
return Tcp.ConnectionSupervisorStrategy;
}

protected override bool Receive(object message)
{
throw new NotImplementedException();
}

/// <summary>
Expand All @@ -122,7 +179,7 @@ protected override void PostStop()
{
try
{
_socket.Dispose();
_socket?.Dispose();
_saeas?.ForEach(x => x.Dispose());
}
catch (Exception e)
Expand Down