diff --git a/README.md b/README.md index ebb94c6dfa..306c5bd928 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,12 @@ These benchmarks allow you to benchmark any Ingest Node Pipelines defined by you For details on how to configure pipeline benchmarks for a package, review the [HOWTO guide](./docs/howto/pipeline_benchmarking.md). +#### Rally Benchmarks + +These benchmarks allow you to benchmark an integration corpus with rally. + +For details on how to configure rally benchmarks for a package, review the [HOWTO guide](./docs/howto/rally_benchmarking.md). + #### System Benchmarks These benchmarks allow you to benchmark an integration end to end. @@ -175,6 +181,12 @@ _Context: package_ Run pipeline benchmarks for the package. +### `elastic-package benchmark rally` + +_Context: package_ + +Run rally benchmarks for the package (esrally needs to be installed in the path of the system). + ### `elastic-package benchmark system` _Context: package_ diff --git a/cmd/benchmark.go b/cmd/benchmark.go index 7849f25a60..402ec29c79 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -25,7 +25,9 @@ import ( "github.com/elastic/elastic-package/internal/benchrunner" "github.com/elastic/elastic-package/internal/benchrunner/reporters" "github.com/elastic/elastic-package/internal/benchrunner/reporters/outputs" + benchcommon "github.com/elastic/elastic-package/internal/benchrunner/runners/common" "github.com/elastic/elastic-package/internal/benchrunner/runners/pipeline" + "github.com/elastic/elastic-package/internal/benchrunner/runners/rally" "github.com/elastic/elastic-package/internal/benchrunner/runners/system" "github.com/elastic/elastic-package/internal/cobraext" "github.com/elastic/elastic-package/internal/common" @@ -48,6 +50,12 @@ These benchmarks allow you to benchmark any Ingest Node Pipelines defined by you For details on how to configure pipeline benchmarks for a package, review the [HOWTO guide](./docs/howto/pipeline_benchmarking.md). +#### Rally Benchmarks + +These benchmarks allow you to benchmark an integration corpus with rally. + +For details on how to configure rally benchmarks for a package, review the [HOWTO guide](./docs/howto/rally_benchmarking.md). + #### System Benchmarks These benchmarks allow you to benchmark an integration end to end. @@ -66,6 +74,9 @@ func setupBenchmarkCommand() *cobraext.Command { pipelineCmd := getPipelineCommand() cmd.AddCommand(pipelineCmd) + rallyCmd := getRallyCommand() + cmd.AddCommand(rallyCmd) + systemCmd := getSystemCommand() cmd.AddCommand(systemCmd) @@ -213,6 +224,151 @@ func pipelineCommandAction(cmd *cobra.Command, args []string) error { return nil } +func getRallyCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "rally", + Short: "Run rally benchmarks", + Long: "Run rally benchmarks for the package (esrally needs to be installed in the path of the system)", + Args: cobra.NoArgs, + RunE: rallyCommandAction, + } + + cmd.Flags().StringP(cobraext.BenchNameFlagName, "", "", cobraext.BenchNameFlagDescription) + cmd.Flags().BoolP(cobraext.BenchReindexToMetricstoreFlagName, "", false, cobraext.BenchReindexToMetricstoreFlagDescription) + cmd.Flags().DurationP(cobraext.BenchMetricsIntervalFlagName, "", time.Second, cobraext.BenchMetricsIntervalFlagDescription) + cmd.Flags().DurationP(cobraext.DeferCleanupFlagName, "", 0, cobraext.DeferCleanupFlagDescription) + cmd.Flags().String(cobraext.VariantFlagName, "", cobraext.VariantFlagDescription) + cmd.Flags().StringP(cobraext.BenchCorpusRallyTrackOutputDirFlagName, "", "", cobraext.BenchCorpusRallyTrackOutputDirFlagDescription) + cmd.Flags().BoolP(cobraext.BenchCorpusRallyDryRunFlagName, "", false, cobraext.BenchCorpusRallyDryRunFlagDescription) + + return cmd +} + +func rallyCommandAction(cmd *cobra.Command, args []string) error { + cmd.Println("Run rally benchmarks for the package") + + variant, err := cmd.Flags().GetString(cobraext.VariantFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.VariantFlagName) + } + + benchName, err := cmd.Flags().GetString(cobraext.BenchNameFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.BenchNameFlagName) + } + + deferCleanup, err := cmd.Flags().GetDuration(cobraext.DeferCleanupFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.DeferCleanupFlagName) + } + + metricsInterval, err := cmd.Flags().GetDuration(cobraext.BenchMetricsIntervalFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.BenchMetricsIntervalFlagName) + } + + dataReindex, err := cmd.Flags().GetBool(cobraext.BenchReindexToMetricstoreFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.BenchReindexToMetricstoreFlagName) + } + + rallyTrackOutputDir, err := cmd.Flags().GetString(cobraext.BenchCorpusRallyTrackOutputDirFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.BenchCorpusRallyTrackOutputDirFlagName) + } + + rallyDryRun, err := cmd.Flags().GetBool(cobraext.BenchCorpusRallyDryRunFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.BenchCorpusRallyDryRunFlagName) + } + + packageRootPath, found, err := packages.FindPackageRoot() + if !found { + return errors.New("package root not found") + } + if err != nil { + return fmt.Errorf("locating package root failed: %w", err) + } + + profile, err := cobraext.GetProfileFlag(cmd) + if err != nil { + return err + } + + signal.Enable() + + esClient, err := stack.NewElasticsearchClientFromProfile(profile) + if err != nil { + return fmt.Errorf("can't create Elasticsearch client: %w", err) + } + err = esClient.CheckHealth(cmd.Context()) + if err != nil { + return err + } + + kc, err := stack.NewKibanaClientFromProfile(profile) + if err != nil { + return fmt.Errorf("can't create Kibana client: %w", err) + } + + withOpts := []rally.OptionFunc{ + rally.WithVariant(variant), + rally.WithBenchmarkName(benchName), + rally.WithDeferCleanup(deferCleanup), + rally.WithMetricsInterval(metricsInterval), + rally.WithDataReindexing(dataReindex), + rally.WithPackageRootPath(packageRootPath), + rally.WithESAPI(esClient.API), + rally.WithKibanaClient(kc), + rally.WithProfile(profile), + rally.WithRallyTrackOutputDir(rallyTrackOutputDir), + rally.WithRallyDryRun(rallyDryRun), + } + + esMetricsClient, err := initializeESMetricsClient(cmd.Context()) + if err != nil { + return fmt.Errorf("can't create Elasticsearch metrics client: %w", err) + } + if esMetricsClient != nil { + withOpts = append(withOpts, rally.WithESMetricsAPI(esMetricsClient.API)) + } + + runner := rally.NewRallyBenchmark(rally.NewOptions(withOpts...)) + + r, err := benchrunner.Run(runner) + if errors.Is(err, rally.ErrDryRun) { + return nil + } + + if err != nil { + return fmt.Errorf("error running package rally benchmarks: %w", err) + } + + multiReport, ok := r.(reporters.MultiReportable) + if !ok { + return fmt.Errorf("rally benchmark is expected to return multiple reports") + } + + reports := multiReport.Split() + if len(reports) != 2 { + return fmt.Errorf("rally benchmark is expected to return a human and a file report") + } + + // human report will always be the first + human := reports[0] + if err := reporters.WriteReportable(reporters.Output(outputs.ReportOutputSTDOUT), human); err != nil { + return fmt.Errorf("error writing benchmark report: %w", err) + } + + // file report will always be the second + file := reports[1] + if err := reporters.WriteReportable(reporters.Output(outputs.ReportOutputFile), file); err != nil { + return fmt.Errorf("error writing benchmark report: %w", err) + } + + return nil +} + func getSystemCommand() *cobra.Command { cmd := &cobra.Command{ Use: "system", @@ -410,10 +566,10 @@ func generateDataStreamCorpusCommandAction(cmd *cobra.Command, _ []string) error } func initializeESMetricsClient(ctx context.Context) (*elasticsearch.Client, error) { - address := os.Getenv(system.ESMetricstoreHostEnv) - user := os.Getenv(system.ESMetricstoreUsernameEnv) - pass := os.Getenv(system.ESMetricstorePasswordEnv) - cacert := os.Getenv(system.ESMetricstoreCACertificateEnv) + address := os.Getenv(benchcommon.ESMetricstoreHostEnv) + user := os.Getenv(benchcommon.ESMetricstoreUsernameEnv) + pass := os.Getenv(benchcommon.ESMetricstorePasswordEnv) + cacert := os.Getenv(benchcommon.ESMetricstoreCACertificateEnv) if address == "" || user == "" || pass == "" { logger.Debugf("can't initialize metricstore, missing environment configuration") return nil, nil diff --git a/docs/howto/rally_benchmarking.md b/docs/howto/rally_benchmarking.md new file mode 100644 index 0000000000..c60500440c --- /dev/null +++ b/docs/howto/rally_benchmarking.md @@ -0,0 +1,282 @@ +# HOWTO: Writing system benchmarks for a package + +## Introduction +Elastic Packages are comprised of data streams. A rally benchmark runs `esrally` track with a corpus of data into an Elasticsearch data stream, and reports rally stats as well as retrieving performance metrics from the Elasticsearch nodes. + +## Conceptual process + +Conceptually, running a rally benchmark involves the following steps: + +1. Deploy the Elastic Stack, including Elasticsearch, Kibana, and the Elastic Agent(s). This step takes time so it should typically be done once as a pre-requisite to running a system benchmark scenario. +1. Install a package that configures its assets for every data stream in the package. +1. Metrics collections from the cluster starts. (**TODO**: record metrics from all Elastic Agents involved using the `system` integration.) +1. Send the collected metrics to the ES Metricstore if set. +1. Generate data (it uses the [corpus-generator-tool](https://github.com/elastic/elastic-integration-corpus-generator-tool)) +1. Run an `esrally` track with the corpus of generated data. `esrally` must be installed on the system where the `elastic-package` is run and available in the `PATH`. +1. Wait for the `esrally` track to be executed. +1. Metrics collection ends and a summary report is created. +1. Delete test artifacts. +1. Optionally reindex all ingested data into the ES Metricstore for further analysis. +1. **TODO**: Optionally compare results against another benchmark run. + +### Benchmark scenario definition + +We must define at least one configuration for the package that we +want to benchmark. There can be multiple scenarios defined for the same package. + +``` +/ + _dev/ + benchmark/ + rally/ + .yml +``` + +The `.yml` files allow you to define various settings for the benchmark scenario +along with values for package and data stream-level variables. These are the available configuration options for system benchmarks. + +| Option | Type | Required | Description | +|---------------------------------|------------|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| package | string | | The name of the package. If omitted will pick the current package, this is to allow for future definition of benchmarks outside of the packages folders. | +| description | string | | A description for the scenario. | +| version | string | | The version of the package to benchmark. If omitted will pick the current version of the package. | +| data_stream.name | string | yes | The data stream to benchmark. | +| warmup_time_period | duration | | Warmup time period. All data prior to this period will be ignored in the benchmark results. | +| corpora.generator.total_events | uint64 | | Number of total events to generate. Example: `20000` | +| corpora.generator.template.raw | string | | Raw template for the corpus generator. | +| corpora.generator.template.path | string | | Path to the template for the corpus generator. If a `path` is defined, it will override any `raw` template definition. | +| corpora.generator.template.type | string | | Type of the template for the corpus generator. Default `placeholder`. | +| corpora.generator.config.raw | dictionary | | Raw config for the corpus generator. | +| corpora.generator.config.path | string | | Path to the config for the corpus generator. If a `path` is defined, it will override any `raw` config definition. | +| corpora.generator.fields.raw | dictionary | | Raw fields for the corpus generator. | +| corpora.generator.fields.path | string | | Path to the fields for the corpus generator. If a `path` is defined, it will override any `raw` fields definition. | + +Example: + +`logs-benchmark.yml` +```yaml +--- +--- +description: Benchmark 20000 events ingested +data_stream: + name: testds +warmup_time_period: 10s +corpora: + generator: + total_events: 900000 + template: + type: gotext + path: ./logs-benchmark/template.ndjson + config: + path: ./logs-benchmark/config.yml + fields: + path: ./logs-benchmark/fields.yml +``` + +There is no need to define an `input` and `vars` for the package, +since we don't create any agent policy, and we don't enroll any agent. + +## Running a rally benchmark + +Once the configuration is defined as described in the previous section, you are ready to run rally benchmarks for a package. + +First you must deploy the Elastic Stack. + +``` +elastic-package stack up -d +``` + +For a complete listing of options available for this command, run `elastic-package stack up -h` or `elastic-package help stack up`. + +Next, you must invoke the system benchmark runner. + +``` +elastic-package benchmark rally --benchmark logs-benchmark -v +# ... debug output +--- Benchmark results for package: rally_benchmarks - START --- +╭──────────────────────────────────────────────────────────────────────────────────────────────────╮ +│ info │ +├────────────────────────┬─────────────────────────────────────────────────────────────────────────┤ +│ benchmark │ logs-benchmark │ +│ description │ Benchmark 20000 events ingested │ +│ run ID │ cb62ba92-14b9-4562-98ce-9251e0936a5e │ +│ package │ rally_benchmarks │ +│ start ts (s) │ 1698892087 │ +│ end ts (s) │ 1698892134 │ +│ duration │ 47s │ +│ generated corpora file │ /Users/andreaspacca/.elastic-package/tmp/rally_corpus/corpus-3633691003 │ +╰────────────────────────┴─────────────────────────────────────────────────────────────────────────╯ +╭────────────────────────────────────────────────────────────────────╮ +│ parameters │ +├─────────────────────────────────┬──────────────────────────────────┤ +│ package version │ 999.999.999 │ +│ data_stream.name │ testds │ +│ warmup time period │ 10s │ +│ corpora.generator.total_events │ 900000 │ +│ corpora.generator.template.path │ ./logs-benchmark/template.ndjson │ +│ corpora.generator.template.raw │ │ +│ corpora.generator.template.type │ gotext │ +│ corpora.generator.config.path │ ./logs-benchmark/config.yml │ +│ corpora.generator.config.raw │ map[] │ +│ corpora.generator.fields.path │ ./logs-benchmark/fields.yml │ +│ corpora.generator.fields.raw │ map[] │ +╰─────────────────────────────────┴──────────────────────────────────╯ +╭───────────────────────╮ +│ cluster info │ +├───────┬───────────────┤ +│ name │ elasticsearch │ +│ nodes │ 1 │ +╰───────┴───────────────╯ +╭──────────────────────────────────────────────────────────────╮ +│ data stream stats │ +├────────────────────────────┬─────────────────────────────────┤ +│ data stream │ logs-rally_benchmarks.testds-ep │ +│ approx total docs ingested │ 900000 │ +│ backing indices │ 1 │ +│ store size bytes │ 440617124 │ +│ maximum ts (ms) │ 1698892076196 │ +╰────────────────────────────┴─────────────────────────────────╯ +╭───────────────────────────────────────╮ +│ disk usage for index .ds-logs-rally_b │ +│ enchmarks.testds-ep-2023.11.01-000001 │ +│ (for all fields) │ +├──────────────────────────────┬────────┤ +│ total │ 262 MB │ +│ inverted_index.total │ 84 MB │ +│ inverted_index.stored_fields │ 96 MB │ +│ inverted_index.doc_values │ 72 MB │ +│ inverted_index.points │ 9.7 MB │ +│ inverted_index.norms │ 0 B │ +│ inverted_index.term_vectors │ 0 B │ +│ inverted_index.knn_vectors │ 0 B │ +╰──────────────────────────────┴────────╯ +╭────────────────────────────────────────────────────────────────────────────────────────────╮ +│ pipeline logs-rally_benchmarks.testds-999.999.999 stats in node 7AYCd2EXQaCSOf-0fKxFBg │ +├────────────────────────────────────────────────┬───────────────────────────────────────────┤ +│ Totals │ Count: 900000 | Failed: 0 | Time: 38.993s │ +│ grok () │ Count: 900000 | Failed: 0 | Time: 36.646s │ +│ user_agent () │ Count: 900000 | Failed: 0 | Time: 1.368s │ +│ pipeline (logs-rally_benchmarks.testds@custom) │ Count: 900000 | Failed: 0 | Time: 102ms │ +╰────────────────────────────────────────────────┴───────────────────────────────────────────╯ +╭────────────────────────────────────────────────────────────────────────────────────────────╮ +│ rally stats │ +├────────────────────────────────────────────────────────────────┬───────────────────────────┤ +│ Cumulative indexing time of primary shards │ 3.3734666666666664 min │ +│ Min cumulative indexing time across primary shards │ 0 min │ +│ Median cumulative indexing time across primary shards │ 0.046950000000000006 min │ +│ Max cumulative indexing time across primary shards │ 1.7421333333333335 min │ +│ Cumulative indexing throttle time of primary shards │ 0 min │ +│ Min cumulative indexing throttle time across primary shards │ 0 min │ +│ Median cumulative indexing throttle time across primary shards │ 0.0 min │ +│ Max cumulative indexing throttle time across primary shards │ 0 min │ +│ Cumulative merge time of primary shards │ 0.8019666666666667 min │ +│ Cumulative merge count of primary shards │ 449 │ +│ Min cumulative merge time across primary shards │ 0 min │ +│ Median cumulative merge time across primary shards │ 0.009525 min │ +│ Max cumulative merge time across primary shards │ 0.5432 min │ +│ Cumulative merge throttle time of primary shards │ 0.21998333333333334 min │ +│ Min cumulative merge throttle time across primary shards │ 0 min │ +│ Median cumulative merge throttle time across primary shards │ 0.0 min │ +│ Max cumulative merge throttle time across primary shards │ 0.21998333333333334 min │ +│ Cumulative refresh time of primary shards │ 0.41008333333333336 min │ +│ Cumulative refresh count of primary shards │ 14095 │ +│ Min cumulative refresh time across primary shards │ 0 min │ +│ Median cumulative refresh time across primary shards │ 0.011966666666666665 min │ +│ Max cumulative refresh time across primary shards │ 0.2056333333333333 min │ +│ Cumulative flush time of primary shards │ 8.79825 min │ +│ Cumulative flush count of primary shards │ 13859 │ +│ Min cumulative flush time across primary shards │ 6.666666666666667e-05 min │ +│ Median cumulative flush time across primary shards │ 0.45098333333333335 min │ +│ Max cumulative flush time across primary shards │ 0.6979833333333333 min │ +│ Total Young Gen GC time │ 0.646 s │ +│ Total Young Gen GC count │ 147 │ +│ Total Old Gen GC time │ 0 s │ +│ Total Old Gen GC count │ 0 │ +│ Store size │ 0.23110682144761086 GB │ +│ Translog size │ 0.5021391455084085 GB │ +│ Heap used for segments │ 0 MB │ +│ Heap used for doc values │ 0 MB │ +│ Heap used for terms │ 0 MB │ +│ Heap used for norms │ 0 MB │ +│ Heap used for points │ 0 MB │ +│ Heap used for stored fields │ 0 MB │ +│ Segment count │ 325 │ +│ Total Ingest Pipeline count │ 900063 │ +│ Total Ingest Pipeline time │ 51.994 s │ +│ Total Ingest Pipeline failed │ 0 │ +│ Min Throughput │ 23987.18 docs/s │ +│ Mean Throughput │ 46511.27 docs/s │ +│ Median Throughput │ 49360.50 docs/s │ +│ Max Throughput │ 51832.58 docs/s │ +│ 50th percentile latency │ 645.8979995000007 ms │ +│ 90th percentile latency │ 896.532670700001 ms │ +│ 99th percentile latency │ 1050.0004142499988 ms │ +│ 100th percentile latency │ 1064.915250000002 ms │ +│ 50th percentile service time │ 645.8979995000007 ms │ +│ 90th percentile service time │ 896.532670700001 ms │ +│ 99th percentile service time │ 1050.0004142499988 ms │ +│ 100th percentile service time │ 1064.915250000002 ms │ +│ error rate │ 0.00 % │ +╰────────────────────────────────────────────────────────────────┴───────────────────────────╯ + +--- Benchmark results for package: rally_benchmarks - END --- +Done +``` + +Finally, when you are done running the benchmark, bring down the Elastic Stack. + +``` +elastic-package stack down +``` + +## Setting up an external metricstore + +A metricstore can be set up to send metrics collected during the benchmark execution. + +An external metricstore might be useful for: + +- Store monitoring data of the benchmark scenario for all its execution time. +- Analyse the data generated during a benchmark. This is possible when using the `reindex-to-metricstore` flag. +- **TODO**: Store benchmark results for various benchmark runs permanently for later comparison. + +In order to initialize it, you need to set up the following environment variables: + +```bash +export ELASTIC_PACKAGE_ESMETRICSTORE_HOST=https://127.0.0.1:9200 +export ELASTIC_PACKAGE_ESMETRICSTORE_USERNAME=elastic +export ELASTIC_PACKAGE_ESMETRICSTORE_PASSWORD=changeme +export ELASTIC_PACKAGE_ESMETRICSTORE_CA_CERT="$HOME/.elastic-package/profiles/default/certs/ca-cert.pem" +``` + +The only one that is optional is `ELASTIC_PACKAGE_ESMETRICSTORE_CA_CERT`. + +When these are detected, metrics will be automatically collected every second and sent to a new index called `bench-metrics-{dataset}-{testRunID}"`. + +The collected metrics include the following node stats: `nodes.*.breakers`, `nodes.*.indices`, `nodes.*.jvm.mem`, `nodes.*.jvm.gc`, `nodes.*.jvm.buffer_pools`, `nodes.*.os.mem`, `nodes.*.process.cpu`, `nodes.*.thread_pool`, and `nodes.*.transport`. + +Ingest pipelines metrics are only collected at the end since its own collection would affect the benchmark results. + +You can see a sample collected metric [here](./sample_metric.json) + +Additionally, if the `reindex-to-metricstore` flag is used, the data generated during the benchmark will be sent to the metricstore into an index called `bench-reindex-{datastream}-{testRunID}` for further analysis. The events will be enriched with metadata related to the benchmark run. + +## Persisting rally tracks and dry-run + +If the `rally-track-output-dir` flag is used, the track and the corpus generated during the benchmark will be saved in the directory passed as value of the flag. +Additionally, if the `dry-run` flag is used, the command will exits before running `esrally`. +If both the flags above are used at the same time, the command will just generate the track and corpus and save them, without running any benchmark. +If the `dry-run` flag only is used, the command will just wipe the data stream returning no report. + +## Replaying a persisted rally track +In the directory of the `rally-track-output-dir` flag two files are saved: +1. The rally track: `track-%data_stream.type%-%data_stream.dataset%-%data_stream.namespace%.json` +2. The track corpus: `corpus-%unix_timestamp%` + +Both files are required to replay the rally benchmark. The first file references the second in its content. +The command to run for replaying the track is the following: +```shell +rally --target-hosts='{"default":["%es_cluster_host:es_cluster_port%"]}' --track-path=%path/to/saved-track-json% --client-options='{"default":{"basic_auth_user":"%es_user%","basic_auth_password":"%es_user%","use_ssl":true,"verify_certs":false}}' --pipeline=benchmark-only +``` + +Please refer to [esrally CLI reference](https://esrally.readthedocs.io/en/stable/command_line_reference.html) for more details. + diff --git a/go.mod b/go.mod index 81d9e1a560..375285962d 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21.0 require ( github.com/AlecAivazis/survey/v2 v2.3.7 github.com/Masterminds/semver/v3 v3.2.1 - github.com/ProtonMail/gopenpgp/v2 v2.7.4 + github.com/ProtonMail/gopenpgp/v2 v2.7.3 github.com/aymerick/raymond v2.0.2+incompatible github.com/boumenot/gocover-cobertura v1.2.0 github.com/cbroglie/mustache v1.4.0 @@ -17,20 +17,20 @@ require ( github.com/elastic/go-resource v0.1.1 github.com/elastic/go-ucfg v0.8.6 github.com/elastic/package-spec/v3 v3.0.1 - github.com/fatih/color v1.16.0 + github.com/fatih/color v1.15.0 github.com/go-git/go-billy/v5 v5.5.0 - github.com/go-git/go-git/v5 v5.10.0 + github.com/go-git/go-git/v5 v5.9.0 github.com/google/go-cmp v0.6.0 github.com/google/go-github/v32 v32.1.0 github.com/google/go-querystring v1.1.0 - github.com/google/uuid v1.4.0 + github.com/google/uuid v1.3.1 github.com/jedib0t/go-pretty v4.3.0+incompatible github.com/magefile/mage v1.15.0 github.com/mholt/archiver/v3 v3.5.1 github.com/olekukonko/tablewriter v0.0.5 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 - github.com/shirou/gopsutil/v3 v3.23.10 - github.com/spf13/cobra v1.8.0 + github.com/shirou/gopsutil/v3 v3.23.9 + github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 golang.org/x/tools v0.14.0 gopkg.in/yaml.v3 v3.0.1 @@ -110,7 +110,7 @@ require ( github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/mitchellh/copystructure v1.2.0 // indirect @@ -157,7 +157,7 @@ require ( golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect golang.org/x/sync v0.4.0 // indirect - golang.org/x/sys v0.14.0 // indirect + golang.org/x/sys v0.13.0 // indirect golang.org/x/term v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index 566f1c6aff..cd50996da2 100644 --- a/go.sum +++ b/go.sum @@ -73,8 +73,8 @@ github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371 h1:kkhsdkhsCv github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f h1:tCbYj7/299ekTTXpdwKYF8eBlsYsDVoggDAuAjoK66k= github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f/go.mod h1:gcr0kNtGBqin9zDW9GOHcVntrwnjrK+qdJ06mWYBybw= -github.com/ProtonMail/gopenpgp/v2 v2.7.4 h1:Vz/8+HViFFnf2A6XX8JOvZMrA6F5puwNvvF21O1mRlo= -github.com/ProtonMail/gopenpgp/v2 v2.7.4/go.mod h1:IhkNEDaxec6NyzSI0PlxapinnwPVIESk8/76da3Ct3g= +github.com/ProtonMail/gopenpgp/v2 v2.7.3 h1:AJu1OI/1UWVYZl6QcCLKGu9OTngS2r52618uGlje84I= +github.com/ProtonMail/gopenpgp/v2 v2.7.3/go.mod h1:IhkNEDaxec6NyzSI0PlxapinnwPVIESk8/76da3Ct3g= github.com/acomagu/bufpipe v1.0.4 h1:e3H4WUzM3npvo5uv95QuJM3cQspFNtFBzvJ2oNjKIDQ= github.com/acomagu/bufpipe v1.0.4/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4= github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= @@ -110,7 +110,7 @@ github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUK github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= -github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.17/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= @@ -164,9 +164,8 @@ github.com/evanphx/json-patch/v5 v5.7.0 h1:nJqP7uwL84RJInrohHfW0Fx3awjbm8qZeFv0n github.com/evanphx/json-patch/v5 v5.7.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f h1:Wl78ApPPB2Wvf/TIe2xdyJxTlb6obmF18d8QdkxNDu4= github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f/go.mod h1:OSYXu++VVOHnXeitef/D8n/6y4QV8uLHSFXX4NeXMGc= +github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= @@ -180,10 +179,10 @@ github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66D github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic= github.com/go-git/go-billy/v5 v5.5.0 h1:yEY4yhzCDuMGSv83oGxiBotRzhwhNr8VZyphhiu+mTU= github.com/go-git/go-billy/v5 v5.5.0/go.mod h1:hmexnoNsr2SJU1Ju67OaNz5ASJY3+sHgFRpCtpDCKow= -github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMje31YglSBqCdIqdhKBW8lokaMrL3uTkpGYlE2OOT4= -github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII= -github.com/go-git/go-git/v5 v5.10.0 h1:F0x3xXrAWmhwtzoCokU4IMPcBdncG+HAAqi9FcOOjbQ= -github.com/go-git/go-git/v5 v5.10.0/go.mod h1:1FOZ/pQnqw24ghP2n7cunVl0ON55BsjPYvhWHvZGhoo= +github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20230305113008-0c11038e723f h1:Pz0DHeFij3XFhoBRGUDPzSJ+w2UcK5/0JvF8DRI58r8= +github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20230305113008-0c11038e723f/go.mod h1:8LHG1a3SRW71ettAD/jW13h8c6AqjVSeL11RAdgaqpo= +github.com/go-git/go-git/v5 v5.9.0 h1:cD9SFA7sHVRdJ7AYck1ZaAa/yeuBvGPxwXDL8cxrObY= +github.com/go-git/go-git/v5 v5.9.0/go.mod h1:RKIqga24sWdMGZF+1Ekv9kylsDz6LzdTSI2s/OsZWE0= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -290,8 +289,8 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= @@ -370,9 +369,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= @@ -446,8 +444,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= -github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzKsh9hM= -github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE= +github.com/shirou/gopsutil/v3 v3.23.9 h1:ZI5bWVeu2ep4/DIxB4U9okeYJ7zp/QLTO4auRb/ty/E= +github.com/shirou/gopsutil/v3 v3.23.9/go.mod h1:x/NWSb71eMcjFIO0vhyGW5nZ7oSIgVjrCnADckb85GA= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= @@ -463,8 +461,8 @@ github.com/spf13/afero v1.10.0/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA= github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48= -github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= -github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= @@ -712,9 +710,9 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20220526004731-065cf7ba2467/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/internal/benchrunner/runners/common/env.go b/internal/benchrunner/runners/common/env.go new file mode 100644 index 0000000000..b084c3ed92 --- /dev/null +++ b/internal/benchrunner/runners/common/env.go @@ -0,0 +1,14 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package common + +import "github.com/elastic/elastic-package/internal/environment" + +var ( + ESMetricstoreHostEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_HOST") + ESMetricstoreUsernameEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_USERNAME") + ESMetricstorePasswordEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_PASSWORD") + ESMetricstoreCACertificateEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_CA_CERT") +) diff --git a/internal/benchrunner/runners/rally/metrics.go b/internal/benchrunner/runners/rally/metrics.go new file mode 100644 index 0000000000..11f5d405f6 --- /dev/null +++ b/internal/benchrunner/runners/rally/metrics.go @@ -0,0 +1,353 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package rally + +import ( + "bytes" + _ "embed" + "encoding/json" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/elasticsearch/ingest" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/servicedeployer" +) + +type collector struct { + ctxt servicedeployer.ServiceContext + metadata benchMeta + scenario scenario + + interval time.Duration + esAPI *elasticsearch.API + metricsAPI *elasticsearch.API + datastream string + pipelinePrefix string + + wg sync.WaitGroup + stopped atomic.Bool + stopC chan struct{} + + startIngestMetrics map[string]ingest.PipelineStatsMap + endIngestMetrics map[string]ingest.PipelineStatsMap + startMetrics metrics + endMetrics metrics + diskUsage map[string]ingest.DiskUsage + startTotalHits int + endTotalHits int +} + +type metrics struct { + ts int64 + dsMetrics *ingest.DataStreamStats + nMetrics *ingest.NodesStats +} + +type metricsSummary struct { + ClusterName string + Nodes int + RunID string + CollectionStartTs int64 + CollectionEndTs int64 + DataStreamStats *ingest.DataStreamStats + IngestPipelineStats map[string]ingest.PipelineStatsMap + DiskUsage map[string]ingest.DiskUsage + TotalHits int + NodesStats map[string]ingest.NodeStats +} + +func newCollector( + ctxt servicedeployer.ServiceContext, + benchName string, + scenario scenario, + esAPI, metricsAPI *elasticsearch.API, + interval time.Duration, + datastream, pipelinePrefix string, +) *collector { + meta := benchMeta{Parameters: scenario} + meta.Info.Benchmark = benchName + meta.Info.RunID = ctxt.Test.RunID + return &collector{ + ctxt: ctxt, + interval: interval, + scenario: scenario, + metadata: meta, + esAPI: esAPI, + metricsAPI: metricsAPI, + datastream: datastream, + pipelinePrefix: pipelinePrefix, + stopC: make(chan struct{}), + } +} + +func (c *collector) start() { + c.createMetricsIndex() + + c.collectMetricsBeforeRallyRun() + + c.wg.Add(1) + go func() { + defer c.wg.Done() + + <-c.stopC + // last collect before stopping + c.collectMetricsAfterRallyRun() + c.publish(c.createEventsFromMetrics(c.endMetrics)) + }() +} + +func (c *collector) stop() { + if !c.stopped.CompareAndSwap(false, true) { + return + } + close(c.stopC) + c.wg.Wait() +} + +func (c *collector) collectMetricsBeforeRallyRun() { + _, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) + if err != nil { + logger.Errorf("unable to refresh data stream at the beginning of rally run") + return + } + + c.startTotalHits = c.collectTotalHits() + c.startMetrics = c.collect() + c.startIngestMetrics = c.collectIngestMetrics() + c.publish(c.createEventsFromMetrics(c.startMetrics)) +} + +func (c *collector) collect() metrics { + m := metrics{ + ts: time.Now().Unix(), + } + + nstats, err := ingest.GetNodesStats(c.esAPI) + if err != nil { + logger.Debug(err) + } else { + m.nMetrics = nstats + } + + dsstats, err := ingest.GetDataStreamStats(c.esAPI, c.datastream) + if err != nil { + logger.Debug(err) + } else { + m.dsMetrics = dsstats + } + + return m +} + +func (c *collector) publish(events [][]byte) { + if c.metricsAPI == nil { + return + } + eventsForBulk := bytes.Join(events, []byte("\n")) + reqBody := bytes.NewReader(eventsForBulk) + resp, err := c.metricsAPI.Bulk(reqBody, c.metricsAPI.Bulk.WithIndex(c.indexName())) + if err != nil { + logger.Errorf("error indexing event in metricstore: %w", err) + return + } + + if resp.Body == nil { + logger.Errorf("empty index response body from metricstore: %w", err) + return + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + logger.Errorf("failed to read index response body from metricstore: %w", err) + } + + resp.Body.Close() + + if resp.StatusCode != 201 { + logger.Errorf("error indexing event in metricstore (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body)) + } +} + +//go:embed metrics_index.json +var metricsIndexBytes []byte + +func (c *collector) createMetricsIndex() { + if c.metricsAPI == nil { + return + } + + reader := bytes.NewReader(metricsIndexBytes) + + logger.Debugf("creating %s index in metricstore...", c.indexName()) + + createRes, err := c.metricsAPI.Indices.Create( + c.indexName(), + c.metricsAPI.Indices.Create.WithBody(reader), + ) + if err != nil { + logger.Errorf("could not create index: %w", err) + return + } + createRes.Body.Close() + + if createRes.IsError() { + logger.Errorf("got a response error while creating index") + } +} + +func (c *collector) indexName() string { + return fmt.Sprintf("bench-metrics-%s-%s", c.datastream, c.ctxt.Test.RunID) +} + +func (c *collector) summarize() (*metricsSummary, error) { + sum := metricsSummary{ + RunID: c.ctxt.Test.RunID, + IngestPipelineStats: make(map[string]ingest.PipelineStatsMap), + NodesStats: make(map[string]ingest.NodeStats), + DiskUsage: c.diskUsage, + TotalHits: c.endTotalHits - c.startTotalHits, + } + + if c.startMetrics.nMetrics != nil { + sum.ClusterName = c.startMetrics.nMetrics.ClusterName + } + sum.CollectionStartTs = c.startMetrics.ts + sum.CollectionEndTs = c.endMetrics.ts + if c.endMetrics.dsMetrics != nil { + sum.DataStreamStats = c.endMetrics.dsMetrics + } + if c.endMetrics.nMetrics != nil { + sum.Nodes = len(c.endMetrics.nMetrics.Nodes) + } + + for node, endPStats := range c.endIngestMetrics { + startPStats, found := c.startIngestMetrics[node] + if !found { + logger.Debugf("node %s not found in initial metrics", node) + continue + } + sumStats := make(ingest.PipelineStatsMap) + for pname, endStats := range endPStats { + startStats, found := startPStats[pname] + if !found { + logger.Debugf("pipeline %s not found in node %s initial metrics", pname, node) + continue + } + sumStats[pname] = ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: endStats.Count - startStats.Count, + Failed: endStats.Failed - startStats.Failed, + TimeInMillis: endStats.TimeInMillis - startStats.TimeInMillis, + }, + Processors: make([]ingest.ProcessorStats, len(endStats.Processors)), + } + for i, endPr := range endStats.Processors { + startPr := startStats.Processors[i] + sumStats[pname].Processors[i] = ingest.ProcessorStats{ + Type: endPr.Type, + Extra: endPr.Extra, + Conditional: endPr.Conditional, + Stats: ingest.StatsRecord{ + Count: endPr.Stats.Count - startPr.Stats.Count, + Failed: endPr.Stats.Failed - startPr.Stats.Failed, + TimeInMillis: endPr.Stats.TimeInMillis - startPr.Stats.TimeInMillis, + }, + } + } + } + sum.IngestPipelineStats[node] = sumStats + } + + return &sum, nil +} + +func (c *collector) collectIngestMetrics() map[string]ingest.PipelineStatsMap { + ipMetrics, err := ingest.GetPipelineStatsByPrefix(c.esAPI, c.pipelinePrefix) + if err != nil { + logger.Debugf("could not get ingest pipeline metrics: %v", err) + return nil + } + return ipMetrics +} + +func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage { + du, err := ingest.GetDiskUsage(c.esAPI, c.datastream) + if err != nil { + logger.Debugf("could not get disk usage metrics: %v", err) + return nil + } + return du +} + +func (c *collector) collectMetricsAfterRallyRun() { + _, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream)) + if err != nil { + logger.Errorf("unable to refresh data stream at the end of rally run") + return + } + + c.diskUsage = c.collectDiskUsage() + c.endMetrics = c.collect() + c.endIngestMetrics = c.collectIngestMetrics() + c.endTotalHits = c.collectTotalHits() + + c.publish(c.createEventsFromMetrics(c.endMetrics)) +} + +func (c *collector) collectTotalHits() int { + totalHits, err := getTotalHits(c.esAPI, c.datastream) + if err != nil { + logger.Debugf("could not get total hits: %v", err) + } + return totalHits +} + +func (c *collector) createEventsFromMetrics(m metrics) [][]byte { + dsEvent := struct { + Timestamp int64 `json:"@timestamp"` + *ingest.DataStreamStats + Meta benchMeta `json:"benchmark_metadata"` + }{ + Timestamp: m.ts * 1000, // ms to s + DataStreamStats: m.dsMetrics, + Meta: c.metadata, + } + + type nEvent struct { + Ts int64 `json:"@timestamp"` + ClusterName string `json:"cluster_name"` + NodeName string `json:"node_name"` + *ingest.NodeStats + Meta benchMeta `json:"benchmark_metadata"` + } + + var nEvents []interface{} + + for node, stats := range m.nMetrics.Nodes { + nEvents = append(nEvents, nEvent{ + Ts: m.ts * 1000, // ms to s + ClusterName: m.nMetrics.ClusterName, + NodeName: node, + NodeStats: &stats, + Meta: c.metadata, + }) + } + + var events [][]byte + for _, e := range append(nEvents, dsEvent) { + b, err := json.Marshal(e) + if err != nil { + logger.Debugf("error marshalling metrics event: %v", err) + continue + } + events = append(events, b) + } + return events +} diff --git a/internal/benchrunner/runners/rally/metrics_index.json b/internal/benchrunner/runners/rally/metrics_index.json new file mode 100644 index 0000000000..5d4e724da0 --- /dev/null +++ b/internal/benchrunner/runners/rally/metrics_index.json @@ -0,0 +1,24 @@ +{ + "settings": { + "number_of_replicas": 0 + }, + "mappings": { + "dynamic_templates": [ + { + "strings_as_keyword": { + "match_mapping_type": "string", + "mapping": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + ], + "date_detection": false, + "properties": { + "@timestamp": { + "type": "date" + } + } + } +} \ No newline at end of file diff --git a/internal/benchrunner/runners/rally/options.go b/internal/benchrunner/runners/rally/options.go new file mode 100644 index 0000000000..02aa8e6d7d --- /dev/null +++ b/internal/benchrunner/runners/rally/options.go @@ -0,0 +1,116 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package rally + +import ( + "time" + + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/kibana" + "github.com/elastic/elastic-package/internal/profile" +) + +// Options contains benchmark runner options. +type Options struct { + ESAPI *elasticsearch.API + KibanaClient *kibana.Client + DeferCleanup time.Duration + MetricsInterval time.Duration + ReindexData bool + ESMetricsAPI *elasticsearch.API + BenchName string + PackageRootPath string + Variant string + Profile *profile.Profile + RallyTrackOutputDir string + DryRun bool +} + +type ClientOptions struct { + Host string + Username string + Password string +} +type OptionFunc func(*Options) + +func NewOptions(fns ...OptionFunc) Options { + var opts Options + for _, fn := range fns { + fn(&opts) + } + return opts +} + +func WithESAPI(api *elasticsearch.API) OptionFunc { + return func(opts *Options) { + opts.ESAPI = api + } +} + +func WithKibanaClient(c *kibana.Client) OptionFunc { + return func(opts *Options) { + opts.KibanaClient = c + } +} + +func WithPackageRootPath(path string) OptionFunc { + return func(opts *Options) { + opts.PackageRootPath = path + } +} + +func WithBenchmarkName(name string) OptionFunc { + return func(opts *Options) { + opts.BenchName = name + } +} + +func WithDeferCleanup(d time.Duration) OptionFunc { + return func(opts *Options) { + opts.DeferCleanup = d + } +} + +func WithMetricsInterval(d time.Duration) OptionFunc { + return func(opts *Options) { + opts.MetricsInterval = d + } +} + +func WithDataReindexing(b bool) OptionFunc { + return func(opts *Options) { + opts.ReindexData = b + } +} + +func WithESMetricsAPI(api *elasticsearch.API) OptionFunc { + return func(opts *Options) { + opts.ESMetricsAPI = api + } +} + +func WithVariant(name string) OptionFunc { + return func(opts *Options) { + opts.Variant = name + } +} + +func WithProfile(p *profile.Profile) OptionFunc { + return func(opts *Options) { + opts.Profile = p + } +} + +func WithRallyTrackOutputDir(r string) OptionFunc { + return func(opts *Options) { + opts.RallyTrackOutputDir = r + } +} + +func WithRallyDryRun(d bool) OptionFunc { + return func(opts *Options) { + opts.DryRun = d + } +} diff --git a/internal/benchrunner/runners/rally/report.go b/internal/benchrunner/runners/rally/report.go new file mode 100644 index 0000000000..2e3bbc811c --- /dev/null +++ b/internal/benchrunner/runners/rally/report.go @@ -0,0 +1,220 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package rally + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/dustin/go-humanize" + "github.com/jedib0t/go-pretty/table" + "github.com/jedib0t/go-pretty/text" + + "github.com/elastic/elastic-package/internal/benchrunner/reporters" + "github.com/elastic/elastic-package/internal/elasticsearch/ingest" +) + +type report struct { + Info struct { + Benchmark string + Description string + RunID string + Package string + StartTs int64 + EndTs int64 + Duration time.Duration + GeneratedCorporaFile string + } + Parameters struct { + PackageVersion string + DataStream dataStream + Corpora corpora + } + ClusterName string + Nodes int + DataStreamStats *ingest.DataStreamStats + IngestPipelineStats map[string]ingest.PipelineStatsMap + DiskUsage map[string]ingest.DiskUsage + TotalHits int + RallyStats []rallyStat +} + +func createReport(benchName, corporaFile string, s *scenario, sum *metricsSummary, stats []rallyStat) (reporters.Reportable, error) { + r := newReport(benchName, corporaFile, s, sum, stats) + human := reporters.NewReport(s.Package, reportHumanFormat(r)) + + jsonBytes, err := reportJSONFormat(r) + if err != nil { + return nil, fmt.Errorf("rendering JSON report: %w", err) + } + + jsonFile := reporters.NewFileReport(s.Package, fmt.Sprintf("system/%s/report.json", sum.RunID), jsonBytes) + + mr := reporters.NewMultiReport(s.Package, []reporters.Reportable{human, jsonFile}) + + return mr, nil +} + +func newReport(benchName, corporaFile string, s *scenario, sum *metricsSummary, stats []rallyStat) *report { + var report report + report.Info.Benchmark = benchName + report.Info.Description = s.Description + report.Info.RunID = sum.RunID + report.Info.Package = s.Package + report.Info.StartTs = sum.CollectionStartTs + report.Info.EndTs = sum.CollectionEndTs + report.Info.Duration = time.Duration(sum.CollectionEndTs-sum.CollectionStartTs) * time.Second + report.Info.GeneratedCorporaFile = corporaFile + report.Parameters.PackageVersion = s.Version + report.Parameters.DataStream = s.DataStream + report.Parameters.Corpora = s.Corpora + report.ClusterName = sum.ClusterName + report.Nodes = sum.Nodes + report.DataStreamStats = sum.DataStreamStats + report.IngestPipelineStats = sum.IngestPipelineStats + report.DiskUsage = sum.DiskUsage + report.TotalHits = sum.TotalHits + report.RallyStats = stats + return &report +} + +func reportJSONFormat(r *report) ([]byte, error) { + b, err := json.MarshalIndent(r, "", "\t") + if err != nil { + return nil, err + } + return b, nil +} + +func reportHumanFormat(r *report) []byte { + var report strings.Builder + report.WriteString(renderBenchmarkTable( + "info", + "benchmark", r.Info.Benchmark, + "description", r.Info.Description, + "run ID", r.Info.RunID, + "package", r.Info.Package, + "start ts (s)", r.Info.StartTs, + "end ts (s)", r.Info.EndTs, + "duration", r.Info.Duration, + "generated corpora file", r.Info.GeneratedCorporaFile, + ) + "\n") + + pkvs := []interface{}{ + "package version", r.Parameters.PackageVersion, + } + + pkvs = append(pkvs, "data_stream.name", r.Parameters.DataStream.Name) + + if r.Parameters.Corpora.Generator != nil { + pkvs = append(pkvs, + "corpora.generator.total_events", r.Parameters.Corpora.Generator.TotalEvents, + "corpora.generator.template.path", r.Parameters.Corpora.Generator.Template.Path, + "corpora.generator.template.raw", r.Parameters.Corpora.Generator.Template.Raw, + "corpora.generator.template.type", r.Parameters.Corpora.Generator.Template.Type, + "corpora.generator.config.path", r.Parameters.Corpora.Generator.Config.Path, + "corpora.generator.config.raw", r.Parameters.Corpora.Generator.Config.Raw, + "corpora.generator.fields.path", r.Parameters.Corpora.Generator.Fields.Path, + "corpora.generator.fields.raw", r.Parameters.Corpora.Generator.Fields.Raw, + ) + } + + report.WriteString(renderBenchmarkTable("parameters", pkvs...) + "\n") + + report.WriteString(renderBenchmarkTable( + "cluster info", + "name", r.ClusterName, + "nodes", r.Nodes, + ) + "\n") + + report.WriteString(renderBenchmarkTable( + "data stream stats", + "data stream", r.DataStreamStats.DataStream, + "approx total docs ingested", r.TotalHits, + "backing indices", r.DataStreamStats.BackingIndices, + "store size bytes", r.DataStreamStats.StoreSizeBytes, + "maximum ts (ms)", r.DataStreamStats.MaximumTimestamp, + ) + "\n") + + for index, du := range r.DiskUsage { + adu := du.AllFields + report.WriteString(renderBenchmarkTable( + fmt.Sprintf("disk usage for index %s (for all fields)", index), + "total", humanize.Bytes(adu.TotalInBytes), + "inverted_index.total", humanize.Bytes(adu.InvertedIndex.TotalInBytes), + "inverted_index.stored_fields", humanize.Bytes(adu.StoredFieldsInBytes), + "inverted_index.doc_values", humanize.Bytes(adu.DocValuesInBytes), + "inverted_index.points", humanize.Bytes(adu.PointsInBytes), + "inverted_index.norms", humanize.Bytes(adu.NormsInBytes), + "inverted_index.term_vectors", humanize.Bytes(adu.TermVectorsInBytes), + "inverted_index.knn_vectors", humanize.Bytes(adu.KnnVectorsInBytes), + ) + "\n") + } + + for node, pStats := range r.IngestPipelineStats { + for pipeline, stats := range pStats { + if stats.Count == 0 { + continue + } + kvs := []interface{}{ + "Totals", + fmt.Sprintf( + "Count: %d | Failed: %d | Time: %s", + stats.Count, + stats.Failed, + time.Duration(stats.TimeInMillis)*time.Millisecond, + ), + } + for _, procStats := range stats.Processors { + str := fmt.Sprintf( + "Count: %d | Failed: %d | Time: %s", + procStats.Stats.Count, + procStats.Stats.Failed, + time.Duration(procStats.Stats.TimeInMillis)*time.Millisecond, + ) + kvs = append(kvs, fmt.Sprintf("%s (%s)", procStats.Type, procStats.Extra), str) + } + report.WriteString(renderBenchmarkTable( + fmt.Sprintf("pipeline %s stats in node %s", pipeline, node), + kvs..., + ) + "\n") + } + } + + rsvs := make([]interface{}, 0, len(r.RallyStats)) + for _, rs := range r.RallyStats { + if rs.Metric == "Metric" { // let's skip the header + continue + } + + value := rs.Value + if len(rs.Unit) > 0 { + value = fmt.Sprintf("%v %s", rs.Value, rs.Unit) + } + rsvs = append(rsvs, rs.Metric, value) + } + + report.WriteString(renderBenchmarkTable("rally stats", rsvs...) + "\n") + + return []byte(report.String()) +} + +func renderBenchmarkTable(title string, kv ...interface{}) string { + t := table.NewWriter() + t.SetStyle(table.StyleRounded) + t.SetTitle(title) + t.SetColumnConfigs([]table.ColumnConfig{ + { + Number: 2, + Align: text.AlignRight, + }, + }) + for i := 0; i < len(kv)-1; i += 2 { + t.AppendRow(table.Row{kv[i], kv[i+1]}) + } + return t.Render() +} diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go new file mode 100644 index 0000000000..1fae90bc92 --- /dev/null +++ b/internal/benchrunner/runners/rally/runner.go @@ -0,0 +1,873 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package rally + +import ( + "bytes" + "context" + "encoding/csv" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/elastic/elastic-package/internal/packages/installer" + + "github.com/magefile/mage/sh" + + "github.com/elastic/elastic-package/internal/corpusgenerator" + "github.com/elastic/elastic-package/internal/stack" + + "github.com/google/uuid" + "gopkg.in/yaml.v3" + + "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib" + "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/config" + "github.com/elastic/elastic-integration-corpus-generator-tool/pkg/genlib/fields" + + "github.com/elastic/elastic-package/internal/benchrunner" + "github.com/elastic/elastic-package/internal/benchrunner/reporters" + "github.com/elastic/elastic-package/internal/configuration/locations" + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/multierror" + "github.com/elastic/elastic-package/internal/packages" + "github.com/elastic/elastic-package/internal/servicedeployer" + "github.com/elastic/elastic-package/internal/signal" +) + +const ( + // RallyCorpusAgentDir is folder path where rally corporsa files produced by the service + // are stored on the Rally container's filesystem. + RallyCorpusAgentDir = "/tmp/rally_corpus" + + // BenchType defining rally benchmark + BenchType benchrunner.Type = "rally" +) + +var ErrDryRun = errors.New("dry run: rally benchmark not executed") + +type rallyStat struct { + Metric string + Task string + Value any + Unit string +} + +type runner struct { + options Options + scenario *scenario + + ctxt servicedeployer.ServiceContext + runtimeDataStream string + pipelinePrefix string + generator genlib.Generator + mcollector *collector + + corpusFile string + trackFile string + reportFile string + + // Execution order of following handlers is defined in runner.TearDown() method. + persistRallyTrackHandler func() error + removePackageHandler func() error + wipeDataStreamHandler func() error + clearCorporaHandler func() error +} + +func NewRallyBenchmark(opts Options) benchrunner.Runner { + return &runner{options: opts} +} + +func (r *runner) SetUp() error { + return r.setUp() +} + +// Run runs the system benchmarks defined under the given folder +func (r *runner) Run() (reporters.Reportable, error) { + return r.run() +} + +func (r *runner) TearDown() error { + if r.options.DeferCleanup > 0 { + logger.Debugf("waiting for %s before tearing down...", r.options.DeferCleanup) + signal.Sleep(r.options.DeferCleanup) + } + + var merr multierror.Error + + if r.persistRallyTrackHandler != nil { + if err := r.persistRallyTrackHandler(); err != nil { + merr = append(merr, err) + } + r.persistRallyTrackHandler = nil + } + + if r.removePackageHandler != nil { + if err := r.removePackageHandler(); err != nil { + merr = append(merr, err) + } + r.removePackageHandler = nil + } + + if r.wipeDataStreamHandler != nil { + if err := r.wipeDataStreamHandler(); err != nil { + merr = append(merr, err) + } + r.wipeDataStreamHandler = nil + } + + if r.clearCorporaHandler != nil { + if err := r.clearCorporaHandler(); err != nil { + merr = append(merr, err) + } + r.clearCorporaHandler = nil + } + + if len(merr) == 0 { + return nil + } + return merr +} + +func (r *runner) createRallyTrackDir(locationManager *locations.LocationManager) error { + outputDir := filepath.Join(locationManager.RallyCorpusDir(), r.ctxt.Test.RunID) + if err := os.MkdirAll(outputDir, 0755); err != nil { + return fmt.Errorf("failed to create output directory: %w", err) + } + return nil +} + +func (r *runner) setUp() error { + locationManager, err := locations.NewLocationManager() + if err != nil { + return fmt.Errorf("reading service logs directory failed: %w", err) + } + + rallyCorpusDir := locationManager.RallyCorpusDir() + r.ctxt.Logs.Folder.Local = rallyCorpusDir + r.ctxt.Logs.Folder.Agent = RallyCorpusAgentDir + r.ctxt.Test.RunID = createRunID() + + outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.ctxt.Test.RunID) + if err != nil { + return fmt.Errorf("could not create output dir for terraform deployer %w", err) + } + r.ctxt.OutputDir = outputDir + + err = r.createRallyTrackDir(locationManager) + if err != nil { + return fmt.Errorf("could not create local rally track dir %w", err) + } + + pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.options.PackageRootPath) + if err != nil { + return fmt.Errorf("reading package manifest failed: %w", err) + } + + scenario, err := readConfig(r.options.PackageRootPath, r.options.BenchName, pkgManifest.Name, pkgManifest.Version) + if err != nil { + return err + } + r.scenario = scenario + + err = r.installPackage() + if err != nil { + return fmt.Errorf("error installing package: %w", err) + } + + if r.scenario.Corpora.Generator != nil { + var err error + r.generator, err = r.initializeGenerator() + if err != nil { + return fmt.Errorf("can't initialize generator: %w", err) + } + } + + dataStreamManifest, err := packages.ReadDataStreamManifest( + filepath.Join( + getDataStreamPath(r.options.PackageRootPath, r.scenario.DataStream.Name), + packages.DataStreamManifestFile, + ), + ) + if err != nil { + return fmt.Errorf("reading data stream manifest failed: %w", err) + } + + r.runtimeDataStream = fmt.Sprintf( + "%s-%s.%s-ep", + dataStreamManifest.Type, + pkgManifest.Name, + dataStreamManifest.Name, + ) + r.pipelinePrefix = fmt.Sprintf( + "%s-%s.%s-%s", + dataStreamManifest.Type, + pkgManifest.Name, + dataStreamManifest.Name, + r.scenario.Version, + ) + + if err := r.wipeDataStreamOnSetup(); err != nil { + return fmt.Errorf("error deleting old data in data stream: %s: %w", r.runtimeDataStream, err) + } + + cleared, err := waitUntilTrue(func() (bool, error) { + if signal.SIGINT() { + return true, errors.New("SIGINT: cancel clearing data") + } + + hits, err := getTotalHits(r.options.ESAPI, r.runtimeDataStream) + return hits == 0, err + }, 2*time.Minute) + if err != nil || !cleared { + if err == nil { + err = errors.New("unable to clear previous data") + } + return err + } + + return nil +} + +func (r *runner) wipeDataStreamOnSetup() error { + // Delete old data + logger.Debug("deleting old data in data stream...") + r.wipeDataStreamHandler = func() error { + logger.Debugf("deleting data in data stream...") + if err := r.deleteDataStreamDocs(r.runtimeDataStream); err != nil { + return fmt.Errorf("error deleting data in data stream: %w", err) + } + return nil + } + + return r.deleteDataStreamDocs(r.runtimeDataStream) +} + +func (r *runner) run() (report reporters.Reportable, err error) { + r.startMetricsColletion() + defer r.mcollector.stop() + + // if there is a generator config, generate the data + if r.generator != nil { + logger.Debugf("generating corpus data to %s...", r.ctxt.Logs.Folder.Local) + if err := r.runGenerator(r.ctxt.Logs.Folder.Local); err != nil { + return nil, fmt.Errorf("can't generate benchmarks data corpus for data stream: %w", err) + } + } + + if r.options.DryRun { + dummy := reporters.NewReport(r.scenario.Package, nil) + return dummy, ErrDryRun + } + + rallyStats, err := r.runRally() + if err != nil { + return nil, err + } + + msum, err := r.collectAndSummarizeMetrics() + if err != nil { + return nil, fmt.Errorf("can't summarize metrics: %w", err) + } + + if err := r.reindexData(); err != nil { + return nil, fmt.Errorf("can't reindex data: %w", err) + } + + return createReport(r.options.BenchName, r.corpusFile, r.scenario, msum, rallyStats) +} + +func (r *runner) installPackage() error { + logger.Debug("Installing package...") + installer, err := installer.NewForPackage(installer.Options{ + Kibana: r.options.KibanaClient, + RootPath: r.options.PackageRootPath, + SkipValidation: true, + }) + + if err != nil { + return fmt.Errorf("failed to initialize package installer: %w", err) + } + + _, err = installer.Install() + if err != nil { + return fmt.Errorf("failed to install package: %w", err) + } + + r.removePackageHandler = func() error { + if err := installer.Uninstall(); err != nil { + return fmt.Errorf("error removing benchmark package: %w", err) + } + + return nil + } + + return nil +} + +func (r *runner) startMetricsColletion() { + // TODO collect agent hosts metrics using system integration + r.mcollector = newCollector( + r.ctxt, + r.options.BenchName, + *r.scenario, + r.options.ESAPI, + r.options.ESMetricsAPI, + r.options.MetricsInterval, + r.runtimeDataStream, + r.pipelinePrefix, + ) + r.mcollector.start() +} + +func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) { + r.mcollector.stop() + sum, err := r.mcollector.summarize() + return sum, err +} + +func (r *runner) deleteDataStreamDocs(dataStream string) error { + body := strings.NewReader(`{ "query": { "match_all": {} } }`) + _, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body) + if err != nil { + return err + } + return nil +} + +func (r *runner) initializeGenerator() (genlib.Generator, error) { + totEvents := r.scenario.Corpora.Generator.TotalEvents + + config, err := r.getGeneratorConfig() + if err != nil { + return nil, err + } + + fields, err := r.getGeneratorFields() + if err != nil { + return nil, err + } + + tpl, err := r.getGeneratorTemplate() + if err != nil { + return nil, err + } + + genlib.InitGeneratorTimeNow(time.Now()) + genlib.InitGeneratorRandSeed(time.Now().UnixNano()) + + var generator genlib.Generator + switch r.scenario.Corpora.Generator.Template.Type { + default: + logger.Debugf("unknown generator template type %q, defaulting to \"placeholder\"", r.scenario.Corpora.Generator.Template.Type) + fallthrough + case "", "placeholder": + generator, err = genlib.NewGeneratorWithCustomTemplate(tpl, *config, fields, totEvents) + case "gotext": + generator, err = genlib.NewGeneratorWithTextTemplate(tpl, *config, fields, totEvents) + } + + if err != nil { + return nil, err + } + + return generator, nil +} + +func (r *runner) getGeneratorConfig() (*config.Config, error) { + var ( + data []byte + err error + ) + + if r.scenario.Corpora.Generator.Config.Path != "" { + configPath := filepath.Clean(filepath.Join(devPath, r.scenario.Corpora.Generator.Config.Path)) + configPath = os.ExpandEnv(configPath) + if _, err := os.Stat(configPath); err != nil { + return nil, fmt.Errorf("can't find config file %s: %w", configPath, err) + } + data, err = os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("can't read config file %s: %w", configPath, err) + } + } else if len(r.scenario.Corpora.Generator.Config.Raw) > 0 { + data, err = yaml.Marshal(r.scenario.Corpora.Generator.Config.Raw) + if err != nil { + return nil, fmt.Errorf("can't parse raw generator config: %w", err) + } + } + + cfg, err := config.LoadConfigFromYaml(data) + if err != nil { + return nil, fmt.Errorf("can't get generator config: %w", err) + } + + return &cfg, nil +} + +func (r *runner) getGeneratorFields() (fields.Fields, error) { + var ( + data []byte + err error + ) + + if r.scenario.Corpora.Generator.Fields.Path != "" { + fieldsPath := filepath.Clean(filepath.Join(devPath, r.scenario.Corpora.Generator.Fields.Path)) + fieldsPath = os.ExpandEnv(fieldsPath) + if _, err := os.Stat(fieldsPath); err != nil { + return nil, fmt.Errorf("can't find fields file %s: %w", fieldsPath, err) + } + + data, err = os.ReadFile(fieldsPath) + if err != nil { + return nil, fmt.Errorf("can't read fields file %s: %w", fieldsPath, err) + } + } else if len(r.scenario.Corpora.Generator.Fields.Raw) > 0 { + data, err = yaml.Marshal(r.scenario.Corpora.Generator.Fields.Raw) + if err != nil { + return nil, fmt.Errorf("can't parse raw generator fields: %w", err) + } + } + + fields, err := fields.LoadFieldsWithTemplateFromString(context.Background(), string(data)) + if err != nil { + return nil, fmt.Errorf("could not load fields yaml: %w", err) + } + + return fields, nil +} + +func (r *runner) getGeneratorTemplate() ([]byte, error) { + var ( + data []byte + err error + ) + + if r.scenario.Corpora.Generator.Template.Path != "" { + tplPath := filepath.Clean(filepath.Join(devPath, r.scenario.Corpora.Generator.Template.Path)) + tplPath = os.ExpandEnv(tplPath) + if _, err := os.Stat(tplPath); err != nil { + return nil, fmt.Errorf("can't find template file %s: %w", tplPath, err) + } + + data, err = os.ReadFile(tplPath) + if err != nil { + return nil, fmt.Errorf("can't read template file %s: %w", tplPath, err) + } + } else if len(r.scenario.Corpora.Generator.Template.Raw) > 0 { + data = []byte(r.scenario.Corpora.Generator.Template.Raw) + } + + return data, nil +} + +func (r *runner) runGenerator(destDir string) error { + corpusFile, err := os.CreateTemp(destDir, "corpus-*") + if err != nil { + return fmt.Errorf("cannot not create rally corpus file: %w", err) + } + defer corpusFile.Close() + + if err := corpusFile.Chmod(os.ModePerm); err != nil { + return fmt.Errorf("cannot not set permission to rally corpus file: %w", err) + } + + buf := bytes.NewBufferString("") + var corpusDocsCount uint64 + for { + err := r.generator.Emit(buf) + if err == io.EOF { + break + } + + if err != nil { + return fmt.Errorf("error while generating content for the rally corpus file: %w", err) + } + + // TODO: this should be taken care of by the corpus generator tool, once it will be done let's remove this + event := strings.Replace(buf.String(), "\n", "", -1) + if _, err = corpusFile.Write([]byte(event)); err != nil { + return fmt.Errorf("error while saving content to the rally corpus file: %w", err) + } + + if _, err = corpusFile.Write([]byte("\n")); err != nil { + return fmt.Errorf("error while saving newline to the rally corpus file: %w", err) + } + + buf.Reset() + corpusDocsCount += 1 + } + + r.corpusFile = corpusFile.Name() + + trackFile, err := os.CreateTemp(destDir, "track-*.json") + if err != nil { + return fmt.Errorf("cannot not create rally track file: %w", err) + } + r.trackFile = trackFile.Name() + rallyTrackContent, err := corpusgenerator.GenerateRallyTrack(r.runtimeDataStream, corpusFile, corpusDocsCount) + if err != nil { + return fmt.Errorf("cannot not generate rally track content: %w", err) + } + err = os.WriteFile(r.trackFile, rallyTrackContent, os.ModePerm) + if err != nil { + return fmt.Errorf("cannot not save rally track content to file: %w", err) + } + defer trackFile.Close() + + reportFile, err := os.CreateTemp(destDir, "report-*.csv") + if err != nil { + return fmt.Errorf("cannot not save rally report file: %w", err) + } + defer reportFile.Close() + + r.reportFile = reportFile.Name() + + if r.options.RallyTrackOutputDir != "" { + r.persistRallyTrackHandler = func() error { + err := os.MkdirAll(r.options.RallyTrackOutputDir, os.ModeDir) + if err != nil { + return fmt.Errorf("cannot not create rally track output dir: %w", err) + } + + persistedRallyTrack := filepath.Join(r.options.RallyTrackOutputDir, fmt.Sprintf("track-%s.json", r.runtimeDataStream)) + err = sh.Copy(persistedRallyTrack, trackFile.Name()) + if err != nil { + return fmt.Errorf("cannot not copy rally track to file in output dir: %w", err) + } + + persistedCorpus := filepath.Join(r.options.RallyTrackOutputDir, filepath.Base(corpusFile.Name())) + err = sh.Copy(persistedCorpus, corpusFile.Name()) + if err != nil { + err = fmt.Errorf("cannot not copy rally corpus to file in output dir: %w", err) + return errors.Join(os.Remove(persistedRallyTrack), err) + } + + logger.Infof("rally track and corpus saved at: %s", r.options.RallyTrackOutputDir) + return nil + } + } + + r.clearCorporaHandler = func() error { + return errors.Join( + os.Remove(r.corpusFile), + os.Remove(r.reportFile), + os.Remove(r.trackFile), + ) + } + + return r.generator.Close() +} + +func (r *runner) runRally() ([]rallyStat, error) { + logger.Debug("running rally...") + profileConfig, err := stack.StackInitConfig(r.options.Profile) + if err != nil { + return nil, fmt.Errorf("failed to load config from profile: %w", err) + } + + elasticsearchHost, found := os.LookupEnv(stack.ElasticsearchHostEnv) + if !found { + status, err := stack.Status(stack.Options{Profile: r.options.Profile}) + if err != nil { + return nil, fmt.Errorf("failed to check status of stack in current profile: %w", err) + } + if len(status) == 0 { + return nil, stack.ErrUnavailableStack + } + + elasticsearchHost = profileConfig.ElasticsearchHostPort + logger.Debugf("Configuring rally with Elasticsearch host from current profile (profile: %s, host: %q)", r.options.Profile.ProfileName, elasticsearchHost) + } + + elasticsearchPassword, found := os.LookupEnv(stack.ElasticsearchPasswordEnv) + if !found { + elasticsearchPassword = profileConfig.ElasticsearchPassword + } + elasticsearchUsername, found := os.LookupEnv(stack.ElasticsearchUsernameEnv) + if !found { + elasticsearchUsername = profileConfig.ElasticsearchUsername + } + + _, err = exec.LookPath("esrally") + if err != nil { + return nil, errors.New("could not run esrally track in path: esrally not found, please follow instruction at https://esrally.readthedocs.io/en/stable/install.html") + } + + cmd := exec.Command( + "esrally", + "race", + "--race-id="+r.ctxt.Test.RunID, + "--report-format=csv", + fmt.Sprintf(`--report-file=%s`, r.reportFile), + fmt.Sprintf(`--target-hosts={"default":["%s"]}`, elasticsearchHost), + fmt.Sprintf(`--track-path=%s`, r.trackFile), + fmt.Sprintf(`--client-options={"default":{"basic_auth_user":"%s","basic_auth_password":"%s","use_ssl":true,"verify_certs":false}}`, elasticsearchUsername, elasticsearchPassword), + "--pipeline=benchmark-only", + ) + errOutput := new(bytes.Buffer) + cmd.Stderr = errOutput + + logger.Debugf("output command: %s", cmd) + output, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("could not run esrally track in path: %s (stdout=%s): %w", r.ctxt.Logs.Folder.Local, output, err) + } + + reportCSV, err := os.Open(r.reportFile) + if err != nil { + return nil, fmt.Errorf("could not open esrally report in path: %s (stderr=%q): %w", r.ctxt.Logs.Folder.Local, errOutput.String(), err) + } + + reader := csv.NewReader(reportCSV) + + stats := make([]rallyStat, 0) + for { + record, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("could not read esrally report in path: %s (stderr=%q): %w", r.ctxt.Logs.Folder.Local, errOutput.String(), err) + } + + stats = append(stats, rallyStat{Metric: record[0], Task: record[1], Value: record[2], Unit: record[3]}) + } + + return stats, nil +} + +// reindexData will read all data generated during the benchmark and will reindex it to the metrisctore +func (r *runner) reindexData() error { + if !r.options.ReindexData { + return nil + } + if r.options.ESMetricsAPI == nil { + return errors.New("the option to reindex data is set, but the metricstore was not initialized") + } + + logger.Debug("starting reindexing of data...") + + logger.Debug("getting orignal mappings...") + // Get the mapping from the source data stream + mappingRes, err := r.options.ESAPI.Indices.GetMapping( + r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), + ) + if err != nil { + return fmt.Errorf("error getting mapping: %w", err) + } + defer mappingRes.Body.Close() + + body, err := io.ReadAll(mappingRes.Body) + if err != nil { + return fmt.Errorf("error reading mapping body: %w", err) + } + + mappings := map[string]struct { + Mappings json.RawMessage + }{} + + if err := json.Unmarshal(body, &mappings); err != nil { + return fmt.Errorf("error unmarshaling mappings: %w", err) + } + + if len(mappings) != 1 { + return fmt.Errorf("exactly 1 mapping was expected, got %d", len(mappings)) + } + + var mapping string + for _, v := range mappings { + mapping = string(v.Mappings) + } + + reader := bytes.NewReader( + []byte(fmt.Sprintf(`{ + "settings": {"number_of_replicas":0}, + "mappings": %s + }`, mapping)), + ) + + indexName := fmt.Sprintf("bench-reindex-%s-%s", r.runtimeDataStream, r.ctxt.Test.RunID) + + logger.Debugf("creating %s index in metricstore...", indexName) + + createRes, err := r.options.ESMetricsAPI.Indices.Create( + indexName, + r.options.ESMetricsAPI.Indices.Create.WithBody(reader), + ) + if err != nil { + return fmt.Errorf("could not create index: %w", err) + } + defer createRes.Body.Close() + + if createRes.IsError() { + return errors.New("got a response error while creating index") + } + + bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`) + + logger.Debug("starting scrolling of events...") + res, err := r.options.ESAPI.Search( + r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), + r.options.ESAPI.Search.WithBody(bodyReader), + r.options.ESAPI.Search.WithScroll(time.Minute), + r.options.ESAPI.Search.WithSize(10000), + ) + if err != nil { + return fmt.Errorf("error executing search: %w", err) + } + defer res.Body.Close() + + type searchRes struct { + Error *struct { + Reason string `json:"reson"` + } `json:"error"` + ScrollID string `json:"_scroll_id"` + Hits []struct { + ID string `json:"_id"` + Source map[string]interface{} `json:"_source"` + } `json:"hits"` + } + + // Iterate through the search results using the Scroll API + for { + var sr searchRes + if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { + return fmt.Errorf("error decoding search response: %w", err) + } + + if sr.Error != nil { + return fmt.Errorf("error searching for documents: %s", sr.Error.Reason) + } + + if len(sr.Hits) == 0 { + break + } + + var bulkBodyBuilder strings.Builder + for _, hit := range sr.Hits { + bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) + enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) + src, err := json.Marshal(enriched) + if err != nil { + return fmt.Errorf("error decoding _source: %w", err) + } + bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + } + + logger.Debugf("bulk request of %d events...", len(sr.Hits)) + + bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + if err != nil { + return fmt.Errorf("error performing the bulk index request: %w", err) + } + bulkRes.Body.Close() + + if sr.ScrollID == "" { + return errors.New("error getting scroll ID") + } + + res, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), + r.options.ESAPI.Scroll.WithScroll(time.Minute), + ) + if err != nil { + return fmt.Errorf("error executing scroll: %s", err) + } + res.Body.Close() + } + + logger.Debug("reindexing operation finished") + return nil +} + +type benchMeta struct { + Info struct { + Benchmark string `json:"benchmark"` + RunID string `json:"run_id"` + } `json:"info"` + Parameters scenario `json:"parameter"` +} + +func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[string]interface{} { + var m benchMeta + m.Info.Benchmark = r.options.BenchName + m.Info.RunID = r.ctxt.Test.RunID + m.Parameters = *r.scenario + e["benchmark_metadata"] = m + return e +} + +func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) { + resp, err := esapi.Count( + esapi.Count.WithIndex(dataStream), + ) + if err != nil { + return 0, fmt.Errorf("could not search data stream: %w", err) + } + defer resp.Body.Close() + + var results struct { + Count int + Error *struct { + Type string + Reason string + } + Status int + } + + if err := json.NewDecoder(resp.Body).Decode(&results); err != nil { + return 0, fmt.Errorf("could not decode search results response: %w", err) + } + + numHits := results.Count + if results.Error != nil { + logger.Debugf("found %d hits in %s data stream: %s: %s Status=%d", + numHits, dataStream, results.Error.Type, results.Error.Reason, results.Status) + } else { + logger.Debugf("found %d hits in %s data stream", numHits, dataStream) + } + + return numHits, nil +} + +func waitUntilTrue(fn func() (bool, error), timeout time.Duration) (bool, error) { + timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() + + retryTicker := time.NewTicker(5 * time.Second) + defer retryTicker.Stop() + + for { + result, err := fn() + if err != nil { + return false, err + } + if result { + return true, nil + } + + select { + case <-retryTicker.C: + continue + case <-timeoutTimer.C: + return false, nil + } + } +} + +func createRunID() string { + return uuid.New().String() +} + +func getDataStreamPath(packageRoot, dataStream string) string { + return filepath.Join(packageRoot, "data_stream", dataStream) +} diff --git a/internal/benchrunner/runners/rally/scenario.go b/internal/benchrunner/runners/rally/scenario.go new file mode 100644 index 0000000000..89186575dd --- /dev/null +++ b/internal/benchrunner/runners/rally/scenario.go @@ -0,0 +1,73 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package rally + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/elastic/go-ucfg/yaml" +) + +const devPath = "_dev/benchmark/rally" + +type scenario struct { + Package string `config:"package" json:"package"` + Description string `config:"description" json:"description"` + Version string `config:"version" json:"version"` + DataStream dataStream `config:"data_stream" json:"data_stream"` + Corpora corpora `config:"corpora" json:"corpora"` +} + +type dataStream struct { + Name string `config:"name" json:"name"` +} + +type corpora struct { + Generator *generator `config:"generator" json:"generator"` +} + +type generator struct { + TotalEvents uint64 `config:"total_events" json:"total_events"` + Template corporaTemplate `config:"template" json:"template"` + Config corporaAsset `config:"config" json:"config"` + Fields corporaAsset `config:"fields" json:"fields"` +} + +type corporaAsset struct { + Raw map[string]interface{} `config:"raw" json:"raw"` + Path string `config:"path" json:"path"` +} +type corporaTemplate struct { + Raw string `config:"raw" json:"raw"` + Path string `config:"path" json:"path"` + Type string `config:"type" json:"type"` +} + +func defaultConfig() *scenario { + return &scenario{} +} + +func readConfig(path, scenario, packageName, packageVersion string) (*scenario, error) { + configPath := filepath.Join(path, devPath, fmt.Sprintf("%s.yml", scenario)) + c := defaultConfig() + cfg, err := yaml.NewConfigWithFile(configPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("can't load benchmark configuration: %s: %w", configPath, err) + } + + if err == nil { + if err := cfg.Unpack(c); err != nil { + return nil, fmt.Errorf("can't unpack benchmark configuration: %s: %w", configPath, err) + } + } + + c.Package = packageName + c.Version = packageVersion + + return c, nil +} diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 3743d10d99..2b48f227d7 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -16,18 +16,10 @@ import ( "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/elasticsearch/ingest" - "github.com/elastic/elastic-package/internal/environment" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/servicedeployer" ) -var ( - ESMetricstoreHostEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_HOST") - ESMetricstoreUsernameEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_USERNAME") - ESMetricstorePasswordEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_PASSWORD") - ESMetricstoreCACertificateEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_CA_CERT") -) - type collector struct { ctxt servicedeployer.ServiceContext metadata benchMeta diff --git a/internal/cobraext/flags.go b/internal/cobraext/flags.go index 277598a435..f02bf6e661 100644 --- a/internal/cobraext/flags.go +++ b/internal/cobraext/flags.go @@ -53,6 +53,12 @@ const ( BenchWithTestSamplesFlagName = "use-test-samples" BenchWithTestSamplesFlagDescription = "use test samples for the benchmarks" + BenchCorpusRallyTrackOutputDirFlagName = "rally-track-output-dir" + BenchCorpusRallyTrackOutputDirFlagDescription = "output dir of the rally track: if present the command will save the generated rally track" + + BenchCorpusRallyDryRunFlagName = "dry-run" + BenchCorpusRallyDryRunFlagDescription = "Do not run rally but just generate the rally track" + BuildSkipValidationFlagName = "skip-validation" BuildSkipValidationFlagDescription = "skip validation of the built package, use only if all validation issues have been acknowledged" diff --git a/internal/configuration/locations/locations.go b/internal/configuration/locations/locations.go index e498f8d910..3f4fba0fbd 100644 --- a/internal/configuration/locations/locations.go +++ b/internal/configuration/locations/locations.go @@ -32,6 +32,7 @@ var ( elasticPackageDataHome = environment.WithElasticPackagePrefix("DATA_HOME") serviceLogsDir = filepath.Join(temporaryDir, "service_logs") + rallyCorpusDir = filepath.Join(temporaryDir, "rally_corpus") kubernetesDeployerDir = filepath.Join(deployerDir, "kubernetes") serviceOutputDir = filepath.Join(temporaryDir, "output") ) @@ -87,6 +88,11 @@ func (loc LocationManager) KubernetesDeployerDir() string { return filepath.Join(loc.stackPath, kubernetesDeployerDir) } +// RallyCorpusDir returns the rally coprus directory +func (loc LocationManager) RallyCorpusDir() string { + return filepath.Join(loc.stackPath, rallyCorpusDir) +} + // ServiceLogDir returns the log directory func (loc LocationManager) ServiceLogDir() string { return filepath.Join(loc.stackPath, serviceLogsDir) diff --git a/internal/corpusgenerator/rally.go b/internal/corpusgenerator/rally.go index ae23432bf4..c50e55bf43 100644 --- a/internal/corpusgenerator/rally.go +++ b/internal/corpusgenerator/rally.go @@ -6,6 +6,7 @@ package corpusgenerator import ( "bytes" + "fmt" "os" "path/filepath" "text/template" @@ -49,17 +50,17 @@ const ( ` ) -func generateRallyTrack(dataStream string, corpusFile *os.File, corpusDocsCount uint64) ([]byte, error) { +func GenerateRallyTrack(dataStream string, corpusFile *os.File, corpusDocsCount uint64) ([]byte, error) { t := template.New("rallytrack") parsedTpl, err := t.Delims("[[", "]]").Parse(rallyTrackTemplate) if err != nil { - return nil, err + return nil, fmt.Errorf("error while parsing rally track template: %w", err) } fi, err := corpusFile.Stat() if err != nil { - return nil, err + return nil, fmt.Errorf("error with stat on rally corpus file: %w", err) } corpusSizeInBytes := fi.Size() @@ -74,7 +75,7 @@ func generateRallyTrack(dataStream string, corpusFile *os.File, corpusDocsCount err = parsedTpl.Execute(buf, templateData) if err != nil { - return nil, err + return nil, fmt.Errorf("error on parsin on rally track template: %w", err) } return buf.Bytes(), nil diff --git a/internal/corpusgenerator/utils.go b/internal/corpusgenerator/utils.go index 511f47954d..f55b3c8dab 100644 --- a/internal/corpusgenerator/utils.go +++ b/internal/corpusgenerator/utils.go @@ -56,7 +56,7 @@ func RunGenerator(generator genlib.Generator, dataStream, rallyTrackOutputDir st if len(rallyTrackOutputDir) > 0 { corpusFile := f.(*os.File) - rallyTrackContent, err := generateRallyTrack(dataStream, corpusFile, corpusDocsCount) + rallyTrackContent, err := GenerateRallyTrack(dataStream, corpusFile, corpusDocsCount) if err != nil { return err } diff --git a/internal/elasticsearch/ingest/nodestats.go b/internal/elasticsearch/ingest/nodestats.go index 4b9da276bf..30bb0ce79d 100644 --- a/internal/elasticsearch/ingest/nodestats.go +++ b/internal/elasticsearch/ingest/nodestats.go @@ -118,8 +118,10 @@ func GetPipelineStatsByPrefix(esClient *elasticsearch.API, pipelinePrefix string } func requestPipelineStats(esClient *elasticsearch.API) ([]byte, error) { - statsReq := esClient.Nodes.Stats.WithFilterPath("nodes.*.ingest.pipelines") - resp, err := esClient.Nodes.Stats(statsReq) + filterPathReq := esClient.Nodes.Stats.WithFilterPath("nodes.*.ingest.pipelines") + includeUnloadedSegmentReq := esClient.Nodes.Stats.WithIncludeUnloadedSegments(true) + + resp, err := esClient.Nodes.Stats(filterPathReq, includeUnloadedSegmentReq) if err != nil { return nil, fmt.Errorf("node stats API call failed: %w", err) } diff --git a/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark.yml b/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark.yml new file mode 100644 index 0000000000..16e45f7782 --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark.yml @@ -0,0 +1,14 @@ +--- +description: Benchmark 20000 events ingested +data_stream: + name: testds +corpora: + generator: + total_events: 20000 + template: + type: gotext + path: ./logs-benchmark/template.ndjson + config: + path: ./logs-benchmark/config.yml + fields: + path: ./logs-benchmark/fields.yml diff --git a/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/config.yml b/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/config.yml new file mode 100644 index 0000000000..03b8ad10a3 --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/config.yml @@ -0,0 +1,22 @@ +fields: + - name: '@timestamp' + period: 1s + - name: container.id + - name: log.flags + type: keyword + - name: log.offset + cardinality: 10000 + - name: tags + enum: ["production", "env2"] + - name: IP + cardinality: 100 + - name: StatusCode + enum: ["200", "400", "404"] + - name: Size + range: + min: 1 + max: 1000 + - name: Port + range: + min: 8000 + max: 8080 diff --git a/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/fields.yml b/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/fields.yml new file mode 100644 index 0000000000..32a82b632d --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/fields.yml @@ -0,0 +1,23 @@ +- name: timestamp + type: date +- name: container.id + type: keyword +- name: log.file.path + example: /var/log/fun-times.log + type: keyword +- name: log.flags + type: keyword +- name: log.offset + type: long +- name: tags + type: keyword +- name: IP + type: ip +- name: StatusCode + type: keyword +- name: Size + type: long +- name: Hostname + type: keyword +- name: Port + type: long diff --git a/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/template.ndjson b/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/template.ndjson new file mode 100644 index 0000000000..3567b2f28a --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/_dev/benchmark/rally/logs-benchmark/template.ndjson @@ -0,0 +1,14 @@ +{{- $timestamp := generate "timestamp" -}} +{ +"@timestamp": "{{ $timestamp.Format "2006-01-02T15:04:05.999999Z07:00" }}", +"data_stream.type": "logs", +"data_stream.dataset": "rally_benchmarks.testds", +"data_stream.namespace": "ep", +"container.id": "{{ generate "container.id" }}", +"input.type": "filestream", +"log.file.path": "{{ generate "log.file.path" }}", +"log.flags": "{{ generate "log.flags" }}", +"log.offset": {{ generate "log.offset" }}, +"tags": ["rally_benchmark.testds", "forwarded", "{{ generate "tags" }}" ], +"message": "{{ generate "IP" }} - - [{{ $timestamp.Format "02/Jan/2006:15:04:05.999999 -0700" }}] \"GET /favicon.ico HTTP/1.1\" {{ generate "StatusCode" }} {{ generate "Size" }} \"http://{{ generate "Hostname" }}:{{ generate "Port" }}/\" \"skip-this-one/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"" +} \ No newline at end of file diff --git a/test/packages/benchmarks/rally_benchmark/changelog.yml b/test/packages/benchmarks/rally_benchmark/changelog.yml new file mode 100644 index 0000000000..dde678acef --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/changelog.yml @@ -0,0 +1,6 @@ +# newer versions go on top +- version: "999.999.999" + changes: + - description: initial release + type: enhancement # can be one of: enhancement, bugfix, breaking-change + link: https://github.com/elastic/elastic-package/pull/1522 diff --git a/test/packages/benchmarks/rally_benchmark/data_stream/testds/_dev/benchmark/pipeline/access-raw.log b/test/packages/benchmarks/rally_benchmark/data_stream/testds/_dev/benchmark/pipeline/access-raw.log new file mode 100644 index 0000000000..c8c9ffe960 --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/data_stream/testds/_dev/benchmark/pipeline/access-raw.log @@ -0,0 +1 @@ +1.2.3.4 - - [25/Oct/2016:14:49:34 +0200] "GET /favicon.ico HTTP/1.1" 404 571 "http://localhost:8080/" "skip-this-one/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36" \ No newline at end of file diff --git a/test/packages/benchmarks/rally_benchmark/data_stream/testds/_dev/benchmark/pipeline/config.yml b/test/packages/benchmarks/rally_benchmark/data_stream/testds/_dev/benchmark/pipeline/config.yml new file mode 100644 index 0000000000..30a2b50cf6 --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/data_stream/testds/_dev/benchmark/pipeline/config.yml @@ -0,0 +1 @@ +num_docs: 10000 diff --git a/test/packages/benchmarks/rally_benchmark/data_stream/testds/elasticsearch/ingest_pipeline/default.yml b/test/packages/benchmarks/rally_benchmark/data_stream/testds/elasticsearch/ingest_pipeline/default.yml new file mode 100644 index 0000000000..f39b8ee231 --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/data_stream/testds/elasticsearch/ingest_pipeline/default.yml @@ -0,0 +1,23 @@ +--- +description: Pipeline for parsing Nginx access logs. Requires the geoip and user_agent + plugins. +processors: + - grok: + field: message + patterns: + - (%{NGINX_HOST} )?"?(?:%{NGINX_ADDRESS_LIST:nginx.access.remote_ip_list}|%{NOTSPACE:source.address}) + - (-|%{DATA:user.name}) \[%{HTTPDATE:nginx.access.time}\] "%{DATA:nginx.access.info}" + %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long} + "(-|%{DATA:http.request.referrer})" "(-|%{DATA:user_agent.original})" + pattern_definitions: + NGINX_HOST: (?:%{IP:destination.ip}|%{NGINX_NOTSEPARATOR:destination.domain})(:%{NUMBER:destination.port})? + NGINX_NOTSEPARATOR: "[^\t ,:]+" + NGINX_ADDRESS_LIST: (?:%{IP}|%{WORD})("?,?\s*(?:%{IP}|%{WORD}))* + ignore_missing: true + - user_agent: + field: user_agent.original + ignore_missing: true +on_failure: + - set: + field: error.message + value: '{{ _ingest.on_failure_message }}' \ No newline at end of file diff --git a/test/packages/benchmarks/rally_benchmark/data_stream/testds/fields/base-fields.yml b/test/packages/benchmarks/rally_benchmark/data_stream/testds/fields/base-fields.yml new file mode 100644 index 0000000000..0ec2cc7e01 --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/data_stream/testds/fields/base-fields.yml @@ -0,0 +1,38 @@ +- name: data_stream.type + type: constant_keyword + description: Data stream type. +- name: data_stream.dataset + type: constant_keyword + description: Data stream dataset. +- name: data_stream.namespace + type: constant_keyword + description: Data stream namespace. +- name: '@timestamp' + type: date + description: Event timestamp. +- name: container.id + description: Unique container id. + ignore_above: 1024 + type: keyword +- name: input.type + description: Type of Filebeat input. + type: keyword +- name: log.file.path + description: Full path to the log file this event came from. + example: /var/log/fun-times.log + ignore_above: 1024 + type: keyword +- name: log.source.address + description: Source address from which the log event was read / sent from. + type: keyword +- name: log.flags + description: Flags for the log file. + type: keyword +- name: log.offset + description: Offset of the entry in the log file. + type: long +- name: tags + description: List of keywords used to tag each event. + example: '["production", "env2"]' + ignore_above: 1024 + type: keyword diff --git a/test/packages/benchmarks/rally_benchmark/data_stream/testds/manifest.yml b/test/packages/benchmarks/rally_benchmark/data_stream/testds/manifest.yml new file mode 100644 index 0000000000..250726a37b --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/data_stream/testds/manifest.yml @@ -0,0 +1,3 @@ +title: Test +release: experimental +type: logs diff --git a/test/packages/benchmarks/rally_benchmark/docs/README.md b/test/packages/benchmarks/rally_benchmark/docs/README.md new file mode 100644 index 0000000000..e0ef7b4a18 --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/docs/README.md @@ -0,0 +1,2 @@ +# Test integration + diff --git a/test/packages/benchmarks/rally_benchmark/manifest.yml b/test/packages/benchmarks/rally_benchmark/manifest.yml new file mode 100644 index 0000000000..7cbe79f7a0 --- /dev/null +++ b/test/packages/benchmarks/rally_benchmark/manifest.yml @@ -0,0 +1,13 @@ +format_version: 3.0.1 +name: rally_benchmarks +title: Rally benchmarks +version: 999.999.999 +description: Test for rally benchmark runner +categories: ["network"] +type: integration +conditions: + kibana: + version: '^8.0.0' +owner: + github: elastic/integrations + type: elastic diff --git a/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark/config.yml b/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark/config.yml index af365d3c94..bbceb2681c 100644 --- a/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark/config.yml +++ b/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark/config.yml @@ -1,40 +1,39 @@ -- name: IP - cardinality: - numerator: 1 - denominator: 100 -- name: Day - range: - min: 1 - max: 28 -- name: H - range: - min: 10 - max: 23 -- name: MS - range: - min: 10 - max: 59 -- name: Mon - enum: - - "Jan" - - "Feb" - - "Mar" - - "Apr" - - "May" - - "Jun" - - "Jul" - - "Aug" - - "Sep" - - "Oct" - - "Nov" - - "Dec" -- name: StatusCode - enum: ["200", "400", "404"] -- name: Size - range: - min: 1 - max: 1000 -- name: Port - range: - min: 8000 - max: 8080 +fields: + - name: IP + cardinality: 100 + - name: Day + range: + min: 1 + max: 28 + - name: H + range: + min: 10 + max: 23 + - name: MS + range: + min: 10 + max: 59 + - name: Mon + enum: + - "Jan" + - "Feb" + - "Mar" + - "Apr" + - "May" + - "Jun" + - "Jul" + - "Aug" + - "Sep" + - "Oct" + - "Nov" + - "Dec" + - name: StatusCode + enum: ["200", "400", "404"] + - name: Size + range: + min: 1 + max: 1000 + - name: Port + range: + min: 8000 + max: 8080