/
kdata.go
303 lines (253 loc) · 9.12 KB
/
kdata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
package kernel
import (
"io"
"log/slog"
"sync"
"github.com/iansmith/parigot/api/shared/id"
"github.com/iansmith/parigot/g/syscall/v1"
)
// kdata is one of the core kernel data structures. The data structures
// this object holds are primarily for the ability to block and correctly
// return when new input event has happened.
type kdata struct {
lock sync.Mutex
rawRecv []GeneralReceiver
// computed based on what we actually get passed
reg []Registrar
exporter []Exporter
// computed based on what we actually get passed
bind []Binder
ns Nameserver
start Starter
match callMatcher
// useful channel
cancel chan bool
}
var _ Kernel = &kdata{}
// newKData returns an initialized kernel
func newKData() *kdata {
return &kdata{
cancel: make(chan bool),
match: newCallMatcher(),
}
}
func (k *kdata) AddReceiver(r GeneralReceiver) {
k.lock.Lock()
defer k.lock.Unlock()
k.rawRecv = append(k.rawRecv, r)
}
// SetApproach sets a number of key subsystems in the kernel and should only be
// called when the kernel is a fresh state, as this call resets many internal data
// structures. The last three arguments are simple "extra slots" in their respective
// lists. These three can be nil
func (k *kdata) SetApproach(n Nameserver, st Starter, r Registrar, b Binder, e Exporter) syscall.KernelErr {
k.ns = n
k.start = st
for _, candidate := range []interface{}{n, st, r} {
if candidate == nil {
continue
}
if reg, ok := candidate.(Registrar); ok {
k.reg = append(k.reg, reg)
}
}
for _, candidate := range []interface{}{n, st, b} {
if candidate == nil {
continue
}
if b, ok := candidate.(Binder); ok {
k.bind = append(k.bind, b)
}
}
for _, candidate := range []interface{}{n, st, e} {
if candidate == nil {
continue
}
if e, ok := candidate.(Exporter); ok {
k.exporter = append(k.exporter, e)
}
}
return syscall.KernelErr_NoError
}
// Register is used to notify the kernel that a given service
// should be assigned a service id. Note that this may reach multiple
// parts of the kernel based on the Registrar interface.
func (k *kdata) Register(req *syscall.RegisterRequest, resp *syscall.RegisterResponse) syscall.KernelErr {
hid := id.UnmarshalHostId(req.GetHostId())
sid := id.NewServiceId()
debugName := req.GetDebugName()
result := syscall.KernelErr_NoError
for _, r := range k.reg {
if kerr := r.Register(hid, sid, debugName); kerr != syscall.KernelErr_NoError {
klog.Errorf("unexpected failure in registrar: %s", syscall.KernelErr_name[int32(kerr)])
result = kerr
}
}
resp.ServiceId = sid.Marshal()
return result
}
// func newService(hid id.HostId, sid id.ServiceId, debugName string) syscall.KernelErr {
// return syscall.KernelErr_NoError
// }
func (k *kdata) matcher() callMatcher {
return k.match
}
// Dispatch is used to send a call to a remote machine. If this
// returns a kernel error it is because the dispatch call itself could
// not be made, not that the dispatch worked ok and an error was returned
// by the remote code. This function is the end of the sequence
// of calls that start with some guest-side code calling a remote
// method.
func (k *kdata) Dispatch(req *syscall.DispatchRequest, resp *syscall.DispatchResponse) syscall.KernelErr {
return k.dispatchWithHostFunc(req, resp, nil, nil)
}
// HostDispatch is used to send a call to a remote machine with the caller
// being host-side code that wants the result via the given callback function.
func (k *kdata) HostDispatch(req *syscall.DispatchRequest, resp *syscall.DispatchResponse, hostFunc func(*syscall.ResolvedCall), w io.Writer) syscall.KernelErr {
return k.dispatchWithHostFunc(req, resp, hostFunc, w)
}
// The client and host side calls of Dispatch are wrappers around
// this function with different values for host func.
func (k *kdata) dispatchWithHostFunc(req *syscall.DispatchRequest, resp *syscall.DispatchResponse, hostFunc func(*syscall.ResolvedCall), w io.Writer) syscall.KernelErr {
// we don't want to lock here because we could block somebody
// else who is reading from the same channel
sid := id.UnmarshalServiceId(req.GetBundle().GetServiceId())
hid := id.UnmarshalHostId(req.GetBundle().GetHostId())
mid := id.UnmarshalMethodId(req.GetBundle().GetMethodId())
targetHid := k.Nameserver().FindHost(sid)
if targetHid.IsZeroOrEmptyValue() {
return syscall.KernelErr_BadId
}
cid := id.UnmarshalCallId(req.GetBundle().GetCallId())
k.matcher().Dispatch(targetHid, hid, cid, mid, hostFunc, w)
ch := k.Nameserver().FindHostChan(targetHid)
if ch == nil {
slog.Error("unable to find output net channel", "host", targetHid.Short())
panic("failed to get network for dispatch from nameserver.FindHostChan")
}
ch <- req
resp.CallId = cid.Marshal()
resp.TargetHostId = targetHid.Marshal()
return syscall.KernelErr_NoError
}
// original caller will get his call completed.
func (k *kdata) ReturnValue(req *syscall.ReturnValueRequest, resp *syscall.ReturnValueResponse) syscall.KernelErr {
cid := id.UnmarshalCallId(req.GetBundle().GetCallId())
//hid := id.UnmarshalHostId(req.GetBundle().GetHostId())
kerr := k.matcher().Response(cid, req.Result, req.ResultError)
if kerr != syscall.KernelErr_NoError {
return kerr
}
return syscall.KernelErr_NoError
}
// Launch logically causes a process to wait for all its dependencies to
// be ready. In practice, it returns immediately and then finishes the
// process later.
func (k *kdata) Launch(req *syscall.LaunchRequest, resp *syscall.LaunchResponse) syscall.KernelErr {
k.lock.Lock()
defer k.lock.Unlock()
sid := id.UnmarshalServiceId(req.GetServiceId())
cid := id.UnmarshalCallId(req.GetCallId())
hid := id.UnmarshalHostId(req.GetHostId())
mid := id.UnmarshalMethodId(req.GetMethodId())
// save for later
k.matcher().Dispatch(hid, id.HostIdZeroValue(), cid, mid, nil, nil)
klog.Infof("xxx saved as a dispatch for later:%s,%s,%s,%s", sid.Short(), cid.Short(), hid.Short(), mid.Short())
return k.start.Launch(sid, cid, hid, mid)
}
// Export binds a particular serviceid to a given name. The name is the name
// of an interface that allows the service to be found by other services.
func (k *kdata) Export(req *syscall.ExportRequest, resp *syscall.ExportResponse) syscall.KernelErr {
sid := id.UnmarshalServiceId(req.GetServiceId())
hid := id.UnmarshalHostId(req.GetHostId())
if hid.IsZeroOrEmptyValue() {
return syscall.KernelErr_BadId
}
if sid.IsZeroOrEmptyValue() {
return syscall.KernelErr_BadId
}
fqsn := req.GetService()
for _, fqs := range fqsn {
p := fqs.GetPackagePath()
n := fqs.GetService()
fqn := FQName{Pkg: p, Name: n}
for _, exp := range k.exporter {
kerr := exp.Export(hid, sid, fqn)
if kerr != syscall.KernelErr_NoError {
return kerr
}
}
}
return syscall.KernelErr_NoError
}
// Locate is the constructor for parigot types.
func (k *kdata) Locate(req *syscall.LocateRequest, resp *syscall.LocateResponse) syscall.KernelErr {
return k.start.Locate(req, resp)
}
// Require establishes a dependency(ies) from source to dest.
func (k *kdata) Require(req *syscall.RequireRequest, resp *syscall.RequireResponse) syscall.KernelErr {
return k.start.Require(req, resp)
}
// BindMethod connects a method name to a method id.
func (k *kdata) BindMethod(req *syscall.BindMethodRequest, resp *syscall.BindMethodResponse) syscall.KernelErr {
sid := id.UnmarshalServiceId(req.GetServiceId())
hid := id.UnmarshalHostId(req.GetHostId())
mid := id.NewMethodId()
name := req.GetMethodName()
for _, b := range k.bind {
b.Bind(hid, sid, mid, name)
}
resp.MethodId = mid.Marshal()
return syscall.KernelErr_NoError
}
// CancelRead should be call to gracefully exit its read loop. This is
// used when you lock, make changes, unlock and then want the kernel
// to pick up your change.
func (k *kdata) CancelRead() {
k.cancel <- true
}
func (k *kdata) Nameserver() Nameserver {
return k.ns
}
// func dumpRC(rc *syscall.ResolvedCall) string {
// hid := id.UnmarshalHostId(rc.GetHostId())
// cid := id.UnmarshalCallId(rc.GetCallId())
// mid := id.UnmarshalMethodId(rc.GetMethodId())
// return fmt.Sprintf("RC[%s,%s,%s]", hid.Short(), cid.Short(), mid.Short())
// }
func (k *kdata) responseReady(hid id.HostId, resp *syscall.ReadOneResponse) syscall.KernelErr {
rc, err := k.matcher().Ready(hid)
if err != syscall.KernelErr_NoError {
return err
}
if rc == nil {
resp.Resolved = nil
return syscall.KernelErr_NoError
}
resp.Timeout = false
resp.Bundle = &syscall.MethodBundle{}
resp.ParamOrResult = nil
resp.ResultErr = 0
resp.Resolved = rc
resp.Exit = &syscall.ExitPair{}
return syscall.KernelErr_NoError
}
func (k *kdata) Exit(req *syscall.ExitRequest, resp *syscall.ExitResponse) syscall.KernelErr {
k.lock.Lock()
defer k.lock.Unlock()
cid := id.UnmarshalCallId(req.GetCallId())
hid := id.UnmarshalHostId(req.GetHostId())
mid := id.UnmarshalMethodId(req.GetMethodId())
// save for later
k.matcher().Dispatch(hid, id.HostIdZeroValue(), cid, mid, nil, nil)
if req.ShutdownAll {
for _, host := range k.ns.AllHosts() {
kerr := k.matcher().Dispatch(host, id.HostIdZeroValue(), cid, mid, nil, nil)
if kerr != syscall.KernelErr_NoError {
return syscall.KernelErr_ExitFailed
}
}
}
resp.Pair = req.Pair
return syscall.KernelErr_NoError
}