Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Actor does not receive "Terminated" message if remoting is used and it is not monitored actor's parent #1646

Closed
akoshelev opened this issue Jan 14, 2016 · 8 comments

Comments

@akoshelev
Copy link

Hi,

I am trying to implement Reaper of Souls pattern in Akka.NET and when remoting is enabled my Reaper actor does not receive Terminated message from its watchees, if, at the time it receives WatchMe message, watchee is already dead, but direct parent of the watchees is receiving these messages regardless of watchee's state. Here is the code to reproduce this problem (you'll need three separate c# projects)

/* system1.csproj */
namespace System1
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var @as = ActorSystem.Create("system1", ConfigurationFactory.ParseString(@"
                akka {  
                    log-config-on-start = on
                    stdout-loglevel = INFO
                    loglevel = INFO
                    actor {
                        provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""

                        debug {  
                          receive = on 
                          autoreceive = on
                          lifecycle = on
                          event-stream = on
                          unhandled = on
                        }

                        deployment {
                            /coordinator {
                               remote = ""akka.tcp://system2@localhost:8080""
                            }
                        }
                    }
                    remote {
                       helios.tcp {
                           port = 8090
                           hostname = localhost
                       }
                    }
                }
                ")))
            {
                var coordinator = @as.ActorOf(Props.Create(() => new Coordinator()), "coordinator");
                coordinator.Tell("Start");
                Console.ReadLine();
            }

        }
    }
}

/* system2.csproj */
namespace System2
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var @as = ActorSystem.Create("system2", ConfigurationFactory.ParseString(@"
                               akka {  
                    log-config-on-start = on
                    stdout-loglevel = DEBUG
                    loglevel = INFO
                    actor {
                        provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""

                        debug {  
                          receive = on 
                          autoreceive = on
                          lifecycle = on
                          event-stream = on
                          unhandled = on
                        }
                    }
                    remote {
                        helios.tcp {
                            port = 8080
                            hostname = localhost
                        }
                    }
                }
                ")))
            {
                Console.ReadLine();
            }

        }
    }
}

/* ClassLibrary.csproj */
namespace ClassLibrary1
{
    public class Watchee : ReceiveActor
    {
        public Watchee()
        {
            Receive<string>(s => s == "Start", s =>
            {
                Console.WriteLine("{0} stopping", Self);
                Context.Stop(Self);
            });
        }
    }

    public class Reaper : ReceiveActor
    {
        public Reaper(IActorRef parent)
        {
            HashSet<IActorRef> watchees = new HashSet<IActorRef>();

            Receive<WatchMe>(m =>
            {
                Console.WriteLine("{0} watching", m.Watchee);
                watchees.Add(m.Watchee);
                Context.Watch(m.Watchee);
            });

            Receive<Terminated>(t =>
            {
                watchees.Remove(t.ActorRef);
                Console.WriteLine("{0} terminated, {1} still watching", t.ActorRef, watchees.Count);
                if (watchees.Count == 0)
                {
                    parent.Tell(new AllSoulsReaped());
                }
            });
        }
    }

    public sealed class AllSoulsReaped
    {
    }

    public sealed class WatchMe
    {
        public WatchMe(IActorRef watchee)
        {
            Watchee = watchee;
        }

        public IActorRef Watchee { get; private set; }
    }

    public class Coordinator : ReceiveActor
    {
        public Coordinator()
        {
            IActorRef actor1, actor2;
            IActorRef reaper = Context.ActorOf(Props.Create(() => new Reaper(Self)));

            Receive<string>(s => s == "Start", s =>
            {
                actor1 = Context.ActorOf(Props.Create<Watchee>());

                actor1.Tell("Start");

                actor2 = Context.ActorOf(Props.Create<Watchee>());
                actor2.Tell("Start");

                Thread.Sleep(2000);

                // at this time both actors are supposed to be dead.
                // context.watch seems to be working
                Context.Watch(actor1);
                Context.Watch(actor2);
                // if other actor tries to monitor actor state, it never receives "Terminated" message
                reaper.Tell(new WatchMe(actor1));
                reaper.Tell(new WatchMe(actor2));
            });

            Receive<AllSoulsReaped>(t =>
            {
                Console.WriteLine("All souls reaped");
            });

            Receive<Terminated>(t =>
            {
                Console.WriteLine("{0} Receive terminated from {1}", Self, t.ActorRef);
            });
        }
    }
}

I'm receiving the following output

[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$c] stopping
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$d] stopping
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$c] watching
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$d] watching
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator] Receive terminated from [akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$d]
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator] Receive terminated from [akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$c]

so Coordinator receives Terminated messages while Reaper does not. But if I disable remoting, I'm getting the following:

[akka://system1/user/coordinator/$c] stopping
[akka://system1/user/coordinator/$d] stopping
[akka://system1/user/coordinator/$c] watching
[akka://system1/user/coordinator/$d] watching
[akka://system1/user/coordinator/$c] terminated, 1 still watching
[akka://system1/user/coordinator/$d] terminated, 0 still watching
[akka://system1/user/coordinator] Receive terminated from [akka://system1/user/coordinator/$c]
All souls reaped
[akka://system1/user/coordinator] Receive terminated from [akka://system1/user/coordinator/$d]

So in that case, Reaper correctly receives Terminated messages.

From Akka docs it seems it does not matter whether actor (at the moment we call Context.Watch) is dead or not link

It should be noted that the Terminated message is generated independent of the order in which registration and termination occur. In particular, the watching actor will receive a Terminated message even if the watched actor has already been terminated at the time of registration.

@rogeralsing
Copy link
Contributor

I have tried your example and can verify that this occurs.
I dont know if JVM Akka behaves the same way or if this is an actual bug, I'm digging into this

@rogeralsing rogeralsing self-assigned this Jan 17, 2016
@rogeralsing
Copy link
Contributor

The issue is in the RemoteActorRefProvider somewhere.
even if I remove the remote deployment but still use the remote actorref provider, the problem still exists.

Disabling the remote actorref provider and using the local provider, the system behaves as expected.

@rogeralsing
Copy link
Contributor

Marking this as critical, looks like actors can not watch other actors (except for their own children) when using the remote actorref provider..

@maxim-s
Copy link
Contributor

maxim-s commented Jan 17, 2016

looks like i have the same problem with spec implementation, i've already found some bugs, but i can investigate this case

@rogeralsing
Copy link
Contributor

I've managed to track the issue somewhat.

        public IActorRef Watch(IActorRef subject)
        {
            var a = (IInternalActorRef)subject;

            if (!a.Equals(Self) && !WatchingContains(a))
            {
                MaintainAddressTerminatedSubscription(() =>
                {
                    //the watch here is forwarded to the deadletter queue
                   //when the local actorref provider is used, that somehow triggers a termination message
                   //when using the remote actorref provider, it does not..
                    a.Tell(new Watch(a, Self)); 
                    _state = _state.AddWatching(a);
                }, a);
            }
            return a;
        }

If I comment out the a.Tell(new Watch(a, Self)); when the LocalActorRefProvider is used, I get the same issue as when the RemoteActorRefProvider is used.

In this case, the target actor is already terminated, so the Tell forwards the Watch message to deadletter.

This is done when using both local and remote actorref provider. but, I suspect that there is some difference between the two, e.g. that there is a missing deadletter subscription doing something with the Watch message on the local provider.

I'm continuing to dig through this, any pointers or ideas are welcome

@rogeralsing
Copy link
Contributor

public class DeadLetterActorRef : EmptyLocalActorRef
    {
        private readonly EventStream _eventStream;

        public DeadLetterActorRef(IActorRefProvider provider, ActorPath path, EventStream eventStream)
            : base(provider, path, eventStream)
        {
            _eventStream = eventStream;
        }

        //TODO: Since this isn't overriding SendUserMessage it doesn't handle all messages as Akka JVM does

        protected override void HandleDeadLetter(DeadLetter deadLetter)
        {
            if(!SpecialHandle(deadLetter.Message, deadLetter.Sender))
                _eventStream.Publish(deadLetter);
        }

        protected override bool SpecialHandle(object message, IActorRef sender)
        {
           //deadletter handler for Watch messages on local system
            var w = message as Watch;
            if(w != null)
            {
                if(w.Watchee != this && w.Watcher != this)
                {
                    //trigger deathwatch for already dead actors..

                    w.Watcher.Tell(new DeathWatchNotification(w.Watchee, existenceConfirmed: false, addressTerminated: false));
                }
                return true;
            }
            return base.SpecialHandle(message, sender);
        }
    }

@rogeralsing
Copy link
Contributor

I found the cause for this, see #1649

Aaronontheweb added a commit that referenced this issue Jan 18, 2016
@rogeralsing
Copy link
Contributor

Fixed by #1649

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants