Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: make now global static per evaluation (#36107
Browse files Browse the repository at this point in the history
)

Previously now was static for the life of the program, which
corresponded to the life of the input. This could lead to incorrect and
surprising times being provided when the global was used. Obtain a now
value before starting each evaluation and use it to shadow the CEL now
global (and share it with the Go logging to allow correlation between
these).
  • Loading branch information
efd6 committed Jul 21, 2023
1 parent 2aba00a commit b6c377c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027]
- Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008]
- Fix handling of region name configuration in awss3 input {pull}36034[36034]
- Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107]

*Heartbeat*

Expand Down
34 changes: 28 additions & 6 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,17 @@ func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin {
}
}

type input struct{}
type input struct {
time func() time.Time
}

// now is time.Now with a modifiable time source.
func (i input) now() time.Time {
if i.time == nil {
return time.Now()
}
return i.time()
}

func (input) Name() string { return inputName }

Expand Down Expand Up @@ -107,7 +117,7 @@ func sanitizeFileName(name string) string {
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}

func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
cfg := src.cfg
log := env.Logger.With("input_url", cfg.Resource.URL)

Expand Down Expand Up @@ -218,8 +228,8 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
// Process a set of event requests.
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete})
metrics.executions.Add(1)
start := time.Now()
state, err = evalWith(ctx, prg, state)
start := i.now()
state, err = evalWith(ctx, prg, state, start)
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete})
if err != nil {
switch {
Expand Down Expand Up @@ -896,8 +906,20 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi
return prg, nil
}

func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state})
func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}, now time.Time) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{
// Replace global program "now" with current time. This is necessary
// as the lib.Time now global is static at program instantiation time
// which will persist over multiple evaluations. The lib.Time behaviour
// is correct for mito where CEL program instances live for only a
// single evaluation. Rather than incurring the cost of creating a new
// cel.Program for each evaluation, shadow lib.Time's now with a new
// value for each eval. We retain the lib.Time now global for
// compatibility between CEL programs developed in mito with programs
// run in the input.
"now": now,
root: state,
})
if e := ctx.Err(); e != nil {
err = e
}
Expand Down
20 changes: 19 additions & 1 deletion x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var inputTests = []struct {
server func(*testing.T, http.HandlerFunc, map[string]interface{})
handler http.HandlerFunc
config map[string]interface{}
time func() time.Time
persistCursor map[string]interface{}
want []map[string]interface{}
wantCursor []map[string]interface{}
Expand All @@ -57,6 +58,23 @@ var inputTests = []struct {
{"message": "Hello, World!"},
},
},
{
name: "hello_world_time",
config: map[string]interface{}{
"interval": 1,
"program": `{"events":[{"message":{"Hello, World!": now}}]}`,
"state": nil,
"resource": map[string]interface{}{
"url": "",
},
},
time: func() time.Time { return time.Date(2010, 2, 8, 0, 0, 0, 0, time.UTC) },
want: []map[string]interface{}{{
"message": map[string]interface{}{
"Hello, World!": "2010-02-08T00:00:00Z",
},
}},
},
{
name: "bad_events_type",
config: map[string]interface{}{
Expand Down Expand Up @@ -1238,7 +1256,7 @@ func TestInput(t *testing.T) {
cancel()
}
}
err = input{}.run(v2Ctx, src, test.persistCursor, &client)
err = input{test.time}.run(v2Ctx, src, test.persistCursor, &client)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr)
}
Expand Down

0 comments on commit b6c377c

Please sign in to comment.