forked from juju/juju
/
manifold.go
143 lines (128 loc) · 4.04 KB
/
manifold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright 2018 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package modelcache
import (
"github.com/juju/errors"
"github.com/juju/pubsub/v2"
"github.com/juju/worker/v3"
"github.com/juju/worker/v3/dependency"
"github.com/prometheus/client_golang/prometheus"
"github.com/DavinZhang/juju/core/cache"
"github.com/DavinZhang/juju/core/multiwatcher"
"github.com/DavinZhang/juju/worker/gate"
workerstate "github.com/DavinZhang/juju/worker/state"
)
// Logger describes the logging methods used in this package by the worker.
type Logger interface {
IsTraceEnabled() bool
Tracef(string, ...interface{})
Errorf(string, ...interface{})
Criticalf(string, ...interface{})
}
// ManifoldConfig holds the information necessary to run a model cache worker in
// a dependency.Engine.
type ManifoldConfig struct {
StateName string
CentralHubName string
MultiwatcherName string
InitializedGateName string
Logger Logger
PrometheusRegisterer prometheus.Registerer
NewWorker func(Config) (worker.Worker, error)
}
// Validate validates the manifold configuration.
func (config ManifoldConfig) Validate() error {
if config.StateName == "" {
return errors.NotValidf("missing StateName")
}
if config.CentralHubName == "" {
return errors.NotValidf("missing CentralHubName")
}
if config.MultiwatcherName == "" {
return errors.NotValidf("missing MultiwatcherName")
}
if config.InitializedGateName == "" {
return errors.NotValidf("missing InitializedGateName")
}
if config.Logger == nil {
return errors.NotValidf("missing Logger")
}
if config.PrometheusRegisterer == nil {
return errors.NotValidf("missing PrometheusRegisterer")
}
if config.NewWorker == nil {
return errors.NotValidf("missing NewWorker func")
}
return nil
}
// Manifold returns a dependency.Manifold that will run a model cache
// worker. The manifold outputs a *cache.Controller, primarily for
// the apiserver to depend on and use.
func Manifold(config ManifoldConfig) dependency.Manifold {
return dependency.Manifold{
Inputs: []string{
config.StateName,
config.CentralHubName,
config.MultiwatcherName,
config.InitializedGateName,
},
Start: config.start,
Output: ExtractCacheController,
}
}
// start is a method on ManifoldConfig because it's more readable than a closure.
func (config ManifoldConfig) start(context dependency.Context) (worker.Worker, error) {
if err := config.Validate(); err != nil {
return nil, errors.Trace(err)
}
// Get the hub.
var hub *pubsub.StructuredHub
if err := context.Get(config.CentralHubName, &hub); err != nil {
config.Logger.Tracef("hub dependency not available")
return nil, err
}
var unlocker gate.Unlocker
if err := context.Get(config.InitializedGateName, &unlocker); err != nil {
return nil, errors.Trace(err)
}
var factory multiwatcher.Factory
if err := context.Get(config.MultiwatcherName, &factory); err != nil {
return nil, errors.Trace(err)
}
var stTracker workerstate.StateTracker
if err := context.Get(config.StateName, &stTracker); err != nil {
return nil, errors.Trace(err)
}
pool, err := stTracker.Use()
if err != nil {
return nil, errors.Trace(err)
}
w, err := config.NewWorker(Config{
StatePool: pool,
Hub: hub,
InitializedGate: unlocker,
Logger: config.Logger,
WatcherFactory: factory.WatchController,
PrometheusRegisterer: config.PrometheusRegisterer,
Cleanup: func() { _ = stTracker.Done() },
}.WithDefaultRestartStrategy())
if err != nil {
_ = stTracker.Done()
return nil, errors.Trace(err)
}
return w, nil
}
// ExtractCacheController extracts a *cache.Controller from a *cacheWorker.
func ExtractCacheController(in worker.Worker, out interface{}) error {
inWorker, _ := in.(*cacheWorker)
if inWorker == nil {
return errors.Errorf("in should be a %T; got %T", inWorker, in)
}
switch outPointer := out.(type) {
case **cache.Controller:
*outPointer = inWorker.controller
default:
return errors.Errorf("out should be *cache.Controller; got %T", out)
}
return nil
}