/
web_server_log_watcher.go
87 lines (70 loc) · 1.89 KB
/
web_server_log_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package transport
import (
"context"
"io"
grpctransport "github.com/go-kit/kit/transport/grpc"
"github.com/marmotedu/errors"
"google.golang.org/grpc"
v1 "github.com/ClessLi/bifrost/api/bifrost/v1"
pbv1 "github.com/ClessLi/bifrost/api/protobuf-spec/bifrostpb/v1"
)
type WebServerLogWatcherTransport interface {
Watch() Client
}
type webServerLogWatcherTransport struct {
watchClient Client
}
func (w *webServerLogWatcherTransport) Watch() Client {
return w.watchClient
}
func newWebServerLogWatcherClient(
conn *grpc.ClientConn,
requestFunc grpctransport.EncodeRequestFunc,
responseFunc grpctransport.DecodeResponseFunc,
) Client {
cli := pbv1.NewWebServerLogWatcherClient(conn)
return newClient(func(ctx context.Context, request interface{}) (response interface{}, err error) {
req, err := requestFunc(ctx, request)
if err != nil {
return nil, err
}
stream, err := cli.Watch(ctx, req.(*pbv1.LogWatchRequest))
if err != nil {
return nil, err
}
outputC := make(chan []byte)
go func() {
defer close(outputC)
needClose := false
for !needClose {
select {
case outputC <- recvWatcherResponse(stream, &needClose):
case <-ctx.Done():
return
}
}
}()
return responseFunc(ctx, &v1.WebServerLog{Lines: outputC})
})
}
func recvWatcherResponse(stream pbv1.WebServerLogWatcher_WatchClient, needClose *bool) []byte {
resp, err := stream.Recv()
if err != nil && !errors.Is(err, io.EOF) {
*needClose = true
return []byte(err.Error())
}
if errors.Is(err, io.EOF) {
*needClose = true
return nil
}
return resp.GetMsg()
}
func newWebServerLogWatcherTransport(transport *transport) WebServerLogWatcherTransport {
return &webServerLogWatcherTransport{
watchClient: newWebServerLogWatcherClient(
transport.conn,
transport.encoderFactory.WebServerLogWatcher().EncodeRequest,
transport.decoderFactory.WebServerLogWatcher().DecodeResponse,
),
}
}