/
manifold.go
132 lines (113 loc) · 3.93 KB
/
manifold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2015 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
// Package spool contains the implementation of a
// worker that extracts the spool directory path from the agent
// config and enables other workers to write and read
// metrics to and from a the spool directory using a writer
// and a reader.
package spool
import (
"time"
"github.com/juju/errors"
corecharm "gopkg.in/juju/charm.v6-unstable"
"gopkg.in/tomb.v1"
"github.com/juju/juju/agent"
"github.com/juju/juju/cmd/jujud/agent/engine"
"github.com/juju/juju/worker"
"github.com/juju/juju/worker/dependency"
)
// MetricRecorder records metrics to a spool directory.
type MetricRecorder interface {
// AddMetric records a metric with the specified key, value and create time
// to a spool directory.
AddMetric(key, value string, created time.Time) error
// Close implements io.Closer.
Close() error
// IsDeclaredMetrics returns true if the metric recorder
// is permitted to store metrics with the specified key.
IsDeclaredMetric(key string) bool
}
// MetricReader reads metrics from a spool directory.
type MetricReader interface {
// Read returns all metric batches stored in the spool directory.
Read() ([]MetricBatch, error)
// Remove removes the metric batch with the specified uuid
// from the spool directory.
Remove(uuid string) error
// Close implements io.Closer.
Close() error
}
// MetricFactory contains the metrics reader and recorder factories.
type MetricFactory interface {
// Recorder returns a new MetricRecorder.
Recorder(metrics map[string]corecharm.Metric, charmURL, unitTag string) (MetricRecorder, error)
// Reader returns a new MetricReader.
Reader() (MetricReader, error)
}
type factory struct {
spoolDir string
}
// Reader implements the MetricFactory interface.
func (f *factory) Reader() (MetricReader, error) {
return NewJSONMetricReader(f.spoolDir)
}
// Recorder implements the MetricFactory interface.
func (f *factory) Recorder(declaredMetrics map[string]corecharm.Metric, charmURL, unitTag string) (MetricRecorder, error) {
return NewJSONMetricRecorder(MetricRecorderConfig{
SpoolDir: f.spoolDir,
Metrics: declaredMetrics,
CharmURL: charmURL,
UnitTag: unitTag,
})
}
var newFactory = func(spoolDir string) MetricFactory {
return &factory{spoolDir: spoolDir}
}
// ManifoldConfig specifies names a spooldirectory manifold should use to
// address its dependencies.
type ManifoldConfig engine.AgentManifoldConfig
// Manifold returns a dependency.Manifold that extracts the metrics
// spool directory path from the agent.
func Manifold(config ManifoldConfig) dependency.Manifold {
manifold := engine.AgentManifold(engine.AgentManifoldConfig(config), newWorker)
manifold.Output = outputFunc
return manifold
}
// newWorker creates a degenerate worker that provides access to the metrics
// spool directory path.
func newWorker(a agent.Agent) (worker.Worker, error) {
metricsSpoolDir := a.CurrentConfig().MetricsSpoolDir()
err := checkSpoolDir(metricsSpoolDir)
if err != nil {
return nil, errors.Annotatef(err, "error checking spool directory %q", metricsSpoolDir)
}
w := &spoolWorker{factory: newFactory(metricsSpoolDir)}
go func() {
defer w.tomb.Done()
<-w.tomb.Dying()
}()
return w, nil
}
// outputFunc extracts the metrics spool directory path from a *metricsSpoolDirWorker.
func outputFunc(in worker.Worker, out interface{}) error {
inWorker, _ := in.(*spoolWorker)
outPointer, _ := out.(*MetricFactory)
if inWorker == nil || outPointer == nil {
return errors.Errorf("expected %T->%T; got %T->%T", inWorker, outPointer, in, out)
}
*outPointer = inWorker.factory
return nil
}
// spoolWorker is a worker that provides a MetricFactory.
type spoolWorker struct {
tomb tomb.Tomb
factory MetricFactory
}
// Kill is part of the worker.Worker interface.
func (w *spoolWorker) Kill() {
w.tomb.Kill(nil)
}
// Wait is part of the worker.Worker interface.
func (w *spoolWorker) Wait() error {
return w.tomb.Wait()
}