/
input_docker_stdout.go
439 lines (391 loc) · 18.2 KB
/
input_docker_stdout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stdout
import (
"fmt"
"regexp"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/util"
"github.com/alibaba/ilogtail/plugins/input"
)
const serviceDockerStdoutKey = "service_docker_stdout_v2"
func logDriverSupported(container types.ContainerJSON) bool {
// containerd has no hostConfig, return true
if container.HostConfig == nil {
return true
}
switch container.HostConfig.LogConfig.Type {
case "json-file":
return true
default:
return false
}
}
type DockerFileSyner struct {
dockerFileReader *helper.LogFileReader
dockerFileProcessor *DockerStdoutProcessor
info *helper.DockerInfoDetail
}
func NewDockerFileSynerByFile(sds *ServiceDockerStdout, filePath string) *DockerFileSyner {
dockerInfoDetail := &helper.DockerInfoDetail{}
dockerInfoDetail.ContainerInfo = types.ContainerJSON{}
dockerInfoDetail.ContainerInfo.LogPath = filePath
sds.LogtailInDocker = false
sds.StartLogMaxOffset = 10 * 1024 * 1024 * 1024
return NewDockerFileSyner(sds, dockerInfoDetail, sds.checkpointMap)
}
func NewDockerFileSyner(sds *ServiceDockerStdout,
info *helper.DockerInfoDetail,
checkpointMap map[string]helper.LogFileReaderCheckPoint) *DockerFileSyner {
var reg *regexp.Regexp
var err error
if len(sds.BeginLineRegex) > 0 {
if reg, err = regexp.Compile(sds.BeginLineRegex); err != nil {
logger.Warning(sds.context.GetRuntimeContext(), "DOCKER_REGEX_COMPILE_ALARM", "compile begin line regex error, regex", sds.BeginLineRegex, "error", err)
}
}
source := util.NewPackIDPrefix(info.ContainerInfo.ID + sds.context.GetConfigName())
tags := info.GetExternalTags(sds.ExternalEnvTag, sds.ExternalK8sLabelTag)
processor := NewDockerStdoutProcessor(reg, time.Duration(sds.BeginLineTimeoutMs)*time.Millisecond, sds.BeginLineCheckLength, sds.MaxLogSize, sds.Stdout, sds.Stderr, sds.context, sds.collector, tags, source)
checkpoint, ok := checkpointMap[info.ContainerInfo.ID]
if !ok {
if sds.LogtailInDocker {
checkpoint.Path = helper.GetMountedFilePath(info.ContainerInfo.LogPath)
} else {
checkpoint.Path = info.ContainerInfo.LogPath
}
// first watch this container
realPath, stat := helper.TryGetRealPath(checkpoint.Path)
if realPath == "" {
logger.Warning(sds.context.GetRuntimeContext(), "DOCKER_STDOUT_STAT_ALARM", "stat log file error, path", checkpoint.Path, "error", "path not found")
} else {
checkpoint.Offset = stat.Size()
if checkpoint.Offset > sds.StartLogMaxOffset {
logger.Warning(sds.context.GetRuntimeContext(), "DOCKER_STDOUT_START_ALARM", "log file too big, path", checkpoint.Path, "offset", checkpoint.Offset)
checkpoint.Offset -= sds.StartLogMaxOffset
} else {
checkpoint.Offset = 0
}
checkpoint.State = helper.GetOSState(stat)
checkpoint.Path = realPath
}
}
if sds.CloseUnChangedSec < 10 {
sds.CloseUnChangedSec = 10
}
logger.Info(sds.context.GetRuntimeContext(), "new stdout reader id", info.IDPrefix(),
"name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status(),
"checkpoint_logpath", checkpoint.Path,
"in_docker", sds.LogtailInDocker)
config := helper.LogFileReaderConfig{
ReadIntervalMs: sds.ReadIntervalMs,
MaxReadBlockSize: sds.MaxLogSize,
CloseFileSec: sds.CloseUnChangedSec,
Tracker: sds.tracker,
}
reader, _ := helper.NewLogFileReader(sds.context.GetRuntimeContext(), checkpoint, config, processor)
return &DockerFileSyner{
dockerFileReader: reader,
info: info,
dockerFileProcessor: processor,
}
}
type ServiceDockerStdout struct {
IncludeLabel map[string]string `comment:"include container label for selector. [Deprecated: use IncludeContainerLabel and IncludeK8sLabel instead]"`
ExcludeLabel map[string]string `comment:"exclude container label for selector. [Deprecated: use ExcludeContainerLabel and ExcludeK8sLabel instead]"`
IncludeEnv map[string]string `comment:"the container would be selected when it is matched by any environment rules. Furthermore, the regular expression starts with '^' is supported as the env value, such as 'ENVA:^DE.*$'' would hit all containers having any envs starts with DE."`
ExcludeEnv map[string]string `comment:"the container would be excluded when it is matched by any environment rules. Furthermore, the regular expression starts with '^' is supported as the env value, such as 'ENVA:^DE.*$'' would hit all containers having any envs starts with DE."`
IncludeContainerLabel map[string]string `comment:"the container would be selected when it is matched by any container labels. Furthermore, the regular expression starts with '^' is supported as the label value, such as 'LABEL:^DE.*$'' would hit all containers having any labels starts with DE."`
ExcludeContainerLabel map[string]string `comment:"the container would be excluded when it is matched by any container labels. Furthermore, the regular expression starts with '^' is supported as the label value, such as 'LABEL:^DE.*$'' would hit all containers having any labels starts with DE."`
IncludeK8sLabel map[string]string `comment:"the container of pod would be selected when it is matched by any include k8s label rules. Furthermore, the regular expression starts with '^' is supported as the value to match pods."`
ExcludeK8sLabel map[string]string `comment:"the container of pod would be excluded when it is matched by any exclude k8s label rules. Furthermore, the regular expression starts with '^' is supported as the value to exclude pods."`
ExternalEnvTag map[string]string `comment:"extract the env value as the log tags for one container, such as the value of ENVA would be appended to the 'taga' of log tags when configured 'ENVA:taga' pair."`
ExternalK8sLabelTag map[string]string `comment:"extract the pod label value as the log tags for one container, such as the value of LABELA would be appended to the 'taga' of log tags when configured 'LABELA:taga' pair."`
FlushIntervalMs int `comment:"the interval of container discovery, and the timeunit is millisecond. Default value is 3000."`
ReadIntervalMs int `comment:"the interval of read stdout log, and the timeunit is millisecond. Default value is 1000."`
SaveCheckPointSec int `comment:"the interval of save checkpoint, and the timeunit is second. Default value is 60."`
BeginLineRegex string `comment:"the regular expression of begin line for the multi line log."`
BeginLineTimeoutMs int `comment:"the maximum timeout milliseconds for begin line match. Default value is 3000."`
BeginLineCheckLength int `comment:"the prefix length of log line to match the first line. Default value is 10240."`
MaxLogSize int `comment:"the maximum log size. Default value is 512*1024, a.k.a 512K."`
CloseUnChangedSec int `comment:"the reading file would be close when the interval between last read operation is over {CloseUnChangedSec} seconds. Default value is 60."`
StartLogMaxOffset int64 `comment:"the first read operation would read {StartLogMaxOffset} size history logs. Default value is 128*1024, a.k.a 128K."`
Stdout bool `comment:"collect stdout log. Default is true."`
Stderr bool `comment:"collect stderr log. Default is true."`
LogtailInDocker bool `comment:"the logtail running mode. Default is true."`
K8sNamespaceRegex string `comment:"the regular expression of kubernetes namespace to match containers."`
K8sPodRegex string `comment:"the regular expression of kubernetes pod to match containers."`
K8sContainerRegex string `comment:"the regular expression of kubernetes container to match containers."`
// export from ilogtail-trace component
IncludeLabelRegex map[string]*regexp.Regexp
ExcludeLabelRegex map[string]*regexp.Regexp
IncludeEnvRegex map[string]*regexp.Regexp
ExcludeEnvRegex map[string]*regexp.Regexp
K8sFilter *helper.K8SFilter
// for tracker
tracker *helper.ReaderMetricTracker
metricRecord *pipeline.MetricsRecord
avgInstanceMetric pipeline.CounterMetric
addMetric pipeline.CounterMetric
deleteMetric pipeline.CounterMetric
synerMap map[string]*DockerFileSyner
checkpointMap map[string]helper.LogFileReaderCheckPoint
shutdown chan struct {
}
waitGroup sync.WaitGroup
context pipeline.Context
collector pipeline.Collector
// Last return of GetAllAcceptedInfoV2
fullList map[string]bool
matchList map[string]*helper.DockerInfoDetail
lastUpdateTime int64
CollectContainersFlag bool
}
func (sds *ServiceDockerStdout) Init(context pipeline.Context) (int, error) {
sds.context = context
helper.ContainerCenterInit()
sds.fullList = make(map[string]bool)
sds.matchList = make(map[string]*helper.DockerInfoDetail)
sds.synerMap = make(map[string]*DockerFileSyner)
if sds.MaxLogSize < 1024 {
sds.MaxLogSize = 1024
}
if sds.MaxLogSize > 1024*1024*20 {
sds.MaxLogSize = 1024 * 1024 * 20
}
sds.tracker = helper.NewReaderMetricTracker()
sds.metricRecord = sds.context.GetMetricRecord()
sds.metricRecord.RegisterCounterMetric(sds.tracker.CloseCounter)
sds.metricRecord.RegisterCounterMetric(sds.tracker.OpenCounter)
sds.metricRecord.RegisterCounterMetric(sds.tracker.ReadSizeCounter)
sds.metricRecord.RegisterCounterMetric(sds.tracker.ReadCounter)
sds.metricRecord.RegisterCounterMetric(sds.tracker.FileSizeCounter)
sds.metricRecord.RegisterCounterMetric(sds.tracker.FileRotatorCounter)
sds.metricRecord.RegisterLatencyMetric(sds.tracker.ProcessLatency)
sds.avgInstanceMetric = helper.NewAverageMetric("container_count")
sds.addMetric = helper.NewCounterMetric("add_container")
sds.deleteMetric = helper.NewCounterMetric("remove_container")
sds.metricRecord.RegisterCounterMetric(sds.avgInstanceMetric)
sds.metricRecord.RegisterCounterMetric(sds.addMetric)
sds.metricRecord.RegisterCounterMetric(sds.deleteMetric)
var err error
sds.IncludeEnv, sds.IncludeEnvRegex, err = helper.SplitRegexFromMap(sds.IncludeEnv)
if err != nil {
logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init include env regex error", err)
}
sds.ExcludeEnv, sds.ExcludeEnvRegex, err = helper.SplitRegexFromMap(sds.ExcludeEnv)
if err != nil {
logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init exclude env regex error", err)
}
if sds.IncludeLabel != nil {
for k, v := range sds.IncludeContainerLabel {
sds.IncludeLabel[k] = v
}
} else {
sds.IncludeLabel = sds.IncludeContainerLabel
}
if sds.ExcludeLabel != nil {
for k, v := range sds.ExcludeContainerLabel {
sds.ExcludeLabel[k] = v
}
} else {
sds.ExcludeLabel = sds.ExcludeContainerLabel
}
sds.IncludeLabel, sds.IncludeLabelRegex, err = helper.SplitRegexFromMap(sds.IncludeLabel)
if err != nil {
logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init include label regex error", err)
}
sds.ExcludeLabel, sds.ExcludeLabelRegex, err = helper.SplitRegexFromMap(sds.ExcludeLabel)
if err != nil {
logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init exclude label regex error", err)
}
sds.K8sFilter, err = helper.CreateK8SFilter(sds.K8sNamespaceRegex, sds.K8sPodRegex, sds.K8sContainerRegex, sds.IncludeK8sLabel, sds.ExcludeK8sLabel)
return 0, err
}
func (sds *ServiceDockerStdout) Description() string {
return "the container stdout input plugin for iLogtail, which supports docker and containerd."
}
func (sds *ServiceDockerStdout) Collect(pipeline.Collector) error {
return nil
}
func (sds *ServiceDockerStdout) FlushAll(c pipeline.Collector, firstStart bool) error {
newUpdateTime := helper.GetContainersLastUpdateTime()
if sds.lastUpdateTime != 0 {
if sds.lastUpdateTime >= newUpdateTime {
return nil
}
}
var err error
newCount, delCount, addResultList, deleteResultList := helper.GetContainerByAcceptedInfoV2(
sds.fullList, sds.matchList,
sds.IncludeLabel, sds.ExcludeLabel,
sds.IncludeLabelRegex, sds.ExcludeLabelRegex,
sds.IncludeEnv, sds.ExcludeEnv,
sds.IncludeEnvRegex, sds.ExcludeEnvRegex,
sds.K8sFilter)
sds.lastUpdateTime = newUpdateTime
if sds.CollectContainersFlag {
// record config result
{
keys := make([]string, 0, len(sds.matchList))
for k := range sds.matchList {
if len(k) > 0 {
keys = append(keys, helper.GetShortID(k))
}
}
configResult := &helper.ContainerConfigResult{
DataType: "container_config_result",
Project: sds.context.GetProject(),
Logstore: sds.context.GetLogstore(),
ConfigName: sds.context.GetConfigName(),
PathExistInputContainerIDs: helper.GetStringFromList(keys),
SourceAddress: "stdout",
InputType: input.ServiceDockerStdoutPluginName,
FlusherType: "flusher_sls",
FlusherTargetAddress: fmt.Sprintf("%s/%s", sds.context.GetProject(), sds.context.GetLogstore()),
}
helper.RecordContainerConfigResultMap(configResult)
if newCount != 0 || delCount != 0 || firstStart {
helper.RecordContainerConfigResultIncrement(configResult)
}
logger.Debugf(sds.context.GetRuntimeContext(), "update match list, addResultList: %v, deleteResultList: %v", addResultList, deleteResultList)
}
}
if !firstStart && newCount == 0 && delCount == 0 {
logger.Debugf(sds.context.GetRuntimeContext(), "update match list, firstStart: %v, new: %v, delete: %v",
firstStart, newCount, delCount)
return nil
}
logger.Infof(sds.context.GetRuntimeContext(), "update match list, firstStart: %v, new: %v, delete: %v",
firstStart, newCount, delCount)
dockerInfos := sds.matchList
logger.Debug(sds.context.GetRuntimeContext(), "match list length", len(dockerInfos))
sds.avgInstanceMetric.Add(int64(len(dockerInfos)))
for id, info := range dockerInfos {
if !logDriverSupported(info.ContainerInfo) {
continue
}
if _, ok := sds.synerMap[id]; !ok || firstStart {
syner := NewDockerFileSyner(sds, info, sds.checkpointMap)
logger.Info(sds.context.GetRuntimeContext(), "docker stdout", "added", "source host path", info.ContainerInfo.LogPath,
"id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status())
sds.addMetric.Add(1)
sds.synerMap[id] = syner
syner.dockerFileReader.Start()
}
}
// delete container
for id, syner := range sds.synerMap {
if _, ok := dockerInfos[id]; !ok {
logger.Info(sds.context.GetRuntimeContext(), "docker stdout", "deleted", "id", helper.GetShortID(id), "name", syner.info.ContainerInfo.Name)
syner.dockerFileReader.Stop()
delete(sds.synerMap, id)
sds.deleteMetric.Add(1)
}
}
return err
}
func (sds *ServiceDockerStdout) SaveCheckPoint(force bool) error {
checkpointChanged := false
for id, syner := range sds.synerMap {
checkpoint, changed := syner.dockerFileReader.GetCheckpoint()
if changed {
checkpointChanged = true
}
sds.checkpointMap[id] = checkpoint
}
if !force && !checkpointChanged {
logger.Debug(sds.context.GetRuntimeContext(), "no need to save checkpoint, checkpoint size", len(sds.checkpointMap))
return nil
}
logger.Debug(sds.context.GetRuntimeContext(), "save checkpoint, checkpoint size", len(sds.checkpointMap))
return sds.context.SaveCheckPointObject(serviceDockerStdoutKey, sds.checkpointMap)
}
func (sds *ServiceDockerStdout) LoadCheckPoint() {
if sds.checkpointMap != nil {
return
}
sds.checkpointMap = make(map[string]helper.LogFileReaderCheckPoint)
sds.context.GetCheckPointObject(serviceDockerStdoutKey, &sds.checkpointMap)
}
func (sds *ServiceDockerStdout) ClearUselessCheckpoint() {
if sds.checkpointMap == nil {
return
}
for id := range sds.checkpointMap {
if _, ok := sds.synerMap[id]; !ok {
logger.Info(sds.context.GetRuntimeContext(), "delete checkpoint, id", id)
delete(sds.checkpointMap, id)
}
}
}
// Start starts the ServiceInput's service, whatever that may be
func (sds *ServiceDockerStdout) Start(c pipeline.Collector) error {
sds.collector = c
sds.shutdown = make(chan struct{})
sds.waitGroup.Add(1)
defer sds.waitGroup.Done()
sds.LoadCheckPoint()
lastSaveCheckPointTime := time.Now()
_ = sds.FlushAll(c, true)
for {
timer := time.NewTimer(time.Duration(sds.FlushIntervalMs) * time.Millisecond)
select {
case <-sds.shutdown:
logger.Info(sds.context.GetRuntimeContext(), "docker stdout main runtime stop", "begin")
for _, syner := range sds.synerMap {
syner.dockerFileReader.Stop()
}
logger.Info(sds.context.GetRuntimeContext(), "docker stdout main runtime stop", "success")
return nil
case <-timer.C:
if nowTime := time.Now(); nowTime.Sub(lastSaveCheckPointTime) > time.Second*time.Duration(sds.SaveCheckPointSec) {
_ = sds.SaveCheckPoint(false)
lastSaveCheckPointTime = nowTime
sds.ClearUselessCheckpoint()
}
_ = sds.FlushAll(c, false)
}
}
}
// Stop stops the services and closes any necessary channels and connections
func (sds *ServiceDockerStdout) Stop() error {
close(sds.shutdown)
sds.waitGroup.Wait()
// force save checkpoint
_ = sds.SaveCheckPoint(true)
return nil
}
func init() {
pipeline.ServiceInputs[input.ServiceDockerStdoutPluginName] = func() pipeline.ServiceInput {
return &ServiceDockerStdout{
FlushIntervalMs: 3000,
SaveCheckPointSec: 60,
ReadIntervalMs: 1000,
Stdout: true,
Stderr: true,
BeginLineTimeoutMs: 3000,
LogtailInDocker: true,
CloseUnChangedSec: 60,
BeginLineCheckLength: 10 * 1024,
MaxLogSize: 512 * 1024,
StartLogMaxOffset: 128 * 1024,
}
}
}