Skip to content

Commit

Permalink
refactor(logtail): have single chan messaging point
Browse files Browse the repository at this point in the history
  • Loading branch information
Integralist committed Nov 8, 2023
1 parent 94cfbeb commit fe0f6b9
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions pkg/commands/logtail/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ func (c *RootCommand) Exec(_ io.Reader, out io.Writer) error {
go c.outputLoop(out)

// Start tailing the logs.
go c.tail(out, failure)
go func() {
failure <- c.tail(out)
}()

select {
case asyncErr := <-failure:
Expand All @@ -127,13 +129,16 @@ func (c *RootCommand) Exec(_ io.Reader, out io.Writer) error {
// Tail starts the virtual tail process. Tail fetches data from the eventbuffer
// API. It hands off the requested logs to the outputloop for the actual
// printing.
func (c *RootCommand) tail(out io.Writer, failure chan<- error) {
func (c *RootCommand) tail(out io.Writer) error {
// Start this with --from and --to if set.
curWindow := c.cfg.from
toWindow := c.cfg.to

// Start the loop with an initial address to query.
path := makeNewPath(c.cfg.path, curWindow, "", failure)
path, err := makeNewPath(c.cfg.path, curWindow, "")
if err != nil {
return err
}

// lastBatchID keeps the last successfully read Batch.ID in case we need
// re-request on failure.
Expand All @@ -153,16 +158,14 @@ func (c *RootCommand) tail(out io.Writer, failure chan<- error) {
c.Globals.ErrLog.AddWithContext(err, map[string]any{
"GET": path,
})
failure <- fmt.Errorf("unable to create new request: %w", err)
return
return fmt.Errorf("unable to create new request: %w", err)
}
req.Header.Add("Fastly-Key", c.token)

resp, err := c.doReq(req)
if err != nil {
c.Globals.ErrLog.Add(err)
failure <- fmt.Errorf("unable to execute request: %w", err)
return
return fmt.Errorf("unable to execute request: %w", err)
}

// Check that our request was successful. If the server is
Expand All @@ -172,8 +175,7 @@ func (c *RootCommand) tail(out io.Writer, failure chan<- error) {
// not valid, give them an error stating this and exit.
if resp.StatusCode == http.StatusNotFound &&
c.cfg.from != 0 {
failure <- fmt.Errorf("specified 'from' time %d not found, either too far in the past or future", c.cfg.from)
return
return fmt.Errorf("specified 'from' time %d not found, either too far in the past or future", c.cfg.from)
}

// In an effort to clean up the output, do not print on
Expand All @@ -198,8 +200,7 @@ func (c *RootCommand) tail(out io.Writer, failure chan<- error) {
}

// Failing at this point is unrecoverable.
failure <- fmt.Errorf("unrecoverable error, response code: %d", resp.StatusCode)
return
return fmt.Errorf("unrecoverable error, response code: %d", resp.StatusCode)
}

// Read and parse response, send batches to the output loop.
Expand All @@ -221,7 +222,10 @@ func (c *RootCommand) tail(out io.Writer, failure chan<- error) {
// We can't parse the response, attempt to
// re-request from the last window & batch.
text.Warning(out, "unable to parse response body: %v", err)
path = makeNewPath(path, curWindow, lastBatchID, failure)
path, err = makeNewPath(path, curWindow, lastBatchID)
if err != nil {
return err
}
continue
}

Expand Down Expand Up @@ -250,7 +254,10 @@ func (c *RootCommand) tail(out io.Writer, failure chan<- error) {

// Something happened in the scanner, re-request the
// current batchID.
path = makeNewPath(path, curWindow, lastBatchID, failure)
path, err = makeNewPath(path, curWindow, lastBatchID)
if err != nil {
return err
}
continue
}

Expand All @@ -267,8 +274,12 @@ func (c *RootCommand) tail(out io.Writer, failure chan<- error) {
// We do NOT want to specify a batchID, as this
// request was successful.
lastBatchID = ""
path = makeNewPath(path, curWindow, lastBatchID, failure)
path, err = makeNewPath(path, curWindow, lastBatchID)
if err != nil {
return err
}
}
return nil
}

// adjustTimes adjusts the passed in from and to flags based on the
Expand Down Expand Up @@ -530,13 +541,10 @@ func (l *Log) String() string {

// makeNewPath generates a new request path based on current
// path, window, and batchID.
func makeNewPath(path string, window int64, batchID string, failure chan<- error) string {
func makeNewPath(path string, window int64, batchID string) (string, error) {
basePath, err := url.Parse(path)
if err != nil {
// No reasonable way to carry on from an error at this point
// and it should never happen, so error & exit.
failure <- fmt.Errorf("error generating request URL: %w", err)
return ""
return "", fmt.Errorf("error generating request URL: %w", err)
}

// Unset anything in the query parameters that might already exist.
Expand All @@ -552,7 +560,7 @@ func makeNewPath(path string, window int64, batchID string, failure chan<- error
}

basePath.RawQuery = q.Encode()
return basePath.String()
return basePath.String(), nil
}

// splitByReqID splits slices of logs based on RequestID.
Expand Down

0 comments on commit fe0f6b9

Please sign in to comment.