-
Notifications
You must be signed in to change notification settings - Fork 113
/
mentix.go
335 lines (272 loc) · 9.65 KB
/
mentix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package mentix
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/rs/zerolog"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/mentix/accservice"
"github.com/cs3org/reva/v2/pkg/mentix/config"
"github.com/cs3org/reva/v2/pkg/mentix/connectors"
"github.com/cs3org/reva/v2/pkg/mentix/entity"
"github.com/cs3org/reva/v2/pkg/mentix/exchangers"
"github.com/cs3org/reva/v2/pkg/mentix/exchangers/exporters"
"github.com/cs3org/reva/v2/pkg/mentix/exchangers/importers"
"github.com/cs3org/reva/v2/pkg/mentix/meshdata"
)
// Mentix represents the main Mentix service object.
type Mentix struct {
conf *config.Configuration
log *zerolog.Logger
connectors *connectors.Collection
importers *importers.Collection
exporters *exporters.Collection
meshDataSet meshdata.Map
updateInterval time.Duration
}
const (
runLoopSleeptime = time.Millisecond * 1000
)
func (mntx *Mentix) initialize(conf *config.Configuration, log *zerolog.Logger) error {
if conf == nil {
return fmt.Errorf("no configuration provided")
}
mntx.conf = conf
if log == nil {
return fmt.Errorf("no logger provided")
}
mntx.log = log
// Initialize the connectors that will be used to gather the mesh data
if err := mntx.initConnectors(); err != nil {
return fmt.Errorf("unable to initialize connector: %v", err)
}
// Initialize the exchangers
if err := mntx.initExchangers(); err != nil {
return fmt.Errorf("unable to initialize exchangers: %v", err)
}
// Get the update interval
duration, err := time.ParseDuration(mntx.conf.UpdateInterval)
if err != nil {
// If the duration can't be parsed, default to one hour
duration = time.Hour
}
mntx.updateInterval = duration
// Create empty mesh data set
mntx.meshDataSet = make(meshdata.Map)
// Log some infos
connectorNames := entity.GetNames(mntx.connectors)
importerNames := entity.GetNames(mntx.importers)
exporterNames := entity.GetNames(mntx.exporters)
log.Info().Msgf("mentix started with connectors: %v; importers: %v; exporters: %v; update interval: %v",
strings.Join(connectorNames, ", "),
strings.Join(importerNames, ", "),
strings.Join(exporterNames, ", "),
duration,
)
return nil
}
func (mntx *Mentix) initConnectors() error {
// Use all connectors exposed by the connectors package
conns, err := connectors.AvailableConnectors(mntx.conf)
if err != nil {
return fmt.Errorf("unable to get registered conns: %v", err)
}
mntx.connectors = conns
if err := mntx.connectors.ActivateAll(mntx.conf, mntx.log); err != nil {
return fmt.Errorf("unable to activate connectors: %v", err)
}
return nil
}
func (mntx *Mentix) initExchangers() error {
// Use all importers exposed by the importers package
imps, err := importers.AvailableImporters(mntx.conf)
if err != nil {
return fmt.Errorf("unable to get registered importers: %v", err)
}
mntx.importers = imps
if err := mntx.importers.ActivateAll(mntx.conf, mntx.log); err != nil {
return fmt.Errorf("unable to activate importers: %v", err)
}
// Use all exporters exposed by the exporters package
exps, err := exporters.AvailableExporters(mntx.conf)
if err != nil {
return fmt.Errorf("unable to get registered exporters: %v", err)
}
mntx.exporters = exps
if err := mntx.exporters.ActivateAll(mntx.conf, mntx.log); err != nil {
return fmt.Errorf("unable to activate exporters: %v", err)
}
return nil
}
func (mntx *Mentix) startExchangers() error {
// Start all importers
if err := mntx.importers.StartAll(); err != nil {
return fmt.Errorf("unable to start importers: %v", err)
}
// Start all exporters
if err := mntx.exporters.StartAll(); err != nil {
return fmt.Errorf("unable to start exporters: %v", err)
}
return nil
}
func (mntx *Mentix) stopExchangers() {
mntx.exporters.StopAll()
mntx.importers.StopAll()
}
func (mntx *Mentix) destroy() {
mntx.stopExchangers()
}
// Run starts the Mentix service that will periodically pull the configured data source and publish this data
// through the enabled exporters.
func (mntx *Mentix) Run(stopSignal <-chan struct{}) error {
defer mntx.destroy()
// Start all im- & exporters; they will be stopped in mntx.destroy
if err := mntx.startExchangers(); err != nil {
return fmt.Errorf("unable to start exchangers: %v", err)
}
updateTimestamp := time.Time{}
loop:
for {
if stopSignal != nil {
// Poll the stopSignal channel; if a signal was received, break the loop, terminating Mentix gracefully
select {
case <-stopSignal:
break loop
default:
}
}
// Perform all regular actions
mntx.tick(&updateTimestamp)
time.Sleep(runLoopSleeptime)
}
return nil
}
func (mntx *Mentix) tick(updateTimestamp *time.Time) {
// Let all importers do their work first
meshDataUpdated, err := mntx.processImporters()
if err != nil {
mntx.log.Err(err).Msgf("an error occurred while processing the importers: %v", err)
}
// If mesh data has been imported or enough time has passed, update the stored mesh data and all exporters
if meshDataUpdated || time.Since(*updateTimestamp) >= mntx.updateInterval {
// Retrieve and update the mesh data; if the importers modified any data, these changes will
// be reflected automatically here
if meshDataSet, err := mntx.retrieveMeshDataSet(); err == nil {
if err := mntx.applyMeshDataSet(meshDataSet); err != nil {
mntx.log.Err(err).Msg("failed to apply mesh data")
}
} else {
mntx.log.Err(err).Msg("failed to retrieve mesh data")
}
*updateTimestamp = time.Now()
}
}
func (mntx *Mentix) processImporters() (bool, error) {
meshDataUpdated := false
for _, importer := range mntx.importers.Importers {
updated, err := importer.Process(mntx.connectors)
if err != nil {
return false, fmt.Errorf("unable to process importer '%v': %v", importer.GetName(), err)
}
meshDataUpdated = meshDataUpdated || updated
if updated {
mntx.log.Debug().Msgf("mesh data imported from '%v'", importer.GetName())
}
}
return meshDataUpdated, nil
}
func (mntx *Mentix) retrieveMeshDataSet() (meshdata.Map, error) {
meshDataSet := make(meshdata.Map)
for _, connector := range mntx.connectors.Connectors {
meshData, err := connector.RetrieveMeshData()
if err == nil {
meshDataSet[connector.GetID()] = meshData
} else {
mntx.log.Err(err).Msgf("retrieving mesh data from connector '%v' failed", connector.GetName())
}
}
return meshDataSet, nil
}
func (mntx *Mentix) applyMeshDataSet(meshDataSet meshdata.Map) error {
// Check if mesh data from any connector has changed
meshDataChanged := false
for connectorID, meshData := range meshDataSet {
if !meshData.Compare(mntx.meshDataSet[connectorID]) {
meshDataChanged = true
break
}
}
if meshDataChanged {
mntx.log.Debug().Msg("mesh data changed, applying")
mntx.meshDataSet = meshDataSet
exchangers := make([]exchangers.Exchanger, 0, len(mntx.exporters.Exporters)+len(mntx.importers.Importers))
exchangers = append(exchangers, mntx.exporters.Exchangers()...)
exchangers = append(exchangers, mntx.importers.Exchangers()...)
for _, exchanger := range exchangers {
if err := exchanger.Update(mntx.meshDataSet); err != nil {
return fmt.Errorf("unable to update mesh data on exchanger '%v': %v", exchanger.GetName(), err)
}
}
}
return nil
}
// GetRequestImporters returns all exporters that can handle HTTP requests.
func (mntx *Mentix) GetRequestImporters() []exchangers.RequestExchanger {
return mntx.importers.GetRequestImporters()
}
// GetRequestExporters returns all exporters that can handle HTTP requests.
func (mntx *Mentix) GetRequestExporters() []exchangers.RequestExchanger {
return mntx.exporters.GetRequestExporters()
}
// RequestHandler handles any incoming HTTP requests by asking each RequestExchanger whether it wants to
// handle the request (usually based on the relative URL path).
func (mntx *Mentix) RequestHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
log := appctx.GetLogger(r.Context())
switch r.Method {
case http.MethodGet:
mntx.handleRequest(mntx.GetRequestExporters(), w, r, log)
case http.MethodPost:
mntx.handleRequest(mntx.GetRequestImporters(), w, r, log)
default:
log.Err(fmt.Errorf("unsupported method")).Msg("error handling incoming request")
}
}
func (mntx *Mentix) handleRequest(exchangers []exchangers.RequestExchanger, w http.ResponseWriter, r *http.Request, log *zerolog.Logger) {
// Ask each RequestExchanger if it wants to handle the request
for _, exchanger := range exchangers {
if exchanger.WantsRequest(r) {
exchanger.HandleRequest(w, r, mntx.conf, log)
}
}
}
// New creates a new Mentix service instance.
func New(conf *config.Configuration, log *zerolog.Logger) (*Mentix, error) {
// Configure the accounts service upfront
if err := accservice.InitAccountsService(conf); err != nil {
return nil, fmt.Errorf("unable to initialize the accounts service: %v", err)
}
mntx := new(Mentix)
if err := mntx.initialize(conf, log); err != nil {
return nil, fmt.Errorf("unable to initialize Mentix: %v", err)
}
return mntx, nil
}