generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
server.go
148 lines (126 loc) · 4.02 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package rpc
import (
"context"
"errors"
"net"
"net/http"
"net/url"
"strings"
"time"
"connectrpc.com/connect"
"connectrpc.com/grpcreflect"
"github.com/alecthomas/concurrency"
"github.com/alecthomas/types/pubsub"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
const ShutdownGracePeriod = time.Second * 5
type serverOptions struct {
mux *http.ServeMux
reflectionPaths []string
}
type Option func(*serverOptions)
type GRPCServerConstructor[Iface Pingable] func(svc Iface, opts ...connect.HandlerOption) (string, http.Handler)
type RawGRPCServerConstructor[Iface any] func(svc Iface, opts ...connect.HandlerOption) (string, http.Handler)
// GRPC is a convenience function for registering a GRPC server with default options.
// TODO(aat): Do we need pingable here?
func GRPC[Iface, Impl Pingable](constructor GRPCServerConstructor[Iface], impl Impl, options ...connect.HandlerOption) Option {
return func(o *serverOptions) {
options = append(options, DefaultHandlerOptions()...)
path, handler := constructor(any(impl).(Iface), options...)
o.reflectionPaths = append(o.reflectionPaths, strings.Trim(path, "/"))
o.mux.Handle(path, handler)
}
}
// RawGRPC is a convenience function for registering a GRPC server with default options without Pingable.
func RawGRPC[Iface, Impl any](constructor RawGRPCServerConstructor[Iface], impl Impl, options ...connect.HandlerOption) Option {
return func(o *serverOptions) {
options = append(options, DefaultHandlerOptions()...)
path, handler := constructor(any(impl).(Iface), options...)
o.reflectionPaths = append(o.reflectionPaths, strings.Trim(path, "/"))
o.mux.Handle(path, handler)
}
}
// HTTP adds a HTTP route to the server.
func HTTP(prefix string, handler http.Handler) Option {
return func(o *serverOptions) {
o.mux.Handle(prefix, handler)
}
}
type Server struct {
listen *url.URL
Bind *pubsub.Topic[*url.URL] // Will be updated with the actual bind address.
Server *http.Server
}
func NewServer(ctx context.Context, listen *url.URL, options ...Option) (*Server, error) {
opts := &serverOptions{
mux: http.NewServeMux(),
}
opts.mux.Handle("/healthz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
for _, option := range options {
option(opts)
}
// Register reflection services.
reflector := grpcreflect.NewStaticReflector(opts.reflectionPaths...)
opts.mux.Handle(grpcreflect.NewHandlerV1(reflector))
opts.mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector))
root := ContextValuesMiddleware(ctx, opts.mux)
http1Server := &http.Server{
Handler: h2c.NewHandler(root, &http2.Server{}),
ReadHeaderTimeout: time.Second * 30,
BaseContext: func(net.Listener) context.Context { return ctx },
}
return &Server{
listen: listen,
Bind: pubsub.New[*url.URL](),
Server: http1Server,
}, nil
}
// Serve runs the server, updating .Bind with the actual bind address.
func (s *Server) Serve(ctx context.Context) error {
listener, err := net.Listen("tcp", s.listen.Host)
if err != nil {
return err
}
if s.listen.Port() == "0" {
s.listen.Host = listener.Addr().String()
}
s.Bind.Publish(s.listen)
tree, _ := concurrency.New(ctx)
// Shutdown server on context cancellation.
tree.Go(func(ctx context.Context) error {
<-ctx.Done()
ctx, cancel := context.WithTimeout(context.Background(), ShutdownGracePeriod)
defer cancel()
err := s.Server.Shutdown(ctx)
if err == nil {
return nil
}
if errors.Is(err, context.Canceled) {
_ = s.Server.Close()
return err
}
return err
})
// Start server.
tree.Go(func(ctx context.Context) error {
err = s.Server.Serve(listener)
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
})
return tree.Wait()
}
// Serve starts a HTTP and Connect gRPC server with sane defaults for FTL.
//
// Blocks until the context is cancelled.
func Serve(ctx context.Context, listen *url.URL, options ...Option) error {
server, err := NewServer(ctx, listen, options...)
if err != nil {
return err
}
return server.Serve(ctx)
}