-
Notifications
You must be signed in to change notification settings - Fork 7
/
service.go
146 lines (119 loc) · 4.24 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright (c) 2016 Timo Savola. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
import (
"context"
"encoding/binary"
"gate.computer/gate/internal/error/badprogram"
"gate.computer/gate/packet"
"gate.computer/gate/snapshot"
)
const maxServices = 256
const serviceStateAvail uint8 = 0x1
var ErrDuplicateService error = badprogram.Error("duplicate service")
// ServiceState is used to respond to a service discovery request.
type ServiceState struct {
flags uint8
}
func (s *ServiceState) SetAvail() {
s.flags |= serviceStateAvail
}
// ServiceConfig for program instance specific ServiceRegistry invocation.
type ServiceConfig struct {
MaxSendSize int // Maximum size which the program is prepared to receive.
}
// ServiceRegistry is a collection of configured services.
//
// StartServing is called once for each program instance. The receive channel
// is closed when the program is being shut down.
//
// config.MaxSendSize may be used when buffering data.
//
// The snapshot buffers must not be mutated, and references to them shouldn't
// be retained for long as they may be parts of a large memory allocation.
//
// The returned channel will deliver up to one error if one occurs after
// initialization.
//
// The service package contains an implementation of this interface.
type ServiceRegistry interface {
CreateServer(context.Context, ServiceConfig, []snapshot.Service, chan<- packet.Thunk) (InstanceServer, []ServiceState, <-chan error, error)
}
type InstanceServer interface {
Start(context.Context, chan<- packet.Thunk) error
Discover(ctx context.Context, newNames []string) (all []ServiceState, err error)
Handle(context.Context, chan<- packet.Thunk, packet.Buf) (packet.Buf, error)
Shutdown(ctx context.Context, suspend bool) ([]snapshot.Service, error)
}
type serviceDiscoverer struct {
server InstanceServer
numServices int
}
func (discoverer *serviceDiscoverer) handlePacket(ctx context.Context, req packet.Buf) (resp packet.Buf, err error) {
if d := req.Domain(); d != packet.DomainCall {
err = badprogram.Errorf("service discovery packet has wrong domain: %d", d)
return
}
if n := len(req); n < packet.ServicesHeaderSize {
err = badprogram.Errorf("service discovery packet is too short: %d bytes", n)
return
}
reqCount := int(binary.LittleEndian.Uint16(req[packet.OffsetServicesCount:]))
respCount := discoverer.numServices + reqCount
if respCount > maxServices {
respCount = maxServices
reqCount = maxServices - discoverer.numServices
}
nameBuf := req[packet.ServicesHeaderSize:]
names := make([]string, reqCount)
for i := range names {
if len(nameBuf) < 1 {
err = badprogram.Error("name data is truncated in service discovery packet")
return
}
nameLen := nameBuf[0]
nameBuf = nameBuf[1:]
if nameLen == 0 || nameLen > 127 {
err = badprogram.Error("service name length in discovery packet is out of bounds")
return
}
if len(nameBuf) < int(nameLen) {
err = badprogram.Error("name data is truncated in service discovery packet")
return
}
names[i] = string(nameBuf[:nameLen])
nameBuf = nameBuf[nameLen:]
}
services, err := discoverer.server.Discover(ctx, names)
if err != nil {
return
}
discoverer.numServices = len(services)
resp = makeServicesPacket(packet.DomainCall, services)
return
}
func (discoverer *serviceDiscoverer) checkPacket(p packet.Buf) (packet.Buf, error) {
if int(p.Code()) >= discoverer.numServices {
return nil, badprogram.Errorf("invalid service code in packet: %d", p.Code())
}
switch p.Domain() {
case packet.DomainCall, packet.DomainInfo, packet.DomainFlow:
case packet.DomainData:
if n := len(p); n < packet.DataHeaderSize {
return nil, badprogram.Errorf("data packet is too short: %d bytes", n)
}
default:
return nil, badprogram.Errorf("invalid domain in packet: %d", p.Domain())
}
return p, nil
}
func makeServicesPacket(domain packet.Domain, services []ServiceState) (resp packet.Buf) {
resp = packet.Make(packet.CodeServices, domain, packet.ServicesHeaderSize+len(services))
resp.SetSize()
binary.LittleEndian.PutUint16(resp[packet.OffsetServicesCount:], uint16(len(services)))
for i, s := range services {
resp[packet.ServicesHeaderSize+i] = s.flags
}
return
}