-
-
Notifications
You must be signed in to change notification settings - Fork 531
/
Copy pathpid.go
154 lines (128 loc) · 3.71 KB
/
pid.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package actor
import (
// "fmt"
// "github.com/gogo/protobuf/jsonpb"
"strings"
"sync/atomic"
"time"
"unsafe"
)
type PID struct {
Address string `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
Id string `protobuf:"bytes,2,opt,name=Id,proto3" json:"Id,omitempty"`
p *Process
}
/*
func (m *PID) MarshalJSONPB(*jsonpb.Marshaler) ([]byte, error) {
str := fmt.Sprintf("{\"Address\":\"%v\", \"Id\":\"%v\"}", m.Address, m.Id)
return []byte(str), nil
}*/
func (pid *PID) ref() Process {
p := (*Process)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&pid.p))))
if p != nil {
if l, ok := (*p).(*localProcess); ok && atomic.LoadInt32(&l.dead) == 1 {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&pid.p)), nil)
} else {
return *p
}
}
ref, exists := ProcessRegistry.Get(pid)
if exists {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&pid.p)), unsafe.Pointer(&ref))
}
return ref
}
// Tell sends a messages asynchronously to the PID
func (pid *PID) Tell(message interface{}) {
pid.ref().SendUserMessage(pid, message)
}
// Request sends a messages asynchronously to the PID. The actor may send a response back via respondTo, which is
// available to the receiving actor via Context.Sender
func (pid *PID) Request(message interface{}, respondTo *PID) {
env := &MessageEnvelope{
Message: message,
Header: nil,
Sender: respondTo,
}
pid.ref().SendUserMessage(pid, env)
}
// RequestFuture sends a message to a given PID and returns a Future
func (pid *PID) RequestFuture(message interface{}, timeout time.Duration) *Future {
future := NewFuture(timeout)
env := &MessageEnvelope{
Message: message,
Header: nil,
Sender: future.PID(),
}
pid.ref().SendUserMessage(pid, env)
return future
}
func (pid *PID) sendSystemMessage(message interface{}) {
pid.ref().SendSystemMessage(pid, message)
}
// StopFuture will stop actor immediately regardless of existing user messages in mailbox, and return its future.
func (pid *PID) StopFuture() *Future {
future := NewFuture(10 * time.Second)
pid.sendSystemMessage(&Watch{Watcher: future.pid})
pid.Stop()
return future
}
// GracefulStop will wait actor to stop immediately regardless of existing user messages in mailbox
func (pid *PID) GracefulStop() {
pid.StopFuture().Wait()
}
// Stop will stop actor immediately regardless of existing user messages in mailbox.
func (pid *PID) Stop() {
pid.ref().Stop(pid)
}
// PoisonFuture will tell actor to stop after processing current user messages in mailbox, and return its future.
func (pid *PID) PoisonFuture() *Future {
future := NewFuture(10 * time.Second)
pid.sendSystemMessage(&Watch{Watcher: future.pid})
pid.Poison()
return future
}
// GracefulPoison will tell and wait actor to stop after processing current user messages in mailbox.
func (pid *PID) GracefulPoison() {
pid.PoisonFuture().Wait()
}
// Poison will tell actor to stop after processing current user messages in mailbox.
func (pid *PID) Poison() {
pid.Tell(&PoisonPill{})
}
func pidFromKey(key string, p *PID) {
i := strings.IndexByte(key, '#')
if i == -1 {
p.Address = ProcessRegistry.Address
p.Id = key
} else {
p.Address = key[:i]
p.Id = key[i+1:]
}
}
func (pid *PID) key() string {
if pid.Address == ProcessRegistry.Address {
return pid.Id
}
return pid.Address + "#" + pid.Id
}
func (pid *PID) String() string {
if pid == nil {
return "nil"
}
return pid.Address + "/" + pid.Id
}
//NewPID returns a new instance of the PID struct
func NewPID(address, id string) *PID {
return &PID{
Address: address,
Id: id,
}
}
//NewLocalPID returns a new instance of the PID struct with the address preset
func NewLocalPID(id string) *PID {
return &PID{
Address: ProcessRegistry.Address,
Id: id,
}
}