forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
docker.go
137 lines (115 loc) · 3.28 KB
/
docker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package docker
import (
"sync"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"time"
"github.com/fsouza/go-dockerclient"
)
var HostParser = parse.URLHostParserBuilder{DefaultScheme: "tcp"}.Build()
func init() {
// Register the ModuleFactory function for the "docker" module.
if err := mb.Registry.AddModule("docker", NewModule); err != nil {
panic(err)
}
}
func NewModule(base mb.BaseModule) (mb.Module, error) {
// Validate that at least one host has been specified.
config := struct {
Hosts []string `config:"hosts" validate:"nonzero,required"`
}{}
if err := base.UnpackConfig(&config); err != nil {
return nil, err
}
return &base, nil
}
type Stat struct {
Container docker.APIContainers
Stats docker.Stats
}
func NewDockerClient(endpoint string, config Config) (*docker.Client, error) {
var err error
var client *docker.Client
if !config.TLS.IsEnabled() {
client, err = docker.NewClient(endpoint)
} else {
client, err = docker.NewTLSClient(
endpoint,
config.TLS.Certificate,
config.TLS.Key,
config.TLS.CA,
)
}
if err != nil {
return nil, err
}
return client, nil
}
// FetchStats returns a list of running containers with all related stats inside
func FetchStats(client *docker.Client, timeout time.Duration) ([]Stat, error) {
containers, err := client.ListContainers(docker.ListContainersOptions{})
if err != nil {
return nil, err
}
var wg sync.WaitGroup
containersList := make([]Stat, 0, len(containers))
statsQueue := make(chan Stat, 1)
wg.Add(len(containers))
for _, container := range containers {
go func(container docker.APIContainers) {
defer wg.Done()
statsQueue <- exportContainerStats(client, &container, timeout)
}(container)
}
go func() {
wg.Wait()
close(statsQueue)
}()
// This will break after the queue has been drained and queue is closed.
for stat := range statsQueue {
// If names is empty, there is not data inside
if len(stat.Container.Names) != 0 {
containersList = append(containersList, stat)
}
}
return containersList, err
}
// exportContainerStats loads stats for the given container
//
// This is currently very inefficient as docker calculates the average for each request,
// means each request will take at least 2s: https://github.com/docker/docker/blob/master/cli/command/container/stats_helpers.go#L148
// Getting all stats at once is implemented here: https://github.com/docker/docker/pull/25361
func exportContainerStats(client *docker.Client, container *docker.APIContainers, timeout time.Duration) Stat {
var wg sync.WaitGroup
var event Stat
statsC := make(chan *docker.Stats)
errC := make(chan error, 1)
statsOptions := docker.StatsOptions{
ID: container.ID,
Stats: statsC,
Stream: false,
Timeout: timeout,
}
wg.Add(2)
go func() {
defer wg.Done()
errC <- client.Stats(statsOptions)
close(errC)
}()
go func() {
defer wg.Done()
stats := <-statsC
err := <-errC
if stats != nil && err == nil {
event.Stats = *stats
event.Container = *container
} else if err == nil && stats == nil {
logp.Warn("Container stopped when recovering stats: %v", container.ID)
} else {
logp.Err("An error occurred while getting docker stats: %v", err)
}
}()
wg.Wait()
return event
}