/
grpc.go
167 lines (140 loc) · 6.19 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package apiimpl
import (
"context"
"fmt"
"time"
"github.com/DataDog/datadog-agent/comp/remote-config/rcservice"
"github.com/DataDog/datadog-agent/comp/remote-config/rcservicemrf"
"github.com/DataDog/datadog-agent/pkg/util/optional"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
workloadmetaServer "github.com/DataDog/datadog-agent/comp/core/workloadmeta/server"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/replay"
taggerserver "github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/server"
"github.com/DataDog/datadog-agent/comp/dogstatsd/pidmap"
dsdReplay "github.com/DataDog/datadog-agent/comp/dogstatsd/replay"
dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
"github.com/DataDog/datadog-agent/pkg/util/grpc"
"github.com/DataDog/datadog-agent/pkg/util/hostname"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
type server struct {
pb.UnimplementedAgentServer
}
type serverSecure struct {
pb.UnimplementedAgentSecureServer
taggerServer *taggerserver.Server
workloadmetaServer *workloadmetaServer.Server
configService optional.Option[rcservice.Component]
configServiceMRF optional.Option[rcservicemrf.Component]
dogstatsdServer dogstatsdServer.Component
capture dsdReplay.Component
pidMap pidmap.Component
}
func (s *server) GetHostname(ctx context.Context, _ *pb.HostnameRequest) (*pb.HostnameReply, error) {
h, err := hostname.Get(ctx)
if err != nil {
return &pb.HostnameReply{}, err
}
return &pb.HostnameReply{Hostname: h}, nil
}
// AuthFuncOverride implements the `grpc_auth.ServiceAuthFuncOverride` interface which allows
// override of the AuthFunc registered with the unary interceptor.
//
// see: https://godoc.org/github.com/grpc-ecosystem/go-grpc-middleware/auth#ServiceAuthFuncOverride
func (s *server) AuthFuncOverride(ctx context.Context, _ string) (context.Context, error) {
return ctx, nil
}
func (s *serverSecure) TaggerStreamEntities(req *pb.StreamTagsRequest, srv pb.AgentSecure_TaggerStreamEntitiesServer) error {
return s.taggerServer.TaggerStreamEntities(req, srv)
}
func (s *serverSecure) TaggerFetchEntity(ctx context.Context, req *pb.FetchEntityRequest) (*pb.FetchEntityResponse, error) {
return s.taggerServer.TaggerFetchEntity(ctx, req)
}
// DogstatsdCaptureTrigger triggers a dogstatsd traffic capture for the
// duration specified in the request. If a capture is already in progress,
// an error response is sent back.
func (s *serverSecure) DogstatsdCaptureTrigger(_ context.Context, req *pb.CaptureTriggerRequest) (*pb.CaptureTriggerResponse, error) {
d, err := time.ParseDuration(req.GetDuration())
if err != nil {
return &pb.CaptureTriggerResponse{}, err
}
p, err := s.capture.StartCapture(req.GetPath(), d, req.GetCompressed())
if err != nil {
return &pb.CaptureTriggerResponse{}, err
}
return &pb.CaptureTriggerResponse{Path: p}, nil
}
// DogstatsdSetTaggerState allows setting a captured tagger state in the
// Tagger facilities. This endpoint is used when traffic replays are in
// progress. An empty state or nil request will result in the Tagger
// capture state being reset to nil.
func (s *serverSecure) DogstatsdSetTaggerState(_ context.Context, req *pb.TaggerState) (*pb.TaggerStateResponse, error) {
// Reset and return if no state pushed
if req == nil || req.State == nil {
log.Debugf("API: empty request or state")
tagger.ResetCaptureTagger()
s.pidMap.SetPidMap(nil)
return &pb.TaggerStateResponse{Loaded: false}, nil
}
// FiXME: we should perhaps lock the capture processing while doing this...
t := replay.NewTagger()
if t == nil {
return &pb.TaggerStateResponse{Loaded: false}, fmt.Errorf("unable to instantiate state")
}
t.LoadState(req.State)
log.Debugf("API: setting capture state tagger")
tagger.SetNewCaptureTagger(t)
s.pidMap.SetPidMap(req.PidMap)
log.Debugf("API: loaded state successfully")
return &pb.TaggerStateResponse{Loaded: true}, nil
}
var rcNotInitializedErr = status.Error(codes.Unimplemented, "remote configuration service not initialized")
var mrfRcNotInitializedErr = status.Error(codes.Unimplemented, "MRF remote configuration service not initialized")
func (s *serverSecure) ClientGetConfigs(ctx context.Context, in *pb.ClientGetConfigsRequest) (*pb.ClientGetConfigsResponse, error) {
rcService, isSet := s.configService.Get()
if !isSet || rcService == nil {
log.Debug(rcNotInitializedErr.Error())
return nil, rcNotInitializedErr
}
return rcService.ClientGetConfigs(ctx, in)
}
func (s *serverSecure) GetConfigState(_ context.Context, _ *emptypb.Empty) (*pb.GetStateConfigResponse, error) {
rcService, isSet := s.configService.Get()
if !isSet || rcService == nil {
log.Debug(rcNotInitializedErr.Error())
return nil, rcNotInitializedErr
}
return rcService.ConfigGetState()
}
func (s *serverSecure) ClientGetConfigsHA(ctx context.Context, in *pb.ClientGetConfigsRequest) (*pb.ClientGetConfigsResponse, error) {
rcServiceMRF, isSet := s.configServiceMRF.Get()
if !isSet || rcServiceMRF == nil {
log.Debug(mrfRcNotInitializedErr.Error())
return nil, mrfRcNotInitializedErr
}
return rcServiceMRF.ClientGetConfigs(ctx, in)
}
func (s *serverSecure) GetConfigStateHA(_ context.Context, _ *emptypb.Empty) (*pb.GetStateConfigResponse, error) {
rcServiceMRF, isSet := s.configServiceMRF.Get()
if !isSet || rcServiceMRF == nil {
log.Debug(mrfRcNotInitializedErr.Error())
return nil, mrfRcNotInitializedErr
}
return rcServiceMRF.ConfigGetState()
}
// WorkloadmetaStreamEntities streams entities from the workloadmeta store applying the given filter
func (s *serverSecure) WorkloadmetaStreamEntities(in *pb.WorkloadmetaStreamRequest, out pb.AgentSecure_WorkloadmetaStreamEntitiesServer) error {
return s.workloadmetaServer.StreamEntities(in, out)
}
func init() {
grpclog.SetLoggerV2(grpc.NewLogger())
}