/
inbox.go
58 lines (48 loc) · 1004 Bytes
/
inbox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package actor
import (
"runtime"
"github.com/charlesderek/actor-model/ggq"
"github.com/charlesderek/actor-model/log"
)
var LOCK_OS_THREAD = true
type Inboxer interface {
Send(Envelope)
Start(Processor)
Stop() error
}
type Inbox struct {
ggq *ggq.GGQ[Envelope]
proc Processor
}
func NewInbox(size int) *Inbox {
in := &Inbox{}
in.ggq = ggq.New[Envelope](uint32(size), in)
return in
}
func (in *Inbox) Consume(msgs []Envelope) {
in.proc.Invoke(msgs)
}
func (in *Inbox) Start(proc Processor) {
in.proc = proc
var lockOSThread bool
// prevent race condition here be reassigning before go routine.
if LOCK_OS_THREAD {
lockOSThread = true
}
go func() {
if lockOSThread {
runtime.LockOSThread()
}
in.ggq.ReadN()
}()
log.Tracew("[INBOX] started", log.M{"pid": proc.PID()})
}
func (in *Inbox) Stop() error {
in.ggq.Close()
log.Tracew("[INBOX] closed", log.M{"pid": in.proc.PID()})
return nil
}
func (in *Inbox) Send(msg Envelope) {
in.ggq.Awake()
in.ggq.Write(msg)
}