This repository has been archived by the owner on Sep 21, 2023. It is now read-only.
/
inputhandler.go
126 lines (107 loc) · 3.52 KB
/
inputhandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package grpcserver
import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"github.com/docker/go-units"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/elastic/elastic-agent-libs/logp"
pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto"
"github.com/elastic/elastic-agent-shipper/tools"
)
// InputHandler wraps the input side of the shipper, and is responsible for starting and stopping the gRPC endpoint
type InputHandler struct {
Shipper ShipperServer
publisher Publisher
log *logp.Logger
server *grpc.Server
startMutex sync.Mutex
}
// NewGRPCServer returns a new gRPC handler with a reference to the output publisher
func NewGRPCServer(publisher Publisher) *InputHandler {
log := logp.NewLogger("input-handler")
srv := &InputHandler{log: log, publisher: publisher}
return srv
}
// Start runs the shipper server according to the set configuration. This is a non-blocking call.
func (srv *InputHandler) Start(grpcTLS credentials.TransportCredentials, endpoint string) error {
// TODO: only needed while we deal with the fact that we have our config coming from multiple input units
if srv.server != nil {
srv.log.Debugf("shipper gRPC already started, continuing...")
return nil
}
listenAddr := strings.TrimPrefix(endpoint, "unix://")
var err error
srv.log.Debugf("initializing the gRPC server...")
opts := []grpc.ServerOption{
grpc.Creds(grpcTLS),
grpc.MaxRecvMsgSize(64 * units.MiB),
}
srv.server = grpc.NewServer(opts...)
srv.Shipper, err = NewShipperServer(srv.publisher)
if err != nil {
return fmt.Errorf("could not initialize gRPC: %w", err)
}
pb.RegisterProducerServer(srv.server, srv.Shipper)
srv.startMutex.Lock()
// paranoid checking, make sure we have the base directory.
dir := filepath.Dir(listenAddr)
if _, err := os.Stat(dir); os.IsNotExist(err) {
err = os.MkdirAll(dir, 0o755)
if err != nil {
srv.startMutex.Unlock()
return fmt.Errorf("could not create directory for unix socket %s: %w", dir, err)
}
}
lis, err := newListener(srv.log, listenAddr)
if err != nil {
srv.startMutex.Unlock()
return fmt.Errorf("failed to listen on %s: %w", listenAddr, err)
}
go func() {
err = srv.server.Serve(lis)
if err != nil {
srv.log.Errorf("gRPC server shut down with error: %s", err)
}
}()
srv.log.Debugf("gRPC started")
// Testing that the server is running and only then unlock the mutex.
// Otherwise if `Close` is called at the same time as `Start` it causes race condition.
defer srv.startMutex.Unlock()
con, err := tools.DialTestAddr(listenAddr)
if err != nil {
// this will stop the other go routine in the wait group
srv.server.Stop()
return fmt.Errorf("failed to test connection with the gRPC server on %s: %w", listenAddr, err)
}
_ = con.Close()
return nil
}
// InitHasFailed reports an error elsewhere to the gRPC server
func (srv *InputHandler) InitHasFailed(e string) {
if srv.Shipper != nil {
srv.Shipper.SetInitError(e)
}
}
// Stop stops the shipper gRPC endpoint
func (srv *InputHandler) Stop() {
srv.startMutex.Lock()
defer srv.startMutex.Unlock()
if srv.Shipper != nil {
err := srv.Shipper.Close()
if err != nil {
srv.log.Debugf("Error stopping shipper input: %s", err)
}
srv.Shipper = nil
}
if srv.server != nil {
srv.server.GracefulStop()
srv.server = nil
}
}