-
Notifications
You must be signed in to change notification settings - Fork 10
/
consumer.go
72 lines (59 loc) · 1.44 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package hammer
import (
"errors"
"log"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
)
type consumer struct {
name string
f consumerFunc
}
type consumerFunc func(*Hammer, interface{})
// Register a consumer to a queue operation.
func registerConsumer(name string, f consumerFunc) {
consumers = append(consumers, consumer{name, f})
consumerMap[name] = f
}
func (s *Hammer) wait(f consumerFunc, p interface{}) {
// Limit go routines
s.wg.Add(1)
atomic.AddUint64(&s.activeWorkers, 1)
defer func() { <-s.sem; s.wg.Done(); atomic.AddUint64(&s.activeWorkers, ^uint64(0)) }()
f(s, p)
}
func (s *Hammer) runConsumers() error {
w, err := s.inQueue.GetWork()
if err != nil {
return err
}
start := time.Now()
fn := consumerMap[w.Operation]
if fn == nil {
log.Printf("unknown operation %s %+v\n", w.Operation, w.Parameter)
return errors.New("Unknown operation")
}
s.sem <- true
go s.wait(fn, w.Parameter)
duration := float64(time.Since(start).Nanoseconds()) / 1000000.0
consumerMetrics.With(
prometheus.Labels{"operation": w.Operation},
).Observe(duration)
return nil
}
var (
consumerMetrics = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "evedata",
Subsystem: "hammer",
Name: "ticks",
Help: "API call statistics.",
Buckets: prometheus.ExponentialBuckets(10, 1.45, 20),
}, []string{"operation"},
)
)
func init() {
prometheus.MustRegister(
consumerMetrics,
)
}