/
service.go
189 lines (156 loc) · 5.22 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package service
import (
"context"
"github.com/ONSdigital/dp-image-importer/config"
"github.com/ONSdigital/dp-image-importer/event"
kafka "github.com/ONSdigital/dp-kafka/v2"
"github.com/ONSdigital/log.go/v2/log"
"github.com/gorilla/mux"
"github.com/pkg/errors"
)
// Service contains all the configs, server and clients to run the Image API
type Service struct {
config *config.Config
server HTTPServer
router *mux.Router
serviceList *ExternalServiceList
healthCheck HealthChecker
consumer kafka.IConsumerGroup
}
// Run the service
func Run(ctx context.Context, serviceList *ExternalServiceList, buildTime, gitCommit, version string, svcErrors chan error) (*Service, error) {
log.Info(ctx, "running service")
// Read config
cfg, err := config.Get()
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve service configuration")
}
log.Info(ctx, "got service configuration", log.Data{"config": cfg})
// Get HTTP Server with collectionID checkHeader middleware
r := mux.NewRouter()
s := serviceList.GetHTTPServer(cfg.BindAddr, r)
// Get S3 Clients
s3Uploaded, s3Private, err := serviceList.GetS3Clients(cfg)
if err != nil {
log.Fatal(ctx, "could not instantiate S3 clients", err)
return nil, err
}
// Get Image API Client
imageAPI := serviceList.GetImageAPI(ctx, cfg)
// Get Kafka consumer
consumer, err := serviceList.GetKafkaConsumer(ctx, cfg)
if err != nil {
log.Fatal(ctx, "failed to initialise kafka consumer", err)
return nil, err
}
// Event Handler for Kafka Consumer with the created S3 Clients
event.Consume(ctx, consumer, &event.ImageUploadedHandler{
AuthToken: cfg.ServiceAuthToken,
S3Private: s3Private,
S3Upload: s3Uploaded,
ImageCli: imageAPI,
DownloadServiceURL: cfg.DownloadServiceURL,
}, cfg.KafkaConsumerWorkers)
// Get HealthCheck
hc, err := serviceList.GetHealthCheck(cfg, buildTime, gitCommit, version)
if err != nil {
log.Fatal(ctx, "could not instantiate healthcheck", err)
return nil, err
}
if err := registerCheckers(ctx, hc, s3Private, s3Uploaded, imageAPI, consumer); err != nil {
return nil, errors.Wrap(err, "unable to register checkers")
}
r.StrictSlash(true).Path("/health").HandlerFunc(hc.Handler)
hc.Start(ctx)
// Run the http server in a new go-routine
go func() {
if err := s.ListenAndServe(); err != nil {
svcErrors <- errors.Wrap(err, "failure in http listen and serve")
}
}()
return &Service{
config: cfg,
server: s,
router: r,
serviceList: serviceList,
healthCheck: hc,
consumer: consumer,
}, nil
}
// Close gracefully shuts the service down in the required order, with timeout
func (svc *Service) Close(ctx context.Context) error {
timeout := svc.config.GracefulShutdownTimeout
log.Info(ctx, "commencing graceful shutdown", log.Data{"graceful_shutdown_timeout": timeout})
ctx, cancel := context.WithTimeout(ctx, timeout)
// track shutown gracefully closes up
var gracefulShutdown bool
go func() {
defer cancel()
var hasShutdownError bool
// stop healthcheck, as it depends on everything else
if svc.serviceList.HealthCheck {
svc.healthCheck.Stop()
}
// If kafka consumer exists, stop listening to it. (Will close later)
if svc.serviceList.KafkaConsumer {
if err := svc.consumer.StopListeningToConsumer(ctx); err != nil {
log.Error(ctx, "error stopping kafka consumer listener", err)
hasShutdownError = true
}
log.Info(ctx, "stopped kafka consumer listener")
}
// stop any incoming requests before closing any outbound connections
if err := svc.server.Shutdown(ctx); err != nil {
log.Error(ctx, "failed to shutdown http server", err)
hasShutdownError = true
}
// If kafka consumer exists, close it.
if svc.serviceList.KafkaConsumer {
if err := svc.consumer.Close(ctx); err != nil {
log.Error(ctx, "error closing kafka consumer", err)
hasShutdownError = true
}
log.Info(ctx, "closed kafka consumer")
}
if !hasShutdownError {
gracefulShutdown = true
}
}()
// wait for shutdown success (via cancel) or failure (timeout)
<-ctx.Done()
if !gracefulShutdown {
err := errors.New("failed to shutdown gracefully")
log.Error(ctx, "failed to shutdown gracefully ", err)
return err
}
log.Info(ctx, "graceful shutdown was successful")
return nil
}
func registerCheckers(ctx context.Context,
hc HealthChecker,
s3Private event.S3Writer,
s3Uploaded event.S3Reader,
imageAPI event.ImageAPIClient,
consumer kafka.IConsumerGroup) (err error) {
hasErrors := false
if err := hc.AddCheck("S3 private bucket", s3Private.Checker); err != nil {
hasErrors = true
log.Error(ctx, "error adding check for s3Private private bucket", err)
}
if err := hc.AddCheck("S3 uploaded bucket", s3Uploaded.Checker); err != nil {
hasErrors = true
log.Error(ctx, "error adding check for s3Private uploaded bucket", err)
}
if err := hc.AddCheck("Image API client", imageAPI.Checker); err != nil {
hasErrors = true
log.Error(ctx, "error adding check for Image API", err)
}
if err := hc.AddCheck("Kafka consumer", consumer.Checker); err != nil {
hasErrors = true
log.Error(ctx, "error adding check for Image API", err)
}
if hasErrors {
return errors.New("Error(s) registering checkers for healthcheck")
}
return nil
}