Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7a8862e
Control cancelation with contexts
jsoriano Feb 13, 2024
b2ebe82
Use the correct context on testrunner commands
jsoriano Feb 14, 2024
e30b7b7
More contexts
jsoriano Feb 14, 2024
7fe9eea
Context for cleanup handlers
jsoriano Feb 14, 2024
3fc76d5
Linting
jsoriano Feb 14, 2024
2ac9918
Reuse waitUntilTrue
jsoriano Feb 14, 2024
3a20101
Adjust default retry period
jsoriano Feb 15, 2024
8e50588
More contexts
jsoriano Feb 15, 2024
b081e44
Merge remote-tracking branch 'origin/main' into context-all-the-things
jsoriano Feb 15, 2024
cccecc1
Rename ServiceContext
jsoriano Feb 15, 2024
5d1c58f
Add context to more handlers
jsoriano Feb 15, 2024
39a718f
Merge remote-tracking branch 'origin/main' into context-all-the-things
jsoriano Feb 23, 2024
dfa337a
Handle returned error
jsoriano Feb 23, 2024
5822449
Reduce sync primitives in streamer
jsoriano Feb 23, 2024
854149a
Refactor data streamer
jsoriano Feb 23, 2024
638cadd
Simplify empty method
jsoriano Feb 23, 2024
f948a38
Merge remote-tracking branch 'origin/main' into context-all-the-things
jsoriano Feb 27, 2024
6ee40a0
Merge remote-tracking branch 'origin/main' into context-all-the-things
jsoriano Mar 5, 2024
4f746a4
Recover previous periods on wait loops
jsoriano Mar 5, 2024
ae9ca78
Pass contexts to ES requests in system test runner
jsoriano Mar 5, 2024
0f7f608
Add contexts to kibana
jsoriano Mar 5, 2024
f0a56fc
Recover previous error format
jsoriano Mar 5, 2024
3adbddf
Add common function to get count of documents in data stream
jsoriano Mar 5, 2024
9898505
Print new line after errors
jsoriano Mar 6, 2024
cfa1075
Log the message about signal caught when the signal is caught
jsoriano Mar 6, 2024
ce50c22
Ensure that service container logs are always written
jsoriano Mar 6, 2024
e12b4e9
Add signal handling per command
jsoriano Mar 6, 2024
438fc2a
Fix enablement of signal handling
jsoriano Mar 6, 2024
761e6d6
Fix panic
jsoriano Mar 6, 2024
168d878
Handle interruption of subcommands
jsoriano Mar 6, 2024
b1ef92a
Merge remote-tracking branch 'origin/main' into context-all-the-things
jsoriano Mar 6, 2024
ee84be7
Merge branch 'context-all-the-things' into context-all-the-things-kibana
jsoriano Mar 6, 2024
72c7b10
Move signal enablement back to where it was
jsoriano Mar 6, 2024
362a263
Fix imports
jsoriano Mar 6, 2024
f900edb
Define context parameters so we don't forget later
jsoriano Mar 7, 2024
1f6bc21
Merge branch 'context-all-the-things' into context-all-the-things-kibana
jsoriano Mar 7, 2024
85bc840
Avoid cancelation of log dumps
jsoriano Mar 7, 2024
cc71f31
Merge remote-tracking branch 'origin/main' into context-all-the-things
jsoriano Mar 7, 2024
387ce80
Merge branch 'context-all-the-things' into context-all-the-things-kibana
jsoriano Mar 7, 2024
149a243
Merge remote-tracking branch 'origin/main' into context-all-the-thing…
jsoriano Mar 21, 2024
ecd4600
Remove package if installation interrupted during asset tests
jsoriano Mar 25, 2024
0051f33
Remove package if installation interrupted during system tests
jsoriano Mar 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,6 @@ func pipelineCommandAction(cmd *cobra.Command, args []string) error {
results = append(results, r)
}

if err != nil {
return fmt.Errorf("error running package pipeline benchmarks: %w", err)
}

for _, report := range results {
if err := reporters.WriteReportable(reporters.Output(reportOutput), report); err != nil {
return fmt.Errorf("error writing benchmark report: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func dumpInstalledObjectsCmdAction(cmd *cobra.Command, args []string) error {
if err != nil {
return fmt.Errorf("failed to initialize Kibana client: %w", err)
}
installedPackage, err := kibanaClient.GetPackage(packageName)
installedPackage, err := kibanaClient.GetPackage(cmd.Context(), packageName)
if err != nil {
return fmt.Errorf("failed to get package status: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func editDashboardsCmd(cmd *cobra.Command, args []string) error {
}

if len(dashboardIDs) == 0 {
dashboardIDs, err = promptDashboardIDs(kibanaClient)
dashboardIDs, err = promptDashboardIDs(cmd.Context(), kibanaClient)
if err != nil {
return fmt.Errorf("prompt for dashboard selection failed: %w", err)
}
Expand All @@ -107,7 +107,7 @@ func editDashboardsCmd(cmd *cobra.Command, args []string) error {
updatedDashboardIDs := make([]string, 0, len(dashboardIDs))
failedDashboardUpdates := make(map[string]error, len(dashboardIDs))
for _, dashboardID := range dashboardIDs {
err = kibanaClient.SetManagedSavedObject("dashboard", dashboardID, false)
err = kibanaClient.SetManagedSavedObject(cmd.Context(), "dashboard", dashboardID, false)
if err != nil {
failedDashboardUpdates[dashboardID] = err
} else {
Expand Down
9 changes: 5 additions & 4 deletions cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package cmd

import (
"context"
"fmt"

"github.com/AlecAivazis/survey/v2"
Expand Down Expand Up @@ -93,7 +94,7 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error {
}

if len(dashboardIDs) == 0 {
dashboardIDs, err = promptDashboardIDs(kibanaClient)
dashboardIDs, err = promptDashboardIDs(cmd.Context(), kibanaClient)
if err != nil {
return fmt.Errorf("prompt for dashboard selection failed: %w", err)
}
Expand All @@ -104,7 +105,7 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error {
}
}

err = export.Dashboards(kibanaClient, dashboardIDs)
err = export.Dashboards(cmd.Context(), kibanaClient, dashboardIDs)
if err != nil {
return fmt.Errorf("dashboards export failed: %w", err)
}
Expand All @@ -113,8 +114,8 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error {
return nil
}

func promptDashboardIDs(kibanaClient *kibana.Client) ([]string, error) {
savedDashboards, err := kibanaClient.FindDashboards()
func promptDashboardIDs(ctx context.Context, kibanaClient *kibana.Client) ([]string, error) {
savedDashboards, err := kibanaClient.FindDashboards(ctx)
if err != nil {
return nil, fmt.Errorf("finding dashboards failed: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func installCommandAction(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("can't process check-condition flag: %w", err)
}
if len(keyValuePairs) > 0 {
manifest, err := installer.Manifest()
manifest, err := installer.Manifest(cmd.Context())
if err != nil {
return err
}
Expand All @@ -105,6 +105,6 @@ func installCommandAction(cmd *cobra.Command, _ []string) error {
return nil
}

_, err = installer.Install()
_, err = installer.Install(cmd.Context())
return err
}
2 changes: 1 addition & 1 deletion cmd/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func uninstallCommandAction(cmd *cobra.Command, args []string) error {

// Uninstall the package
cmd.Println("Uninstall the package")
err = packageInstaller.Uninstall()
err = packageInstaller.Uninstall(cmd.Context())
if err != nil {
return fmt.Errorf("can't uninstall the package: %w", err)
}
Expand Down
20 changes: 10 additions & 10 deletions internal/benchrunner/runners/rally/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (r *runner) setUp(ctx context.Context) error {
}
r.scenario = scenario

if err = r.installPackage(); err != nil {
if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
}

Expand Down Expand Up @@ -450,28 +450,28 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro
return createReport(r.options.BenchName, r.corpusFile, r.scenario, msum, rallyStats)
}

func (r *runner) installPackage() error {
func (r *runner) installPackage(ctx context.Context) error {
if len(r.options.PackageVersion) > 0 {
r.scenario.Package = r.options.PackageName
r.scenario.Version = r.options.PackageVersion
return r.installPackageFromRegistry(r.options.PackageName, r.options.PackageVersion)
return r.installPackageFromRegistry(ctx, r.options.PackageName, r.options.PackageVersion)
}

return r.installPackageFromPackageRoot()
return r.installPackageFromPackageRoot(ctx)
}

func (r *runner) installPackageFromRegistry(packageName, packageVersion string) error {
func (r *runner) installPackageFromRegistry(ctx context.Context, packageName, packageVersion string) error {
// POST /epm/packages/{pkgName}/{pkgVersion}
// Configure package (single data stream) via Ingest Manager APIs.
logger.Debug("installing package...")
_, err := r.options.KibanaClient.InstallPackage(packageName, packageVersion)
_, err := r.options.KibanaClient.InstallPackage(ctx, packageName, packageVersion)
if err != nil {
return fmt.Errorf("cannot install package %s@%s: %w", packageName, packageVersion, err)
}

r.removePackageHandler = func(ctx context.Context) error {
logger.Debug("removing benchmark package...")
if _, err := r.options.KibanaClient.RemovePackage(packageName, packageVersion); err != nil {
if _, err := r.options.KibanaClient.RemovePackage(ctx, packageName, packageVersion); err != nil {
return fmt.Errorf("error removing benchmark package: %w", err)
}
return nil
Expand All @@ -480,7 +480,7 @@ func (r *runner) installPackageFromRegistry(packageName, packageVersion string)
return nil
}

func (r *runner) installPackageFromPackageRoot() error {
func (r *runner) installPackageFromPackageRoot(ctx context.Context) error {
logger.Debug("Installing package...")
installer, err := installer.NewForPackage(installer.Options{
Kibana: r.options.KibanaClient,
Expand All @@ -492,13 +492,13 @@ func (r *runner) installPackageFromPackageRoot() error {
return fmt.Errorf("failed to initialize package installer: %w", err)
}

_, err = installer.Install()
_, err = installer.Install(ctx)
if err != nil {
return fmt.Errorf("failed to install package: %w", err)
}

r.removePackageHandler = func(ctx context.Context) error {
if err := installer.Uninstall(); err != nil {
if err := installer.Uninstall(ctx); err != nil {
return fmt.Errorf("error removing benchmark package: %w", err)
}

Expand Down
12 changes: 6 additions & 6 deletions internal/benchrunner/runners/stream/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *runner) setUp(ctx context.Context) error {
}
r.scenarios = scenarios

if err = r.installPackage(); err != nil {
if err = r.installPackage(ctx); err != nil {
return fmt.Errorf("error installing package: %w", err)
}

Expand Down Expand Up @@ -192,11 +192,11 @@ func (r *runner) wipeDataStreamsOnSetup() error {
return nil
}

func (r *runner) installPackage() error {
return r.installPackageFromPackageRoot()
func (r *runner) installPackage(ctx context.Context) error {
return r.installPackageFromPackageRoot(ctx)
}

func (r *runner) installPackageFromPackageRoot() error {
func (r *runner) installPackageFromPackageRoot(ctx context.Context) error {
logger.Debug("Installing package...")
installer, err := installer.NewForPackage(installer.Options{
Kibana: r.options.KibanaClient,
Expand All @@ -208,13 +208,13 @@ func (r *runner) installPackageFromPackageRoot() error {
return fmt.Errorf("failed to initialize package installer: %w", err)
}

_, err = installer.Install()
_, err = installer.Install(ctx)
if err != nil {
return fmt.Errorf("failed to install package: %w", err)
}

r.removePackageHandler = func(ctx context.Context) error {
if err := installer.Uninstall(); err != nil {
if err := installer.Uninstall(ctx); err != nil {
return fmt.Errorf("error removing benchmark package: %w", err)
}

Expand Down
20 changes: 10 additions & 10 deletions internal/benchrunner/runners/system/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (r *runner) setUp(ctx context.Context) error {
return fmt.Errorf("reading package manifest failed: %w", err)
}

policy, err := r.createBenchmarkPolicy(pkgManifest)
policy, err := r.createBenchmarkPolicy(ctx, pkgManifest)
if err != nil {
return err
}
Expand Down Expand Up @@ -353,7 +353,7 @@ func (r *runner) deleteDataStreamDocs(dataStream string) error {
return nil
}

func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*kibana.Policy, error) {
func (r *runner) createBenchmarkPolicy(ctx context.Context, pkgManifest *packages.PackageManifest) (*kibana.Policy, error) {
// Configure package (single data stream) via Ingest Manager APIs.
logger.Debug("creating benchmark policy...")
benchTime := time.Now().Format("20060102T15:04:05Z")
Expand All @@ -369,12 +369,12 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*
p.DataOutputID = "fleet-logstash-output"
}

policy, err := r.options.KibanaClient.CreatePolicy(p)
policy, err := r.options.KibanaClient.CreatePolicy(ctx, p)
if err != nil {
return nil, err
}

packagePolicy, err := r.createPackagePolicy(pkgManifest, policy)
packagePolicy, err := r.createPackagePolicy(ctx, pkgManifest, policy)
if err != nil {
return nil, err
}
Expand All @@ -383,12 +383,12 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*
var merr multierror.Error

logger.Debug("deleting benchmark package policy...")
if err := r.options.KibanaClient.DeletePackagePolicy(*packagePolicy); err != nil {
if err := r.options.KibanaClient.DeletePackagePolicy(ctx, *packagePolicy); err != nil {
merr = append(merr, fmt.Errorf("error cleaning up benchmark package policy: %w", err))
}

logger.Debug("deleting benchmark policy...")
if err := r.options.KibanaClient.DeletePolicy(*policy); err != nil {
if err := r.options.KibanaClient.DeletePolicy(ctx, *policy); err != nil {
merr = append(merr, fmt.Errorf("error cleaning up benchmark policy: %w", err))
}

Expand All @@ -402,7 +402,7 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*
return policy, nil
}

func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *kibana.Policy) (*kibana.PackagePolicy, error) {
func (r *runner) createPackagePolicy(ctx context.Context, pkgManifest *packages.PackageManifest, p *kibana.Policy) (*kibana.PackagePolicy, error) {
logger.Debug("creating package policy...")

if r.scenario.Version == "" {
Expand Down Expand Up @@ -437,7 +437,7 @@ func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *k
pp.Package.Name = pkgManifest.Name
pp.Package.Version = r.scenario.Version

policy, err := r.options.KibanaClient.CreatePackagePolicy(pp)
policy, err := r.options.KibanaClient.CreatePackagePolicy(ctx, pp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -620,7 +620,7 @@ func (r *runner) runGenerator(destDir string) error {
func (r *runner) checkEnrolledAgents(ctx context.Context) ([]kibana.Agent, error) {
var agents []kibana.Agent
enrolled, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
allAgents, err := r.options.KibanaClient.ListAgents()
allAgents, err := r.options.KibanaClient.ListAgents(ctx)
if err != nil {
return false, fmt.Errorf("could not list agents: %w", err)
}
Expand Down Expand Up @@ -696,7 +696,7 @@ func (r *runner) enrollAgents(ctx context.Context) error {
return nil
}

policyWithDataStream, err := r.options.KibanaClient.GetPolicy(r.benchPolicy.ID)
policyWithDataStream, err := r.options.KibanaClient.GetPolicy(ctx, r.benchPolicy.ID)
if err != nil {
return fmt.Errorf("could not read the policy with data stream: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/dump/agentpolicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewAgentPoliciesDumper(client *kibana.Client) *AgentPoliciesDumper {
}

func (d *AgentPoliciesDumper) getAgentPolicy(ctx context.Context, name string) (*AgentPolicy, error) {
policy, err := d.client.GetRawPolicy(name)
policy, err := d.client.GetRawPolicy(ctx, name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func getPackagesUsingAgentPolicy(packagePolicies []packagePolicy) []string {
}

func (d *AgentPoliciesDumper) getAgentPoliciesFilteredByPackage(ctx context.Context, packageName string) ([]AgentPolicy, error) {
rawPolicies, err := d.client.ListRawPolicies()
rawPolicies, err := d.client.ListRawPolicies(ctx)

if err != nil {
return nil, err
Expand Down
9 changes: 5 additions & 4 deletions internal/export/dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package export

import (
"context"
"encoding/json"
"fmt"
"os"
Expand All @@ -20,7 +21,7 @@ import (

// Dashboards method exports selected dashboards with references objects. All Kibana objects are saved to local files
// in appropriate directories.
func Dashboards(kibanaClient *kibana.Client, dashboardsIDs []string) error {
func Dashboards(ctx context.Context, kibanaClient *kibana.Client, dashboardsIDs []string) error {
packageRoot, err := packages.MustFindPackageRoot()
if err != nil {
return fmt.Errorf("locating package root failed: %w", err)
Expand All @@ -40,16 +41,16 @@ func Dashboards(kibanaClient *kibana.Client, dashboardsIDs []string) error {
return fmt.Errorf("cannot import from this Kibana version: %w", err)
}

objects, err := kibanaClient.Export(dashboardsIDs)
objects, err := kibanaClient.Export(ctx, dashboardsIDs)
if err != nil {
return fmt.Errorf("exporting dashboards using Kibana client failed: %w", err)
}

ctx := &transformationContext{
transformContext := &transformationContext{
packageName: m.Name,
}

objects, err = applyTransformations(ctx, objects)
objects, err = applyTransformations(transformContext, objects)
if err != nil {
return fmt.Errorf("can't transform Kibana objects: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions internal/kibana/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (a *Agent) String() string {
}

// ListAgents returns the list of agents enrolled with Fleet.
func (c *Client) ListAgents() ([]Agent, error) {
statusCode, respBody, err := c.get(fmt.Sprintf("%s/agents", FleetAPI))
func (c *Client) ListAgents(ctx context.Context) ([]Agent, error) {
statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agents", FleetAPI))
if err != nil {
return nil, fmt.Errorf("could not list agents: %w", err)
}
Expand All @@ -73,7 +73,7 @@ func (c *Client) AssignPolicyToAgent(ctx context.Context, a Agent, p Policy) err
reqBody := `{ "policy_id": "` + p.ID + `" }`

path := fmt.Sprintf("%s/agents/%s/reassign", FleetAPI, a.ID)
statusCode, respBody, err := c.put(path, []byte(reqBody))
statusCode, respBody, err := c.put(ctx, path, []byte(reqBody))
if err != nil {
return fmt.Errorf("could not assign policy to agent: %w", err)
}
Expand All @@ -96,7 +96,7 @@ func (c *Client) waitUntilPolicyAssigned(ctx context.Context, a Agent, p Policy)
defer ticker.Stop()

for {
agent, err := c.getAgent(a.ID)
agent, err := c.getAgent(ctx, a.ID)
if err != nil {
return fmt.Errorf("can't get the agent: %w", err)
}
Expand All @@ -122,8 +122,8 @@ func (c *Client) waitUntilPolicyAssigned(ctx context.Context, a Agent, p Policy)
return nil
}

func (c *Client) getAgent(agentID string) (*Agent, error) {
statusCode, respBody, err := c.get(fmt.Sprintf("%s/agents/%s", FleetAPI, agentID))
func (c *Client) getAgent(ctx context.Context, agentID string) (*Agent, error) {
statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agents/%s", FleetAPI, agentID))
if err != nil {
return nil, fmt.Errorf("could not list agents: %w", err)
}
Expand Down
Loading