forked from k3s-io/kine
/
endpoint.go
258 lines (227 loc) · 7.46 KB
/
endpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
package endpoint
import (
"context"
"fmt"
"net"
"os"
"strings"
"github.com/AdamShannag/ora-kine/pkg/drivers/dqlite"
"github.com/AdamShannag/ora-kine/pkg/drivers/generic"
"github.com/AdamShannag/ora-kine/pkg/drivers/mysql"
"github.com/AdamShannag/ora-kine/pkg/drivers/nats"
"github.com/AdamShannag/ora-kine/pkg/drivers/oracle"
"github.com/AdamShannag/ora-kine/pkg/drivers/pgsql"
"github.com/AdamShannag/ora-kine/pkg/drivers/sqlite"
"github.com/AdamShannag/ora-kine/pkg/metrics"
"github.com/AdamShannag/ora-kine/pkg/server"
"github.com/AdamShannag/ora-kine/pkg/tls"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/server/v3/embed"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
const (
KineSocket = "unix://kine.sock"
SQLiteBackend = "sqlite"
DQLiteBackend = "dqlite"
ETCDBackend = "etcd3"
JetStreamBackend = "jetstream"
NATSBackend = "nats"
MySQLBackend = "mysql"
PostgresBackend = "postgres"
OracleBackend = "oracle"
)
type Config struct {
GRPCServer *grpc.Server
Listener string
Endpoint string
ConnectionPoolConfig generic.ConnectionPoolConfig
ServerTLSConfig tls.Config
BackendTLSConfig tls.Config
MetricsRegisterer prometheus.Registerer
}
type ETCDConfig struct {
Endpoints []string
TLSConfig tls.Config
LeaderElect bool
}
func Listen(ctx context.Context, config Config) (ETCDConfig, error) {
driver, dsn := ParseStorageEndpoint(config.Endpoint)
if driver == ETCDBackend {
return ETCDConfig{
Endpoints: strings.Split(config.Endpoint, ","),
TLSConfig: config.BackendTLSConfig,
LeaderElect: true,
}, nil
}
leaderelect, backend, err := getKineStorageBackend(ctx, driver, dsn, config)
if err != nil {
return ETCDConfig{}, errors.Wrap(err, "building kine")
}
if config.MetricsRegisterer != nil {
config.MetricsRegisterer.MustRegister(
metrics.SQLTotal,
metrics.SQLTime,
metrics.CompactTotal,
)
}
if err := backend.Start(ctx); err != nil {
return ETCDConfig{}, errors.Wrap(err, "starting kine backend")
}
// set up GRPC server and register services
b := server.New(backend, endpointScheme(config))
grpcServer, err := grpcServer(config)
if err != nil {
return ETCDConfig{}, errors.Wrap(err, "creating GRPC server")
}
b.Register(grpcServer)
// Create raw listener and wrap in cmux for protocol switching
listener, err := createListener(config)
if err != nil {
return ETCDConfig{}, errors.Wrap(err, "creating listener")
}
go func() {
if err := grpcServer.Serve(listener); err != nil {
logrus.Errorf("Kine GPRC server exited: %v", err)
}
}()
endpoint := endpointURL(config, listener)
logrus.Infof("Kine available at %s", endpoint)
return ETCDConfig{
LeaderElect: leaderelect,
Endpoints: []string{endpoint},
TLSConfig: tls.Config{},
}, nil
}
// endpointURL returns a URI string suitable for use as a local etcd endpoint.
// For TCP sockets, it is assumed that the port can be reached via the loopback address.
func endpointURL(config Config, listener net.Listener) string {
scheme := endpointScheme(config)
address := listener.Addr().String()
if !strings.HasPrefix(scheme, "unix") {
_, port, err := net.SplitHostPort(address)
if err != nil {
logrus.Warnf("failed to get listener port: %v", err)
port = "2379"
}
address = "127.0.0.1:" + port
}
return scheme + "://" + address
}
// endpointScheme returns the URI scheme for the listener specified by the configuration.
func endpointScheme(config Config) string {
if config.Listener == "" {
config.Listener = KineSocket
}
network, _ := networkAndAddress(config.Listener)
if network != "unix" {
network = "http"
}
if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" {
// yes, etcd supports the "unixs" scheme for TLS over unix sockets
network += "s"
}
return network
}
// createListener returns a listener bound to the requested protocol and address.
func createListener(config Config) (ret net.Listener, rerr error) {
if config.Listener == "" {
config.Listener = KineSocket
}
network, address := networkAndAddress(config.Listener)
if network == "unix" {
if err := os.Remove(address); err != nil && !os.IsNotExist(err) {
logrus.Warnf("failed to remove socket %s: %v", address, err)
}
defer func() {
if err := os.Chmod(address, 0600); err != nil {
rerr = err
}
}()
} else {
network = "tcp"
}
return net.Listen(network, address)
}
// grpcServer returns either a preconfigured GRPC server, or builds a new GRPC
// server using upstream keepalive defaults plus the local Server TLS configuration.
func grpcServer(config Config) (*grpc.Server, error) {
if config.GRPCServer != nil {
return config.GRPCServer, nil
}
gopts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: embed.DefaultGRPCKeepAliveMinTime,
PermitWithoutStream: false,
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: embed.DefaultGRPCKeepAliveInterval,
Timeout: embed.DefaultGRPCKeepAliveTimeout,
}),
}
if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" {
creds, err := credentials.NewServerTLSFromFile(config.ServerTLSConfig.CertFile, config.ServerTLSConfig.KeyFile)
if err != nil {
return nil, err
}
gopts = append(gopts, grpc.Creds(creds))
}
return grpc.NewServer(gopts...), nil
}
// getKineStorageBackend parses the driver string, and returns a bool
// indicating whether the backend requires leader election, and a suitable
// backend datastore connection.
func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) (bool, server.Backend, error) {
var (
backend server.Backend
leaderElect = true
err error
)
switch driver {
case SQLiteBackend:
leaderElect = false
backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer)
case DQLiteBackend:
backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer)
case PostgresBackend:
backend, err = pgsql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer)
case MySQLBackend:
backend, err = mysql.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer)
case JetStreamBackend:
backend, err = nats.NewLegacy(ctx, dsn, cfg.BackendTLSConfig)
case NATSBackend:
backend, err = nats.New(ctx, dsn, cfg.BackendTLSConfig)
case OracleBackend:
backend, err = oracle.New(ctx, dsn, cfg.BackendTLSConfig, cfg.ConnectionPoolConfig, cfg.MetricsRegisterer)
default:
return false, nil, fmt.Errorf("storage backend is not defined")
}
return leaderElect, backend, err
}
// ParseStorageEndpoint returns the driver name and endpoint string from a datastore endpoint URL.
func ParseStorageEndpoint(storageEndpoint string) (string, string) {
network, address := networkAndAddress(storageEndpoint)
switch network {
case "":
return SQLiteBackend, ""
case "nats":
return NATSBackend, storageEndpoint
case "http":
fallthrough
case "https":
return ETCDBackend, address
}
return network, address
}
// networkAndAddress crudely splits a URL string into network (scheme) and address,
// where the address includes everything after the scheme/authority separator.
func networkAndAddress(str string) (string, string) {
parts := strings.SplitN(str, "://", 2)
if len(parts) > 1 {
return parts[0], parts[1]
}
return "", parts[0]
}