Skip to content

Commit

Permalink
Make script processor Run method concurrency safe (elastic#13857)
Browse files Browse the repository at this point in the history
This should make the script processor safe to be used across multiple goroutines.

It compiles the script once. And loads it into a pool of VMs. Each invocation of Run() grabs a VM instance from the pool and returns it when complete.

Fixes elastic#13690

```
$ benchcmp master.txt pool.txt
benchmark                                             old ns/op     new ns/op     delta
BenchmarkBeatEventV0/Put-12                           2271          2597          +14.35%
BenchmarkBeatEventV0/timeout_Put-12                   2449          2783          +13.64%
BenchmarkBeatEventV0/Object_Put_Key-12                2155          2374          +10.16%
BenchmarkBeatEventV0/timeout_Object_Put_Key-12        2365          2602          +10.02%
BenchmarkBeatEventV0/Get-12                           2315          2534          +9.46%
BenchmarkBeatEventV0/timeout_Get-12                   2480          2830          +14.11%
BenchmarkBeatEventV0/Get_Object-12                    2439          2872          +17.75%
BenchmarkBeatEventV0/timeout_Get_Object-12            2669          3079          +15.36%
BenchmarkBeatEventV0/Get_Undefined_Key-12             2588          2919          +12.79%
BenchmarkBeatEventV0/timeout_Get_Undefined_Key-12     2789          3128          +12.15%
BenchmarkBeatEventV0/fields_get_key-12                2382          2722          +14.27%
BenchmarkBeatEventV0/timeout_fields_get_key-12        2663          2933          +10.14%
BenchmarkBeatEventV0/Get_@metadata-12                 2291          2460          +7.38%
BenchmarkBeatEventV0/timeout_Get_@metadata-12         2515          2697          +7.24%
BenchmarkBeatEventV0/Put_@metadata-12                 2662          2492          -6.39%
BenchmarkBeatEventV0/timeout_Put_@metadata-12         3038          2722          -10.40%
BenchmarkBeatEventV0/Delete_@metadata-12              2229          2420          +8.57%
BenchmarkBeatEventV0/timeout_Delete_@metadata-12      2452          2653          +8.20%
BenchmarkBeatEventV0/Cancel-12                        2274          2319          +1.98%
BenchmarkBeatEventV0/timeout_Cancel-12                3198          2576          -19.45%
BenchmarkBeatEventV0/Tag-12                           2925          2668          -8.79%
BenchmarkBeatEventV0/timeout_Tag-12                   3107          2877          -7.40%
BenchmarkBeatEventV0/AppendTo-12                      2496          2428          -2.72%
BenchmarkBeatEventV0/timeout_AppendTo-12              2570          2721          +5.88%

benchmark                                             old allocs     new allocs     delta
BenchmarkBeatEventV0/Put-12                           30             30             +0.00%
BenchmarkBeatEventV0/timeout_Put-12                   32             32             +0.00%
BenchmarkBeatEventV0/Object_Put_Key-12                29             29             +0.00%
BenchmarkBeatEventV0/timeout_Object_Put_Key-12        31             31             +0.00%
BenchmarkBeatEventV0/Get-12                           29             29             +0.00%
BenchmarkBeatEventV0/timeout_Get-12                   31             31             +0.00%
BenchmarkBeatEventV0/Get_Object-12                    32             32             +0.00%
BenchmarkBeatEventV0/timeout_Get_Object-12            34             34             +0.00%
BenchmarkBeatEventV0/Get_Undefined_Key-12             35             35             +0.00%
BenchmarkBeatEventV0/timeout_Get_Undefined_Key-12     37             37             +0.00%
BenchmarkBeatEventV0/fields_get_key-12                32             32             +0.00%
BenchmarkBeatEventV0/timeout_fields_get_key-12        34             34             +0.00%
BenchmarkBeatEventV0/Get_@metadata-12                 29             29             +0.00%
BenchmarkBeatEventV0/timeout_Get_@metadata-12         31             31             +0.00%
BenchmarkBeatEventV0/Put_@metadata-12                 30             30             +0.00%
BenchmarkBeatEventV0/timeout_Put_@metadata-12         32             32             +0.00%
BenchmarkBeatEventV0/Delete_@metadata-12              28             28             +0.00%
BenchmarkBeatEventV0/timeout_Delete_@metadata-12      30             30             +0.00%
BenchmarkBeatEventV0/Cancel-12                        28             28             +0.00%
BenchmarkBeatEventV0/timeout_Cancel-12                30             30             +0.00%
BenchmarkBeatEventV0/Tag-12                           28             28             +0.00%
BenchmarkBeatEventV0/timeout_Tag-12                   30             30             +0.00%
BenchmarkBeatEventV0/AppendTo-12                      28             28             +0.00%
BenchmarkBeatEventV0/timeout_AppendTo-12              30             30             +0.00%

benchmark                                             old bytes     new bytes     delta
BenchmarkBeatEventV0/Put-12                           2112          2114          +0.09%
BenchmarkBeatEventV0/timeout_Put-12                   2208          2210          +0.09%
BenchmarkBeatEventV0/Object_Put_Key-12                2096          2098          +0.10%
BenchmarkBeatEventV0/timeout_Object_Put_Key-12        2192          2194          +0.09%
BenchmarkBeatEventV0/Get-12                           2096          2098          +0.10%
BenchmarkBeatEventV0/timeout_Get-12                   2192          2194          +0.09%
BenchmarkBeatEventV0/Get_Object-12                    2256          2258          +0.09%
BenchmarkBeatEventV0/timeout_Get_Object-12            2352          2354          +0.09%
BenchmarkBeatEventV0/Get_Undefined_Key-12             2416          2419          +0.12%
BenchmarkBeatEventV0/timeout_Get_Undefined_Key-12     2512          2514          +0.08%
BenchmarkBeatEventV0/fields_get_key-12                2256          2258          +0.09%
BenchmarkBeatEventV0/timeout_fields_get_key-12        2352          2354          +0.09%
BenchmarkBeatEventV0/Get_@metadata-12                 2096          2098          +0.10%
BenchmarkBeatEventV0/timeout_Get_@metadata-12         2192          2194          +0.09%
BenchmarkBeatEventV0/Put_@metadata-12                 2112          2114          +0.09%
BenchmarkBeatEventV0/timeout_Put_@metadata-12         2208          2210          +0.09%
BenchmarkBeatEventV0/Delete_@metadata-12              2080          2082          +0.10%
BenchmarkBeatEventV0/timeout_Delete_@metadata-12      2176          2178          +0.09%
BenchmarkBeatEventV0/Cancel-12                        2080          2082          +0.10%
BenchmarkBeatEventV0/timeout_Cancel-12                2176          2178          +0.09%
BenchmarkBeatEventV0/Tag-12                           2080          2082          +0.10%
BenchmarkBeatEventV0/timeout_Tag-12                   2176          2178          +0.09%
BenchmarkBeatEventV0/AppendTo-12                      2080          2082          +0.10%
BenchmarkBeatEventV0/timeout_AppendTo-12              2176          2178          +0.09%
```
  • Loading branch information
andrewkroh committed Oct 2, 2019
1 parent cda40ae commit 0338e66
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 28 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Disable `add_kubernetes_metadata` if no matchers found. {pull}13709[13709]
- Better wording for xpack beats when the _xpack endpoint is not reachable. {pull}13771[13771]
- Recover from panics in the javascript process and log details about the failure to aid in future debugging. {pull}13690[13690]
- Make the script processor concurrency-safe. {issue}13690[13690] {pull}13857[13857]

*Auditbeat*

- Process dataset: Fixed a memory leak under Windows. {pull}12100[12100]
Expand Down
43 changes: 29 additions & 14 deletions libbeat/processors/script/javascript/javascript.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"
"time"

"github.com/dop251/goja"
"github.com/pkg/errors"
"github.com/rcrowley/go-metrics"

Expand All @@ -39,9 +40,10 @@ import (

type jsProcessor struct {
Config
s *session
sourceFile string
stats *processorStats
sessionPool *sessionPool
sourceProg *goja.Program
sourceFile string
stats *processorStats
}

// New constructs a new Javascript processor.
Expand Down Expand Up @@ -78,16 +80,23 @@ func NewFromConfig(c Config, reg *monitoring.Registry) (processors.Processor, er
return nil, annotateError(c.Tag, err)
}

s, err := newSession(sourceFile, sourceCode, c)
// Validate processor source code.
prog, err := goja.Compile(sourceFile, string(sourceCode), true)
if err != nil {
return nil, err
}

pool, err := newSessionPool(prog, c)
if err != nil {
return nil, annotateError(c.Tag, err)
}

return &jsProcessor{
Config: c,
s: s,
sourceFile: sourceFile,
stats: getStats(c.Tag, reg),
Config: c,
sessionPool: pool,
sourceProg: prog,
sourceFile: sourceFile,
stats: getStats(c.Tag, reg),
}, nil
}

Expand Down Expand Up @@ -156,17 +165,23 @@ func annotateError(id string, err error) error {
// Run executes the processor on the given it event. It invokes the
// process function defined in the Javascript source.
func (p *jsProcessor) Run(event *beat.Event) (*beat.Event, error) {
run := p.s.runProcessFunc
if p.stats != nil {
run = p.runWithStats
s := p.sessionPool.Get()
defer p.sessionPool.Put(s)

var rtn *beat.Event
var err error

if p.stats == nil {
rtn, err = s.runProcessFunc(event)
} else {
rtn, err = p.runWithStats(s, event)
}
rtn, err := run(event)
return rtn, annotateError(p.Tag, err)
}

func (p *jsProcessor) runWithStats(event *beat.Event) (*beat.Event, error) {
func (p *jsProcessor) runWithStats(s *session, event *beat.Event) (*beat.Event, error) {
start := time.Now()
event, err := p.s.runProcessFunc(event)
event, err := s.runProcessFunc(event)
elapsed := time.Since(start)

p.stats.processTime.Update(int64(elapsed))
Expand Down
53 changes: 39 additions & 14 deletions libbeat/processors/script/javascript/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package javascript

import (
"reflect"
"sync"
"time"

"github.com/dop251/goja"
Expand Down Expand Up @@ -81,17 +82,7 @@ type session struct {
tagOnException string
}

func newSession(
name string,
src []byte,
conf Config,
) (*session, error) {
// Validate processor source code.
p, err := goja.Compile(name, string(src), true)
if err != nil {
return nil, err
}

func newSession(p *goja.Program, conf Config, test bool) (*session, error) {
// Setup JS runtime.
s := &session{
vm: goja.New(),
Expand All @@ -113,7 +104,7 @@ func newSession(
// Register constructor for 'new Event' to enable test() to create events.
s.vm.Set("Event", newBeatEventV0Constructor(s))

_, err = s.vm.RunProgram(p)
_, err := s.vm.RunProgram(p)
if err != nil {
return nil, err
}
Expand All @@ -128,8 +119,10 @@ func newSession(
}
}

if err = s.executeTestFunction(); err != nil {
return nil, err
if test {
if err = s.executeTestFunction(); err != nil {
return nil, err
}
}

return s, nil
Expand Down Expand Up @@ -272,3 +265,35 @@ func init() {
)
})
}

type sessionPool struct {
pool *sync.Pool
}

func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) {
s, err := newSession(p, c, true)
if err != nil {
return nil, err
}

pool := &sync.Pool{
New: func() interface{} {
s, _ := newSession(p, c, false)
return s
},
}
pool.Put(s)

return &sessionPool{pool}, nil
}

func (p *sessionPool) Get() *session {
s, _ := p.pool.Get().(*session)
return s
}

func (p *sessionPool) Put(s *session) {
if s != nil {
p.pool.Put(s)
}
}
40 changes: 40 additions & 0 deletions libbeat/processors/script/javascript/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package javascript

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -184,3 +186,41 @@ func TestSessionTimeout(t *testing.T) {
_, err = p.Run(evt)
assert.NoError(t, err)
}

func TestSessionParallel(t *testing.T) {
const script = `
evt.Put("host.name", "workstation");
`

p, err := NewFromConfig(Config{
Source: header + script + footer,
TagOnException: "_js_exception",
}, nil)
if err != nil {
t.Fatal(err)
}

const goroutines = 10
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
for ctx.Err() == nil {
evt := &beat.Event{
Fields: common.MapStr{
"host": common.MapStr{"name": "computer"},
},
}
_, err := p.Run(evt)
assert.NoError(t, err)
}
}()
}

time.AfterFunc(time.Second, cancel)
wg.Wait()
}

0 comments on commit 0338e66

Please sign in to comment.