Permalink
Browse files

rename ActorRef to Process for consistency with ProcessRegistry

  • Loading branch information...
stuartcarnie committed Jan 12, 2017
1 parent cc7e4a5 commit 06d598dc6f9d7771b5b47c7aeaedd77fcdac26f6
View
@@ -165,22 +165,22 @@ func (m *mockContext) Actor() Actor {
return args.Get(0).(Actor)
}
type mockActorRef struct {
type mockProcess struct {
mock.Mock
}
func (m *mockActorRef) SendUserMessage(pid *PID, message interface{}, sender *PID) {
func (m *mockProcess) SendUserMessage(pid *PID, message interface{}, sender *PID) {
m.Called(pid, message, sender)
}
func (m *mockActorRef) SendSystemMessage(pid *PID, message SystemMessage) {
func (m *mockProcess) SendSystemMessage(pid *PID, message SystemMessage) {
m.Called(pid, message)
}
func (m *mockActorRef) Stop(pid *PID) {
func (m *mockProcess) Stop(pid *PID) {
m.Called(pid)
}
func (m *mockActorRef) Watch(pid *PID) {
func (m *mockProcess) Watch(pid *PID) {
m.Called(pid)
}
func (m *mockActorRef) Unwatch(pid *PID) {
func (m *mockProcess) Unwatch(pid *PID) {
m.Called(pid)
}
View
@@ -1,9 +1,9 @@
package actor
type deadLetterActorRef struct{}
type deadLetterProcess struct{}
var (
deadLetter ActorRef = &deadLetterActorRef{}
deadLetter Process = &deadLetterProcess{}
)
type DeadLetter struct {
@@ -12,29 +12,29 @@ type DeadLetter struct {
Sender *PID
}
func (*deadLetterActorRef) SendUserMessage(pid *PID, message interface{}, sender *PID) {
func (*deadLetterProcess) SendUserMessage(pid *PID, message interface{}, sender *PID) {
EventStream.Publish(&DeadLetter{
PID: pid,
Message: message,
Sender: sender,
})
}
func (*deadLetterActorRef) SendSystemMessage(pid *PID, message SystemMessage) {
func (*deadLetterProcess) SendSystemMessage(pid *PID, message SystemMessage) {
EventStream.Publish(&DeadLetter{
PID: pid,
Message: message,
})
}
func (ref *deadLetterActorRef) Stop(pid *PID) {
func (ref *deadLetterProcess) Stop(pid *PID) {
ref.SendSystemMessage(pid, stopMessage)
}
func (ref *deadLetterActorRef) Watch(pid *PID) {
func (ref *deadLetterProcess) Watch(pid *PID) {
ref.SendSystemMessage(pid, &Watch{Watcher: pid})
}
func (ref *deadLetterActorRef) Unwatch(pid *PID) {
func (ref *deadLetterProcess) Unwatch(pid *PID) {
ref.SendSystemMessage(pid, &Unwatch{Watcher: pid})
}
View
@@ -14,7 +14,7 @@ var (
func NewFuture(timeout time.Duration) *Future {
fut := &Future{cond: sync.NewCond(&sync.Mutex{})}
ref := &FutureActorRef{f: fut}
ref := &futureProcess{f: fut}
id := ProcessRegistry.getAutoId()
pid, ok := ProcessRegistry.add(ref, id)
@@ -84,22 +84,22 @@ func (f *Future) Wait() error {
return f.err
}
// FutureActorRef is a struct carrying a response PID and a channel where the response is placed
type FutureActorRef struct {
// futureProcess is a struct carrying a response PID and a channel where the response is placed
type futureProcess struct {
f *Future
}
func (ref *FutureActorRef) SendUserMessage(pid *PID, message interface{}, sender *PID) {
func (ref *futureProcess) SendUserMessage(pid *PID, message interface{}, sender *PID) {
ref.f.result = message
ref.Stop(pid)
}
func (ref *FutureActorRef) SendSystemMessage(pid *PID, message SystemMessage) {
func (ref *futureProcess) SendSystemMessage(pid *PID, message SystemMessage) {
ref.f.result = message
ref.Stop(pid)
}
func (ref *FutureActorRef) Stop(pid *PID) {
func (ref *futureProcess) Stop(pid *PID) {
ref.f.cond.L.Lock()
if ref.f.done {
ref.f.cond.L.Unlock()
@@ -114,5 +114,5 @@ func (ref *FutureActorRef) Stop(pid *PID) {
ref.f.cond.Signal()
}
func (ref *FutureActorRef) Watch(pid *PID) {}
func (ref *FutureActorRef) Unwatch(pid *PID) {}
func (ref *futureProcess) Watch(pid *PID) {}
func (ref *futureProcess) Unwatch(pid *PID) {}
@@ -18,7 +18,7 @@ func NewBlackHoleActor() Actor {
return &BlackHoleActor{}
}
func TestSpawnProducesActorRef(t *testing.T) {
func TestSpawnProducesProcess(t *testing.T) {
actor := Spawn(FromProducer(NewBlackHoleActor))
defer actor.Stop()
assert.NotNil(t, actor)
View

This file was deleted.

Oops, something went wrong.
View
@@ -0,0 +1,35 @@
package actor
type localProcess struct {
mailbox Mailbox
}
func newLocalProcess(mailbox Mailbox) *localProcess {
return &localProcess{
mailbox: mailbox,
}
}
func (ref *localProcess) SendUserMessage(pid *PID, message interface{}, sender *PID) {
if sender != nil {
ref.mailbox.PostUserMessage(&Request{Message: message, Sender: sender})
} else {
ref.mailbox.PostUserMessage(message)
}
}
func (ref *localProcess) SendSystemMessage(pid *PID, message SystemMessage) {
ref.mailbox.PostSystemMessage(message)
}
func (ref *localProcess) Stop(pid *PID) {
ref.SendSystemMessage(pid, stopMessage)
}
func (ref *localProcess) Watch(pid *PID) {
ref.SendSystemMessage(pid, &Watch{Watcher: pid})
}
func (ref *localProcess) Unwatch(pid *PID) {
ref.SendSystemMessage(pid, &Unwatch{Watcher: pid})
}
View
@@ -41,7 +41,7 @@ func (pid *PID) StopFuture() *Future {
future := NewFuture(10 * time.Second)
ref, ok := ref.(*localActorRef)
ref, ok := ref.(*localProcess)
if !ok {
log.Fatalf("[ACTOR] Trying to stop non local actorref %s", reflect.TypeOf(ref))
}
@@ -1,7 +1,7 @@
package actor
//ActorRef is an interface that defines the base contract for interaction of actors
type ActorRef interface {
// Process is an interface that defines the base contract for interaction of actors
type Process interface {
SendUserMessage(pid *PID, message interface{}, sender *PID)
SendSystemMessage(pid *PID, message SystemMessage)
Stop(pid *PID)
View
@@ -22,7 +22,7 @@ var (
}
)
type AddressResolver func(*PID) (ActorRef, bool)
type AddressResolver func(*PID) (Process, bool)
func (pr *ProcessRegistryValue) RegisterAddressResolver(handler AddressResolver) {
pr.RemoteHandlers = append(pr.RemoteHandlers, handler)
@@ -55,7 +55,7 @@ func (pr *ProcessRegistryValue) getAutoId() string {
return uint64ToId(counter)
}
func (pr *ProcessRegistryValue) add(actorRef ActorRef, id string) (*PID, bool) {
func (pr *ProcessRegistryValue) add(actorRef Process, id string) (*PID, bool) {
pid := PID{
Address: pr.Address,
@@ -70,7 +70,7 @@ func (pr *ProcessRegistryValue) remove(pid *PID) {
pr.LocalPids.Remove(pid.Id)
}
func (pr *ProcessRegistryValue) get(pid *PID) (ActorRef, bool) {
func (pr *ProcessRegistryValue) get(pid *PID) (Process, bool) {
if pid == nil {
panic("Pid may not be nil")
}
@@ -87,13 +87,13 @@ func (pr *ProcessRegistryValue) get(pid *PID) (ActorRef, bool) {
if !ok {
return deadLetter, false
}
return ref.(ActorRef), true
return ref.(Process), true
}
func (pr *ProcessRegistryValue) GetLocal(id string) (ActorRef, bool) {
func (pr *ProcessRegistryValue) GetLocal(id string) (Process, bool) {
ref, ok := pr.LocalPids.Get(id)
if !ok {
return deadLetter, false
}
return ref.(ActorRef), true
return ref.(Process), true
}
View
@@ -52,20 +52,20 @@ func spawnRouter(id string, config RouterConfig, props Props, parent *PID) *PID
routerId := ProcessRegistry.getAutoId()
router := spawn(routerId, routerProps, parent)
ref := &RouterActorRef{
ref := &routerProcess{
router: router,
state: routerState,
}
proxy, _ := ProcessRegistry.add(ref, id)
return proxy
}
type RouterActorRef struct {
type routerProcess struct {
router *PID
state RouterState
}
func (ref *RouterActorRef) SendUserMessage(pid *PID, message interface{}, sender *PID) {
func (ref *routerProcess) SendUserMessage(pid *PID, message interface{}, sender *PID) {
if _, ok := message.(RouterManagementMessage); ok {
r, _ := ProcessRegistry.get(ref.router)
r.SendUserMessage(pid, message, sender)
@@ -74,20 +74,20 @@ func (ref *RouterActorRef) SendUserMessage(pid *PID, message interface{}, sender
}
}
func (ref *RouterActorRef) Watch(pid *PID) {
func (ref *routerProcess) Watch(pid *PID) {
ref.SendSystemMessage(pid, &Watch{Watcher: pid})
}
func (ref *RouterActorRef) Unwatch(pid *PID) {
func (ref *routerProcess) Unwatch(pid *PID) {
ref.SendSystemMessage(pid, &Unwatch{Watcher: pid})
}
func (ref *RouterActorRef) SendSystemMessage(pid *PID, message SystemMessage) {
func (ref *routerProcess) SendSystemMessage(pid *PID, message SystemMessage) {
r, _ := ProcessRegistry.get(ref.router)
r.SendSystemMessage(pid, message)
}
func (ref *RouterActorRef) Stop(pid *PID) {
func (ref *routerProcess) Stop(pid *PID) {
ref.SendSystemMessage(pid, stopMessage)
}
@@ -63,7 +63,7 @@ func TestRouterActor_Receive_BroadcastMessage(t *testing.T) {
p1 := NewLocalPID("p1")
p2 := NewLocalPID("p2")
child := new(mockActorRef)
child := new(mockProcess)
child.On("SendUserMessage", mock.Anything, mock.Anything, mock.Anything).Times(2)
ProcessRegistry.add(child, "p1")
View
@@ -20,7 +20,7 @@ func spawn(id string, props Props, parent *PID) *PID {
cell := NewActorCell(props, parent)
mailbox := props.ProduceMailbox()
ref := newLocalActorRef(mailbox)
ref := newLocalProcess(mailbox)
pid, new := ProcessRegistry.add(ref, id)
if new {
@@ -2,7 +2,7 @@ package remoting
import "github.com/AsynkronIT/protoactor-go/actor"
func remoteHandler(pid *actor.PID) (actor.ActorRef, bool) {
ref := newRemoteActorRef(pid)
func remoteHandler(pid *actor.PID) (actor.Process, bool) {
ref := newRemoteProcess(pid)
return ref, true
}
@@ -8,17 +8,17 @@ import (
"github.com/gogo/protobuf/proto"
)
type remoteActorRef struct {
type remoteProcess struct {
pid *actor.PID
}
func newRemoteActorRef(pid *actor.PID) actor.ActorRef {
return &remoteActorRef{
func newRemoteProcess(pid *actor.PID) actor.Process {
return &remoteProcess{
pid: pid,
}
}
func (ref *remoteActorRef) SendUserMessage(pid *actor.PID, message interface{}, sender *actor.PID) {
func (ref *remoteProcess) SendUserMessage(pid *actor.PID, message interface{}, sender *actor.PID) {
sendRemoteMessage(pid, message, sender)
}
@@ -32,7 +32,7 @@ func sendRemoteMessage(pid *actor.PID, message interface{}, sender *actor.PID) {
}
}
func (ref *remoteActorRef) SendSystemMessage(pid *actor.PID, message actor.SystemMessage) {
func (ref *remoteProcess) SendSystemMessage(pid *actor.PID, message actor.SystemMessage) {
//intercept any Watch messages and direct them to the endpoint manager
switch msg := message.(type) {
@@ -53,14 +53,14 @@ func (ref *remoteActorRef) SendSystemMessage(pid *actor.PID, message actor.Syste
}
}
func (ref *remoteActorRef) Stop(pid *actor.PID) {
func (ref *remoteProcess) Stop(pid *actor.PID) {
ref.SendSystemMessage(pid, &actor.Stop{})
}
func (ref *remoteActorRef) Watch(pid *actor.PID) {
func (ref *remoteProcess) Watch(pid *actor.PID) {
ref.SendSystemMessage(pid, &actor.Watch{Watcher: pid})
}
func (ref *remoteActorRef) Unwatch(pid *actor.PID) {
func (ref *remoteProcess) Unwatch(pid *actor.PID) {
ref.SendSystemMessage(pid, &actor.Unwatch{Watcher: pid})
}

0 comments on commit 06d598d

Please sign in to comment.