-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
initcontainer.go
146 lines (128 loc) · 3.99 KB
/
initcontainer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
//go:build !windows
//nolint:revive // TODO(SERV) Fix revive linter
package initcontainer
import (
"context"
"io"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/DataDog/datadog-agent/cmd/serverless-init/metric"
"go.uber.org/atomic"
"github.com/DataDog/datadog-agent/cmd/serverless-init/cloudservice"
serverlessLog "github.com/DataDog/datadog-agent/cmd/serverless-init/log"
logsAgent "github.com/DataDog/datadog-agent/comp/logs/agent"
"github.com/DataDog/datadog-agent/pkg/serverless"
"github.com/DataDog/datadog-agent/pkg/serverless/metrics"
"github.com/DataDog/datadog-agent/pkg/serverless/trace"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/spf13/afero"
)
// Run is the entrypoint of the init process. It will spawn the customer process
func Run(
cloudService cloudservice.CloudService,
logConfig *serverlessLog.Config,
metricAgent *metrics.ServerlessMetricAgent,
traceAgent *trace.ServerlessTraceAgent,
logsAgent logsAgent.ServerlessLogsAgent,
args []string,
) {
log.Debugf("Launching subprocess %v\n", args)
err := execute(logConfig, args)
if err != nil {
log.Debugf("Error exiting: %v\n", err)
}
metric.AddShutdownMetric(cloudService.GetPrefix(), metricAgent.GetExtraTags(), time.Now(), metricAgent.Demux)
flush(logConfig.FlushTimeout, metricAgent, traceAgent, logsAgent)
}
func execute(logConfig *serverlessLog.Config, args []string) error {
commandName, commandArgs := buildCommandParam(args)
// Add our tracer settings
fs := afero.NewOsFs()
AutoInstrumentTracer(fs)
cmd := exec.Command(commandName, commandArgs...)
cmd.Stdout = io.Writer(os.Stdout)
cmd.Stderr = io.Writer(os.Stderr)
if logConfig.IsEnabled {
cmd.Stdout = io.MultiWriter(os.Stdout, serverlessLog.NewChannelWriter(logConfig.Channel, false))
cmd.Stderr = io.MultiWriter(os.Stderr, serverlessLog.NewChannelWriter(logConfig.Channel, true))
}
err := cmd.Start()
if err != nil {
return err
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs)
go forwardSignals(cmd.Process, sigs)
err = cmd.Wait()
return err
}
func buildCommandParam(cmdArg []string) (string, []string) {
fields := cmdArg
if len(cmdArg) == 1 {
fields = strings.Fields(cmdArg[0])
}
commandName := fields[0]
if len(fields) > 1 {
return commandName, fields[1:]
}
return commandName, []string{}
}
func forwardSignals(process *os.Process, sigs chan os.Signal) {
for sig := range sigs {
if sig != syscall.SIGCHLD {
if process != nil {
_ = syscall.Kill(process.Pid, sig.(syscall.Signal))
}
}
}
}
func flush(flushTimeout time.Duration, metricAgent serverless.FlushableAgent, traceAgent serverless.FlushableAgent, logsAgent logsAgent.ServerlessLogsAgent) bool {
hasTimeout := atomic.NewInt32(0)
wg := &sync.WaitGroup{}
wg.Add(3)
go flushAndWait(flushTimeout, wg, metricAgent, hasTimeout)
go flushAndWait(flushTimeout, wg, traceAgent, hasTimeout)
childCtx, cancel := context.WithTimeout(context.Background(), flushTimeout)
defer cancel()
go func(wg *sync.WaitGroup, ctx context.Context) {
if logsAgent != nil {
logsAgent.Flush(ctx)
}
wg.Done()
}(wg, childCtx)
wg.Wait()
return hasTimeout.Load() > 0
}
func flushWithContext(ctx context.Context, timeoutchan chan struct{}, flushFunction func()) {
flushFunction()
select {
case timeoutchan <- struct{}{}:
log.Debug("finished flushing")
case <-ctx.Done():
log.Error("timed out while flushing")
return
}
}
func flushAndWait(flushTimeout time.Duration, wg *sync.WaitGroup, agent serverless.FlushableAgent, hasTimeout *atomic.Int32) {
childCtx, cancel := context.WithTimeout(context.Background(), flushTimeout)
defer cancel()
ch := make(chan struct{}, 1)
go flushWithContext(childCtx, ch, agent.Flush)
select {
case <-childCtx.Done():
hasTimeout.Inc()
break
case <-ch:
break
}
wg.Done()
}