-
Notifications
You must be signed in to change notification settings - Fork 0
/
serviceServer.go
157 lines (130 loc) · 3.86 KB
/
serviceServer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package serviceServer
import (
"context"
"sync"
"github.com/influx6/npkg/njson"
"github.com/ewe-studios/sabuhp/actions"
"github.com/ewe-studios/sabuhp/injectors"
"golang.org/x/sync/errgroup"
"github.com/ewe-studios/sabuhp"
"github.com/ewe-studios/sabuhp/bus/redispub"
)
type Mod func(cs *ServiceServer)
func WithWorkerRegistry(registry *actions.WorkerTemplateRegistry) Mod {
return func(cs *ServiceServer) {
cs.Registry = registry
}
}
func WithInjector(injector *injectors.Injector) Mod {
return func(cs *ServiceServer) {
cs.Injector = injector
}
}
func WithCtx(this context.Context) Mod {
return func(cs *ServiceServer) {
cs.Ctx, cs.CancelFunc = context.WithCancel(this)
}
}
func WithRedisPubSub(config redispub.Config) Mod {
return func(cs *ServiceServer) {
var redisBus, busErr = redispub.PubSub(config)
if busErr != nil {
panic(busErr)
}
cs.Bus = redisBus
}
}
func WithRedisStreams(config redispub.Config) Mod {
return func(cs *ServiceServer) {
var redisBus, busErr = redispub.Stream(config)
if busErr != nil {
panic(busErr)
}
cs.Bus = redisBus
}
}
// ServiceServer exists to provide a central connection point to the message bus
// for the client (browser, CLI app, ...etc).
// They will never host any functions or processing providers but exists to provide
// a direct and distributed (by creating horizontally scaled replicas) that allow clients
// to deliver requests into the underline pubsub bus which will deliver these to service
// servers who host nothing else but functions and processors.
type ServiceServer struct {
initer sync.Once
Ctx context.Context
CancelFunc context.CancelFunc
Logger sabuhp.Logger
ErrGroup *errgroup.Group
BusRelay *sabuhp.BusRelay
Injector *injectors.Injector
Registry *actions.WorkerTemplateRegistry
Escalations actions.EscalationNotification
Workers *actions.ActionHub
Bus sabuhp.MessageBus
}
func New(ctx context.Context, logger sabuhp.Logger, bus sabuhp.MessageBus, mods ...Mod) *ServiceServer {
var cs = new(ServiceServer)
cs.Bus = bus
cs.Logger = logger
cs.Injector = injectors.NewInjector()
var errCtx context.Context
cs.ErrGroup, errCtx = errgroup.WithContext(ctx)
cs.Logger = logger
cs.Ctx, cs.CancelFunc = context.WithCancel(errCtx)
cs.BusRelay = sabuhp.NewBusRelay(cs.Ctx, cs.Logger, cs.Bus)
for _, mod := range mods {
mod(cs)
}
return cs
}
// Init allows you to initialize all components for setup as
// calling ServiceServer.Start with both initialize and start all
// related servers.
//
// If you wish to use the default setup but customize to fit your needs
// it's your best interest to call ServiceServer.Init first.
func (c *ServiceServer) Init() {
c.initer.Do(c.initializeComponents)
}
// Start calls ServiceServer.Init first then starts all related servers (http, websocket)
// etc.
func (c *ServiceServer) Start() {
c.Init()
// start web socket server
c.Workers.Start()
c.ErrGroup.Go(func() error {
c.Workers.Wait()
return nil
})
}
func (c *ServiceServer) Wait() error {
return c.ErrGroup.Wait()
}
func (c *ServiceServer) initializeComponents() {
if c.Escalations == nil {
c.Escalations = func(escalation actions.Escalation, hub *actions.ActionHub) {
var logStack = njson.Log(c.Logger)
var log = logStack.New().
LDebug().
Message("worker escalation occurred").
Formatted("data", "%+v", escalation.Data)
if escalation.OffendingMessage != nil {
log.Object("offending_message", escalation.OffendingMessage)
}
log.Formatted("worker_protocol", "%+v", escalation.WorkerProtocol).
Formatted("group_protocol", "%+v", escalation.GroupProtocol).
List("stats", actions.WorkerStats(hub.Stats())).
Int("pending_messages", len(escalation.PendingMessages)).
End()
}
}
c.Workers = actions.NewActionHub(
c.Ctx,
c.Escalations,
c.Registry,
c.Injector,
c.Bus,
c.BusRelay,
c.Logger,
)
}