generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
serve.go
195 lines (164 loc) · 5.57 KB
/
serve.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package plugin
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"reflect"
"strconv"
"syscall"
"time"
"connectrpc.com/connect"
"connectrpc.com/grpcreflect"
"github.com/alecthomas/kong"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
_ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/rpc"
)
type serveCli struct {
LogConfig log.Config `prefix:"log-" embed:"" group:"Logging:"`
Bind *url.URL `help:"URL to listen on." env:"FTL_BIND" required:""`
kong.Plugins
}
type serverRegister[Impl any] struct {
servicePath string
register func(i Impl, mux *http.ServeMux)
}
type handlerPath struct {
path string
handler http.Handler
}
type startOptions[Impl any] struct {
register []serverRegister[Impl]
handlers []handlerPath
}
// StartOption is an option for Start.
type StartOption[Impl any] func(*startOptions[Impl])
// ConnectHandlerFactory is a type alias for a function that creates a new Connect request handler.
//
// This will typically just be the generated NewXYZHandler function.
type ConnectHandlerFactory[Iface any] func(Iface, ...connect.HandlerOption) (string, http.Handler)
// RegisterAdditionalServer allows a plugin to serve additional gRPC services.
//
// "Impl" must be an implementation of "Iface.
func RegisterAdditionalServer[Impl any, Iface any](servicePath string, register ConnectHandlerFactory[Iface]) StartOption[Impl] {
return func(so *startOptions[Impl]) {
so.register = append(so.register, serverRegister[Impl]{
servicePath: servicePath,
register: func(i Impl, mux *http.ServeMux) {
mux.Handle(register(any(i).(Iface), rpc.DefaultHandlerOptions()...)) //nolint:forcetypeassert
}})
}
}
// RegisterAdditionalHandler allows a plugin to serve additional HTTP handlers.
func RegisterAdditionalHandler[Impl any](path string, handler http.Handler) StartOption[Impl] {
return func(so *startOptions[Impl]) {
so.handlers = append(so.handlers, handlerPath{path: path, handler: handler})
}
}
// Constructor is a function that creates a new plugin server implementation.
type Constructor[Impl any, Config any] func(context.Context, Config) (context.Context, Impl, error)
// Start a gRPC server plugin listening on the socket specified by the
// environment variable FTL_BIND.
//
// This function does not return.
//
// "Config" is Kong configuration to pass to "create".
// "create" is called to create the implementation of the service.
// "register" is called to register the service with the gRPC server and is typically a generated function.
func Start[Impl any, Iface any, Config any](
ctx context.Context,
name string,
create Constructor[Impl, Config],
servicePath string,
register ConnectHandlerFactory[Iface],
options ...StartOption[Impl],
) {
var config Config
cli := serveCli{Plugins: kong.Plugins{&config}}
kctx := kong.Parse(&cli, kong.Description(`FTL - Towards a 𝝺-calculus for large-scale systems`))
mux := http.NewServeMux()
so := &startOptions[Impl]{}
for _, option := range options {
option(so)
}
for _, handler := range so.handlers {
mux.Handle(handler.path, handler.handler)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Configure logging to JSON on stdout. This will be read by the parent process.
logConfig := cli.LogConfig
logConfig.JSON = true
logger := log.Configure(os.Stderr, logConfig)
logger = logger.Scope(name)
ctx = log.ContextWithLogger(ctx, logger)
logger.Tracef("Starting on %s", cli.Bind)
// Signal handling.
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigch
logger.Debugf("Terminated by signal %s", sig)
cancel()
_ = syscall.Kill(-syscall.Getpid(), sig.(syscall.Signal)) //nolint:forcetypeassert,errcheck // best effort
os.Exit(0)
}()
ctx, svc, err := create(ctx, config)
kctx.FatalIfErrorf(err)
if _, ok := any(svc).(Iface); !ok {
var iface Iface
panic(fmt.Sprintf("%s does not implement %s", reflect.TypeOf(svc), reflect.TypeOf(iface)))
}
l, err := net.Listen("tcp", cli.Bind.Host)
kctx.FatalIfErrorf(err)
servicePaths := []string{servicePath}
mux.Handle(register(any(svc).(Iface), rpc.DefaultHandlerOptions()...)) //nolint:forcetypeassert
for _, register := range so.register {
register.register(svc, mux)
servicePaths = append(servicePaths, register.servicePath)
}
reflector := grpcreflect.NewStaticReflector(servicePaths...)
mux.Handle(grpcreflect.NewHandlerV1(reflector))
mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector))
// Start the server.
http1Server := &http.Server{
Handler: h2c.NewHandler(mux, &http2.Server{}),
ReadHeaderTimeout: time.Second * 30,
BaseContext: func(net.Listener) context.Context { return ctx },
}
err = http1Server.Serve(l)
kctx.FatalIfErrorf(err)
kctx.Exit(0)
}
func allocatePort() (*net.TCPAddr, error) {
l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
if err != nil {
return nil, fmt.Errorf("failed to allocate port: %w", err)
}
_ = l.Close()
return l.Addr().(*net.TCPAddr), nil //nolint:forcetypeassert
}
func cleanup(logger *log.Logger, pidFile string) error {
pidb, err := os.ReadFile(pidFile)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
pid, err := strconv.Atoi(string(pidb))
if err != nil && !os.IsNotExist(err) {
return err
}
err = syscall.Kill(pid, syscall.SIGKILL)
if err != nil && !errors.Is(err, syscall.ESRCH) {
logger.Warnf("Failed to reap old plugin with pid %d: %s", pid, err)
}
return nil
}