/
server.go
155 lines (132 loc) · 4.54 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
/*
Package api implements the agent IPC api. Using HTTP
calls, it's possible to communicate with the agent,
sending commands and receiving infos.
*/
package api
import (
"context"
"crypto/tls"
"fmt"
stdLog "log"
"net"
"net/http"
"strings"
"time"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/DataDog/datadog-agent/cmd/agent/api/agent"
"github.com/DataDog/datadog-agent/cmd/agent/api/check"
"github.com/DataDog/datadog-agent/pkg/api/util"
"github.com/DataDog/datadog-agent/pkg/config"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo"
gorilla "github.com/gorilla/mux"
)
var (
listener net.Listener
)
// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Copied from cockroachdb.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// This is a partial recreation of gRPC's internal checks https://github.com/grpc/grpc-go/pull/514/files#diff-95e9a25b738459a2d3030e1e6fa2a718R61
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
grpcServer.ServeHTTP(w, r)
} else {
deadline := time.Now().Add(config.Datadog.GetDuration("server_timeout") * time.Second)
conn := agent.GetConnection(r)
_ = conn.SetWriteDeadline(deadline)
otherHandler.ServeHTTP(w, r)
}
})
}
// StartServer creates the router and starts the HTTP server
func StartServer() error {
initializeTLS()
// get the transport we're going to use under HTTP
var err error
listener, err = getListener()
if err != nil {
// we use the listener to handle commands for the Agent, there's
// no way we can recover from this error
return fmt.Errorf("Unable to create the api server: %v", err)
}
err = util.CreateAndSetAuthToken()
if err != nil {
return err
}
// gRPC server
mux := http.NewServeMux()
opts := []grpc.ServerOption{
grpc.Creds(credentials.NewClientTLSFromCert(tlsCertPool, tlsAddr)),
grpc.StreamInterceptor(grpc_auth.StreamServerInterceptor(grpcAuth)),
grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(grpcAuth)),
}
s := grpc.NewServer(opts...)
pb.RegisterAgentServer(s, &server{})
pb.RegisterAgentSecureServer(s, &serverSecure{})
dcreds := credentials.NewTLS(&tls.Config{
ServerName: tlsAddr,
RootCAs: tlsCertPool,
})
dopts := []grpc.DialOption{grpc.WithTransportCredentials(dcreds)}
// starting grpc gateway
ctx := context.Background()
gwmux := runtime.NewServeMux()
err = pb.RegisterAgentHandlerFromEndpoint(
ctx, gwmux, tlsAddr, dopts)
if err != nil {
panic(err)
}
err = pb.RegisterAgentSecureHandlerFromEndpoint(
ctx, gwmux, tlsAddr, dopts)
if err != nil {
panic(err)
}
// Setup multiplexer
// create the REST HTTP router
agentMux := gorilla.NewRouter()
checkMux := gorilla.NewRouter()
// Validate token for every request
agentMux.Use(validateToken)
checkMux.Use(validateToken)
mux.Handle("/agent/", http.StripPrefix("/agent", agent.SetupHandlers(agentMux)))
mux.Handle("/check/", http.StripPrefix("/check", check.SetupHandlers(checkMux)))
mux.Handle("/", gwmux)
srv := &http.Server{
Addr: tlsAddr,
Handler: grpcHandlerFunc(s, mux),
// Handler: grpcHandlerFunc(s, r),
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{*tlsKeyPair},
NextProtos: []string{"h2"},
},
ErrorLog: stdLog.New(&config.ErrorLogWriter{
AdditionalDepth: 5, // Use a stack depth of 5 on top of the default one to get a relevant filename in the stdlib
}, "Error from the agent http API server: ", 0), // log errors to seelog,
ConnContext: func(ctx context.Context, c net.Conn) context.Context {
// Store the connection in the context so requests can reference it if needed
return context.WithValue(ctx, agent.ConnContextKey, c)
},
}
tlsListener := tls.NewListener(listener, srv.TLSConfig)
go srv.Serve(tlsListener) //nolint:errcheck
return nil
}
// StopServer closes the connection and the server
// stops listening to new commands.
func StopServer() {
if listener != nil {
listener.Close()
}
}
// ServerAddress retruns the server address.
func ServerAddress() *net.TCPAddr {
return listener.Addr().(*net.TCPAddr)
}