Skip to content

Commit

Permalink
[filebeat][gcs] - Fixed log format issues (#34659)
Browse files Browse the repository at this point in the history
* updated logs to more structured logging methods

* updated asciidoc
  • Loading branch information
ShourieG authored and chrisberkhout committed Jun 1, 2023
1 parent f1e8bbd commit 15f21d7
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -216,6 +216,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Allow user configuration of timezone offset in Checkpoint module. {pull}34472[34472]
- Add support for Okta debug attributes, `risk_reasons`, `risk_behaviors` and `factor`. {issue}33677[33677] {pull}34508[34508]
- Fill okta.request.ip_chain.* as a flattened object in Okta module. {pull}34621[34621]
- Fixed GCS log format issues. {pull}34659[34659]

*Auditbeat*

Expand Down
12 changes: 5 additions & 7 deletions x-pack/filebeat/input/gcs/job.go
Expand Up @@ -79,8 +79,6 @@ func gcsObjectHash(src *Source, object *storage.ObjectAttrs) string {
return hex.EncodeToString(h.Sum(nil)[:5])
}

const jobErrString = "job with jobId %s encountered an error : %w"

func (j *job) do(ctx context.Context, id string) {
var fields mapstr.M

Expand All @@ -94,7 +92,7 @@ func (j *job) do(ctx context.Context, id string) {
err := j.processAndPublishData(ctx, id)
if err != nil {
j.state.updateFailedJobs(j.object.Name)
j.log.Errorf(jobErrString, id, err)
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
return
}

Expand All @@ -110,7 +108,7 @@ func (j *job) do(ctx context.Context, id string) {
event.SetID(objectID(j.hash, 0))
j.state.save(j.object.Name, j.object.Updated)
if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil {
j.log.Errorf(jobErrString, id, err)
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
}
}
Expand Down Expand Up @@ -145,7 +143,7 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error {
defer func() {
err = reader.Close()
if err != nil {
j.log.Errorf("failed to close reader for object: %s, with error: %w", j.object.Name, err)
j.log.Errorw("failed to close reader for object", "objectName", j.object.Name, "error", err)
}
}()

Expand Down Expand Up @@ -205,7 +203,7 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
if j.src.ParseJSON {
parsedData, err = decodeJSON(bytes.NewReader(item))
if err != nil {
j.log.Errorf(jobErrString, id, err)
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
}
evt := j.createEvent(item, parsedData, offset+relativeOffset)
Expand All @@ -220,7 +218,7 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
j.state.savePartial(j.object.Name, offset+relativeOffset)
}
if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil {
j.log.Errorf(jobErrString, id, err)
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
}
return nil
Expand Down

0 comments on commit 15f21d7

Please sign in to comment.