Skip to content

Commit

Permalink
Use cancellations to ensure the receive worker returns
Browse files Browse the repository at this point in the history
  • Loading branch information
mikejholly committed Sep 16, 2023
1 parent 6b59f46 commit 5812dec
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-docker-ubuntu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
RUNS_ON: "ubuntu-latest"
BINARY: "docker"
SUDO: ""
EXTRA_ARGS: "--logstream=false"
EXTRA_ARGS: "--logstream=false --logstream-upload=false"
secrets: inherit

docker-tests-no-qemu-normal:
Expand All @@ -82,7 +82,7 @@ jobs:
RUNS_ON: "ubuntu-latest"
BINARY: "docker"
SUDO: ""
EXTRA_ARGS: "--logstream=false"
EXTRA_ARGS: "--logstream=false --logstream-upload=false"
secrets: inherit

docker-tests-no-qemu-slow:
Expand Down
8 changes: 6 additions & 2 deletions cloud/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import (
"google.golang.org/grpc/status"
)

func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta, _ bool) error {
verbose := true // Debug
func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan *pb.Delta, verbose bool) error {
if man.GetResumeToken() == "" {
man.ResumeToken = stringutil.RandomAlphanumeric(40)
}
Expand Down Expand Up @@ -46,6 +45,9 @@ func (c *Client) StreamLogs(ctx context.Context, man *pb.RunManifest, ch <-chan
}

func (c *Client) streamLogsAttempt(ctx context.Context, buildID string, first *pb.Delta, ch <-chan *pb.Delta) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

stream, err := c.logstream.StreamLogs(c.withAuth(ctx))
if err != nil {
return errors.Wrap(err, "failed to create log stream client")
Expand Down Expand Up @@ -104,12 +106,14 @@ func (c *Client) streamLogsAttempt(ctx context.Context, buildID string, first *p
}
err := stream.Send(msg)
if err != nil {
cancel() // Force receive worker to exit.
return errors.Wrap(err, "failed to send EOF to log stream")
}
return nil
}
err := sendSingle(delta)
if err != nil {
cancel() // Force receive worker to exit.
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion logbus/ship/ship.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (l *LogShipper) Start(ctx context.Context) {
func (l *LogShipper) Close() {
close(l.ch)
// Graceful attempt to drain any in-flight logs then force-quit after delay.
t := time.NewTimer(30 * time.Second)
t := time.NewTimer(10 * time.Second)
defer t.Stop()
select {
case <-t.C:
Expand Down

0 comments on commit 5812dec

Please sign in to comment.