/
app.go
147 lines (129 loc) · 4.77 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2023 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package app
import (
"context"
"syscall"
"time"
"github.com/gardener/etcd-wrapper/internal/types"
"github.com/gardener/etcd-wrapper/internal/bootstrap"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
)
// Application is a top level struct which serves as an entry point for this application.
type Application struct {
ctx context.Context
cancelFn context.CancelFunc
// Config is the application config
Config types.Config
etcdInitializer bootstrap.EtcdInitializer
cfg *embed.Config
etcdClient *clientv3.Client
etcd *embed.Etcd
waitReadyTimeout time.Duration
logger *zap.Logger
etcdReady bool // should have only one actor that updates it, queryAndUpdateEtcdReadiness()
}
// NewApplication initializes and returns an application struct
func NewApplication(ctx context.Context, cancelFn context.CancelFunc, config types.Config, waitReadyTimeout time.Duration, logger *zap.Logger) (*Application, error) {
logger.Info("Initializing application", zap.Any("config", config))
etcdInitializer, err := bootstrap.NewEtcdInitializer(&config.BackupRestore, logger)
if err != nil {
return nil, err
}
return &Application{
ctx: ctx,
cancelFn: cancelFn,
Config: config,
etcdInitializer: etcdInitializer,
waitReadyTimeout: waitReadyTimeout,
logger: logger,
}, nil
}
// Setup sets up etcd by triggering initialization of the etcd DB.
func (a *Application) Setup() error {
// Set up etcd
cfg, err := a.etcdInitializer.Run(a.ctx)
if err != nil {
return err
}
a.cfg = cfg
syscall.Umask(0077)
return nil
}
// Start sets up readiness probe and starts an embedded etcd.
func (a *Application) Start() error {
var err error
// Create etcd client for readiness probe
cli, err := a.createEtcdClient()
if err != nil {
return err
}
a.etcdClient = cli
defer a.Close()
// Setup readiness probe
go a.SetupReadinessProbe()
// Create embedded etcd and start.
if err = a.startEtcd(); err != nil {
return err
}
// Delete validation marker after etcd starts successfully
if err = bootstrap.CleanupExitCode(types.DefaultExitCodeFilePath); err != nil {
a.logger.Warn("failed to clean-up last captured exit code", zap.Error(err))
}
// block till application context is cancelled, or there is a notification on etcd.Server.StopNotify channel
// or there is an error notification on etcd.Err channel
select {
case <-a.ctx.Done():
a.logger.Error("application context has been cancelled", zap.Error(a.ctx.Err()))
case <-a.etcd.Server.StopNotify():
a.logger.Error("etcd server has been aborted, received notification on StopNotify channel")
case err = <-a.etcd.Err():
a.logger.Error("error received on etcd Err channel", zap.Error(err))
}
return nil
}
// Close closes resources(e.g. etcd client) and cancels the context if not already done so.
func (a *Application) Close() {
if err := a.etcdClient.Close(); err != nil {
a.logger.Error("failed to close etcd client", zap.Error(err))
}
a.cancelContext()
}
func (a *Application) cancelContext() {
// only if the context has not yet been cancelled, call the context.CancelFunc
if a.ctx.Err() == nil {
a.cancelFn()
}
}
func (a *Application) startEtcd() error {
// TODO StartEtcd returns an Etcd object. In future we should use that to listen on leadership change notifications (when we move to a version of etcd which exposes the channel).
etcd, err := embed.StartEtcd(a.cfg)
if err != nil {
return err
}
// wait till the etcd server notifies that it is ready, or if an abrupt stop has happened which is notified
// via etcd.Server.Notify or there is a timeout waiting for the etcd server to start.
select {
case <-etcd.Server.ReadyNotify():
a.logger.Info("etcd server is now ready to serve client requests")
case <-etcd.Server.StopNotify():
a.logger.Error("etcd server has been aborted, received notification on StopNotify channel")
case <-time.After(a.waitReadyTimeout):
a.logger.Error("timeout waiting for ReadyNotify signal, aborting start of etcd")
}
a.etcd = etcd
return nil
}