Skip to content

Commit

Permalink
Make script processor Run method concurrency safe
Browse files Browse the repository at this point in the history
This should make the script processor safe to be used across multiple goroutines.

Fixes elastic#13690

```
$ benchcmp master.txt pool.txt
benchmark                                             old ns/op     new ns/op     delta
BenchmarkBeatEventV0/Put-12                           2271          2597          +14.35%
BenchmarkBeatEventV0/timeout_Put-12                   2449          2783          +13.64%
BenchmarkBeatEventV0/Object_Put_Key-12                2155          2374          +10.16%
BenchmarkBeatEventV0/timeout_Object_Put_Key-12        2365          2602          +10.02%
BenchmarkBeatEventV0/Get-12                           2315          2534          +9.46%
BenchmarkBeatEventV0/timeout_Get-12                   2480          2830          +14.11%
BenchmarkBeatEventV0/Get_Object-12                    2439          2872          +17.75%
BenchmarkBeatEventV0/timeout_Get_Object-12            2669          3079          +15.36%
BenchmarkBeatEventV0/Get_Undefined_Key-12             2588          2919          +12.79%
BenchmarkBeatEventV0/timeout_Get_Undefined_Key-12     2789          3128          +12.15%
BenchmarkBeatEventV0/fields_get_key-12                2382          2722          +14.27%
BenchmarkBeatEventV0/timeout_fields_get_key-12        2663          2933          +10.14%
BenchmarkBeatEventV0/Get_@metadata-12                 2291          2460          +7.38%
BenchmarkBeatEventV0/timeout_Get_@metadata-12         2515          2697          +7.24%
BenchmarkBeatEventV0/Put_@metadata-12                 2662          2492          -6.39%
BenchmarkBeatEventV0/timeout_Put_@metadata-12         3038          2722          -10.40%
BenchmarkBeatEventV0/Delete_@metadata-12              2229          2420          +8.57%
BenchmarkBeatEventV0/timeout_Delete_@metadata-12      2452          2653          +8.20%
BenchmarkBeatEventV0/Cancel-12                        2274          2319          +1.98%
BenchmarkBeatEventV0/timeout_Cancel-12                3198          2576          -19.45%
BenchmarkBeatEventV0/Tag-12                           2925          2668          -8.79%
BenchmarkBeatEventV0/timeout_Tag-12                   3107          2877          -7.40%
BenchmarkBeatEventV0/AppendTo-12                      2496          2428          -2.72%
BenchmarkBeatEventV0/timeout_AppendTo-12              2570          2721          +5.88%

benchmark                                             old allocs     new allocs     delta
BenchmarkBeatEventV0/Put-12                           30             30             +0.00%
BenchmarkBeatEventV0/timeout_Put-12                   32             32             +0.00%
BenchmarkBeatEventV0/Object_Put_Key-12                29             29             +0.00%
BenchmarkBeatEventV0/timeout_Object_Put_Key-12        31             31             +0.00%
BenchmarkBeatEventV0/Get-12                           29             29             +0.00%
BenchmarkBeatEventV0/timeout_Get-12                   31             31             +0.00%
BenchmarkBeatEventV0/Get_Object-12                    32             32             +0.00%
BenchmarkBeatEventV0/timeout_Get_Object-12            34             34             +0.00%
BenchmarkBeatEventV0/Get_Undefined_Key-12             35             35             +0.00%
BenchmarkBeatEventV0/timeout_Get_Undefined_Key-12     37             37             +0.00%
BenchmarkBeatEventV0/fields_get_key-12                32             32             +0.00%
BenchmarkBeatEventV0/timeout_fields_get_key-12        34             34             +0.00%
BenchmarkBeatEventV0/Get_@metadata-12                 29             29             +0.00%
BenchmarkBeatEventV0/timeout_Get_@metadata-12         31             31             +0.00%
BenchmarkBeatEventV0/Put_@metadata-12                 30             30             +0.00%
BenchmarkBeatEventV0/timeout_Put_@metadata-12         32             32             +0.00%
BenchmarkBeatEventV0/Delete_@metadata-12              28             28             +0.00%
BenchmarkBeatEventV0/timeout_Delete_@metadata-12      30             30             +0.00%
BenchmarkBeatEventV0/Cancel-12                        28             28             +0.00%
BenchmarkBeatEventV0/timeout_Cancel-12                30             30             +0.00%
BenchmarkBeatEventV0/Tag-12                           28             28             +0.00%
BenchmarkBeatEventV0/timeout_Tag-12                   30             30             +0.00%
BenchmarkBeatEventV0/AppendTo-12                      28             28             +0.00%
BenchmarkBeatEventV0/timeout_AppendTo-12              30             30             +0.00%

benchmark                                             old bytes     new bytes     delta
BenchmarkBeatEventV0/Put-12                           2112          2114          +0.09%
BenchmarkBeatEventV0/timeout_Put-12                   2208          2210          +0.09%
BenchmarkBeatEventV0/Object_Put_Key-12                2096          2098          +0.10%
BenchmarkBeatEventV0/timeout_Object_Put_Key-12        2192          2194          +0.09%
BenchmarkBeatEventV0/Get-12                           2096          2098          +0.10%
BenchmarkBeatEventV0/timeout_Get-12                   2192          2194          +0.09%
BenchmarkBeatEventV0/Get_Object-12                    2256          2258          +0.09%
BenchmarkBeatEventV0/timeout_Get_Object-12            2352          2354          +0.09%
BenchmarkBeatEventV0/Get_Undefined_Key-12             2416          2419          +0.12%
BenchmarkBeatEventV0/timeout_Get_Undefined_Key-12     2512          2514          +0.08%
BenchmarkBeatEventV0/fields_get_key-12                2256          2258          +0.09%
BenchmarkBeatEventV0/timeout_fields_get_key-12        2352          2354          +0.09%
BenchmarkBeatEventV0/Get_@metadata-12                 2096          2098          +0.10%
BenchmarkBeatEventV0/timeout_Get_@metadata-12         2192          2194          +0.09%
BenchmarkBeatEventV0/Put_@metadata-12                 2112          2114          +0.09%
BenchmarkBeatEventV0/timeout_Put_@metadata-12         2208          2210          +0.09%
BenchmarkBeatEventV0/Delete_@metadata-12              2080          2082          +0.10%
BenchmarkBeatEventV0/timeout_Delete_@metadata-12      2176          2178          +0.09%
BenchmarkBeatEventV0/Cancel-12                        2080          2082          +0.10%
BenchmarkBeatEventV0/timeout_Cancel-12                2176          2178          +0.09%
BenchmarkBeatEventV0/Tag-12                           2080          2082          +0.10%
BenchmarkBeatEventV0/timeout_Tag-12                   2176          2178          +0.09%
BenchmarkBeatEventV0/AppendTo-12                      2080          2082          +0.10%
BenchmarkBeatEventV0/timeout_AppendTo-12              2176          2178          +0.09%
```
  • Loading branch information
andrewkroh committed Oct 2, 2019
1 parent cda40ae commit b884b5a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 28 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -105,6 +105,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Disable `add_kubernetes_metadata` if no matchers found. {pull}13709[13709]
- Better wording for xpack beats when the _xpack endpoint is not reachable. {pull}13771[13771]
- Recover from panics in the javascript process and log details about the failure to aid in future debugging. {pull}13690[13690]
- Make the script processor concurrency-safe. {issue}13690[13690] {pull}13857[13857]

*Auditbeat*

- Process dataset: Fixed a memory leak under Windows. {pull}12100[12100]
Expand Down
43 changes: 29 additions & 14 deletions libbeat/processors/script/javascript/javascript.go
Expand Up @@ -26,6 +26,7 @@ import (
"strings"
"time"

"github.com/dop251/goja"
"github.com/pkg/errors"
"github.com/rcrowley/go-metrics"

Expand All @@ -39,9 +40,10 @@ import (

type jsProcessor struct {
Config
s *session
sourceFile string
stats *processorStats
sessionPool *sessionPool
sourceProg *goja.Program
sourceFile string
stats *processorStats
}

// New constructs a new Javascript processor.
Expand Down Expand Up @@ -78,16 +80,23 @@ func NewFromConfig(c Config, reg *monitoring.Registry) (processors.Processor, er
return nil, annotateError(c.Tag, err)
}

s, err := newSession(sourceFile, sourceCode, c)
// Validate processor source code.
prog, err := goja.Compile(sourceFile, string(sourceCode), true)
if err != nil {
return nil, err
}

pool, err := newSessionPool(prog, c)
if err != nil {
return nil, annotateError(c.Tag, err)
}

return &jsProcessor{
Config: c,
s: s,
sourceFile: sourceFile,
stats: getStats(c.Tag, reg),
Config: c,
sessionPool: pool,
sourceProg: prog,
sourceFile: sourceFile,
stats: getStats(c.Tag, reg),
}, nil
}

Expand Down Expand Up @@ -156,17 +165,23 @@ func annotateError(id string, err error) error {
// Run executes the processor on the given it event. It invokes the
// process function defined in the Javascript source.
func (p *jsProcessor) Run(event *beat.Event) (*beat.Event, error) {
run := p.s.runProcessFunc
if p.stats != nil {
run = p.runWithStats
s := p.sessionPool.Get()
defer p.sessionPool.Put(s)

var rtn *beat.Event
var err error

if p.stats == nil {
rtn, err = s.runProcessFunc(event)
} else {
rtn, err = p.runWithStats(s, event)
}
rtn, err := run(event)
return rtn, annotateError(p.Tag, err)
}

func (p *jsProcessor) runWithStats(event *beat.Event) (*beat.Event, error) {
func (p *jsProcessor) runWithStats(s *session, event *beat.Event) (*beat.Event, error) {
start := time.Now()
event, err := p.s.runProcessFunc(event)
event, err := s.runProcessFunc(event)
elapsed := time.Since(start)

p.stats.processTime.Update(int64(elapsed))
Expand Down
53 changes: 39 additions & 14 deletions libbeat/processors/script/javascript/session.go
Expand Up @@ -19,6 +19,7 @@ package javascript

import (
"reflect"
"sync"
"time"

"github.com/dop251/goja"
Expand Down Expand Up @@ -81,17 +82,7 @@ type session struct {
tagOnException string
}

func newSession(
name string,
src []byte,
conf Config,
) (*session, error) {
// Validate processor source code.
p, err := goja.Compile(name, string(src), true)
if err != nil {
return nil, err
}

func newSession(p *goja.Program, conf Config, test bool) (*session, error) {
// Setup JS runtime.
s := &session{
vm: goja.New(),
Expand All @@ -113,7 +104,7 @@ func newSession(
// Register constructor for 'new Event' to enable test() to create events.
s.vm.Set("Event", newBeatEventV0Constructor(s))

_, err = s.vm.RunProgram(p)
_, err := s.vm.RunProgram(p)
if err != nil {
return nil, err
}
Expand All @@ -128,8 +119,10 @@ func newSession(
}
}

if err = s.executeTestFunction(); err != nil {
return nil, err
if test {
if err = s.executeTestFunction(); err != nil {
return nil, err
}
}

return s, nil
Expand Down Expand Up @@ -272,3 +265,35 @@ func init() {
)
})
}

type sessionPool struct {
pool *sync.Pool
}

func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) {
s, err := newSession(p, c, true)
if err != nil {
return nil, err
}

pool := &sync.Pool{
New: func() interface{} {
s, _ := newSession(p, c, false)
return s
},
}
pool.Put(s)

return &sessionPool{pool}, nil
}

func (p *sessionPool) Get() *session {
s, _ := p.pool.Get().(*session)
return s
}

func (p *sessionPool) Put(s *session) {
if s != nil {
p.pool.Put(s)
}
}

0 comments on commit b884b5a

Please sign in to comment.