Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 98 additions & 33 deletions internal/benchrunner/runners/stream/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/elastic/elastic-package/internal/wait"
)

const numberOfEvents = 100

type runner struct {
options Options
scenarios map[string]*scenario
Expand All @@ -57,6 +59,16 @@ func (r *runner) SetUp(ctx context.Context) error {
return r.setUp(ctx)
}

func StaticValidation(ctx context.Context, opts Options, dataStreamName string) (bool, error) {
runner := runner{options: opts}
err := runner.initialize()
if err != nil {
return false, err
}
hasBenchmark, err := runner.validateScenario(ctx, dataStreamName)
return hasBenchmark, err
}

// Run runs the system benchmarks defined under the given folder
func (r *runner) Run(ctx context.Context) (reporters.Reportable, error) {
return nil, r.run(ctx)
Expand Down Expand Up @@ -94,14 +106,10 @@ func (r *runner) TearDown(ctx context.Context) error {
return merr
}

func (r *runner) setUp(ctx context.Context) error {
func (r *runner) initialize() error {
r.generators = make(map[string]genlib.Generator)
r.backFillGenerators = make(map[string]genlib.Generator)

r.runtimeDataStreams = make(map[string]string)

r.svcInfo.Test.RunID = common.NewRunID()

pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("reading package manifest failed: %w", err)
Expand All @@ -113,15 +121,61 @@ func (r *runner) setUp(ctx context.Context) error {
}
r.scenarios = scenarios

if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
return nil
}

func (r *runner) validateScenario(ctx context.Context, dataStreamName string) (bool, error) {
for scenarioName, scenario := range r.scenarios {
if scenario.DataStream.Name != dataStreamName {
continue
}
generator, _, err := r.createGenerator(ctx, scenarioName, scenario)
if err != nil {
return true, err
}
for i := 0; i < numberOfEvents; i++ {
buf := bytes.NewBufferString("")
err := generator.Emit(buf)
if err != nil {
return true, fmt.Errorf("[%s] error while generating event: %w", scenarioName, err)
}
// check whether the generated event is valid json
var event map[string]any
err = json.Unmarshal(buf.Bytes(), &event)
if err != nil {
return true, fmt.Errorf("[%s] failed to unmarshal json event: %w, generated output: %s", scenarioName, err, buf.String())
}
}
return true, nil
}

return false, nil
}

func (r *runner) setUp(ctx context.Context) error {
err := r.initialize()
if err != nil {
return err
}

err = r.collectGenerators(ctx)
if err != nil {
return fmt.Errorf("can't initialize generator: %w", err)
}

r.runtimeDataStreams = make(map[string]string)

r.svcInfo.Test.RunID = common.NewRunID()

pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("reading package manifest failed: %w", err)
}

if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
}

for scenarioName, scenario := range r.scenarios {
var err error
dataStreamManifest, err := packages.ReadDataStreamManifest(
Expand Down Expand Up @@ -263,47 +317,58 @@ func (r *runner) initializeGenerator(tpl []byte, config genlib.Config, fields ge
}
func (r *runner) collectGenerators(ctx context.Context) error {
for scenarioName, scenario := range r.scenarios {
config, err := r.getGeneratorConfig(scenario)
generator, backfillGenerator, err := r.createGenerator(ctx, scenarioName, scenario)
if err != nil {
return err
}

fields, err := r.getGeneratorFields(ctx, scenario)
if err != nil {
return err
}
r.generators[scenarioName] = generator

tpl, err := r.getGeneratorTemplate(scenario)
if err != nil {
return err
if backfillGenerator != nil {
r.backFillGenerators[scenarioName] = backfillGenerator
}
}

genlib.InitGeneratorTimeNow(time.Now())
genlib.InitGeneratorRandSeed(time.Now().UnixNano())
return nil
}

generator, err := r.initializeGenerator(tpl, *config, fields, scenario, 0, 0)
if err != nil {
return err
}
func (r *runner) createGenerator(ctx context.Context, scenarioName string, scenario *scenario) (genlib.Generator, genlib.Generator, error) {
config, err := r.getGeneratorConfig(scenario)
if err != nil {
return nil, nil, fmt.Errorf("failed to obtain generator config for scenario %q: %w", scenarioName, err)
}

r.generators[scenarioName] = generator
fields, err := r.getGeneratorFields(ctx, scenario)
if err != nil {
return nil, nil, fmt.Errorf("failed to obtain fields from generator for scenario %q: %w", scenarioName, err)
}

if r.options.BackFill >= 0 {
continue
}
tpl, err := r.getGeneratorTemplate(scenario)
if err != nil {
return nil, nil, fmt.Errorf("failed to obtain template from for scenario %q: %w", scenarioName, err)
}

// backfill is a negative duration, make it positive, find how many periods in the backfill and multiply by events for periodk
totEvents := uint64((-1*r.options.BackFill)/r.options.PeriodDuration) * r.options.EventsPerPeriod
genlib.InitGeneratorTimeNow(time.Now())
genlib.InitGeneratorRandSeed(time.Now().UnixNano())

generator, err = r.initializeGenerator(tpl, *config, fields, scenario, r.options.BackFill, totEvents)
if err != nil {
return err
}
generator, err := r.initializeGenerator(tpl, *config, fields, scenario, 0, 0)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize backfill generator for scenario %q: %w", scenarioName, err)
}

r.backFillGenerators[scenarioName] = generator
if r.options.BackFill >= 0 {
return generator, nil, nil
}

return nil
// backfill is a negative duration, make it positive, find how many periods in the backfill and multiply by events for periodk
totEvents := uint64((-1*r.options.BackFill)/r.options.PeriodDuration) * r.options.EventsPerPeriod

backFillGenerator, err := r.initializeGenerator(tpl, *config, fields, scenario, r.options.BackFill, totEvents)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize backfill generator for scenario %q: %w", scenarioName, err)
}

return generator, backFillGenerator, nil
}

func (r *runner) getGeneratorConfig(scenario *scenario) (*config.Config, error) {
Expand Down
4 changes: 4 additions & 0 deletions internal/benchrunner/runners/stream/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func readScenarios(path, scenarioName, packageName, packageVersion string) (map[
}
scenarios[scenarioName] = scenario
} else {
if _, err := os.Stat(filepath.Join(path, devPath)); os.IsNotExist(err) {
// if the dev path doesn't exist, treat that as no scenarios found
return nil, nil
}
err := filepath.Walk(filepath.Join(path, devPath), func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
Expand Down
34 changes: 33 additions & 1 deletion internal/testrunner/runners/static/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"os"
"path/filepath"

"github.com/elastic/elastic-package/internal/benchrunner/runners/stream"
"github.com/elastic/elastic-package/internal/fields"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/packages"
"github.com/elastic/elastic-package/internal/signal"
"github.com/elastic/elastic-package/internal/testrunner"
)

Expand Down Expand Up @@ -72,7 +74,37 @@ func (r runner) run(ctx context.Context) ([]testrunner.TestResult, error) {
return result.WithError(fmt.Errorf("failed to read manifest: %w", err))
}

return r.verifySampleEvent(pkgManifest), nil
// join together results from verifyStreamConfig and verifySampleEvent
return append(r.verifyStreamConfig(ctx, r.options.PackageRootPath), r.verifySampleEvent(pkgManifest)...), nil
}

func (r runner) verifyStreamConfig(ctx context.Context, packageRootPath string) []testrunner.TestResult {
resultComposer := testrunner.NewResultComposer(testrunner.TestResult{
Name: "Verify benchmark config",
TestType: TestType,
Package: r.options.TestFolder.Package,
DataStream: r.options.TestFolder.DataStream,
})

withOpts := []stream.OptionFunc{
stream.WithPackageRootPath(packageRootPath),
}

ctx, stop := signal.Enable(ctx, logger.Info)
defer stop()

hasBenchmark, err := stream.StaticValidation(ctx, stream.NewOptions(withOpts...), r.options.TestFolder.DataStream)
if err != nil {
results, _ := resultComposer.WithError(err)
return results
}

if !hasBenchmark {
return []testrunner.TestResult{}
}

results, _ := resultComposer.WithSuccess()
return results
}

func (r runner) verifySampleEvent(pkgManifest *packages.PackageManifest) []testrunner.TestResult {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
description: Benchmark 20000 events ingested
data_stream:
name: testds
corpora:
generator:
total_events: 20000
template:
type: gotext
path: ./logs-benchmark/template.ndjson
config:
path: ./logs-benchmark/config.yml
fields:
path: ./logs-benchmark/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
fields:
- name: '@timestamp'
period: 1s
- name: container.id
- name: log.flags
type: keyword
- name: log.offset
cardinality: 10000
- name: tags
enum: ["production", "env2"]
- name: IP
cardinality: 100
- name: StatusCode
enum: ["200", "400", "404"]
- name: Size
range:
min: 1
max: 1000
- name: Port
range:
min: 8000
max: 8080
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
- name: timestamp
type: date
- name: container.id
type: keyword
- name: log.file.path
example: /var/log/fun-times.log
type: keyword
- name: log.flags
type: keyword
- name: log.offset
type: long
- name: tags
type: keyword
- name: IP
type: ip
- name: StatusCode
type: keyword
- name: Size
type: long
- name: Hostname
type: keyword
- name: Port
type: long
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{{- $timestamp := generate "timestamp" -}}
{
"@timestamp": "{{ $timestamp.Format "2006-01-02T15:04:05.999999Z07:00" }}",
"data_stream.type": "logs",
"data_stream.dataset": "rally_benchmarks.testds",
"data_stream.namespace": "ep",
"container.id": "{{ generate "container.id" }}",
"input.type": "filestream",
"log.file.path": "{{ generate "log.file.path" }}",
"log.flags": "{{ generate "log.flags" }}",
"log.offset": {{ generate "log.offset" }},
"tags": ["rally_benchmark.testds", "forwarded", "{{ generate "tags" }" ],
"message": "{{ generate "IP" }} - - [{{ $timestamp.Format "02/Jan/2006:15:04:05.999999 -0700" }}] \"GET /favicon.ico HTTP/1.1\" {{ generate "StatusCode" }} {{ generate "Size" }} \"http://{{ generate "Hostname" }}:{{ generate "Port" }}/\" \"skip-this-one/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# newer versions go on top
- version: "999.999.999"
changes:
- description: initial release
type: enhancement # can be one of: enhancement, bugfix, breaking-change
link: https://github.com/elastic/elastic-package/pull/1522
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.2.3.4 - - [25/Oct/2016:14:49:34 +0200] "GET /favicon.ico HTTP/1.1" 404 571 "http://localhost:8080/" "skip-this-one/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
num_docs: 10000
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
description: Pipeline for parsing Nginx access logs. Requires the geoip and user_agent
plugins.
processors:
- grok:
field: message
patterns:
- (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx.access.remote_ip_list}|%{NOTSPACE:source.address})
- (-|%{DATA:user.name}) \[%{HTTPDATE:nginx.access.time}\] "%{DATA:nginx.access.info}"
%{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long}
"(-|%{DATA:http.request.referrer})" "(-|%{DATA:user_agent.original})"
pattern_definitions:
NGINX_HOST: (?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})?
NGINX_NOTSEPARATOR: "[^\t ,:]+"
NGINX_ADDRESS_LIST: (?:%{IP}|%{WORD})("?,?\s*(?:%{IP}|%{WORD}))*
ignore_missing: true
- user_agent:
field: user_agent.original
ignore_missing: true
on_failure:
- set:
field: error.message
value: '{{ _ingest.on_failure_message }}'
Loading