Skip to content

Commit

Permalink
#1296 Port EventBusUnsubscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim.salamatko committed Oct 8, 2015
1 parent 99b370b commit bd91bcd
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/core/Akka.Tests/Event/EventStreamSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private class CCATBT : CC, ATT, BTT { }
[Fact]
public void ManageSubscriptions()
{

var bus = new EventStream(true);
bus.StartUnsubscriber(Sys);
bus.Subscribe(TestActor, typeof(M));

bus.Publish(new M { Value = 42 });
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka/Actor/Internal/ActorSystemImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ public void Start()
if(_settings.LogDeadLetters > 0)
_logDeadLetterListener = SystemActorOf<DeadLetterListener>("deadLetterListener");

_eventStream.StartUnsubscriber(this);

if(_settings.LogConfigOnStart)

if (_settings.LogConfigOnStart)
{
_log.Warning(Settings.ToString());
}
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/Akka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
<Compile Include="Event\DefaultLogMessageFormatter.cs" />
<Compile Include="Event\Error.cs" />
<Compile Include="Event\EventBus.cs" />
<Compile Include="Event\EventBusUnsubscriber.cs" />
<Compile Include="Event\EventStream.cs" />
<Compile Include="Event\EventStreamExtensions.cs" />
<Compile Include="Event\ILogMessageFormatter.cs" />
Expand Down
132 changes: 132 additions & 0 deletions src/core/Akka/Event/EventBusUnsubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//-----------------------------------------------------------------------
// <copyright file="EventBusUnsubscribers.cs" company="Akka.NET Project">
// Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2015 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Util.Internal;

namespace Akka.Event
{
/// <summary>
/// INTERNAL API
///
/// Watches all actors which subscribe on the given eventStream, and unsubscribes them from it when they are Terminated.
///
/// Assumptions note:
/// We do not guarantee happens-before in the EventStream when 2 threads subscribe(a) / unsubscribe(a) on the same actor,
/// thus the messages sent to this actor may appear to be reordered - this is fine, because the worst-case is starting to
/// needlessly watch the actor which will not cause trouble for the stream. This is a trade-off between slowing down
/// subscribe calls * because of the need of linearizing the history message sequence and the possibility of sometimes
/// watching a few actors too much - we opt for the 2nd choice here.
/// </summary>
class EventStreamUnsubscriber : ActorBase
{
private readonly EventStream _eventStream;
private readonly bool _debug;
private readonly ActorSystem _system;

public EventStreamUnsubscriber(EventStream eventStream, ActorSystem system, bool debug)
{
_eventStream = eventStream;
_system = system;
_debug = debug;

}

protected override bool Receive(object message)
{
return message.Match().With<Register>(register =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("watching {0} in order to unsubscribe from EventStream when it terminates", register.Actor)));
Context.Watch(register.Actor);
}).With<UnregisterIfNoMoreSubscribedChannels>(unregister =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unwatching {0} since has no subscriptions", unregister.Actor)));
Context.Unwatch(unregister.Actor);
}).With<Terminated>(terminated =>
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("unsubscribe {0} from {1}, because it was terminated", terminated.Actor , _eventStream )));
_eventStream.Unsubscribe(terminated.Actor);
})
.WasHandled;
}

protected override void PreStart()
{
if (_debug)
_eventStream.Publish(new Debug(this.GetType().Name, GetType(),
string.Format("registering unsubscriber with {0}", _eventStream)));
_eventStream.InitUnsubscriber(Self, _system);
}

internal class Register
{
public Register(IActorRef actor)
{
Actor = actor;
}

public IActorRef Actor { get; private set; }
}


internal class Terminated
{
public Terminated(IActorRef actor)
{
Actor = actor;
}

public IActorRef Actor { get; private set; }
}

internal class UnregisterIfNoMoreSubscribedChannels
{
public UnregisterIfNoMoreSubscribedChannels(IActorRef actor)
{
Actor = actor;
}

public IActorRef Actor { get; private set; }
}
}



/// <summary>
/// Provides factory for Akka.Event.EventStreamUnsubscriber actors with unique names.
/// This is needed if someone spins up more EventStreams using the same ActorSystem,
/// each stream gets it's own unsubscriber.
/// </summary>
class EventStreamUnsubscribersProvider
{
private readonly AtomicCounter _unsubscribersCounter = new AtomicCounter(0);
private static readonly EventStreamUnsubscribersProvider _instance = new EventStreamUnsubscribersProvider();


public static EventStreamUnsubscribersProvider Instance
{
get { return _instance; }
}

public void Start(ActorSystem system, EventStream eventStream, bool debug)
{
system.ActorOf(Props.Create<EventStreamUnsubscriber>(eventStream, system, debug),
string.Format("EventStreamUnsubscriber-{0}", _unsubscribersCounter.IncrementAndGet()));
}
}
}
89 changes: 86 additions & 3 deletions src/core/Akka/Event/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Util;
using Akka.Util.Internal;
using Akka.Util.Internal.Collections;

namespace Akka.Event
{
Expand All @@ -25,6 +31,9 @@ public class EventStream : LoggingBus
/// </summary>
private readonly bool _debug;


private readonly AtomicReference<Either<IImmutableSet<IActorRef>, IActorRef>> _initiallySubscribedOrUnsubscriber =
new AtomicReference<Either<IImmutableSet<IActorRef>, IActorRef>>();
/// <summary>
/// Initializes a new instance of the <see cref="EventStream"/> class.
/// </summary>
Expand All @@ -50,7 +59,7 @@ public override bool Subscribe(IActorRef subscriber, Type channel)
{
Publish(new Debug(SimpleName(this), GetType(), "subscribing " + subscriber + " to channel " + channel));
}

RegisterWithUnsubscriber(subscriber);
return base.Subscribe(subscriber, channel);
}

Expand All @@ -70,7 +79,7 @@ public override bool Unsubscribe(IActorRef subscriber, Type channel)
{
Publish(new Debug(SimpleName(this), GetType(), "unsubscribing " + subscriber + " from channel " + channel));
}

UnregisterIfNoMoreSubscribedChannels(subscriber);
return base.Unsubscribe(subscriber, channel);
}

Expand All @@ -89,9 +98,83 @@ public override bool Unsubscribe(IActorRef subscriber)
{
Publish(new Debug(SimpleName(this), GetType(), "unsubscribing " + subscriber + " from all channels"));
}

UnregisterIfNoMoreSubscribedChannels(subscriber);
return base.Unsubscribe(subscriber);
}

public void StartUnsubscriber(ActorSystem system)
{
EventStreamUnsubscribersProvider.Instance.Start(system, this, _debug);
}

public bool InitUnsubscriber(IActorRef unsubscriber, ActorSystem system)
{
if (system == null)
{
return false;
}
return _initiallySubscribedOrUnsubscriber.Match().With<Left<IImmutableSet<IActorRef>>>(v =>
{
if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, Either.Right(unsubscriber)))
{
if (_debug)
{
Publish(new Debug(SimpleName(this), GetType(),
string.Format("initialized unsubscriber to: {0} registering {1} initial subscribers with it", unsubscriber, v.Value.Count)));
}
v.Value.ForEach(RegisterWithUnsubscriber);
}
else
{
InitUnsubscriber(unsubscriber, system);
}
}).With<Right<IActorRef>>(presentUnsubscriber =>
{
if (_debug)
{
Publish(new Debug(SimpleName(this), GetType(),
string.Format("not using unsubscriber {0}, because already initialized with {1}", unsubscriber, presentUnsubscriber)));
}
}).WasHandled;
}

private void RegisterWithUnsubscriber(IActorRef subscriber)
{
_initiallySubscribedOrUnsubscriber.Match().With<Left<IImmutableSet<IActorRef>>>(v =>
{
if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v,
Either.Left<IImmutableSet<IActorRef>>(v.Value.Add(subscriber))))
{
RegisterWithUnsubscriber(subscriber);
}
}).With<Right<IActorRef>>(unsubscriber =>
{
unsubscriber.Value.Tell( new EventStreamUnsubscriber.Register(subscriber));
});
}

private void UnregisterIfNoMoreSubscribedChannels(IActorRef subscriber)
{
_initiallySubscribedOrUnsubscriber.Match().With<Left<IImmutableSet<IActorRef>>>(v =>
{
if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v,
Either.Left<IImmutableSet<IActorRef>>(v.Value.Remove(subscriber))))
{
UnregisterIfNoMoreSubscribedChannels(subscriber);
}
}).With<Right<IActorRef>>(unsubscriber =>
{
unsubscriber.Value.Tell(new EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber));
});
}
}
}

0 comments on commit bd91bcd

Please sign in to comment.