forked from envoyproxy/go-control-plane
-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.go
136 lines (115 loc) · 4.08 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright 2018 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package test contains test utilities
package test
import (
"context"
"fmt"
"net"
"net/http"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
accesslog "github.com/envoyproxy/go-control-plane/envoy/service/accesslog/v2"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
xds "github.com/envoyproxy/go-control-plane/pkg/server"
)
const (
// Hello is the echo message
Hello = "Hi, there!\n"
)
// Hasher returns node ID as an ID
type Hasher struct {
}
// ID function
func (h Hasher) ID(node *core.Node) string {
if node == nil {
return "unknown"
}
return node.Id
}
type echo struct{}
func (h echo) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/text")
if _, err := w.Write([]byte(Hello)); err != nil {
log.Error(err)
}
}
// RunHTTP opens a simple listener on the port.
func RunHTTP(ctx context.Context, upstreamPort uint) {
log.WithFields(log.Fields{"port": upstreamPort}).Info("upstream listening HTTP/1.1")
server := &http.Server{Addr: fmt.Sprintf(":%d", upstreamPort), Handler: echo{}}
go func() {
if err := server.ListenAndServe(); err != nil {
log.Error(err)
}
}()
}
// RunAccessLogServer starts an accesslog service.
func RunAccessLogServer(ctx context.Context, als *AccessLogService, port uint) {
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.WithError(err).Fatal("failed to listen")
}
accesslog.RegisterAccessLogServiceServer(grpcServer, als)
log.WithFields(log.Fields{"port": port}).Info("access log server listening")
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Error(err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
}
const grpcMaxConcurrentStreams = 1000000
// RunManagementServer starts an xDS server at the given port.
func RunManagementServer(ctx context.Context, server xds.Server, port uint) {
// gRPC golang library sets a very small upper bound for the number gRPC/h2
// streams over a single TCP connection. If a proxy multiplexes requests over
// a single connection to the management server, then it might lead to
// availability problems.
var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
grpcServer := grpc.NewServer(grpcOptions...)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.WithError(err).Fatal("failed to listen")
}
// register services
discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
v2.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
v2.RegisterClusterDiscoveryServiceServer(grpcServer, server)
v2.RegisterRouteDiscoveryServiceServer(grpcServer, server)
v2.RegisterListenerDiscoveryServiceServer(grpcServer, server)
log.WithFields(log.Fields{"port": port}).Info("management server listening")
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Error(err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
}
// RunManagementGateway starts an HTTP gateway to an xDS server.
func RunManagementGateway(ctx context.Context, srv xds.Server, port uint) {
log.WithFields(log.Fields{"port": port}).Info("gateway listening HTTP/1.1")
server := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: &xds.HTTPGateway{Server: srv}}
go func() {
if err := server.ListenAndServe(); err != nil {
log.Error(err)
}
}()
}