forked from mantzas/patron
-
Notifications
You must be signed in to change notification settings - Fork 66
/
component.go
99 lines (83 loc) · 2.07 KB
/
component.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Package grpc provides a gRPC component with included observability.
package grpc
import (
"context"
"fmt"
"net"
"github.com/beatlabs/patron/errors"
"github.com/beatlabs/patron/log"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// Component of a gRPC service.
type Component struct {
port int
srv *grpc.Server
}
// Server returns the gRPC sever.
func (c *Component) Server() *grpc.Server {
return c.srv
}
// Run the gRPC service.
func (c *Component) Run(ctx context.Context) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", c.port))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
go func() {
<-ctx.Done()
c.srv.GracefulStop()
}()
log.Debugf("gRPC component listening on port %d", c.port)
return c.srv.Serve(lis)
}
// Builder pattern for our gRPC service.
type Builder struct {
port int
serverOptions []grpc.ServerOption
enableReflection bool
errors []error
}
// New builder.
func New(port int) *Builder {
b := &Builder{}
if port <= 0 || port > 65535 {
b.errors = append(b.errors, fmt.Errorf("port is invalid: %d", port))
return b
}
b.port = port
return b
}
// WithOptions allows gRPC server options to be set.
func (b *Builder) WithOptions(oo ...grpc.ServerOption) *Builder {
if len(b.errors) != 0 {
return b
}
b.serverOptions = append(b.serverOptions, oo...)
return b
}
// WithReflection opt-in for gRPC reflection.
// Reflection could be considered a security risk if services are exposed to public internet.
func (b *Builder) WithReflection() *Builder {
if len(b.errors) != 0 {
return b
}
b.enableReflection = true
return b
}
// Create the gRPC component.
func (b *Builder) Create() (*Component, error) {
if len(b.errors) != 0 {
return nil, errors.Aggregate(b.errors...)
}
b.serverOptions = append(b.serverOptions, grpc.UnaryInterceptor(observableUnaryInterceptor),
grpc.StreamInterceptor(observableStreamInterceptor))
srv := grpc.NewServer(b.serverOptions...)
if b.enableReflection {
reflection.Register(srv)
}
return &Component{
port: b.port,
srv: srv,
}, nil
}