-
Notifications
You must be signed in to change notification settings - Fork 686
/
server.go
131 lines (109 loc) · 3.66 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Package test contains test utilities
package test
import (
"context"
"fmt"
"log"
"net"
"net/http"
serverv2 "github.com/datawire/ambassador/pkg/envoy-control-plane/server/v2"
serverv3 "github.com/datawire/ambassador/pkg/envoy-control-plane/server/v3"
testv2 "github.com/datawire/ambassador/pkg/envoy-control-plane/test/v2"
testv3 "github.com/datawire/ambassador/pkg/envoy-control-plane/test/v3"
"google.golang.org/grpc"
gcplogger "github.com/datawire/ambassador/pkg/envoy-control-plane/log"
)
const (
// Hello is the echo message
Hello = "Hi, there!\n"
grpcMaxConcurrentStreams = 1000000
)
type echo struct{}
// HTTPGateway is a custom implementation of [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway)
// specialized to Envoy xDS API.
type HTTPGateway struct {
// Log is an optional log for errors in response write
Log gcplogger.Logger
GatewayV2 serverv2.HTTPGateway
GatewayV3 serverv3.HTTPGateway
}
// RunAccessLogServer starts an accesslog server.
func RunAccessLogServer(ctx context.Context, alsv2 *testv2.AccessLogService, alsv3 *testv3.AccessLogService, alsPort uint) {
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", alsPort))
if err != nil {
log.Fatal(err)
}
testv2.RegisterAccessLogServer(grpcServer, alsv2)
log.Printf("access log server listening on %d\n", alsPort)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
}
// RunManagementServer starts an xDS server at the given port.
func RunManagementServer(ctx context.Context, srv2 serverv2.Server, srv3 serverv3.Server, port uint) {
// gRPC golang library sets a very small upper bound for the number gRPC/h2
// streams over a single TCP connection. If a proxy multiplexes requests over
// a single connection to the management server, then it might lead to
// availability problems.
var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
grpcServer := grpc.NewServer(grpcOptions...)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
testv2.RegisterServer(grpcServer, srv2)
testv3.RegisterServer(grpcServer, srv3)
log.Printf("management server listening on %d\n", port)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
}
// RunManagementGateway starts an HTTP gateway to an xDS server.
func RunManagementGateway(ctx context.Context, srv2 serverv2.Server, srv3 serverv3.Server, port uint, lg gcplogger.Logger) {
log.Printf("gateway listening HTTP/1.1 on %d\n", port)
server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: &HTTPGateway{
GatewayV2: serverv2.HTTPGateway{Log: lg, Server: srv2},
GatewayV3: serverv3.HTTPGateway{Log: lg, Server: srv3},
Log: lg,
},
}
go func() {
if err := server.ListenAndServe(); err != nil {
log.Println(err)
}
}()
}
func (h *HTTPGateway) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
err := h.GatewayV2.ServeHTTP(resp, req)
if err != nil {
h.GatewayV3.ServeHTTP(resp, req)
}
}
func (h echo) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/text")
if _, err := w.Write([]byte(Hello)); err != nil {
log.Println(err)
}
}
// RunHTTP opens a simple listener on the port.
func RunHTTP(ctx context.Context, upstreamPort uint) {
log.Printf("upstream listening HTTP/1.1 on %d\n", upstreamPort)
server := &http.Server{Addr: fmt.Sprintf(":%d", upstreamPort), Handler: echo{}}
go func() {
if err := server.ListenAndServe(); err != nil {
log.Println(err)
}
}()
}