-
Notifications
You must be signed in to change notification settings - Fork 4
/
instance.go
105 lines (93 loc) · 2.87 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Copyright © 2019 Circonus, Inc. <support@circonus.com>
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
package gcpservice
import (
"context"
"fmt"
"sync"
"time"
"github.com/circonus-labs/circonus-cloud-agent/internal/circonus"
"github.com/circonus-labs/circonus-cloud-agent/internal/services/gcpservice/collectors"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
// Instance defines a specific gcp service instance for collecting metrics.
type Instance struct {
logger zerolog.Logger
ctx context.Context
cfg *Config
check *circonus.Check
lastStart *time.Time
collectors []collectors.Collector
baseTags circonus.Tags
sync.Mutex
running bool
}
// Start runs the instance based on the configured interval.
func (inst *Instance) Start() error {
interval := time.Duration(inst.cfg.GCP.Interval) * time.Minute
inst.logger.Info().Str("collection_interval", interval.String()).Time("next_collection", time.Now().Add(interval)).Msg("client started")
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-inst.ctx.Done():
return nil
case <-ticker.C:
inst.Lock()
if inst.lastStart != nil && time.Since(*inst.lastStart) < interval {
inst.Unlock()
continue
}
if inst.running {
inst.Unlock()
inst.logger.Warn().Msg("collection in progress, not starting another")
continue
}
// calculate one timeseries range for all requests from collectors
start := time.Now()
var delta time.Duration
if inst.lastStart == nil {
delta = interval * 2
} else {
delta = start.Sub(*inst.lastStart) + 2*time.Minute
}
tsEnd := start
tsStart := tsEnd.Add(-delta)
inst.logger.Info().Time("start", tsStart).Time("end", tsEnd).Str("delta", delta.String()).Msg("collection timeseries range")
inst.lastStart = &start
inst.running = true
inst.Unlock()
go func() {
for _, c := range inst.collectors {
if err := c.Collect(tsStart, tsEnd, inst.cfg.GCP.projectID, inst.cfg.GCP.credentialData, inst.baseTags); err != nil {
inst.check.ReportError(errors.WithMessage(err, fmt.Sprintf("id: %s, collector: %s", inst.cfg.ID, c.ID())))
inst.logger.Warn().Err(err).Str("collector", c.ID()).Msg("collecting telemetry")
// need to determine which errors from the various
// cloud service providers are fatal vs retry vs
// wait for next iteration
}
if inst.done() {
break
}
}
inst.Lock()
inst.running = false
inst.Unlock()
inst.logger.Debug().Str("duration", time.Since(start).String()).Msg("collection complete")
}()
}
}
}
// done is a utility routine to check the context, returns true if done.
func (inst *Instance) done() bool {
select {
case <-inst.ctx.Done():
inst.logger.Debug().Msg("context done, exiting")
return true
default:
return false
}
}