Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(logtail): replace log.Fatal usage with channel #1082

Merged
merged 6 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/commands/compute/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (c *DeployCommand) Exec(in io.Reader, out io.Writer) (err error) {
return err
}
if c.Globals.Manifest.File.Setup.Defined() && !c.Globals.Flags.Quiet {
text.Info(out, "\nProcessing of the %s [setup] configuration happens only for a new service. Once a service is created, any further changes to the service or its resources must be made manually.", manifestFilename)
text.Info(out, "\nProcessing of the %s [setup] configuration happens only for a new service. Once a service is created, any further changes to the service or its resources must be made manually.\n\n", manifestFilename)
}
}

Expand Down
63 changes: 38 additions & 25 deletions pkg/commands/logtail/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,31 +101,44 @@ func (c *RootCommand) Exec(_ io.Reader, out io.Writer) error {
return err
}

failure := make(chan error)
sigs := make(chan os.Signal, 2)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)

// Start the output loop.
go c.outputLoop(out)

// Start tailing the logs.
go c.tail(out)
go func() {
failure <- c.tail(out)
}()

<-sigs
close(c.dieCh)
select {
case asyncErr := <-failure:
close(c.dieCh)
return asyncErr
case <-c.doneCh:
return nil
case <-sigs:
close(c.dieCh)
}

return nil
}

// Tail starts the virtual tail process. Tail fetches data from the eventbuffer
// API. It hands off the requested logs to the outputloop for the actual
// printing.
func (c *RootCommand) tail(out io.Writer) {
func (c *RootCommand) tail(out io.Writer) error {
// Start this with --from and --to if set.
curWindow := c.cfg.from
toWindow := c.cfg.to

// Start the loop with an initial address to query.
path := makeNewPath(out, c.cfg.path, curWindow, "")
path, err := makeNewPath(c.cfg.path, curWindow, "")
if err != nil {
return err
}

// lastBatchID keeps the last successfully read Batch.ID in case we need
// re-request on failure.
Expand All @@ -145,16 +158,14 @@ func (c *RootCommand) tail(out io.Writer) {
c.Globals.ErrLog.AddWithContext(err, map[string]any{
"GET": path,
})
text.Error(out, "unable to create new request: %v", err)
os.Exit(1)
return fmt.Errorf("unable to create new request: %w", err)
}
req.Header.Add("Fastly-Key", c.token)

resp, err := c.doReq(req)
if err != nil {
c.Globals.ErrLog.Add(err)
text.Error(out, "unable to execute request: %v", err)
os.Exit(1)
return fmt.Errorf("unable to execute request: %w", err)
}

// Check that our request was successful. If the server is
Expand All @@ -164,8 +175,7 @@ func (c *RootCommand) tail(out io.Writer) {
// not valid, give them an error stating this and exit.
if resp.StatusCode == http.StatusNotFound &&
c.cfg.from != 0 {
text.Error(out, "specified 'from' time %d not found, either too far in the past or future", c.cfg.from)
os.Exit(1)
return fmt.Errorf("specified 'from' time %d not found, either too far in the past or future", c.cfg.from)
}

// In an effort to clean up the output, do not print on
Expand All @@ -190,8 +200,7 @@ func (c *RootCommand) tail(out io.Writer) {
}

// Failing at this point is unrecoverable.
text.Error(out, "unrecoverable error, response code: %d", resp.StatusCode)
os.Exit(1)
return fmt.Errorf("unrecoverable error, response code: %d", resp.StatusCode)
}

// Read and parse response, send batches to the output loop.
Expand All @@ -213,7 +222,10 @@ func (c *RootCommand) tail(out io.Writer) {
// We can't parse the response, attempt to
// re-request from the last window & batch.
text.Warning(out, "unable to parse response body: %v", err)
path = makeNewPath(out, path, curWindow, lastBatchID)
path, err = makeNewPath(path, curWindow, lastBatchID)
if err != nil {
return err
}
continue
}

Expand Down Expand Up @@ -242,7 +254,10 @@ func (c *RootCommand) tail(out io.Writer) {

// Something happened in the scanner, re-request the
// current batchID.
path = makeNewPath(out, path, curWindow, lastBatchID)
path, err = makeNewPath(path, curWindow, lastBatchID)
if err != nil {
return err
}
continue
}

Expand All @@ -259,8 +274,12 @@ func (c *RootCommand) tail(out io.Writer) {
// We do NOT want to specify a batchID, as this
// request was successful.
lastBatchID = ""
path = makeNewPath(out, path, curWindow, lastBatchID)
path, err = makeNewPath(path, curWindow, lastBatchID)
if err != nil {
return err
}
}
return nil
}

// adjustTimes adjusts the passed in from and to flags based on the
Expand Down Expand Up @@ -418,9 +437,6 @@ func (c *RootCommand) outputLoop(out io.Writer) {
// Set the new log and receive info back to the
// logmap for this RequestID.
logmap[reqID] = reqLogs

case <-c.doneCh:
os.Exit(0)
}
}
}
Expand Down Expand Up @@ -525,13 +541,10 @@ func (l *Log) String() string {

// makeNewPath generates a new request path based on current
// path, window, and batchID.
func makeNewPath(out io.Writer, path string, window int64, batchID string) string {
func makeNewPath(path string, window int64, batchID string) (string, error) {
basePath, err := url.Parse(path)
if err != nil {
// No reasonable way to carry on from an error at this point
// and it should never happen, so error & exit.
text.Error(out, "error generating request URL: %v", err)
os.Exit(1)
return "", fmt.Errorf("error generating request URL: %w", err)
}

// Unset anything in the query parameters that might already exist.
Expand All @@ -547,7 +560,7 @@ func makeNewPath(out io.Writer, path string, window int64, batchID string) strin
}

basePath.RawQuery = q.Encode()
return basePath.String()
return basePath.String(), nil
}

// splitByReqID splits slices of logs based on RequestID.
Expand Down