diff --git a/.ci/Jenkinsfile b/.ci/Jenkinsfile index a3846901e5..b83d2fe49c 100644 --- a/.ci/Jenkinsfile +++ b/.ci/Jenkinsfile @@ -103,6 +103,7 @@ pipeline { 'check-packages-with-kind': generateTestCommandStage(command: 'test-check-packages-with-kind', artifacts: ['build/test-results/*.xml', 'build/kubectl-dump.txt', 'build/elastic-stack-dump/check-*/logs/*.log', 'build/elastic-stack-dump/check-*/logs/fleet-server-internal/*'], junitArtifacts: true, publishCoverage: true), 'check-packages-other': generateTestCommandStage(command: 'test-check-packages-other', artifacts: ['build/test-results/*.xml', 'build/elastic-stack-dump/check-*/logs/*.log', 'build/elastic-stack-dump/check-*/logs/fleet-server-internal/*'], junitArtifacts: true, publishCoverage: true), 'check-packages-with-custom-agent': generateTestCommandStage(command: 'test-check-packages-with-custom-agent', artifacts: ['build/test-results/*.xml', 'build/elastic-stack-dump/check-*/logs/*.log', 'build/elastic-stack-dump/check-*/logs/fleet-server-internal/*'], junitArtifacts: true, publishCoverage: true), + 'check-packages-benchmarks': generateTestCommandStage(command: 'test-check-packages-benchmarks', artifacts: ['build/test-results/*.xml', 'build/elastic-stack-dump/check-*/logs/*.log', 'build/elastic-stack-dump/check-*/logs/fleet-server-internal/*'], junitArtifacts: true, publishCoverage: false), 'build-zip': generateTestCommandStage(command: 'test-build-zip', artifacts: ['build/elastic-stack-dump/build-zip/logs/*.log', 'build/packages/*.sig']), 'profiles-command': generateTestCommandStage(command: 'test-profiles-command') ] diff --git a/Makefile b/Makefile index 031526ae3f..54dbdd8957 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,7 @@ test-stack-command-8x: test-stack-command: test-stack-command-default test-stack-command-7x test-stack-command-800 test-stack-command-8x -test-check-packages: test-check-packages-with-kind test-check-packages-other test-check-packages-parallel test-check-packages-with-custom-agent +test-check-packages: test-check-packages-with-kind test-check-packages-other test-check-packages-parallel test-check-packages-with-custom-agent test-check-packages-benchmarks test-check-packages-with-kind: PACKAGE_TEST_TYPE=with-kind ./scripts/test-check-packages.sh @@ -73,6 +73,9 @@ test-check-packages-with-kind: test-check-packages-other: PACKAGE_TEST_TYPE=other ./scripts/test-check-packages.sh +test-check-packages-benchmarks: + PACKAGE_TEST_TYPE=benchmarks ./scripts/test-check-packages.sh + test-check-packages-parallel: PACKAGE_TEST_TYPE=parallel ./scripts/test-check-packages.sh diff --git a/README.md b/README.md index 1c938b7189..7e17dd336c 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,18 @@ The command output shell completions information (for `bash`, `zsh`, `fish` and Run `elastic-package completion` and follow the instruction for your shell. +### `elastic-package benchmark` + +_Context: package_ + +Use this command to run benchmarks on a package. Currently, the following types of benchmarks are available: + +#### Pipeline Benchmarks + +These benchmarks allow you to benchmark any Ingest Node Pipelines defined by your packages. + +For details on how to configure pipeline benchmarks for a package, review the [HOWTO guide](./docs/howto/pipeline_benchmarking.md). + ### `elastic-package build` _Context: package_ diff --git a/cmd/benchmark.go b/cmd/benchmark.go new file mode 100644 index 0000000000..7b1d651dd9 --- /dev/null +++ b/cmd/benchmark.go @@ -0,0 +1,196 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + + "github.com/elastic/elastic-package/internal/benchrunner" + "github.com/elastic/elastic-package/internal/benchrunner/reporters/formats" + "github.com/elastic/elastic-package/internal/benchrunner/reporters/outputs" + _ "github.com/elastic/elastic-package/internal/benchrunner/runners" // register all benchmark runners + "github.com/elastic/elastic-package/internal/cobraext" + "github.com/elastic/elastic-package/internal/common" + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/packages" + "github.com/elastic/elastic-package/internal/signal" + "github.com/elastic/elastic-package/internal/testrunner" +) + +const benchLongDescription = `Use this command to run benchmarks on a package. Currently, the following types of benchmarks are available: + +#### Pipeline Benchmarks + +These benchmarks allow you to benchmark any Ingest Node Pipelines defined by your packages. + +For details on how to configure pipeline benchmarks for a package, review the [HOWTO guide](./docs/howto/pipeline_benchmarking.md).` + +func setupBenchmarkCommand() *cobraext.Command { + var benchTypeCmdActions []cobraext.CommandAction + + cmd := &cobra.Command{ + Use: "benchmark", + Short: "Run benchmarks for the package", + Long: benchLongDescription, + RunE: func(cmd *cobra.Command, args []string) error { + cmd.Println("Run benchmarks for the package") + + if len(args) > 0 { + return fmt.Errorf("unsupported benchmark type: %s", args[0]) + } + + return cobraext.ComposeCommandActions(cmd, args, benchTypeCmdActions...) + }} + + cmd.PersistentFlags().BoolP(cobraext.FailOnMissingFlagName, "m", false, cobraext.FailOnMissingFlagDescription) + cmd.PersistentFlags().StringP(cobraext.ReportFormatFlagName, "", string(formats.ReportFormatHuman), cobraext.ReportFormatFlagDescription) + cmd.PersistentFlags().StringP(cobraext.ReportOutputFlagName, "", string(outputs.ReportOutputSTDOUT), cobraext.ReportOutputFlagDescription) + cmd.PersistentFlags().BoolP(cobraext.BenchWithTestSamplesFlagName, "", true, cobraext.BenchWithTestSamplesFlagDescription) + cmd.PersistentFlags().IntP(cobraext.BenchNumTopProcsFlagName, "", 10, cobraext.BenchNumTopProcsFlagDescription) + cmd.PersistentFlags().StringSliceP(cobraext.DataStreamsFlagName, "", nil, cobraext.DataStreamsFlagDescription) + + for benchType, runner := range benchrunner.BenchRunners() { + action := benchTypeCommandActionFactory(runner) + benchTypeCmdActions = append(benchTypeCmdActions, action) + + benchTypeCmd := &cobra.Command{ + Use: string(benchType), + Short: fmt.Sprintf("Run %s benchmarks", runner.String()), + Long: fmt.Sprintf("Run %s benchmarks for the package.", runner.String()), + RunE: action, + } + + benchTypeCmd.Flags().StringSliceP(cobraext.DataStreamsFlagName, "d", nil, cobraext.DataStreamsFlagDescription) + + cmd.AddCommand(benchTypeCmd) + } + + return cobraext.NewCommand(cmd, cobraext.ContextPackage) +} + +func benchTypeCommandActionFactory(runner benchrunner.BenchRunner) cobraext.CommandAction { + benchType := runner.Type() + return func(cmd *cobra.Command, args []string) error { + cmd.Printf("Run %s benchmarks for the package\n", benchType) + + failOnMissing, err := cmd.Flags().GetBool(cobraext.FailOnMissingFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.FailOnMissingFlagName) + } + + reportFormat, err := cmd.Flags().GetString(cobraext.ReportFormatFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.ReportFormatFlagName) + } + + reportOutput, err := cmd.Flags().GetString(cobraext.ReportOutputFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.ReportOutputFlagName) + } + + useTestSamples, err := cmd.Flags().GetBool(cobraext.BenchWithTestSamplesFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.BenchWithTestSamplesFlagName) + } + + numTopProcs, err := cmd.Flags().GetInt(cobraext.BenchNumTopProcsFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.BenchNumTopProcsFlagName) + } + + packageRootPath, found, err := packages.FindPackageRoot() + if !found { + return errors.New("package root not found") + } + if err != nil { + return errors.Wrap(err, "locating package root failed") + } + + dataStreams, err := cmd.Flags().GetStringSlice(cobraext.DataStreamsFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.DataStreamsFlagName) + } + + if len(dataStreams) > 0 { + common.TrimStringSlice(dataStreams) + + if err := validateDataStreamsFlag(packageRootPath, dataStreams); err != nil { + return cobraext.FlagParsingError(err, cobraext.DataStreamsFlagName) + } + } + + signal.Enable() + + benchFolders, err := benchrunner.FindBenchmarkFolders(packageRootPath, dataStreams, benchType) + if err != nil { + return errors.Wrap(err, "unable to determine benchmark folder paths") + } + + if useTestSamples { + testFolders, err := testrunner.FindTestFolders(packageRootPath, dataStreams, testrunner.TestType(benchType)) + if err != nil { + return errors.Wrap(err, "unable to determine test folder paths") + } + benchFolders = append(benchFolders, testFolders...) + } + + if failOnMissing && len(benchFolders) == 0 { + if len(dataStreams) > 0 { + return fmt.Errorf("no %s benchmarks found for %s data stream(s)", benchType, strings.Join(dataStreams, ",")) + } + return fmt.Errorf("no %s benchmarks found", benchType) + } + + esClient, err := elasticsearch.Client() + if err != nil { + return errors.Wrap(err, "can't create Elasticsearch client") + } + + var results []*benchrunner.Result + for _, folder := range benchFolders { + r, err := benchrunner.Run(benchType, benchrunner.BenchOptions{ + Folder: folder, + PackageRootPath: packageRootPath, + API: esClient.API, + NumTopProcs: numTopProcs, + }) + + if err != nil { + return errors.Wrapf(err, "error running package %s benchmarks", benchType) + } + + results = append(results, r) + } + + format := benchrunner.BenchReportFormat(reportFormat) + benchReports, err := benchrunner.FormatReport(format, results) + if err != nil { + return errors.Wrap(err, "error formatting benchmark report") + } + + m, err := packages.ReadPackageManifestFromPackageRoot(packageRootPath) + if err != nil { + return errors.Wrapf(err, "reading package manifest failed (path: %s)", packageRootPath) + } + + for idx, report := range benchReports { + if err := benchrunner.WriteReport(fmt.Sprintf("%s-%d", m.Name, idx+1), benchrunner.BenchReportOutput(reportOutput), report, format); err != nil { + return errors.Wrap(err, "error writing benchmark report") + } + } + + // Check if there is any error or failure reported + for _, r := range results { + if r.ErrorMsg != "" { + return fmt.Errorf("one or more benchmarks failed: %v", r.ErrorMsg) + } + } + return nil + } +} diff --git a/cmd/root.go b/cmd/root.go index 0997b70eec..9cb7ba3902 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -15,6 +15,7 @@ import ( ) var commands = []*cobraext.Command{ + setupBenchmarkCommand(), setupBuildCommand(), setupChangelogCommand(), setupCheckCommand(), diff --git a/docs/howto/pipeline_benchmarking.md b/docs/howto/pipeline_benchmarking.md new file mode 100644 index 0000000000..362fa2f084 --- /dev/null +++ b/docs/howto/pipeline_benchmarking.md @@ -0,0 +1,182 @@ +# HOWTO: Writing pipeline benchmarks for a package + +## Introduction + +Elastic Packages are comprised of data streams. A pipeline benchmark exercises Elasticsearch Ingest Node pipelines defined for a package's data stream. + +## Conceptual process + +Conceptually, running a pipeline benchmark involves the following steps: + +1. Deploy the Elasticsearch instance (part of the Elastic Stack). This step takes time so it should typically be done once as a pre-requisite to running pipeline benchmarks on multiple data streams. +1. Upload ingest pipelines to be benchmarked. +1. Use [Simulate API](https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html) to process logs/metrics with the ingest pipeline. +1. Gather statistics of the involved processors and show them in a report. + +## Limitations + +At the moment pipeline benchmarks have limitations. The main ones are: +* As you're only benchmarking the ingest pipeline, you can prepare mocked documents with imaginary fields, different from ones collected in Beats. Also the other way round, you can skip most of the processors and as examples use tiny documents with minimal set of fields just to run the processing simulation. +* There might be integrations which transform data mostly using Beats processors instead of ingest pipelines. In such cases ingest pipeline benchmarks are rather plain. + +## Defining a pipeline benchmark + +Packages have a specific folder structure (only relevant parts shown). + +``` +/ + data_stream/ + / + manifest.yml + manifest.yml +``` + +To define a pipeline benchmark we must define configuration at each dataset's level: + +``` +/ + data_stream/ + / + _dev/ + benchmark/ + pipeline/ + (benchmark samples definitions, both raw files and input events, optional configuration) + manifest.yml + manifest.yml +``` + +### Benchmark definitions + +There are two types of benchmark samples definitions - **raw files** and **input events**. + +#### Raw files + +The raw files simplify preparing samples using real application `.log` files. A sample log (e.g. `access-sample.log`) file may look like the following one for Nginx: + +``` +127.0.0.1 - - [07/Dec/2016:11:04:37 +0100] "GET /test1 HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36" +127.0.0.1 - - [07/Dec/2016:11:04:58 +0100] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:49.0) Gecko/20100101 Firefox/49.0" +127.0.0.1 - - [07/Dec/2016:11:04:59 +0100] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:49.0) Gecko/20100101 Firefox/49.0" +``` + +#### Input events + +The input events contain mocked JSON events that are ready to be passed to the ingest pipeline as-is. Such events can be helpful in situations in which an input event can't be serialized to a standard log file, e.g. Redis input. A sample file with input events (e.g. `access-event.json`) looks as following: + +```json +{ + "events": [ + { + "@timestamp": "2016-10-25T12:49:34.000Z", + "message": "127.0.0.1 - - [07/Dec/2016:11:04:37 +0100] \"GET /test1 HTTP/1.1\" 404 571 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36\"\n" + }, + { + "@timestamp": "2016-10-25T12:49:34.000Z", + "message": "127.0.0.1 - - [07/Dec/2016:11:05:07 +0100] \"GET /taga HTTP/1.1\" 404 169 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:49.0) Gecko/20100101 Firefox/49.0\"\n" + } + ] +} +``` + +#### Benchmark configuration + +The benchmark execution can be customized to some extent using an optional configuration stored as a YAML file with the name `config.yml`: + +```yml +num_docs: 1000 +``` + +The `num_docs` option tells the benchmarks how many events should be sent with the simulation request. If not enough samples are provided, the events will be reused to generate a sufficient number of them. If not present it defaults to `1000`. + + +## Running a pipeline benchmark + +Once the configurations are defined as described in the previous section, you are ready to run pipeline benchmarks for a package's data streams. + +First you must deploy the Elasticsearch instance. This corresponds to step 1 as described in the [_Conceptual process_](#Conceptual-process) section. + +``` +elastic-package stack up -d --services=elasticsearch +``` + +For a complete listing of options available for this command, run `elastic-package stack up -h` or `elastic-package help stack up`. + +Next, you must set environment variables needed for further `elastic-package` commands. + +``` +$(elastic-package stack shellinit) +``` + +Next, you must invoke the pipeline benchmark runner. This corresponds to steps 2 through 4 as described in the [_Conceptual process_](#Conceptual-process) section. + +If you want to run pipeline benchmarks for **all data streams** in a package, navigate to the package's root folder (or any sub-folder under it) and run the following command. + +``` +elastic-package benchmark pipeline + +--- Benchmark results for package: windows-1 - START --- +╭───────────────────────────────╮ +│ parameters │ +├──────────────────┬────────────┤ +│ package │ windows │ +│ data_stream │ powershell │ +│ source doc count │ 6 │ +│ doc count │ 1000 │ +╰──────────────────┴────────────╯ +╭───────────────────────╮ +│ ingest performance │ +├─────────────┬─────────┤ +│ ingest time │ 0.23s │ +│ eps │ 4291.85 │ +╰─────────────┴─────────╯ +╭───────────────────────────────────╮ +│ processors by total time │ +├──────────────────────────┬────────┤ +│ kv @ default.yml:4 │ 12.02% │ +│ script @ default.yml:240 │ 7.73% │ +│ kv @ default.yml:13 │ 6.87% │ +│ set @ default.yml:44 │ 6.01% │ +│ script @ default.yml:318 │ 5.58% │ +│ date @ default.yml:34 │ 3.43% │ +│ script @ default.yml:397 │ 2.15% │ +│ remove @ default.yml:425 │ 2.15% │ +│ set @ default.yml:102 │ 1.72% │ +│ set @ default.yml:108 │ 1.29% │ +╰──────────────────────────┴────────╯ +╭─────────────────────────────────────╮ +│ processors by average time per doc │ +├──────────────────────────┬──────────┤ +│ kv @ default.yml:4 │ 56.112µs │ +│ script @ default.yml:240 │ 36.072µs │ +│ kv @ default.yml:13 │ 31.936µs │ +│ script @ default.yml:397 │ 29.94µs │ +│ set @ default.yml:44 │ 14µs │ +│ script @ default.yml:318 │ 13µs │ +│ date @ default.yml:34 │ 11.976µs │ +│ set @ default.yml:102 │ 8.016µs │ +│ append @ default.yml:114 │ 6.012µs │ +│ set @ default.yml:108 │ 6.012µs │ +╰──────────────────────────┴──────────╯ + +--- Benchmark results for package: windows-1 - END --- +Done + +``` + +If you want to run pipeline benchmarks for **specific data streams** in a package, navigate to the package's root folder (or any sub-folder under it) and run the following command. + +``` +elastic-package benchmark pipeline --data-streams [,,...] +``` + +By default, if the benchmark configuration is not present, it will run using any samples found in the data stream. You can disable this behavior disabling the `--use-test-samples` flag. + +``` +elastic-package benchmark pipeline -v --use-test-samples=false +``` + +Finally, when you are done running all benchmarks, bring down the Elastic Stack. This corresponds to step 4 as described in the [_Conceptual process_](#Conceptual-process) section. + +``` +elastic-package stack down +``` diff --git a/internal/benchrunner/benchmark.go b/internal/benchrunner/benchmark.go new file mode 100644 index 0000000000..7720bc419f --- /dev/null +++ b/internal/benchrunner/benchmark.go @@ -0,0 +1,76 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchrunner + +import ( + "fmt" +) + +// BenchmarkResult represents the result of a benchmark run. +// This is modeled after the xUnit benchmark schema. +// See https://github.com/Autodesk/jenkinsci-benchmark-plugin/blob/master/doc/EXAMPLE_SCHEMA_XML_DEFAULT.md +type BenchmarkResult struct { + // XMLName is a zero-length field used as an annotation for XML marshaling. + XMLName struct{} `xml:"group"` + + // Name of this benchmark run. + Name string `xml:"name,attr"` + + // Description of the benchmark run. + Description string `xml:"description,omitempty"` + + // Parameters used for this benchmark. + Parameters []BenchmarkValue `xml:"parameter"` + + // Tests holds the results for the benchmark. + Tests []BenchmarkTest `xml:"test"` +} + +// BenchmarkTest models a particular test performed during a benchmark. +type BenchmarkTest struct { + // Name of this test. + Name string `xml:"name,attr"` + // Detailed benchmark tests will be printed to the output but not + // included in xUnit reports. + Detailed bool `xml:"-"` + // Description of this test. + Description string `xml:"description,omitempty"` + // Parameters for this test. + Parameters []BenchmarkValue `xml:"parameter"` + // Results of the test. + Results []BenchmarkValue `xml:"result"` +} + +// BenchmarkValue represents a value (result or parameter) +// with an optional associated unit. +type BenchmarkValue struct { + // Name of the value. + Name string `xml:"name,attr"` + + // Description of the value. + Description string `xml:"description,omitempty"` + + // Unit used for this value. + Unit string `xml:"unit,omitempty"` + + // Value is of any type, usually string or numeric. + Value interface{} `xml:"value,omitempty"` +} + +// String returns a BenchmarkValue's value nicely-formatted. +func (p BenchmarkValue) String() (r string) { + if str, ok := p.Value.(fmt.Stringer); ok { + return str.String() + } + if float, ok := p.Value.(float64); ok { + r = fmt.Sprintf("%.02f", float) + } else { + r = fmt.Sprintf("%v", p.Value) + } + if p.Unit != "" { + r += p.Unit + } + return r +} diff --git a/internal/benchrunner/benchrunner.go b/internal/benchrunner/benchrunner.go new file mode 100644 index 0000000000..1e0ef86e8c --- /dev/null +++ b/internal/benchrunner/benchrunner.go @@ -0,0 +1,205 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchrunner + +import ( + "fmt" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/testrunner" +) + +// BenchType represents the various supported benchmark types +type BenchType string + +// BenchOptions contains benchmark runner options. +type BenchOptions struct { + Folder testrunner.TestFolder + PackageRootPath string + API *elasticsearch.API + NumTopProcs int +} + +// BenchRunner is the interface all benchmark runners must implement. +type BenchRunner interface { + // Type returns the benchmark runner's type. + Type() BenchType + + // String returns the human-friendly name of the benchmark runner. + String() string + + // Run executes the benchmark runner. + Run(BenchOptions) (*Result, error) + + // TearDown cleans up any benchmark runner resources. It must be called + // after the benchmark runner has finished executing. + TearDown() error +} + +var runners = map[BenchType]BenchRunner{} + +// Result contains a single benchmark's results +type Result struct { + // Package to which this benchmark result belongs. + Package string + + // BenchType indicates the type of benchmark. + BenchType BenchType + + // Data stream to which this benchmark result belongs. + DataStream string + + // Time elapsed from running a benchmark case to arriving at its result. + TimeElapsed time.Duration + + // If there was an error while running the benchmark case, description + // of the error. An error is when the benchmark cannot complete execution due + // to an unexpected runtime error in the benchmark execution. + ErrorMsg string + + // Benchmark results. + Benchmark *BenchmarkResult +} + +// ResultComposer wraps a Result and provides convenience methods for +// manipulating this Result. +type ResultComposer struct { + Result + StartTime time.Time +} + +// NewResultComposer returns a new ResultComposer with the StartTime +// initialized to now. +func NewResultComposer(tr Result) *ResultComposer { + return &ResultComposer{ + Result: tr, + StartTime: time.Now(), + } +} + +// WithError sets an error on the benchmark result wrapped by ResultComposer. +func (rc *ResultComposer) WithError(err error) ([]Result, error) { + rc.TimeElapsed = time.Since(rc.StartTime) + if err == nil { + return []Result{rc.Result}, nil + } + + rc.ErrorMsg += err.Error() + return []Result{rc.Result}, err +} + +// WithSuccess marks the benchmark result wrapped by ResultComposer as successful. +func (rc *ResultComposer) WithSuccess() ([]Result, error) { + return rc.WithError(nil) +} + +// FindBenchmarkFolders finds benchmark folders for the given package and, optionally, benchmark type and data streams +func FindBenchmarkFolders(packageRootPath string, dataStreams []string, benchType BenchType) ([]testrunner.TestFolder, error) { + // Expected folder structure: + // / + // data_stream/ + // / + // _dev/ + // benchmark/ + // / + + benchTypeGlob := "*" + if benchType != "" { + benchTypeGlob = string(benchType) + } + + var paths []string + if len(dataStreams) > 0 { + sort.Strings(dataStreams) + for _, dataStream := range dataStreams { + p, err := findBenchFolderPaths(packageRootPath, dataStream, benchTypeGlob) + if err != nil { + return nil, err + } + + paths = append(paths, p...) + } + } else { + p, err := findBenchFolderPaths(packageRootPath, "*", benchTypeGlob) + if err != nil { + return nil, err + } + + paths = p + } + + sort.Strings(dataStreams) + for _, dataStream := range dataStreams { + p, err := findBenchFolderPaths(packageRootPath, dataStream, benchTypeGlob) + if err != nil { + return nil, err + } + + paths = append(paths, p...) + } + + folders := make([]testrunner.TestFolder, len(paths)) + _, pkg := filepath.Split(packageRootPath) + for idx, p := range paths { + relP := strings.TrimPrefix(p, packageRootPath) + parts := strings.Split(relP, string(filepath.Separator)) + dataStream := parts[2] + + folder := testrunner.TestFolder{ + Path: p, + Package: pkg, + DataStream: dataStream, + } + + folders[idx] = folder + } + + return folders, nil +} + +// RegisterRunner method registers the benchmark runner. +func RegisterRunner(runner BenchRunner) { + runners[runner.Type()] = runner +} + +// Run method delegates execution to the registered benchmark runner, based on the benchmark type. +func Run(benchType BenchType, options BenchOptions) (*Result, error) { + runner, defined := runners[benchType] + if !defined { + return nil, fmt.Errorf("unregistered runner benchmark: %s", benchType) + } + + result, err := runner.Run(options) + tdErr := runner.TearDown() + if err != nil { + return nil, errors.Wrap(err, "could not complete benchmark run") + } + if tdErr != nil { + return result, errors.Wrap(err, "could not teardown benchmark runner") + } + return result, nil +} + +// BenchRunners returns registered benchmark runners. +func BenchRunners() map[BenchType]BenchRunner { + return runners +} + +// findBenchFoldersPaths can only be called for benchmark runners that require benchmarks to be defined +// at the data stream level. +func findBenchFolderPaths(packageRootPath, dataStreamGlob, benchTypeGlob string) ([]string, error) { + benchFoldersGlob := filepath.Join(packageRootPath, "data_stream", dataStreamGlob, "_dev", "benchmark", benchTypeGlob) + paths, err := filepath.Glob(benchFoldersGlob) + if err != nil { + return nil, errors.Wrap(err, "error finding benchmark folders") + } + return paths, err +} diff --git a/internal/benchrunner/report_format.go b/internal/benchrunner/report_format.go new file mode 100644 index 0000000000..816dcaa9ff --- /dev/null +++ b/internal/benchrunner/report_format.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchrunner + +import "fmt" + +// BenchReportFormat represents a benchmark report format +type BenchReportFormat string + +// ReportFormatFunc defines the report formatter function. +type ReportFormatFunc func(results []*Result) ([]string, error) + +var reportFormatters = map[BenchReportFormat]ReportFormatFunc{} + +// RegisterReporterFormat registers a benchmark report formatter. +func RegisterReporterFormat(name BenchReportFormat, formatFunc ReportFormatFunc) { + reportFormatters[name] = formatFunc +} + +// FormatReport delegates formatting of benchmark results to the registered benchmark report formatter. +func FormatReport(name BenchReportFormat, results []*Result) (benchmarkReports []string, err error) { + reportFunc, defined := reportFormatters[name] + if !defined { + return nil, fmt.Errorf("unregistered benchmark report format: %s", name) + } + + return reportFunc(results) +} diff --git a/internal/benchrunner/report_output.go b/internal/benchrunner/report_output.go new file mode 100644 index 0000000000..3f6db9b95f --- /dev/null +++ b/internal/benchrunner/report_output.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchrunner + +import ( + "fmt" +) + +// BenchReportOutput represents an output for a benchmark report +type BenchReportOutput string + +// ReportOutputFunc defines the report writer function. +type ReportOutputFunc func(pkg, report string, format BenchReportFormat) error + +var reportOutputs = map[BenchReportOutput]ReportOutputFunc{} + +// RegisterReporterOutput registers a benchmark report output. +func RegisterReporterOutput(name BenchReportOutput, outputFunc ReportOutputFunc) { + reportOutputs[name] = outputFunc +} + +// WriteReport delegates writing of benchmark results to the registered benchmark report output +func WriteReport(pkg string, name BenchReportOutput, report string, format BenchReportFormat) error { + outputFunc, defined := reportOutputs[name] + if !defined { + return fmt.Errorf("unregistered benchmark report output: %s", name) + } + return outputFunc(pkg, report, format) +} diff --git a/internal/benchrunner/reporters/formats/human.go b/internal/benchrunner/reporters/formats/human.go new file mode 100644 index 0000000000..272274bf99 --- /dev/null +++ b/internal/benchrunner/reporters/formats/human.go @@ -0,0 +1,73 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package formats + +import ( + "strings" + + "github.com/jedib0t/go-pretty/table" + "github.com/jedib0t/go-pretty/text" + + "github.com/elastic/elastic-package/internal/benchrunner" +) + +func init() { + benchrunner.RegisterReporterFormat(ReportFormatHuman, reportHumanFormat) +} + +const ( + // ReportFormatHuman reports benchmark results in a human-readable format + ReportFormatHuman benchrunner.BenchReportFormat = "human" +) + +func reportHumanFormat(results []*benchrunner.Result) ([]string, error) { + if len(results) == 0 { + return nil, nil + } + + var benchmarks []benchrunner.BenchmarkResult + for _, r := range results { + if r.Benchmark != nil { + benchmarks = append(benchmarks, *r.Benchmark) + } + } + + benchFormatted, err := reportHumanFormatBenchmark(benchmarks) + if err != nil { + return nil, err + } + return benchFormatted, nil +} + +func reportHumanFormatBenchmark(benchmarks []benchrunner.BenchmarkResult) ([]string, error) { + var textReports []string + for _, b := range benchmarks { + var report strings.Builder + if len(b.Parameters) > 0 { + report.WriteString(renderBenchmarkTable("parameters", b.Parameters) + "\n") + } + for _, t := range b.Tests { + report.WriteString(renderBenchmarkTable(t.Name, t.Results) + "\n") + } + textReports = append(textReports, report.String()) + } + return textReports, nil +} + +func renderBenchmarkTable(title string, values []benchrunner.BenchmarkValue) string { + t := table.NewWriter() + t.SetStyle(table.StyleRounded) + t.SetTitle(title) + t.SetColumnConfigs([]table.ColumnConfig{ + { + Number: 2, + Align: text.AlignRight, + }, + }) + for _, r := range values { + t.AppendRow(table.Row{r.Name, r.String()}) + } + return t.Render() +} diff --git a/internal/benchrunner/reporters/formats/xunit.go b/internal/benchrunner/reporters/formats/xunit.go new file mode 100644 index 0000000000..47cc2a9ef1 --- /dev/null +++ b/internal/benchrunner/reporters/formats/xunit.go @@ -0,0 +1,58 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package formats + +import ( + "encoding/xml" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/benchrunner" +) + +func init() { + benchrunner.RegisterReporterFormat(ReportFormatXUnit, reportXUnitFormat) +} + +const ( + // ReportFormatXUnit reports benchmark results in the xUnit format + ReportFormatXUnit benchrunner.BenchReportFormat = "xUnit" +) + +func reportXUnitFormat(results []*benchrunner.Result) ([]string, error) { + var benchmarks []*benchrunner.BenchmarkResult + for _, r := range results { + if r.Benchmark != nil { + benchmarks = append(benchmarks, r.Benchmark) + } + } + + benchFormatted, err := reportXUnitFormatBenchmark(benchmarks) + if err != nil { + return nil, err + } + return benchFormatted, nil +} + +func reportXUnitFormatBenchmark(benchmarks []*benchrunner.BenchmarkResult) ([]string, error) { + var reports []string + for _, b := range benchmarks { + // Filter out detailed benchmarks. These add too much information for the + // aggregated nature of xUnit reports, creating a lot of noise in Jenkins. + var benchmarks []benchrunner.BenchmarkTest + for _, t := range b.Tests { + if !t.Detailed { + benchmarks = append(benchmarks, t) + } + } + b.Tests = benchmarks + out, err := xml.MarshalIndent(b, "", " ") + if err != nil { + return nil, errors.Wrap(err, "unable to format benchmark results as xUnit") + } + reports = append(reports, xml.Header+string(out)) + } + return reports, nil +} diff --git a/internal/benchrunner/reporters/outputs/file.go b/internal/benchrunner/reporters/outputs/file.go new file mode 100644 index 0000000000..6e8656190f --- /dev/null +++ b/internal/benchrunner/reporters/outputs/file.go @@ -0,0 +1,65 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package outputs + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/benchrunner" + "github.com/elastic/elastic-package/internal/benchrunner/reporters/formats" + "github.com/elastic/elastic-package/internal/builder" +) + +func init() { + benchrunner.RegisterReporterOutput(ReportOutputFile, reportToFile) +} + +const ( + // ReportOutputFile reports benchmark results to files in a folder + ReportOutputFile benchrunner.BenchReportOutput = "file" +) + +func reportToFile(pkg, report string, format benchrunner.BenchReportFormat) error { + dest, err := reportsDir() + if err != nil { + return errors.Wrap(err, "could not determine benchmark reports folder") + } + + // Create benchmark reports folder if it doesn't exist + _, err = os.Stat(dest) + if err != nil && errors.Is(err, os.ErrNotExist) { + if err := os.MkdirAll(dest, 0755); err != nil { + return errors.Wrapf(err, "could not create benchmark reports folder") + } + } + + ext := "txt" + if format == formats.ReportFormatXUnit { + ext = "xml" + } + fileName := fmt.Sprintf("%s_%d.%s", pkg, time.Now().UnixNano(), ext) + filePath := filepath.Join(dest, fileName) + + if err := os.WriteFile(filePath, []byte(report+"\n"), 0644); err != nil { + return errors.Wrapf(err, "could not write benchmark report file") + } + + return nil +} + +// reportsDir returns the location of the directory to store reports. +func reportsDir() (string, error) { + buildDir, err := builder.BuildDirectory() + if err != nil { + return "", errors.Wrap(err, "locating build directory failed") + } + const folder = "benchmark-results" + return filepath.Join(buildDir, folder), nil +} diff --git a/internal/benchrunner/reporters/outputs/stdout.go b/internal/benchrunner/reporters/outputs/stdout.go new file mode 100644 index 0000000000..b0fb25bf72 --- /dev/null +++ b/internal/benchrunner/reporters/outputs/stdout.go @@ -0,0 +1,28 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package outputs + +import ( + "fmt" + + "github.com/elastic/elastic-package/internal/benchrunner" +) + +func init() { + benchrunner.RegisterReporterOutput(ReportOutputSTDOUT, reportToSTDOUT) +} + +const ( + // ReportOutputSTDOUT reports benchmark results to STDOUT + ReportOutputSTDOUT benchrunner.BenchReportOutput = "stdout" +) + +func reportToSTDOUT(pkg, report string, _ benchrunner.BenchReportFormat) error { + fmt.Printf("--- Benchmark results for package: %s - START ---\n", pkg) + fmt.Println(report) + fmt.Printf("--- Benchmark results for package: %s - END ---\n", pkg) + fmt.Println("Done") + return nil +} diff --git a/internal/benchrunner/runners/pipeline/benchmark.go b/internal/benchrunner/runners/pipeline/benchmark.go new file mode 100644 index 0000000000..752fa528c8 --- /dev/null +++ b/internal/benchrunner/runners/pipeline/benchmark.go @@ -0,0 +1,287 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "encoding/json" + "fmt" + "sort" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/benchrunner" + "github.com/elastic/elastic-package/internal/elasticsearch/ingest" +) + +func (r *runner) benchmarkPipeline(b *benchmark, entryPipeline string) (*benchrunner.BenchmarkResult, error) { + // Run benchmark + bench, err := r.benchmarkIngest(b, entryPipeline) + if err != nil { + return nil, errors.Wrap(err, "failed running benchmark") + } + + // Extract performance measurements + processorKey := func(pipeline ingest.Pipeline, processor ingest.Processor) string { + // Don't want to use pipeline processors time in benchmark, as they + // aggregate the time of all the processors in their pipeline. + if processor.Type == "pipeline" { + return "" + } + return fmt.Sprintf("%s @ %s:%d", processor.Type, pipeline.Filename(), processor.FirstLine) + } + byAbsoluteTime := func(record ingest.StatsRecord) int64 { + return record.TimeInMillis * int64(time.Millisecond) + } + byRelativeTime := func(record ingest.StatsRecord) int64 { + if record.Count == 0 { + return 0 + } + return record.TimeInMillis * int64(time.Millisecond) / record.Count + } + asPercentageOfTotalTime := func(perf processorPerformance) benchrunner.BenchmarkValue { + return benchrunner.BenchmarkValue{ + Name: perf.key, + Description: perf.key, + Unit: "%", + Value: time.Duration(perf.value).Seconds() * 100 / bench.elapsed.Seconds(), + } + } + asDuration := func(perf processorPerformance) benchrunner.BenchmarkValue { + return benchrunner.BenchmarkValue{ + Name: perf.key, + Description: perf.key, + Value: time.Duration(perf.value), + } + } + nonZero := func(p processorPerformance) bool { + // This removes pipeline processors (marked with key="") and zero values. + return p.key != "" && p.value != 0 + } + + topAbsProc, err := bench. + aggregate(processorKey, byAbsoluteTime). + filter(nonZero). + sort(descending). + top(r.options.NumTopProcs). + collect(asPercentageOfTotalTime) + if err != nil { + return nil, err + } + + topRelProcs, err := bench. + aggregate(processorKey, byRelativeTime). + filter(nonZero). + sort(descending). + top(r.options.NumTopProcs). + collect(asDuration) + if err != nil { + return nil, err + } + + // Build result + result := &benchrunner.BenchmarkResult{ + Name: fmt.Sprintf("pipeline benchmark for %s/%s", r.options.Folder.Package, r.options.Folder.DataStream), + Parameters: []benchrunner.BenchmarkValue{ + { + Name: "package", + Value: r.options.Folder.Package, + }, + { + Name: "data_stream", + Value: r.options.Folder.DataStream, + }, + { + Name: "source doc count", + Value: len(b.events), + }, + { + Name: "doc count", + Value: bench.numDocs, + }, + }, + Tests: []benchrunner.BenchmarkTest{ + { + Name: "ingest performance", + Results: []benchrunner.BenchmarkValue{ + { + Name: "ingest time", + Description: "time elapsed in ingest processors", + Value: bench.elapsed.Seconds(), + Unit: "s", + }, + { + Name: "eps", + Description: "ingested events per second", + Value: float64(bench.numDocs) / bench.elapsed.Seconds(), + }, + }, + }, + { + Name: "processors by total time", + Detailed: true, + Description: fmt.Sprintf("top %d processors by time spent", r.options.NumTopProcs), + Results: topAbsProc, + }, + { + Name: "processors by average time per doc", + Detailed: true, + Description: fmt.Sprintf("top %d processors by average time per document", r.options.NumTopProcs), + Results: topRelProcs, + }, + }, + } + + return result, nil +} + +type ingestResult struct { + pipelines []ingest.Pipeline + stats ingest.PipelineStatsMap + elapsed time.Duration + numDocs int +} + +func (r *runner) benchmarkIngest(b *benchmark, entryPipeline string) (ingestResult, error) { + baseDocs := resizeDocs(b.events, b.config.NumDocs) + return r.runSingleBenchmark(entryPipeline, baseDocs) +} + +type processorPerformance struct { + key string + value int64 +} + +type aggregation struct { + result []processorPerformance + err error +} + +type ( + keyFn func(ingest.Pipeline, ingest.Processor) string + valueFn func(record ingest.StatsRecord) int64 + mapFn func(processorPerformance) benchrunner.BenchmarkValue + compareFn func(a, b processorPerformance) bool + filterFn func(processorPerformance) bool +) + +func (ir ingestResult) aggregate(key keyFn, value valueFn) (agg aggregation) { + pipelines := make(map[string]ingest.Pipeline, len(ir.pipelines)) + for _, p := range ir.pipelines { + pipelines[p.Name] = p + } + + for pipelineName, pipelineStats := range ir.stats { + pipeline, ok := pipelines[pipelineName] + if !ok { + return aggregation{err: fmt.Errorf("unexpected pipeline '%s'", pipelineName)} + } + processors, err := pipeline.Processors() + if err != nil { + return aggregation{err: err} + } + if nSrc, nStats := len(processors), len(pipelineStats.Processors); nSrc != nStats { + return aggregation{err: fmt.Errorf("pipeline '%s' processor count mismatch. source=%d stats=%d", pipelineName, nSrc, nStats)} + } + for procId, procStats := range pipelineStats.Processors { + agg.result = append(agg.result, processorPerformance{ + key: key(pipeline, processors[procId]), + value: value(procStats.Stats), + }) + } + } + return agg +} + +func (agg aggregation) sort(compare compareFn) aggregation { + if agg.err != nil { + return agg + } + sort.Slice(agg.result, func(i, j int) bool { + return compare(agg.result[i], agg.result[j]) + }) + return agg +} + +func ascending(a, b processorPerformance) bool { + return a.value < b.value +} + +func descending(a, b processorPerformance) bool { + return !ascending(a, b) +} + +func (agg aggregation) top(n int) aggregation { + if n < len(agg.result) { + agg.result = agg.result[:n] + } + return agg +} + +func (agg aggregation) filter(keep filterFn) aggregation { + if agg.err != nil { + return agg + } + o := 0 + for _, entry := range agg.result { + if keep(entry) { + agg.result[o] = entry + o++ + } + } + agg.result = agg.result[:o] + return agg +} + +func (agg aggregation) collect(fn mapFn) ([]benchrunner.BenchmarkValue, error) { + if agg.err != nil { + return nil, agg.err + } + r := make([]benchrunner.BenchmarkValue, len(agg.result)) + for idx := range r { + r[idx] = fn(agg.result[idx]) + } + return r, nil +} + +func (r *runner) runSingleBenchmark(entryPipeline string, docs []json.RawMessage) (ingestResult, error) { + if len(docs) == 0 { + return ingestResult{}, errors.New("no docs supplied for benchmark") + } + + if _, err := ingest.SimulatePipeline(r.options.API, entryPipeline, docs); err != nil { + return ingestResult{}, errors.Wrap(err, "simulate failed") + } + + stats, err := ingest.GetPipelineStats(r.options.API, r.pipelines) + if err != nil { + return ingestResult{}, errors.Wrap(err, "error fetching pipeline stats") + } + var took time.Duration + for _, pSt := range stats { + took += time.Millisecond * time.Duration(pSt.TimeInMillis) + } + return ingestResult{ + pipelines: r.pipelines, + stats: stats, + elapsed: took, + numDocs: len(docs), + }, nil +} + +func resizeDocs(inputDocs []json.RawMessage, want int) []json.RawMessage { + n := len(inputDocs) + if n == 0 { + return nil + } + if want == 0 { + want = 1 + } + result := make([]json.RawMessage, want) + for i := 0; i < want; i++ { + result[i] = inputDocs[i%n] + } + return result +} diff --git a/internal/benchrunner/runners/pipeline/config.go b/internal/benchrunner/runners/pipeline/config.go new file mode 100644 index 0000000000..e980b61b50 --- /dev/null +++ b/internal/benchrunner/runners/pipeline/config.go @@ -0,0 +1,44 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "os" + "path/filepath" + + "github.com/elastic/go-ucfg/yaml" + "github.com/pkg/errors" +) + +const ( + configYAML = "config.yml" +) + +type config struct { + NumDocs int `config:"num_docs"` +} + +func defaultConfig() *config { + return &config{ + NumDocs: 1000, + } +} + +func readConfig(path string) (*config, error) { + configPath := filepath.Join(path, configYAML) + c := defaultConfig() + cfg, err := yaml.NewConfigWithFile(configPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, errors.Wrapf(err, "can't load common configuration: %s", configPath) + } + + if err == nil { + if err := cfg.Unpack(c); err != nil { + return nil, errors.Wrapf(err, "can't unpack benchmark configuration: %s", configPath) + } + } + + return c, nil +} diff --git a/internal/benchrunner/runners/pipeline/runner.go b/internal/benchrunner/runners/pipeline/runner.go new file mode 100644 index 0000000000..3feb28e224 --- /dev/null +++ b/internal/benchrunner/runners/pipeline/runner.go @@ -0,0 +1,162 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/benchrunner" + "github.com/elastic/elastic-package/internal/elasticsearch/ingest" + "github.com/elastic/elastic-package/internal/packages" +) + +const ( + // BenchType defining pipeline benchmarks. + BenchType benchrunner.BenchType = "pipeline" + + expectedTestResultSuffix = "-expected.json" + configTestSuffixYAML = "-config.yml" +) + +type runner struct { + options benchrunner.BenchOptions + pipelines []ingest.Pipeline +} + +// Type returns the type of benchmark that can be run by this benchmark runner. +func (r *runner) Type() benchrunner.BenchType { + return BenchType +} + +// String returns the human-friendly name of the benchmark runner. +func (r *runner) String() string { + return "pipeline" +} + +// Run runs the pipeline benchmarks defined under the given folder +func (r *runner) Run(options benchrunner.BenchOptions) (*benchrunner.Result, error) { + r.options = options + return r.run() +} + +// TearDown shuts down the pipeline benchmark runner. +func (r *runner) TearDown() error { + if err := ingest.UninstallPipelines(r.options.API, r.pipelines); err != nil { + return errors.Wrap(err, "uninstalling ingest pipelines failed") + } + return nil +} + +func (r *runner) run() (*benchrunner.Result, error) { + dataStreamPath, found, err := packages.FindDataStreamRootForPath(r.options.Folder.Path) + if err != nil { + return nil, errors.Wrap(err, "locating data_stream root failed") + } + if !found { + return nil, errors.New("data stream root not found") + } + + var entryPipeline string + entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(r.options.API, dataStreamPath) + if err != nil { + return nil, errors.Wrap(err, "installing ingest pipelines failed") + } + + start := time.Now() + result := &benchrunner.Result{ + BenchType: BenchType + " benchmark", + Package: r.options.Folder.Package, + DataStream: r.options.Folder.DataStream, + } + + b, err := r.loadBenchmark() + if err != nil { + return nil, errors.Wrap(err, "loading benchmark failed") + } + + if result.Benchmark, err = r.benchmarkPipeline(b, entryPipeline); err != nil { + result.ErrorMsg = err.Error() + } + + result.TimeElapsed = time.Since(start) + + return result, nil +} + +func (r *runner) listBenchmarkFiles() ([]string, error) { + fis, err := os.ReadDir(r.options.Folder.Path) + if err != nil { + return nil, errors.Wrapf(err, "reading pipeline benchmarks failed (path: %s)", r.options.Folder.Path) + } + + var files []string + for _, fi := range fis { + if fi.Name() == configYAML || + // since pipeline tests might be included we need to + // exclude the expected and config files for them + strings.HasSuffix(fi.Name(), expectedTestResultSuffix) || + strings.HasSuffix(fi.Name(), configTestSuffixYAML) { + continue + } + files = append(files, fi.Name()) + } + return files, nil +} + +func (r *runner) loadBenchmark() (*benchmark, error) { + benchFiles, err := r.listBenchmarkFiles() + if err != nil { + return nil, err + } + + var allEntries []json.RawMessage + for _, benchFile := range benchFiles { + benchPath := filepath.Join(r.options.Folder.Path, benchFile) + benchData, err := os.ReadFile(benchPath) + if err != nil { + return nil, errors.Wrapf(err, "reading input file failed (benchPath: %s)", benchPath) + } + + ext := filepath.Ext(benchFile) + var entries []json.RawMessage + switch ext { + case ".json": + entries, err = readBenchmarkEntriesForEvents(benchData) + if err != nil { + return nil, errors.Wrapf(err, "reading benchmark case entries for events failed (benchmarkPath: %s)", benchPath) + } + case ".log": + entries, err = readBenchmarkEntriesForRawInput(benchData) + if err != nil { + return nil, errors.Wrapf(err, "creating benchmark case entries for raw input failed (benchmarkPath: %s)", benchPath) + } + default: + return nil, fmt.Errorf("unsupported extension for benchmark case file (ext: %s)", ext) + } + allEntries = append(allEntries, entries...) + } + + config, err := readConfig(r.options.Folder.Path) + if err != nil { + return nil, errors.Wrapf(err, "reading config for benchmark failed (benchPath: %s)", r.options.Folder.Path) + } + + tc, err := createBenchmark(allEntries, config) + if err != nil { + return nil, errors.Wrapf(err, "can't create benchmark case (benchmarkPath: %s)", r.options.Folder.Path) + } + return tc, nil +} + +func init() { + benchrunner.RegisterRunner(&runner{}) +} diff --git a/internal/benchrunner/runners/pipeline/runner_test.go b/internal/benchrunner/runners/pipeline/runner_test.go new file mode 100644 index 0000000000..9900bc87b6 --- /dev/null +++ b/internal/benchrunner/runners/pipeline/runner_test.go @@ -0,0 +1,117 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "encoding/json" + "fmt" + "strings" + "testing" +) + +var jsonUnmarshalUsingNumberTests = []struct { + name string + msg string +}{ + { + name: "empty", + msg: "", // Will error "unexpected end of JSON input". + }, + { + name: "string", + msg: `"message"`, + }, + { + name: "array", + msg: "[1,2,3,4,5]", + }, + { + name: "object", + msg: `{"key":42}`, + }, + { + name: "object", + msg: `{"key":42}answer`, // Will error "invalid character 'a' after top-level value". + }, + // Test extra data whitespace parity with json.Unmarshal for error parity. + { + name: "object", + msg: `{"key":42} `, + }, + { + name: "object", + msg: `{"key":42}` + "\t", + }, + { + name: "object", + msg: `{"key":42}` + "\r", + }, + { + name: "object", + msg: `{"key":42}` + "\n", + }, + { + name: "0x1p52+1", + msg: fmt.Sprint(uint64(0x1p52) + 1), + }, + { + name: "0x1p53-1", + msg: fmt.Sprint(uint64(0x1p53) - 1), + }, + // The following three cases will fail if json.Unmarshal is used in place + // of jsonUnmarshalUsingNumber, as they are past the cutover. + { + name: "0x1p53+1", + msg: fmt.Sprint(uint64(0x1p53) + 1), + }, + { + name: "0x1p54+1", + msg: fmt.Sprint(uint64(0x1p54) + 1), + }, + { + name: "long", + msg: "9223372036854773807", + }, +} + +func TestJsonUnmarshalUsingNumberRoundTrip(t *testing.T) { + // This tests that jsonUnmarshalUsingNumber behaves the same + // way as json.Unmarshal with the exception that numbers are + // not unmarshaled through float64. This is important to avoid + // low-bit truncation of long numeric values that are greater + // than or equal to 0x1p53, the limit of bijective equivalence + // with 64 bit-integers. + + for _, test := range jsonUnmarshalUsingNumberTests { + t.Run(test.name, func(t *testing.T) { + var val interface{} + err := jsonUnmarshalUsingNumber([]byte(test.msg), &val) + + // Confirm that we get the same errors with jsonUnmarshalUsingNumber + // as are returned by json.Unmarshal. + jerr := json.Unmarshal([]byte(test.msg), new(interface{})) + if (err == nil) != (jerr == nil) { + t.Errorf("unexpected error: got:%#v want:%#v", err, jerr) + } + if err != nil { + return + } + + // Confirm that we round-trip the message correctly without + // alteration beyond trailing whitespace. + got, err := json.Marshal(val) + if err != nil { + t.Errorf("unexpected error: got:%#v want:%#v", err, jerr) + } + // Truncate trailing whitespace from the input since it won't + // be rendered in the output. This set of space characters is + // defined in encoding/json/scanner.go as func isSpace. + want := strings.TrimRight(test.msg, " \t\r\n") + if string(got) != want { + t.Errorf("unexpected result: got:%v want:%v", val, want) + } + }) + } +} diff --git a/internal/benchrunner/runners/pipeline/test_case.go b/internal/benchrunner/runners/pipeline/test_case.go new file mode 100644 index 0000000000..bb8117568c --- /dev/null +++ b/internal/benchrunner/runners/pipeline/test_case.go @@ -0,0 +1,120 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "strings" + + "github.com/elastic/elastic-package/internal/common" + + "github.com/pkg/errors" +) + +type benchmark struct { + events []json.RawMessage + config *config +} + +type benchmarkDefinition struct { + Events []json.RawMessage `json:"events"` +} + +func readBenchmarkEntriesForEvents(inputData []byte) ([]json.RawMessage, error) { + var tcd benchmarkDefinition + err := jsonUnmarshalUsingNumber(inputData, &tcd) + if err != nil { + return nil, errors.Wrap(err, "unmarshalling input data failed") + } + return tcd.Events, nil +} + +func readBenchmarkEntriesForRawInput(inputData []byte) ([]json.RawMessage, error) { + entries, err := readRawInputEntries(inputData) + if err != nil { + return nil, errors.Wrap(err, "reading raw input entries failed") + } + + var events []json.RawMessage + for _, entry := range entries { + event := map[string]interface{}{} + event["message"] = entry + + m, err := json.Marshal(&event) + if err != nil { + return nil, errors.Wrap(err, "marshalling mocked event failed") + } + events = append(events, m) + } + return events, nil +} + +func createBenchmark(entries []json.RawMessage, config *config) (*benchmark, error) { + var events []json.RawMessage + for _, entry := range entries { + var m common.MapStr + err := jsonUnmarshalUsingNumber(entry, &m) + if err != nil { + return nil, errors.Wrap(err, "can't unmarshal benchmark entry") + } + + event, err := json.Marshal(&m) + if err != nil { + return nil, errors.Wrap(err, "marshalling event failed") + } + events = append(events, event) + } + return &benchmark{ + events: events, + config: config, + }, nil +} + +func readRawInputEntries(inputData []byte) ([]string, error) { + var inputDataEntries []string + + var builder strings.Builder + scanner := bufio.NewScanner(bytes.NewReader(inputData)) + for scanner.Scan() { + line := scanner.Text() + inputDataEntries = append(inputDataEntries, line) + } + err := scanner.Err() + if err != nil { + return nil, errors.Wrap(err, "reading raw input benchmark file failed") + } + + lastEntry := builder.String() + if len(lastEntry) > 0 { + inputDataEntries = append(inputDataEntries, lastEntry) + } + return inputDataEntries, nil +} + +// jsonUnmarshalUsingNumber is a drop-in replacement for json.Unmarshal that +// does not default to unmarshaling numeric values to float64 in order to +// prevent low bit truncation of values greater than 1<<53. +// See https://golang.org/cl/6202068 for details. +func jsonUnmarshalUsingNumber(data []byte, v interface{}) error { + dec := json.NewDecoder(bytes.NewReader(data)) + dec.UseNumber() + err := dec.Decode(v) + if err != nil { + if err == io.EOF { + return errors.New("unexpected end of JSON input") + } + return err + } + // Make sure there is no more data after the message + // to approximate json.Unmarshal's behaviour. + if dec.More() { + return fmt.Errorf("more data after top-level value") + } + return nil +} diff --git a/internal/benchrunner/runners/runners.go b/internal/benchrunner/runners/runners.go new file mode 100644 index 0000000000..1fc2f6b067 --- /dev/null +++ b/internal/benchrunner/runners/runners.go @@ -0,0 +1,10 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runners + +import ( + // Registered benchmark runners + _ "github.com/elastic/elastic-package/internal/benchrunner/runners/pipeline" +) diff --git a/internal/cobraext/flags.go b/internal/cobraext/flags.go index abc1e1f0ed..0a19277119 100644 --- a/internal/cobraext/flags.go +++ b/internal/cobraext/flags.go @@ -26,6 +26,12 @@ const ( AgentPolicyFlagName = "agent-policy" AgentPolicyDescription = "name of the agent policy" + BenchNumTopProcsFlagName = "num-top-procs" + BenchNumTopProcsFlagDescription = "number of top processors to show in the benchmarks results" + + BenchWithTestSamplesFlagName = "use-test-samples" + BenchWithTestSamplesFlagDescription = "use test samples for the benchmarks" + BuildSkipValidationFlagName = "skip-validation" BuildSkipValidationFlagDescription = "skip validation of the built package, use only if all validation issues have been acknowledged" diff --git a/internal/testrunner/runners/pipeline/ingest_pipeline.go b/internal/elasticsearch/ingest/datastream.go similarity index 56% rename from internal/testrunner/runners/pipeline/ingest_pipeline.go rename to internal/elasticsearch/ingest/datastream.go index c70ebdc967..e2283978c5 100644 --- a/internal/testrunner/runners/pipeline/ingest_pipeline.go +++ b/internal/elasticsearch/ingest/datastream.go @@ -2,11 +2,10 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package pipeline +package ingest import ( "bytes" - "encoding/json" "fmt" "io" "log" @@ -20,29 +19,12 @@ import ( "github.com/pkg/errors" "github.com/elastic/elastic-package/internal/elasticsearch" - "github.com/elastic/elastic-package/internal/elasticsearch/ingest" "github.com/elastic/elastic-package/internal/packages" ) var ingestPipelineTag = regexp.MustCompile(`{{\s*IngestPipeline.+}}`) -type simulatePipelineRequest struct { - Docs []pipelineDocument `json:"docs"` -} - -type pipelineDocument struct { - Source json.RawMessage `json:"_source"` -} - -type simulatePipelineResponse struct { - Docs []pipelineIngestedDocument `json:"docs"` -} - -type pipelineIngestedDocument struct { - Doc pipelineDocument `json:"doc"` -} - -func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (string, []ingest.Pipeline, error) { +func InstallDataStreamPipelines(api *elasticsearch.API, dataStreamPath string) (string, []Pipeline, error) { dataStreamManifest, err := packages.ReadDataStreamManifest(filepath.Join(dataStreamPath, packages.DataStreamManifestFile)) if err != nil { return "", nil, errors.Wrap(err, "reading data stream manifest failed") @@ -50,7 +32,7 @@ func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (stri nonce := time.Now().UnixNano() - mainPipeline := getWithPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce) + mainPipeline := getPipelineNameWithNonce(dataStreamManifest.GetPipelineNameOrDefault(), nonce) pipelines, err := loadIngestPipelineFiles(dataStreamPath, nonce) if err != nil { return "", nil, errors.Wrap(err, "loading ingest pipeline files failed") @@ -63,7 +45,7 @@ func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (stri return mainPipeline, pipelines, nil } -func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]ingest.Pipeline, error) { +func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, error) { elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline") var pipelineFiles []string @@ -75,7 +57,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]ingest.Pipel pipelineFiles = append(pipelineFiles, files...) } - var pipelines []ingest.Pipeline + var pipelines []Pipeline for _, path := range pipelineFiles { c, err := os.ReadFile(path) if err != nil { @@ -88,12 +70,12 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]ingest.Pipel log.Fatalf("invalid IngestPipeline tag in template (path: %s)", path) } pipelineTag := s[1] - return []byte(getWithPipelineNameWithNonce(pipelineTag, nonce)) + return []byte(getPipelineNameWithNonce(pipelineTag, nonce)) }) name := filepath.Base(path) - pipelines = append(pipelines, ingest.Pipeline{ + pipelines = append(pipelines, Pipeline{ Path: path, - Name: getWithPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce), + Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce), Format: filepath.Ext(path)[1:], Content: c, }) @@ -101,7 +83,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]ingest.Pipel return pipelines, nil } -func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []ingest.Pipeline) error { +func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []Pipeline) error { for _, p := range pipelines { if err := installPipeline(api, p); err != nil { return err @@ -110,7 +92,7 @@ func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []ingest. return nil } -func pipelineError(err error, pipeline ingest.Pipeline, format string, args ...interface{}) error { +func pipelineError(err error, pipeline Pipeline, format string, args ...interface{}) error { context := "pipelineName: " + pipeline.Name if pipeline.Path != "" { context += ", path: " + pipeline.Path @@ -119,7 +101,7 @@ func pipelineError(err error, pipeline ingest.Pipeline, format string, args ...i return errors.Wrapf(err, format+" ("+context+")", args...) } -func installPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error { +func installPipeline(api *elasticsearch.API, pipeline Pipeline) error { if err := putIngestPipeline(api, pipeline); err != nil { return err } @@ -127,7 +109,7 @@ func installPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error { return getIngestPipeline(api, pipeline) } -func putIngestPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error { +func putIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error { source, err := pipeline.MarshalJSON() if err != nil { return err @@ -151,7 +133,7 @@ func putIngestPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error { return nil } -func getIngestPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error { +func getIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error { r, err := api.Ingest.GetPipeline(func(request *elasticsearch.IngestGetPipelineRequest) { request.PipelineID = pipeline.Name }) @@ -173,60 +155,6 @@ func getIngestPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error { return nil } -func uninstallIngestPipelines(api *elasticsearch.API, pipelines []ingest.Pipeline) error { - for _, pipeline := range pipelines { - resp, err := api.Ingest.DeletePipeline(pipeline.Name) - if err != nil { - return pipelineError(err, pipeline, "DeletePipeline API call failed") - } - resp.Body.Close() - } - return nil -} - -func getWithPipelineNameWithNonce(pipelineName string, nonce int64) string { +func getPipelineNameWithNonce(pipelineName string, nonce int64) string { return fmt.Sprintf("%s-%d", pipelineName, nonce) } - -func simulatePipelineProcessing(api *elasticsearch.API, pipelineName string, tc *testCase) (*testResult, error) { - var request simulatePipelineRequest - for _, event := range tc.events { - request.Docs = append(request.Docs, pipelineDocument{ - Source: event, - }) - } - - requestBody, err := json.Marshal(&request) - if err != nil { - return nil, errors.Wrap(err, "marshalling simulate request failed") - } - - r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), func(request *elasticsearch.IngestSimulateRequest) { - request.PipelineID = pipelineName - }) - if err != nil { - return nil, errors.Wrapf(err, "Simulate API call failed (pipelineName: %s)", pipelineName) - } - defer r.Body.Close() - - body, err := io.ReadAll(r.Body) - if err != nil { - return nil, errors.Wrap(err, "failed to read Simulate API response body") - } - - if r.StatusCode != http.StatusOK { - return nil, errors.Wrapf(elasticsearch.NewError(body), "unexpected response status for Simulate (%d): %s", r.StatusCode, r.Status()) - } - - var response simulatePipelineResponse - err = json.Unmarshal(body, &response) - if err != nil { - return nil, errors.Wrap(err, "unmarshalling simulate request failed") - } - - var tr testResult - for _, doc := range response.Docs { - tr.events = append(tr.events, doc.Doc.Source) - } - return &tr, nil -} diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index b4612fa613..839bd6e52b 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -5,14 +5,35 @@ package ingest import ( + "bytes" "encoding/json" + "io" + "net/http" "strings" "github.com/pkg/errors" "gopkg.in/yaml.v3" + + "github.com/elastic/elastic-package/internal/elasticsearch" ) -// Pipeline represents a pipeline resource loaded from a file. +type simulatePipelineRequest struct { + Docs []pipelineDocument `json:"docs"` +} + +type simulatePipelineResponse struct { + Docs []pipelineIngestedDocument `json:"docs"` +} + +type pipelineDocument struct { + Source json.RawMessage `json:"_source"` +} + +type pipelineIngestedDocument struct { + Doc pipelineDocument `json:"doc"` +} + +// Pipeline represents a pipeline resource loaded from a file type Pipeline struct { Path string // Path of the file with the pipeline definition. Name string // Name of the pipeline. @@ -48,3 +69,57 @@ func (p *Pipeline) MarshalJSON() (asJSON []byte, err error) { } return asJSON, nil } + +func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json.RawMessage) ([]json.RawMessage, error) { + var request simulatePipelineRequest + for _, event := range events { + request.Docs = append(request.Docs, pipelineDocument{ + Source: event, + }) + } + + requestBody, err := json.Marshal(&request) + if err != nil { + return nil, errors.Wrap(err, "marshalling simulate request failed") + } + + r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), func(request *elasticsearch.IngestSimulateRequest) { + request.PipelineID = pipelineName + }) + if err != nil { + return nil, errors.Wrapf(err, "Simulate API call failed (pipelineName: %s)", pipelineName) + } + defer r.Body.Close() + + body, err := io.ReadAll(r.Body) + if err != nil { + return nil, errors.Wrap(err, "failed to read Simulate API response body") + } + + if r.StatusCode != http.StatusOK { + return nil, errors.Wrapf(elasticsearch.NewError(body), "unexpected response status for Simulate (%d): %s", r.StatusCode, r.Status()) + } + + var response simulatePipelineResponse + err = json.Unmarshal(body, &response) + if err != nil { + return nil, errors.Wrap(err, "unmarshalling simulate request failed") + } + + processedEvents := make([]json.RawMessage, len(response.Docs)) + for i, doc := range response.Docs { + processedEvents[i] = doc.Doc.Source + } + return processedEvents, nil +} + +func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error { + for _, p := range pipelines { + resp, err := api.Ingest.DeletePipeline(p.Name) + if err != nil { + return errors.Wrapf(err, "DeletePipeline API call failed (pipelineName: %s)", p.Name) + } + resp.Body.Close() + } + return nil +} diff --git a/internal/testrunner/reporters/formats/human.go b/internal/testrunner/reporters/formats/human.go index fd83a4d2a3..fc0a01f97a 100644 --- a/internal/testrunner/reporters/formats/human.go +++ b/internal/testrunner/reporters/formats/human.go @@ -68,6 +68,5 @@ func reportHumanFormat(results []testrunner.TestResult) (string, error) { t.SetStyle(table.StyleRounded) report.WriteString(t.Render()) - return report.String(), nil } diff --git a/internal/testrunner/reporters/outputs/file.go b/internal/testrunner/reporters/outputs/file.go index ce589b87c9..232d714263 100644 --- a/internal/testrunner/reporters/outputs/file.go +++ b/internal/testrunner/reporters/outputs/file.go @@ -31,7 +31,6 @@ func reportToFile(pkg, report string, format testrunner.TestReportFormat) error if err != nil { return errors.Wrap(err, "could not determine test reports folder") } - // Create test reports folder if it doesn't exist _, err = os.Stat(dest) if err != nil && errors.Is(err, os.ErrNotExist) { diff --git a/internal/testrunner/runners/pipeline/runner.go b/internal/testrunner/runners/pipeline/runner.go index e3bb72d90a..117a99e615 100644 --- a/internal/testrunner/runners/pipeline/runner.go +++ b/internal/testrunner/runners/pipeline/runner.go @@ -62,8 +62,7 @@ func (r *runner) TearDown() error { signal.Sleep(r.options.DeferCleanup) } - err := uninstallIngestPipelines(r.options.API, r.pipelines) - if err != nil { + if err := ingest.UninstallPipelines(r.options.API, r.pipelines); err != nil { return errors.Wrap(err, "uninstalling ingest pipelines failed") } return nil @@ -90,7 +89,7 @@ func (r *runner) run() ([]testrunner.TestResult, error) { } var entryPipeline string - entryPipeline, r.pipelines, err = installIngestPipelines(r.options.API, dataStreamPath) + entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(r.options.API, dataStreamPath) if err != nil { return nil, errors.Wrap(err, "installing ingest pipelines failed") } @@ -130,7 +129,7 @@ func (r *runner) run() ([]testrunner.TestResult, error) { continue } - result, err := simulatePipelineProcessing(r.options.API, entryPipeline, tc) + processedEvents, err := ingest.SimulatePipeline(r.options.API, entryPipeline, tc.events) if err != nil { err := errors.Wrap(err, "simulating pipeline processing failed") tr.ErrorMsg = err.Error() @@ -138,6 +137,8 @@ func (r *runner) run() ([]testrunner.TestResult, error) { continue } + result := &testResult{events: processedEvents} + tr.TimeElapsed = time.Since(startTime) fieldsValidator, err := fields.CreateValidatorForDirectory(dataStreamPath, fields.WithSpecVersion(pkgManifest.SpecVersion), @@ -175,6 +176,7 @@ func (r *runner) run() ([]testrunner.TestResult, error) { } results = append(results, tr) } + return results, nil } diff --git a/scripts/test-check-packages.sh b/scripts/test-check-packages.sh index 596fdd93eb..70d9b820cb 100755 --- a/scripts/test-check-packages.sh +++ b/scripts/test-check-packages.sh @@ -66,8 +66,12 @@ for d in test/packages/${PACKAGE_TEST_TYPE:-other}/${PACKAGE_UNDER_TEST:-*}/; do cd $d elastic-package install -v - # defer-cleanup is set to a short period to verify that the option is available - elastic-package test -v --report-format xUnit --report-output file --defer-cleanup 1s --test-coverage + if [ "${PACKAGE_TEST_TYPE:-other}" == "benchmarks" ]; then + elastic-package benchmark -v --report-format xUnit --report-output file --fail-on-missing + else + # defer-cleanup is set to a short period to verify that the option is available + elastic-package test -v --report-format xUnit --report-output file --defer-cleanup 1s --test-coverage + fi ) cd - done diff --git a/test/packages/benchmarks/pipeline_benchmark/changelog.yml b/test/packages/benchmarks/pipeline_benchmark/changelog.yml new file mode 100644 index 0000000000..1ced0b8d36 --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/changelog.yml @@ -0,0 +1,6 @@ +# newer versions go on top +- version: "999.999.999" + changes: + - description: initial release + type: enhancement # can be one of: enhancement, bugfix, breaking-change + link: https://github.com/elastic/elastic-package/pull/906 diff --git a/test/packages/benchmarks/pipeline_benchmark/data_stream/test/_dev/benchmark/pipeline/access-raw.log b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/_dev/benchmark/pipeline/access-raw.log new file mode 100644 index 0000000000..c8c9ffe960 --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/_dev/benchmark/pipeline/access-raw.log @@ -0,0 +1 @@ +1.2.3.4 - - [25/Oct/2016:14:49:34 +0200] "GET /favicon.ico HTTP/1.1" 404 571 "http://localhost:8080/" "skip-this-one/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36" \ No newline at end of file diff --git a/test/packages/benchmarks/pipeline_benchmark/data_stream/test/_dev/benchmark/pipeline/config.yml b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/_dev/benchmark/pipeline/config.yml new file mode 100644 index 0000000000..30a2b50cf6 --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/_dev/benchmark/pipeline/config.yml @@ -0,0 +1 @@ +num_docs: 10000 diff --git a/test/packages/benchmarks/pipeline_benchmark/data_stream/test/agent/stream/stream.yml.hbs b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/agent/stream/stream.yml.hbs new file mode 100644 index 0000000000..2cdbbeb73e --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/agent/stream/stream.yml.hbs @@ -0,0 +1,25 @@ +paths: +{{#each paths as |path i|}} + - {{path}} +{{/each}} +exclude_files: [".gz$"] +tags: +{{#each tags as |tag i|}} + - {{tag}} +{{/each}} +fields_under_root: true +fields: + observer: + vendor: Test + product: Test + type: test +{{#contains tags "forwarded"}} +publisher_pipeline.disable_host: true +{{/contains}} + +processors: +- add_locale: ~ +- add_fields: + target: '' + fields: + ecs.version: 1.6.0 diff --git a/test/packages/benchmarks/pipeline_benchmark/data_stream/test/elasticsearch/ingest_pipeline/default.yml b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/elasticsearch/ingest_pipeline/default.yml new file mode 100644 index 0000000000..54a442eb2b --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/elasticsearch/ingest_pipeline/default.yml @@ -0,0 +1,25 @@ +--- +description: Pipeline for parsing Nginx access logs. Requires the geoip and user_agent + plugins. +processors: + - grok: + field: message + patterns: + - (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx.access.remote_ip_list}|%{NOTSPACE:source.address}) + - (-|%{DATA:user.name}) \[%{HTTPDATE:nginx.access.time}\] "%{DATA:nginx.access.info}" + %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long} + "(-|%{DATA:http.request.referrer})" "(-|%{DATA:user_agent.original})" + pattern_definitions: + NGINX_HOST: (?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})? + NGINX_NOTSEPARATOR: "[^\t ,:]+" + NGINX_ADDRESS_LIST: (?:%{IP}|%{WORD})("?,?\s*(?:%{IP}|%{WORD}))* + ignore_missing: true + - user_agent: + field: user_agent.original + ignore_missing: true + - drop: + if: "ctx?.user_agent?.original?.startsWith('skip-this-one')" +on_failure: + - set: + field: error.message + value: '{{ _ingest.on_failure_message }}' \ No newline at end of file diff --git a/test/packages/benchmarks/pipeline_benchmark/data_stream/test/fields/base-fields.yml b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/fields/base-fields.yml new file mode 100644 index 0000000000..0ec2cc7e01 --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/fields/base-fields.yml @@ -0,0 +1,38 @@ +- name: data_stream.type + type: constant_keyword + description: Data stream type. +- name: data_stream.dataset + type: constant_keyword + description: Data stream dataset. +- name: data_stream.namespace + type: constant_keyword + description: Data stream namespace. +- name: '@timestamp' + type: date + description: Event timestamp. +- name: container.id + description: Unique container id. + ignore_above: 1024 + type: keyword +- name: input.type + description: Type of Filebeat input. + type: keyword +- name: log.file.path + description: Full path to the log file this event came from. + example: /var/log/fun-times.log + ignore_above: 1024 + type: keyword +- name: log.source.address + description: Source address from which the log event was read / sent from. + type: keyword +- name: log.flags + description: Flags for the log file. + type: keyword +- name: log.offset + description: Offset of the entry in the log file. + type: long +- name: tags + description: List of keywords used to tag each event. + example: '["production", "env2"]' + ignore_above: 1024 + type: keyword diff --git a/test/packages/benchmarks/pipeline_benchmark/data_stream/test/manifest.yml b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/manifest.yml new file mode 100644 index 0000000000..d922481164 --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/data_stream/test/manifest.yml @@ -0,0 +1,79 @@ +title: Test +release: experimental +type: logs +streams: + - input: udp + title: UDP logs + description: Collect UDP logs + template_path: udp.yml.hbs + vars: + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + default: + - forwarded + - name: udp_host + type: text + title: UDP host to listen on + multi: false + required: true + show_user: true + default: localhost + - name: udp_port + type: integer + title: UDP port to listen on + multi: false + required: true + show_user: true + default: 9999 + - input: tcp + title: TCP logs + description: Collect TCP logs + template_path: tcp.yml.hbs + vars: + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + default: + - forwarded + - name: tcp_host + type: text + title: TCP host to listen on + multi: false + required: true + show_user: true + default: localhost + - name: tcp_port + type: integer + title: TCP port to listen on + multi: false + required: true + show_user: true + default: 9511 + - input: file + title: File logs + description: Collect logs from file + enabled: false + vars: + - name: paths + type: text + title: Paths + multi: true + required: true + show_user: true + default: + - /var/log/file.log + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + default: + - forwarded diff --git a/test/packages/benchmarks/pipeline_benchmark/docs/README.md b/test/packages/benchmarks/pipeline_benchmark/docs/README.md new file mode 100644 index 0000000000..e0ef7b4a18 --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/docs/README.md @@ -0,0 +1,2 @@ +# Test integration + diff --git a/test/packages/benchmarks/pipeline_benchmark/manifest.yml b/test/packages/benchmarks/pipeline_benchmark/manifest.yml new file mode 100644 index 0000000000..f7713fd52c --- /dev/null +++ b/test/packages/benchmarks/pipeline_benchmark/manifest.yml @@ -0,0 +1,24 @@ +format_version: 1.0.0 +name: pipeline_benchmarks +title: Pipeline benchmarks +# version is set to something very large to so this test package can +# be installed in the package registry regardless of the version of +# the actual apache package in the registry at any given time. +version: 999.999.999 +description: Test for pipeline test runner +categories: ["network"] +release: experimental +license: basic +type: integration +conditions: + kibana.version: '^8.0.0' +policy_templates: + - name: test + title: Test + description: Description + inputs: + - type: file + title: Foo bar + description: Foo bar +owner: + github: elastic/integrations diff --git a/test/packages/benchmarks/use_pipeline_tests/changelog.yml b/test/packages/benchmarks/use_pipeline_tests/changelog.yml new file mode 100644 index 0000000000..1ced0b8d36 --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/changelog.yml @@ -0,0 +1,6 @@ +# newer versions go on top +- version: "999.999.999" + changes: + - description: initial release + type: enhancement # can be one of: enhancement, bugfix, breaking-change + link: https://github.com/elastic/elastic-package/pull/906 diff --git a/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log new file mode 100644 index 0000000000..c8c9ffe960 --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log @@ -0,0 +1 @@ +1.2.3.4 - - [25/Oct/2016:14:49:34 +0200] "GET /favicon.ico HTTP/1.1" 404 571 "http://localhost:8080/" "skip-this-one/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36" \ No newline at end of file diff --git a/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log-config.yml b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log-config.yml new file mode 100644 index 0000000000..958d74a23e --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log-config.yml @@ -0,0 +1,4 @@ +multiline: + first_line_pattern: "^(?:[0-9]{1,3}\\.){3}[0-9]{1,3}" +fields: + "@timestamp": "2020-04-28T11:07:58.223Z" diff --git a/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log-expected.json b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log-expected.json new file mode 100644 index 0000000000..1c2f884a44 --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/_dev/test/pipeline/test-access-raw.log-expected.json @@ -0,0 +1,5 @@ +{ + "expected": [ + null + ] +} \ No newline at end of file diff --git a/test/packages/benchmarks/use_pipeline_tests/data_stream/test/agent/stream/stream.yml.hbs b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/agent/stream/stream.yml.hbs new file mode 100644 index 0000000000..2cdbbeb73e --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/agent/stream/stream.yml.hbs @@ -0,0 +1,25 @@ +paths: +{{#each paths as |path i|}} + - {{path}} +{{/each}} +exclude_files: [".gz$"] +tags: +{{#each tags as |tag i|}} + - {{tag}} +{{/each}} +fields_under_root: true +fields: + observer: + vendor: Test + product: Test + type: test +{{#contains tags "forwarded"}} +publisher_pipeline.disable_host: true +{{/contains}} + +processors: +- add_locale: ~ +- add_fields: + target: '' + fields: + ecs.version: 1.6.0 diff --git a/test/packages/benchmarks/use_pipeline_tests/data_stream/test/elasticsearch/ingest_pipeline/default.yml b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/elasticsearch/ingest_pipeline/default.yml new file mode 100644 index 0000000000..54a442eb2b --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/elasticsearch/ingest_pipeline/default.yml @@ -0,0 +1,25 @@ +--- +description: Pipeline for parsing Nginx access logs. Requires the geoip and user_agent + plugins. +processors: + - grok: + field: message + patterns: + - (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx.access.remote_ip_list}|%{NOTSPACE:source.address}) + - (-|%{DATA:user.name}) \[%{HTTPDATE:nginx.access.time}\] "%{DATA:nginx.access.info}" + %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long} + "(-|%{DATA:http.request.referrer})" "(-|%{DATA:user_agent.original})" + pattern_definitions: + NGINX_HOST: (?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})? + NGINX_NOTSEPARATOR: "[^\t ,:]+" + NGINX_ADDRESS_LIST: (?:%{IP}|%{WORD})("?,?\s*(?:%{IP}|%{WORD}))* + ignore_missing: true + - user_agent: + field: user_agent.original + ignore_missing: true + - drop: + if: "ctx?.user_agent?.original?.startsWith('skip-this-one')" +on_failure: + - set: + field: error.message + value: '{{ _ingest.on_failure_message }}' \ No newline at end of file diff --git a/test/packages/benchmarks/use_pipeline_tests/data_stream/test/fields/base-fields.yml b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/fields/base-fields.yml new file mode 100644 index 0000000000..0ec2cc7e01 --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/fields/base-fields.yml @@ -0,0 +1,38 @@ +- name: data_stream.type + type: constant_keyword + description: Data stream type. +- name: data_stream.dataset + type: constant_keyword + description: Data stream dataset. +- name: data_stream.namespace + type: constant_keyword + description: Data stream namespace. +- name: '@timestamp' + type: date + description: Event timestamp. +- name: container.id + description: Unique container id. + ignore_above: 1024 + type: keyword +- name: input.type + description: Type of Filebeat input. + type: keyword +- name: log.file.path + description: Full path to the log file this event came from. + example: /var/log/fun-times.log + ignore_above: 1024 + type: keyword +- name: log.source.address + description: Source address from which the log event was read / sent from. + type: keyword +- name: log.flags + description: Flags for the log file. + type: keyword +- name: log.offset + description: Offset of the entry in the log file. + type: long +- name: tags + description: List of keywords used to tag each event. + example: '["production", "env2"]' + ignore_above: 1024 + type: keyword diff --git a/test/packages/benchmarks/use_pipeline_tests/data_stream/test/manifest.yml b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/manifest.yml new file mode 100644 index 0000000000..d922481164 --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/data_stream/test/manifest.yml @@ -0,0 +1,79 @@ +title: Test +release: experimental +type: logs +streams: + - input: udp + title: UDP logs + description: Collect UDP logs + template_path: udp.yml.hbs + vars: + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + default: + - forwarded + - name: udp_host + type: text + title: UDP host to listen on + multi: false + required: true + show_user: true + default: localhost + - name: udp_port + type: integer + title: UDP port to listen on + multi: false + required: true + show_user: true + default: 9999 + - input: tcp + title: TCP logs + description: Collect TCP logs + template_path: tcp.yml.hbs + vars: + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + default: + - forwarded + - name: tcp_host + type: text + title: TCP host to listen on + multi: false + required: true + show_user: true + default: localhost + - name: tcp_port + type: integer + title: TCP port to listen on + multi: false + required: true + show_user: true + default: 9511 + - input: file + title: File logs + description: Collect logs from file + enabled: false + vars: + - name: paths + type: text + title: Paths + multi: true + required: true + show_user: true + default: + - /var/log/file.log + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + default: + - forwarded diff --git a/test/packages/benchmarks/use_pipeline_tests/docs/README.md b/test/packages/benchmarks/use_pipeline_tests/docs/README.md new file mode 100644 index 0000000000..e0ef7b4a18 --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/docs/README.md @@ -0,0 +1,2 @@ +# Test integration + diff --git a/test/packages/benchmarks/use_pipeline_tests/manifest.yml b/test/packages/benchmarks/use_pipeline_tests/manifest.yml new file mode 100644 index 0000000000..b30a02942b --- /dev/null +++ b/test/packages/benchmarks/use_pipeline_tests/manifest.yml @@ -0,0 +1,24 @@ +format_version: 1.0.0 +name: use_pipeline_tests +title: Use pipeline tests for the benchmark +# version is set to something very large to so this test package can +# be installed in the package registry regardless of the version of +# the actual apache package in the registry at any given time. +version: 999.999.999 +description: Test for pipeline test runner +categories: ["network"] +release: experimental +license: basic +type: integration +conditions: + kibana.version: '^8.0.0' +policy_templates: + - name: test + title: Test + description: Description + inputs: + - type: file + title: Foo bar + description: Foo bar +owner: + github: elastic/integrations