-
Notifications
You must be signed in to change notification settings - Fork 394
/
logstream.go
71 lines (68 loc) · 1.6 KB
/
logstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package cloud
import (
"context"
"sync"
"github.com/earthly/cloud-api/logstream"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
func (c *client) StreamLogs(ctx context.Context, buildID string, deltasCh chan []*logstream.Delta) error {
streamClient, err := c.logstream.StreamLogs(c.withAuth(ctx), grpc_retry.Disable())
if err != nil {
return errors.Wrap(err, "failed to create log stream client")
}
eg, ctx := errgroup.WithContext(ctx)
var mu sync.Mutex
finished := false
eg.Go(func() error {
for {
resp, err := streamClient.Recv()
if err != nil {
return errors.Wrap(err, "failed to read log stream response")
}
if resp.GetEofAck() {
mu.Lock()
defer mu.Unlock()
if !finished {
return errors.New("unexpected EOF ack")
}
err := streamClient.CloseSend()
if err != nil {
return errors.Wrap(err, "failed to close log stream")
}
return nil
}
}
})
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case deltas, ok := <-deltasCh:
if !ok {
err := streamClient.Send(&logstream.StreamLogRequest{
BuildId: buildID,
Eof: true,
})
if err != nil {
return errors.Wrap(err, "failed to send EOF to log stream")
}
mu.Lock()
finished = true
mu.Unlock()
return nil
}
err := streamClient.Send(&logstream.StreamLogRequest{
BuildId: buildID,
Deltas: deltas,
})
if err != nil {
return errors.Wrap(err, "failed to send log delta")
}
}
}
})
return eg.Wait()
}