-
-
Notifications
You must be signed in to change notification settings - Fork 514
/
activator_actor.go
143 lines (124 loc) · 3.75 KB
/
activator_actor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package remote
import (
"errors"
"fmt"
"time"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/log"
)
// Register a known actor props by name
func (r *Remote) Register(kind string, props *actor.Props) {
r.nameLookup[kind] = *props
}
// GetKnownKinds returns a slice of known actor "Kinds"
func (r *Remote) GetKnownKinds() []string {
keys := make([]string, 0, len(r.nameLookup))
for k := range r.nameLookup {
keys = append(keys, k)
}
return keys
}
type activator struct {
remote *Remote
}
// ErrActivatorUnavailable : this error will not panic the Activator.
// It simply tells Partition this Activator is not available
// Partition will then find next available Activator to spawn
var ErrActivatorUnavailable = &ActivatorError{ResponseStatusCodeUNAVAILABLE.ToInt32(), true}
type ActivatorError struct {
Code int32
DoNotPanic bool
}
func (e *ActivatorError) Error() string {
return fmt.Sprint(e.Code)
}
// ActivatorForAddress returns a PID for the activator at the given address
func (r *Remote) ActivatorForAddress(address string) *actor.PID {
pid := actor.NewPID(address, "activator")
return pid
}
// SpawnFuture spawns a remote actor and returns a Future that completes once the actor is started
func (r *Remote) SpawnFuture(address, name, kind string, timeout time.Duration) *actor.Future {
activator := r.ActivatorForAddress(address)
f := r.actorSystem.Root.RequestFuture(activator, &ActorPidRequest{
Name: name,
Kind: kind,
}, timeout)
return f
}
// Spawn spawns a remote actor of a given type at a given address
func (r *Remote) Spawn(address, kind string, timeout time.Duration) (*ActorPidResponse, error) {
return r.SpawnNamed(address, "", kind, timeout)
}
// SpawnNamed spawns a named remote actor of a given type at a given address
func (r *Remote) SpawnNamed(address, name, kind string, timeout time.Duration) (*ActorPidResponse, error) {
res, err := r.SpawnFuture(address, name, kind, timeout).Result()
if err != nil {
return nil, err
}
switch msg := res.(type) {
case *ActorPidResponse:
return msg, nil
default:
return nil, errors.New("remote: Unknown response when remote activating")
}
}
func newActivatorActor(remote *Remote) actor.Producer {
return func() actor.Actor {
return &activator{
remote: remote,
}
}
}
func (a *activator) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
plog.Info("Started Activator")
case *Ping:
context.Respond(&Pong{})
case *ActorPidRequest:
props, exist := a.remote.nameLookup[msg.Kind]
// if props not exist, return error and panic
if !exist {
response := &ActorPidResponse{
StatusCode: ResponseStatusCodeERROR.ToInt32(),
}
context.Respond(response)
panic(fmt.Errorf("no Props found for kind %s", msg.Kind))
}
name := msg.Name
// unnamed actor, assign auto ID
if name == "" {
name = context.ActorSystem().ProcessRegistry.NextId()
}
pid, err := context.SpawnNamed(&props, "Remote$"+name)
if err == nil {
response := &ActorPidResponse{Pid: pid}
context.Respond(response)
} else if err == actor.ErrNameExists {
response := &ActorPidResponse{
Pid: pid,
StatusCode: ResponseStatusCodePROCESSNAMEALREADYEXIST.ToInt32(),
}
context.Respond(response)
} else if aErr, ok := err.(*ActivatorError); ok {
response := &ActorPidResponse{
StatusCode: aErr.Code,
}
context.Respond(response)
if !aErr.DoNotPanic {
panic(err)
}
} else {
response := &ActorPidResponse{
StatusCode: ResponseStatusCodeERROR.ToInt32(),
}
context.Respond(response)
panic(err)
}
case actor.SystemMessage, actor.AutoReceiveMessage:
// ignore
default:
plog.Error("Activator received unknown message", log.TypeOf("type", msg), log.Message(msg))
}
}