diff --git a/cmd/benchmark.go b/cmd/benchmark.go index 7dbf5e4a16..37fc4ae512 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -213,10 +213,6 @@ func pipelineCommandAction(cmd *cobra.Command, args []string) error { results = append(results, r) } - if err != nil { - return fmt.Errorf("error running package pipeline benchmarks: %w", err) - } - for _, report := range results { if err := reporters.WriteReportable(reporters.Output(reportOutput), report); err != nil { return fmt.Errorf("error writing benchmark report: %w", err) diff --git a/cmd/dump.go b/cmd/dump.go index 75ae705242..4515ef6652 100644 --- a/cmd/dump.go +++ b/cmd/dump.go @@ -96,7 +96,7 @@ func dumpInstalledObjectsCmdAction(cmd *cobra.Command, args []string) error { if err != nil { return fmt.Errorf("failed to initialize Kibana client: %w", err) } - installedPackage, err := kibanaClient.GetPackage(packageName) + installedPackage, err := kibanaClient.GetPackage(cmd.Context(), packageName) if err != nil { return fmt.Errorf("failed to get package status: %w", err) } diff --git a/cmd/edit.go b/cmd/edit.go index 066edc6787..38fc7d0168 100644 --- a/cmd/edit.go +++ b/cmd/edit.go @@ -93,7 +93,7 @@ func editDashboardsCmd(cmd *cobra.Command, args []string) error { } if len(dashboardIDs) == 0 { - dashboardIDs, err = promptDashboardIDs(kibanaClient) + dashboardIDs, err = promptDashboardIDs(cmd.Context(), kibanaClient) if err != nil { return fmt.Errorf("prompt for dashboard selection failed: %w", err) } @@ -107,7 +107,7 @@ func editDashboardsCmd(cmd *cobra.Command, args []string) error { updatedDashboardIDs := make([]string, 0, len(dashboardIDs)) failedDashboardUpdates := make(map[string]error, len(dashboardIDs)) for _, dashboardID := range dashboardIDs { - err = kibanaClient.SetManagedSavedObject("dashboard", dashboardID, false) + err = kibanaClient.SetManagedSavedObject(cmd.Context(), "dashboard", dashboardID, false) if err != nil { failedDashboardUpdates[dashboardID] = err } else { diff --git a/cmd/export.go b/cmd/export.go index b3c1bbd58c..da2adb56db 100644 --- a/cmd/export.go +++ b/cmd/export.go @@ -5,6 +5,7 @@ package cmd import ( + "context" "fmt" "github.com/AlecAivazis/survey/v2" @@ -93,7 +94,7 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error { } if len(dashboardIDs) == 0 { - dashboardIDs, err = promptDashboardIDs(kibanaClient) + dashboardIDs, err = promptDashboardIDs(cmd.Context(), kibanaClient) if err != nil { return fmt.Errorf("prompt for dashboard selection failed: %w", err) } @@ -104,7 +105,7 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error { } } - err = export.Dashboards(kibanaClient, dashboardIDs) + err = export.Dashboards(cmd.Context(), kibanaClient, dashboardIDs) if err != nil { return fmt.Errorf("dashboards export failed: %w", err) } @@ -113,8 +114,8 @@ func exportDashboardsCmd(cmd *cobra.Command, args []string) error { return nil } -func promptDashboardIDs(kibanaClient *kibana.Client) ([]string, error) { - savedDashboards, err := kibanaClient.FindDashboards() +func promptDashboardIDs(ctx context.Context, kibanaClient *kibana.Client) ([]string, error) { + savedDashboards, err := kibanaClient.FindDashboards(ctx) if err != nil { return nil, fmt.Errorf("finding dashboards failed: %w", err) } diff --git a/cmd/install.go b/cmd/install.go index 7dc781560d..b8ebcb03a5 100644 --- a/cmd/install.go +++ b/cmd/install.go @@ -90,7 +90,7 @@ func installCommandAction(cmd *cobra.Command, _ []string) error { return fmt.Errorf("can't process check-condition flag: %w", err) } if len(keyValuePairs) > 0 { - manifest, err := installer.Manifest() + manifest, err := installer.Manifest(cmd.Context()) if err != nil { return err } @@ -105,6 +105,6 @@ func installCommandAction(cmd *cobra.Command, _ []string) error { return nil } - _, err = installer.Install() + _, err = installer.Install(cmd.Context()) return err } diff --git a/cmd/uninstall.go b/cmd/uninstall.go index 7e44522fd2..1624d93c30 100644 --- a/cmd/uninstall.go +++ b/cmd/uninstall.go @@ -59,7 +59,7 @@ func uninstallCommandAction(cmd *cobra.Command, args []string) error { // Uninstall the package cmd.Println("Uninstall the package") - err = packageInstaller.Uninstall() + err = packageInstaller.Uninstall(cmd.Context()) if err != nil { return fmt.Errorf("can't uninstall the package: %w", err) } diff --git a/internal/benchrunner/runners/rally/runner.go b/internal/benchrunner/runners/rally/runner.go index 0a9c0fffdc..e936b402aa 100644 --- a/internal/benchrunner/runners/rally/runner.go +++ b/internal/benchrunner/runners/rally/runner.go @@ -270,7 +270,7 @@ func (r *runner) setUp(ctx context.Context) error { } r.scenario = scenario - if err = r.installPackage(); err != nil { + if err = r.installPackage(ctx); err != nil { return fmt.Errorf("error installing package: %w", err) } @@ -450,28 +450,28 @@ func (r *runner) run(ctx context.Context) (report reporters.Reportable, err erro return createReport(r.options.BenchName, r.corpusFile, r.scenario, msum, rallyStats) } -func (r *runner) installPackage() error { +func (r *runner) installPackage(ctx context.Context) error { if len(r.options.PackageVersion) > 0 { r.scenario.Package = r.options.PackageName r.scenario.Version = r.options.PackageVersion - return r.installPackageFromRegistry(r.options.PackageName, r.options.PackageVersion) + return r.installPackageFromRegistry(ctx, r.options.PackageName, r.options.PackageVersion) } - return r.installPackageFromPackageRoot() + return r.installPackageFromPackageRoot(ctx) } -func (r *runner) installPackageFromRegistry(packageName, packageVersion string) error { +func (r *runner) installPackageFromRegistry(ctx context.Context, packageName, packageVersion string) error { // POST /epm/packages/{pkgName}/{pkgVersion} // Configure package (single data stream) via Ingest Manager APIs. logger.Debug("installing package...") - _, err := r.options.KibanaClient.InstallPackage(packageName, packageVersion) + _, err := r.options.KibanaClient.InstallPackage(ctx, packageName, packageVersion) if err != nil { return fmt.Errorf("cannot install package %s@%s: %w", packageName, packageVersion, err) } r.removePackageHandler = func(ctx context.Context) error { logger.Debug("removing benchmark package...") - if _, err := r.options.KibanaClient.RemovePackage(packageName, packageVersion); err != nil { + if _, err := r.options.KibanaClient.RemovePackage(ctx, packageName, packageVersion); err != nil { return fmt.Errorf("error removing benchmark package: %w", err) } return nil @@ -480,7 +480,7 @@ func (r *runner) installPackageFromRegistry(packageName, packageVersion string) return nil } -func (r *runner) installPackageFromPackageRoot() error { +func (r *runner) installPackageFromPackageRoot(ctx context.Context) error { logger.Debug("Installing package...") installer, err := installer.NewForPackage(installer.Options{ Kibana: r.options.KibanaClient, @@ -492,13 +492,13 @@ func (r *runner) installPackageFromPackageRoot() error { return fmt.Errorf("failed to initialize package installer: %w", err) } - _, err = installer.Install() + _, err = installer.Install(ctx) if err != nil { return fmt.Errorf("failed to install package: %w", err) } r.removePackageHandler = func(ctx context.Context) error { - if err := installer.Uninstall(); err != nil { + if err := installer.Uninstall(ctx); err != nil { return fmt.Errorf("error removing benchmark package: %w", err) } diff --git a/internal/benchrunner/runners/stream/runner.go b/internal/benchrunner/runners/stream/runner.go index d20769ff86..974522e7a9 100644 --- a/internal/benchrunner/runners/stream/runner.go +++ b/internal/benchrunner/runners/stream/runner.go @@ -113,7 +113,7 @@ func (r *runner) setUp(ctx context.Context) error { } r.scenarios = scenarios - if err = r.installPackage(); err != nil { + if err = r.installPackage(ctx); err != nil { return fmt.Errorf("error installing package: %w", err) } @@ -192,11 +192,11 @@ func (r *runner) wipeDataStreamsOnSetup() error { return nil } -func (r *runner) installPackage() error { - return r.installPackageFromPackageRoot() +func (r *runner) installPackage(ctx context.Context) error { + return r.installPackageFromPackageRoot(ctx) } -func (r *runner) installPackageFromPackageRoot() error { +func (r *runner) installPackageFromPackageRoot(ctx context.Context) error { logger.Debug("Installing package...") installer, err := installer.NewForPackage(installer.Options{ Kibana: r.options.KibanaClient, @@ -208,13 +208,13 @@ func (r *runner) installPackageFromPackageRoot() error { return fmt.Errorf("failed to initialize package installer: %w", err) } - _, err = installer.Install() + _, err = installer.Install(ctx) if err != nil { return fmt.Errorf("failed to install package: %w", err) } r.removePackageHandler = func(ctx context.Context) error { - if err := installer.Uninstall(); err != nil { + if err := installer.Uninstall(ctx); err != nil { return fmt.Errorf("error removing benchmark package: %w", err) } diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 587509a013..723979da58 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -169,7 +169,7 @@ func (r *runner) setUp(ctx context.Context) error { return fmt.Errorf("reading package manifest failed: %w", err) } - policy, err := r.createBenchmarkPolicy(pkgManifest) + policy, err := r.createBenchmarkPolicy(ctx, pkgManifest) if err != nil { return err } @@ -353,7 +353,7 @@ func (r *runner) deleteDataStreamDocs(dataStream string) error { return nil } -func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (*kibana.Policy, error) { +func (r *runner) createBenchmarkPolicy(ctx context.Context, pkgManifest *packages.PackageManifest) (*kibana.Policy, error) { // Configure package (single data stream) via Ingest Manager APIs. logger.Debug("creating benchmark policy...") benchTime := time.Now().Format("20060102T15:04:05Z") @@ -369,12 +369,12 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (* p.DataOutputID = "fleet-logstash-output" } - policy, err := r.options.KibanaClient.CreatePolicy(p) + policy, err := r.options.KibanaClient.CreatePolicy(ctx, p) if err != nil { return nil, err } - packagePolicy, err := r.createPackagePolicy(pkgManifest, policy) + packagePolicy, err := r.createPackagePolicy(ctx, pkgManifest, policy) if err != nil { return nil, err } @@ -383,12 +383,12 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (* var merr multierror.Error logger.Debug("deleting benchmark package policy...") - if err := r.options.KibanaClient.DeletePackagePolicy(*packagePolicy); err != nil { + if err := r.options.KibanaClient.DeletePackagePolicy(ctx, *packagePolicy); err != nil { merr = append(merr, fmt.Errorf("error cleaning up benchmark package policy: %w", err)) } logger.Debug("deleting benchmark policy...") - if err := r.options.KibanaClient.DeletePolicy(*policy); err != nil { + if err := r.options.KibanaClient.DeletePolicy(ctx, *policy); err != nil { merr = append(merr, fmt.Errorf("error cleaning up benchmark policy: %w", err)) } @@ -402,7 +402,7 @@ func (r *runner) createBenchmarkPolicy(pkgManifest *packages.PackageManifest) (* return policy, nil } -func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *kibana.Policy) (*kibana.PackagePolicy, error) { +func (r *runner) createPackagePolicy(ctx context.Context, pkgManifest *packages.PackageManifest, p *kibana.Policy) (*kibana.PackagePolicy, error) { logger.Debug("creating package policy...") if r.scenario.Version == "" { @@ -437,7 +437,7 @@ func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *k pp.Package.Name = pkgManifest.Name pp.Package.Version = r.scenario.Version - policy, err := r.options.KibanaClient.CreatePackagePolicy(pp) + policy, err := r.options.KibanaClient.CreatePackagePolicy(ctx, pp) if err != nil { return nil, err } @@ -620,7 +620,7 @@ func (r *runner) runGenerator(destDir string) error { func (r *runner) checkEnrolledAgents(ctx context.Context) ([]kibana.Agent, error) { var agents []kibana.Agent enrolled, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) { - allAgents, err := r.options.KibanaClient.ListAgents() + allAgents, err := r.options.KibanaClient.ListAgents(ctx) if err != nil { return false, fmt.Errorf("could not list agents: %w", err) } @@ -696,7 +696,7 @@ func (r *runner) enrollAgents(ctx context.Context) error { return nil } - policyWithDataStream, err := r.options.KibanaClient.GetPolicy(r.benchPolicy.ID) + policyWithDataStream, err := r.options.KibanaClient.GetPolicy(ctx, r.benchPolicy.ID) if err != nil { return fmt.Errorf("could not read the policy with data stream: %w", err) } diff --git a/internal/dump/agentpolicies.go b/internal/dump/agentpolicies.go index b2746c13a6..b56fd46870 100644 --- a/internal/dump/agentpolicies.go +++ b/internal/dump/agentpolicies.go @@ -42,7 +42,7 @@ func NewAgentPoliciesDumper(client *kibana.Client) *AgentPoliciesDumper { } func (d *AgentPoliciesDumper) getAgentPolicy(ctx context.Context, name string) (*AgentPolicy, error) { - policy, err := d.client.GetRawPolicy(name) + policy, err := d.client.GetRawPolicy(ctx, name) if err != nil { return nil, err } @@ -86,7 +86,7 @@ func getPackagesUsingAgentPolicy(packagePolicies []packagePolicy) []string { } func (d *AgentPoliciesDumper) getAgentPoliciesFilteredByPackage(ctx context.Context, packageName string) ([]AgentPolicy, error) { - rawPolicies, err := d.client.ListRawPolicies() + rawPolicies, err := d.client.ListRawPolicies(ctx) if err != nil { return nil, err diff --git a/internal/export/dashboards.go b/internal/export/dashboards.go index 9cca9b2164..df57b85ac4 100644 --- a/internal/export/dashboards.go +++ b/internal/export/dashboards.go @@ -5,6 +5,7 @@ package export import ( + "context" "encoding/json" "fmt" "os" @@ -20,7 +21,7 @@ import ( // Dashboards method exports selected dashboards with references objects. All Kibana objects are saved to local files // in appropriate directories. -func Dashboards(kibanaClient *kibana.Client, dashboardsIDs []string) error { +func Dashboards(ctx context.Context, kibanaClient *kibana.Client, dashboardsIDs []string) error { packageRoot, err := packages.MustFindPackageRoot() if err != nil { return fmt.Errorf("locating package root failed: %w", err) @@ -40,16 +41,16 @@ func Dashboards(kibanaClient *kibana.Client, dashboardsIDs []string) error { return fmt.Errorf("cannot import from this Kibana version: %w", err) } - objects, err := kibanaClient.Export(dashboardsIDs) + objects, err := kibanaClient.Export(ctx, dashboardsIDs) if err != nil { return fmt.Errorf("exporting dashboards using Kibana client failed: %w", err) } - ctx := &transformationContext{ + transformContext := &transformationContext{ packageName: m.Name, } - objects, err = applyTransformations(ctx, objects) + objects, err = applyTransformations(transformContext, objects) if err != nil { return fmt.Errorf("can't transform Kibana objects: %w", err) } diff --git a/internal/kibana/agents.go b/internal/kibana/agents.go index 2fa77771b6..07b949563d 100644 --- a/internal/kibana/agents.go +++ b/internal/kibana/agents.go @@ -47,8 +47,8 @@ func (a *Agent) String() string { } // ListAgents returns the list of agents enrolled with Fleet. -func (c *Client) ListAgents() ([]Agent, error) { - statusCode, respBody, err := c.get(fmt.Sprintf("%s/agents", FleetAPI)) +func (c *Client) ListAgents(ctx context.Context) ([]Agent, error) { + statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agents", FleetAPI)) if err != nil { return nil, fmt.Errorf("could not list agents: %w", err) } @@ -73,7 +73,7 @@ func (c *Client) AssignPolicyToAgent(ctx context.Context, a Agent, p Policy) err reqBody := `{ "policy_id": "` + p.ID + `" }` path := fmt.Sprintf("%s/agents/%s/reassign", FleetAPI, a.ID) - statusCode, respBody, err := c.put(path, []byte(reqBody)) + statusCode, respBody, err := c.put(ctx, path, []byte(reqBody)) if err != nil { return fmt.Errorf("could not assign policy to agent: %w", err) } @@ -96,7 +96,7 @@ func (c *Client) waitUntilPolicyAssigned(ctx context.Context, a Agent, p Policy) defer ticker.Stop() for { - agent, err := c.getAgent(a.ID) + agent, err := c.getAgent(ctx, a.ID) if err != nil { return fmt.Errorf("can't get the agent: %w", err) } @@ -122,8 +122,8 @@ func (c *Client) waitUntilPolicyAssigned(ctx context.Context, a Agent, p Policy) return nil } -func (c *Client) getAgent(agentID string) (*Agent, error) { - statusCode, respBody, err := c.get(fmt.Sprintf("%s/agents/%s", FleetAPI, agentID)) +func (c *Client) getAgent(ctx context.Context, agentID string) (*Agent, error) { + statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agents/%s", FleetAPI, agentID)) if err != nil { return nil, fmt.Errorf("could not list agents: %w", err) } diff --git a/internal/kibana/client.go b/internal/kibana/client.go index b02ebe707b..6d4db93ce7 100644 --- a/internal/kibana/client.go +++ b/internal/kibana/client.go @@ -6,6 +6,7 @@ package kibana import ( "bytes" + "context" "crypto/tls" "errors" "fmt" @@ -65,7 +66,8 @@ func NewClient(opts ...ClientOption) (*Client, error) { // Allow to initialize version from tests. var zeroVersion VersionInfo if c.semver == nil || c.versionInfo == zeroVersion { - v, err := c.requestStatus() + // Passing a nil context here because we are on initialization. + v, err := c.requestStatus(context.Background()) if err != nil { return nil, fmt.Errorf("failed to get Kibana version: %w", err) } @@ -134,24 +136,24 @@ func HTTPClientSetup(setup func(*http.Client) *http.Client) ClientOption { } } -func (c *Client) get(resourcePath string) (int, []byte, error) { - return c.SendRequest(http.MethodGet, resourcePath, nil) +func (c *Client) get(ctx context.Context, resourcePath string) (int, []byte, error) { + return c.SendRequest(ctx, http.MethodGet, resourcePath, nil) } -func (c *Client) post(resourcePath string, body []byte) (int, []byte, error) { - return c.SendRequest(http.MethodPost, resourcePath, body) +func (c *Client) post(ctx context.Context, resourcePath string, body []byte) (int, []byte, error) { + return c.SendRequest(ctx, http.MethodPost, resourcePath, body) } -func (c *Client) put(resourcePath string, body []byte) (int, []byte, error) { - return c.SendRequest(http.MethodPut, resourcePath, body) +func (c *Client) put(ctx context.Context, resourcePath string, body []byte) (int, []byte, error) { + return c.SendRequest(ctx, http.MethodPut, resourcePath, body) } -func (c *Client) delete(resourcePath string) (int, []byte, error) { - return c.SendRequest(http.MethodDelete, resourcePath, nil) +func (c *Client) delete(ctx context.Context, resourcePath string) (int, []byte, error) { + return c.SendRequest(ctx, http.MethodDelete, resourcePath, nil) } -func (c *Client) SendRequest(method, resourcePath string, body []byte) (int, []byte, error) { - request, err := c.newRequest(method, resourcePath, bytes.NewReader(body)) +func (c *Client) SendRequest(ctx context.Context, method, resourcePath string, body []byte) (int, []byte, error) { + request, err := c.newRequest(ctx, method, resourcePath, bytes.NewReader(body)) if err != nil { return 0, nil, err } @@ -159,7 +161,7 @@ func (c *Client) SendRequest(method, resourcePath string, body []byte) (int, []b return c.doRequest(request) } -func (c *Client) newRequest(method, resourcePath string, reqBody io.Reader) (*http.Request, error) { +func (c *Client) newRequest(ctx context.Context, method, resourcePath string, reqBody io.Reader) (*http.Request, error) { base, err := url.Parse(c.host) if err != nil { return nil, fmt.Errorf("could not create base URL from host: %v: %w", c.host, err) @@ -175,7 +177,7 @@ func (c *Client) newRequest(method, resourcePath string, reqBody io.Reader) (*ht logger.Debugf("%s %s", method, u) - req, err := http.NewRequest(method, u.String(), reqBody) + req, err := http.NewRequestWithContext(ctx, method, u.String(), reqBody) if err != nil { return nil, fmt.Errorf("could not create %v request to Kibana API resource: %s: %w", method, resourcePath, err) } diff --git a/internal/kibana/client_test.go b/internal/kibana/client_test.go index 7a63ea57fd..820289853a 100644 --- a/internal/kibana/client_test.go +++ b/internal/kibana/client_test.go @@ -6,6 +6,7 @@ package kibana import ( "bytes" + "context" "crypto/x509" "encoding/pem" "fmt" @@ -37,7 +38,7 @@ func TestClientWithTLS(t *testing.T) { client, err := NewClient(version, Address(server.URL)) require.NoError(t, err) - _, _, err = client.get("/") + _, _, err = client.get(context.Background(), "/") assert.Error(t, err) }) @@ -45,7 +46,7 @@ func TestClientWithTLS(t *testing.T) { client, err := NewClient(version, Address(server.URL), CertificateAuthority(caCertFile)) require.NoError(t, err) - _, _, err = client.get("/") + _, _, err = client.get(context.Background(), "/") assert.NoError(t, err) }) @@ -53,7 +54,7 @@ func TestClientWithTLS(t *testing.T) { client, err := NewClient(version, Address(server.URL), TLSSkipVerify()) require.NoError(t, err) - _, _, err = client.get("/") + _, _, err = client.get(context.Background(), "/") assert.NoError(t, err) }) } diff --git a/internal/kibana/dashboards.go b/internal/kibana/dashboards.go index 38988a94ce..87a7aeebb4 100644 --- a/internal/kibana/dashboards.go +++ b/internal/kibana/dashboards.go @@ -5,6 +5,7 @@ package kibana import ( + "context" "encoding/json" "errors" "fmt" @@ -21,7 +22,7 @@ type exportedType struct { } // Export method exports selected dashboards using the Kibana Export API. -func (c *Client) Export(dashboardIDs []string) ([]common.MapStr, error) { +func (c *Client) Export(ctx context.Context, dashboardIDs []string) ([]common.MapStr, error) { logger.Debug("Export dashboards using the Kibana Export API") var query strings.Builder @@ -33,7 +34,7 @@ func (c *Client) Export(dashboardIDs []string) ([]common.MapStr, error) { } path := fmt.Sprintf("%s/dashboards/export%s", CoreAPI, query.String()) - statusCode, respBody, err := c.get(path) + statusCode, respBody, err := c.get(ctx, path) if err != nil { return nil, fmt.Errorf("could not export dashboards; API status code = %d; response body = %s: %w", statusCode, respBody, err) } diff --git a/internal/kibana/fleet.go b/internal/kibana/fleet.go index 869044bb12..5b81e52dfa 100644 --- a/internal/kibana/fleet.go +++ b/internal/kibana/fleet.go @@ -5,6 +5,7 @@ package kibana import ( + "context" "encoding/json" "errors" "fmt" @@ -27,10 +28,10 @@ type AgentSSL struct { } // DefaultFleetServerURL returns the default Fleet server configured in Kibana -func (c *Client) DefaultFleetServerURL() (string, error) { +func (c *Client) DefaultFleetServerURL(ctx context.Context) (string, error) { path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI) - statusCode, respBody, err := c.get(path) + statusCode, respBody, err := c.get(ctx, path) if err != nil { return "", fmt.Errorf("could not reach fleet server hosts endpoint: %w", err) } @@ -61,13 +62,13 @@ func (c *Client) DefaultFleetServerURL() (string, error) { // UpdateFleetOutput updates an existing output to fleet // For example, to update ssl certificates etc., -func (c *Client) UpdateFleetOutput(fo FleetOutput, outputId string) error { +func (c *Client) UpdateFleetOutput(ctx context.Context, fo FleetOutput, outputId string) error { reqBody, err := json.Marshal(fo) if err != nil { return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err) } - statusCode, respBody, err := c.put(fmt.Sprintf("%s/outputs/%s", FleetAPI, outputId), reqBody) + statusCode, respBody, err := c.put(ctx, fmt.Sprintf("%s/outputs/%s", FleetAPI, outputId), reqBody) if err != nil { return fmt.Errorf("could not update fleet output: %w", err) } @@ -80,13 +81,13 @@ func (c *Client) UpdateFleetOutput(fo FleetOutput, outputId string) error { } // AddFleetOutput adds an additional output to fleet eg., logstash -func (c *Client) AddFleetOutput(fo FleetOutput) error { +func (c *Client) AddFleetOutput(ctx context.Context, fo FleetOutput) error { reqBody, err := json.Marshal(fo) if err != nil { return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err) } - statusCode, respBody, err := c.post(fmt.Sprintf("%s/outputs", FleetAPI), reqBody) + statusCode, respBody, err := c.post(ctx, fmt.Sprintf("%s/outputs", FleetAPI), reqBody) if err != nil { return fmt.Errorf("could not create fleet output: %w", err) } @@ -98,7 +99,7 @@ func (c *Client) AddFleetOutput(fo FleetOutput) error { return nil } -func (c *Client) SetAgentLogLevel(agentID, level string) error { +func (c *Client) SetAgentLogLevel(ctx context.Context, agentID, level string) error { path := fmt.Sprintf("%s/agents/%s/actions", FleetAPI, agentID) type fleetAction struct { @@ -119,7 +120,7 @@ func (c *Client) SetAgentLogLevel(agentID, level string) error { return fmt.Errorf("could not convert action settingr (request) to JSON: %w", err) } - statusCode, respBody, err := c.post(path, reqBody) + statusCode, respBody, err := c.post(ctx, path, reqBody) if err != nil { return fmt.Errorf("could not update agent settings: %w", err) } diff --git a/internal/kibana/packages.go b/internal/kibana/packages.go index ba6d2d5735..4d900dca54 100644 --- a/internal/kibana/packages.go +++ b/internal/kibana/packages.go @@ -5,6 +5,7 @@ package kibana import ( + "context" "encoding/json" "fmt" "net/http" @@ -14,11 +15,11 @@ import ( ) // InstallPackage installs the given package in Fleet. -func (c *Client) InstallPackage(name, version string) ([]packages.Asset, error) { +func (c *Client) InstallPackage(ctx context.Context, name, version string) ([]packages.Asset, error) { path := c.epmPackageUrl(name, version) reqBody := []byte(`{"force":true}`) // allows installing older versions of the package being tested - statusCode, respBody, err := c.post(path, reqBody) + statusCode, respBody, err := c.post(ctx, path, reqBody) if err != nil { return nil, fmt.Errorf("could not install package: %w", err) } @@ -27,7 +28,7 @@ func (c *Client) InstallPackage(name, version string) ([]packages.Asset, error) } // InstallZipPackage installs the local zip package in Fleet. -func (c *Client) InstallZipPackage(zipFile string) ([]packages.Asset, error) { +func (c *Client) InstallZipPackage(ctx context.Context, zipFile string) ([]packages.Asset, error) { path := fmt.Sprintf("%s/epm/packages", FleetAPI) body, err := os.Open(zipFile) @@ -36,7 +37,7 @@ func (c *Client) InstallZipPackage(zipFile string) ([]packages.Asset, error) { } defer body.Close() - req, err := c.newRequest(http.MethodPost, path, body) + req, err := c.newRequest(ctx, http.MethodPost, path, body) if err != nil { return nil, err } @@ -52,9 +53,9 @@ func (c *Client) InstallZipPackage(zipFile string) ([]packages.Asset, error) { } // RemovePackage removes the given package from Fleet. -func (c *Client) RemovePackage(name, version string) ([]packages.Asset, error) { +func (c *Client) RemovePackage(ctx context.Context, name, version string) ([]packages.Asset, error) { path := c.epmPackageUrl(name, version) - statusCode, respBody, err := c.delete(path) + statusCode, respBody, err := c.delete(ctx, path) if err != nil { return nil, fmt.Errorf("could not delete package: %w", err) } @@ -70,9 +71,9 @@ type FleetPackage struct { } // GetPackage obtains information about a package from Fleet. -func (c *Client) GetPackage(name string) (*FleetPackage, error) { +func (c *Client) GetPackage(ctx context.Context, name string) (*FleetPackage, error) { path := c.epmPackageUrl(name, "") - statusCode, respBody, err := c.get(path) + statusCode, respBody, err := c.get(ctx, path) if err != nil { return nil, fmt.Errorf("could not get package: %w", err) } diff --git a/internal/kibana/policies.go b/internal/kibana/policies.go index 8e48019c02..897c0e2037 100644 --- a/internal/kibana/policies.go +++ b/internal/kibana/policies.go @@ -5,6 +5,7 @@ package kibana import ( + "context" "encoding/json" "fmt" "net/http" @@ -25,13 +26,13 @@ type Policy struct { } // CreatePolicy persists the given Policy in Fleet. -func (c *Client) CreatePolicy(p Policy) (*Policy, error) { +func (c *Client) CreatePolicy(ctx context.Context, p Policy) (*Policy, error) { reqBody, err := json.Marshal(p) if err != nil { return nil, fmt.Errorf("could not convert policy (request) to JSON: %w", err) } - statusCode, respBody, err := c.post(fmt.Sprintf("%s/agent_policies", FleetAPI), reqBody) + statusCode, respBody, err := c.post(ctx, fmt.Sprintf("%s/agent_policies", FleetAPI), reqBody) if err != nil { return nil, fmt.Errorf("could not create policy: %w", err) } @@ -52,8 +53,8 @@ func (c *Client) CreatePolicy(p Policy) (*Policy, error) { } // GetPolicy fetches the given Policy in Fleet. -func (c *Client) GetPolicy(policyID string) (*Policy, error) { - statusCode, respBody, err := c.get(fmt.Sprintf("%s/agent_policies/%s", FleetAPI, policyID)) +func (c *Client) GetPolicy(ctx context.Context, policyID string) (*Policy, error) { + statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agent_policies/%s", FleetAPI, policyID)) if err != nil { return nil, fmt.Errorf("could not get policy: %w", err) } @@ -74,8 +75,8 @@ func (c *Client) GetPolicy(policyID string) (*Policy, error) { } // GetRawPolicy fetches the given Policy with all the fields in Fleet. -func (c *Client) GetRawPolicy(policyID string) (json.RawMessage, error) { - statusCode, respBody, err := c.get(fmt.Sprintf("%s/agent_policies/%s", FleetAPI, policyID)) +func (c *Client) GetRawPolicy(ctx context.Context, policyID string) (json.RawMessage, error) { + statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agent_policies/%s", FleetAPI, policyID)) if err != nil { return nil, fmt.Errorf("could not get policy: %w", err) } @@ -96,7 +97,7 @@ func (c *Client) GetRawPolicy(policyID string) (json.RawMessage, error) { } // ListRawPolicies fetches all the Policies in Fleet. -func (c *Client) ListRawPolicies() ([]json.RawMessage, error) { +func (c *Client) ListRawPolicies(ctx context.Context) ([]json.RawMessage, error) { itemsRetrieved := 0 currentPage := 1 var items []json.RawMessage @@ -108,7 +109,7 @@ func (c *Client) ListRawPolicies() ([]json.RawMessage, error) { } for finished := false; !finished; finished = itemsRetrieved == resp.Total { - statusCode, respBody, err := c.get(fmt.Sprintf("%s/agent_policies?full=true&page=%d", FleetAPI, currentPage)) + statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agent_policies?full=true&page=%d", FleetAPI, currentPage)) if err != nil { return nil, fmt.Errorf("could not get policies: %w", err) } @@ -130,10 +131,10 @@ func (c *Client) ListRawPolicies() ([]json.RawMessage, error) { } // DeletePolicy removes the given Policy from Fleet. -func (c *Client) DeletePolicy(p Policy) error { +func (c *Client) DeletePolicy(ctx context.Context, p Policy) error { reqBody := `{ "agentPolicyId": "` + p.ID + `" }` - statusCode, respBody, err := c.post(fmt.Sprintf("%s/agent_policies/delete", FleetAPI), []byte(reqBody)) + statusCode, respBody, err := c.post(ctx, fmt.Sprintf("%s/agent_policies/delete", FleetAPI), []byte(reqBody)) if err != nil { return fmt.Errorf("could not delete policy: %w", err) } @@ -199,13 +200,13 @@ type PackageDataStream struct { } // AddPackageDataStreamToPolicy adds a PackageDataStream to a Policy in Fleet. -func (c *Client) AddPackageDataStreamToPolicy(r PackageDataStream) error { +func (c *Client) AddPackageDataStreamToPolicy(ctx context.Context, r PackageDataStream) error { reqBody, err := json.Marshal(r) if err != nil { return fmt.Errorf("could not convert policy-package (request) to JSON: %w", err) } - statusCode, respBody, err := c.post(fmt.Sprintf("%s/package_policies", FleetAPI), reqBody) + statusCode, respBody, err := c.post(ctx, fmt.Sprintf("%s/package_policies", FleetAPI), reqBody) if err != nil { return fmt.Errorf("could not add package to policy: %w", err) } @@ -244,13 +245,13 @@ type PackagePolicyStream struct { } // CreatePackagePolicy persists the given Package Policy in Fleet. -func (c *Client) CreatePackagePolicy(p PackagePolicy) (*PackagePolicy, error) { +func (c *Client) CreatePackagePolicy(ctx context.Context, p PackagePolicy) (*PackagePolicy, error) { reqBody, err := json.Marshal(p) if err != nil { return nil, fmt.Errorf("could not convert package policy (request) to JSON: %w", err) } - statusCode, respBody, err := c.post(fmt.Sprintf("%s/package_policies", FleetAPI), reqBody) + statusCode, respBody, err := c.post(ctx, fmt.Sprintf("%s/package_policies", FleetAPI), reqBody) if err != nil { return nil, fmt.Errorf("could not create package policy (req %s): %w", string(reqBody), err) } @@ -277,8 +278,8 @@ func (c *Client) CreatePackagePolicy(p PackagePolicy) (*PackagePolicy, error) { } // DeletePackagePolicy removes the given Package Policy from Fleet. -func (c *Client) DeletePackagePolicy(p PackagePolicy) error { - statusCode, respBody, err := c.delete(fmt.Sprintf("%s/package_policies/%s", FleetAPI, p.ID)) +func (c *Client) DeletePackagePolicy(ctx context.Context, p PackagePolicy) error { + statusCode, respBody, err := c.delete(ctx, fmt.Sprintf("%s/package_policies/%s", FleetAPI, p.ID)) if err != nil { return fmt.Errorf("could not delete package policy: %w", err) } diff --git a/internal/kibana/savedobjects.go b/internal/kibana/savedobjects.go index d43eca33c7..1f243b5389 100644 --- a/internal/kibana/savedobjects.go +++ b/internal/kibana/savedobjects.go @@ -6,6 +6,7 @@ package kibana import ( "bytes" + "context" "encoding/json" "fmt" "mime/multipart" @@ -57,14 +58,14 @@ func (dso *DashboardSavedObject) String() string { } // FindDashboards method returns dashboards available in the Kibana instance. -func (c *Client) FindDashboards() (DashboardSavedObjects, error) { +func (c *Client) FindDashboards(ctx context.Context) (DashboardSavedObjects, error) { logger.Debug("Find dashboards using the Saved Objects API") var foundObjects DashboardSavedObjects page := 1 for { - r, err := c.findDashboardsNextPage(page) + r, err := c.findDashboardsNextPage(ctx, page) if err != nil { return nil, fmt.Errorf("can't fetch page with results: %w", err) } @@ -91,9 +92,9 @@ func (c *Client) FindDashboards() (DashboardSavedObjects, error) { return foundObjects, nil } -func (c *Client) findDashboardsNextPage(page int) (*savedObjectsResponse, error) { +func (c *Client) findDashboardsNextPage(ctx context.Context, page int) (*savedObjectsResponse, error) { path := fmt.Sprintf("%s/_find?type=dashboard&fields=title&per_page=%d&page=%d", SavedObjectsAPI, findDashboardsPerPage, page) - statusCode, respBody, err := c.get(path) + statusCode, respBody, err := c.get(ctx, path) if err != nil { return nil, fmt.Errorf("could not find dashboards; API status code = %d; response body = %s: %w", statusCode, string(respBody), err) } @@ -115,7 +116,7 @@ func (c *Client) findDashboardsNextPage(page int) (*savedObjectsResponse, error) // allow to edit them. // Managed property cannot be directly changed, so we modify it by exporting the // saved object and importing it again, overwriting the original one. -func (c *Client) SetManagedSavedObject(savedObjectType string, id string, managed bool) error { +func (c *Client) SetManagedSavedObject(ctx context.Context, savedObjectType string, id string, managed bool) error { exportRequest := ExportSavedObjectsRequest{ ExcludeExportDetails: true, IncludeReferencesDeep: false, @@ -126,7 +127,7 @@ func (c *Client) SetManagedSavedObject(savedObjectType string, id string, manage }, }, } - objects, err := c.ExportSavedObjects(exportRequest) + objects, err := c.ExportSavedObjects(ctx, exportRequest) if err != nil { return fmt.Errorf("failed to export %s %s: %w", savedObjectType, id, err) } @@ -139,7 +140,7 @@ func (c *Client) SetManagedSavedObject(savedObjectType string, id string, manage Overwrite: true, Objects: objects, } - _, err = c.ImportSavedObjects(importRequest) + _, err = c.ImportSavedObjects(ctx, importRequest) if err != nil { return fmt.Errorf("failed to import %s %s: %w", savedObjectType, id, err) } @@ -158,14 +159,14 @@ type ExportSavedObjectsRequestObject struct { Type string `json:"type"` } -func (c *Client) ExportSavedObjects(request ExportSavedObjectsRequest) ([]map[string]any, error) { +func (c *Client) ExportSavedObjects(ctx context.Context, request ExportSavedObjectsRequest) ([]map[string]any, error) { body, err := json.Marshal(request) if err != nil { return nil, fmt.Errorf("failed to encode request: %w", err) } path := SavedObjectsAPI + "/_export" - statusCode, respBody, err := c.SendRequest(http.MethodPost, path, body) + statusCode, respBody, err := c.SendRequest(ctx, http.MethodPost, path, body) if err != nil { return nil, fmt.Errorf("could not export saved objects; API status code = %d; response body = %s: %w", statusCode, string(respBody), err) } @@ -208,7 +209,7 @@ type ImportResult struct { Meta map[string]any `json:"meta"` } -func (c *Client) ImportSavedObjects(importRequest ImportSavedObjectsRequest) (*ImportSavedObjectsResponse, error) { +func (c *Client) ImportSavedObjects(ctx context.Context, importRequest ImportSavedObjectsRequest) (*ImportSavedObjectsResponse, error) { var body bytes.Buffer multipartWriter := multipart.NewWriter(&body) fileWriter, err := multipartWriter.CreateFormFile("file", "file.ndjson") @@ -229,7 +230,7 @@ func (c *Client) ImportSavedObjects(importRequest ImportSavedObjectsRequest) (*I } path := SavedObjectsAPI + "/_import" - request, err := c.newRequest(http.MethodPost, path, &body) + request, err := c.newRequest(ctx, http.MethodPost, path, &body) if err != nil { return nil, fmt.Errorf("cannot create new request: %w", err) } diff --git a/internal/kibana/savedobjects_test.go b/internal/kibana/savedobjects_test.go index c678a70b8f..9f7cc1de0f 100644 --- a/internal/kibana/savedobjects_test.go +++ b/internal/kibana/savedobjects_test.go @@ -5,6 +5,7 @@ package kibana_test import ( + "context" "net/http" "testing" @@ -25,7 +26,7 @@ func TestSetManagedSavedObject(t *testing.T) { id := preloadDashboard(t, client) require.True(t, getManagedSavedObject(t, client, "dashboard", id)) - err := client.SetManagedSavedObject("dashboard", id, false) + err := client.SetManagedSavedObject(context.Background(), "dashboard", id, false) require.NoError(t, err) assert.False(t, getManagedSavedObject(t, client, "dashboard", id)) } @@ -45,11 +46,11 @@ func preloadDashboard(t *testing.T, client *kibana.Client) string { }, }, } - _, err := client.ImportSavedObjects(importRequest) + _, err := client.ImportSavedObjects(context.Background(), importRequest) require.NoError(t, err) t.Cleanup(func() { - statusCode, _, err := client.SendRequest(http.MethodDelete, kibana.SavedObjectsAPI+"/dashboard/"+id, nil) + statusCode, _, err := client.SendRequest(context.Background(), http.MethodDelete, kibana.SavedObjectsAPI+"/dashboard/"+id, nil) require.NoError(t, err) require.Equal(t, http.StatusOK, statusCode) }) @@ -67,7 +68,7 @@ func getManagedSavedObject(t *testing.T, client *kibana.Client, savedObjectType }, }, } - export, err := client.ExportSavedObjects(exportRequest) + export, err := client.ExportSavedObjects(context.Background(), exportRequest) require.NoError(t, err) require.Len(t, export, 1) diff --git a/internal/kibana/status.go b/internal/kibana/status.go index 4edbf428d6..2e16afb9da 100644 --- a/internal/kibana/status.go +++ b/internal/kibana/status.go @@ -5,6 +5,7 @@ package kibana import ( + "context" "encoding/json" "fmt" "net/http" @@ -42,9 +43,9 @@ func (c *Client) Version() (VersionInfo, error) { return c.versionInfo, nil } -func (c *Client) requestStatus() (statusType, error) { +func (c *Client) requestStatus(ctx context.Context) (statusType, error) { var status statusType - statusCode, respBody, err := c.get(StatusAPI) + statusCode, respBody, err := c.get(ctx, StatusAPI) if err != nil { return status, fmt.Errorf("could not reach status endpoint: %w", err) } @@ -63,8 +64,8 @@ func (c *Client) requestStatus() (statusType, error) { } // CheckHealth checks the Kibana health -func (c *Client) CheckHealth() error { - status, err := c.requestStatus() +func (c *Client) CheckHealth(ctx context.Context) error { + status, err := c.requestStatus(ctx) if err != nil { return fmt.Errorf("could not reach status endpoint: %w", err) } diff --git a/internal/packages/installer/factory.go b/internal/packages/installer/factory.go index 5309438d2c..29819f43a0 100644 --- a/internal/packages/installer/factory.go +++ b/internal/packages/installer/factory.go @@ -5,6 +5,7 @@ package installer import ( + "context" "errors" "fmt" @@ -21,10 +22,10 @@ var semver8_7_0 = semver.MustParse("8.7.0") // Installer is responsible for installation/uninstallation of the package. type Installer interface { - Install() (*InstalledPackage, error) - Uninstall() error + Install(context.Context) (*InstalledPackage, error) + Uninstall(context.Context) error - Manifest() (*packages.PackageManifest, error) + Manifest(context.Context) (*packages.PackageManifest, error) } // Options are the parameters used to build an installer. diff --git a/internal/packages/installer/installer.go b/internal/packages/installer/installer.go index dcf90ec736..7a54259ee7 100644 --- a/internal/packages/installer/installer.go +++ b/internal/packages/installer/installer.go @@ -5,6 +5,7 @@ package installer import ( + "context" "fmt" "github.com/elastic/elastic-package/internal/kibana" @@ -36,8 +37,8 @@ func CreateForManifest(kibanaClient *kibana.Client, packageRoot string) (*manife } // Install method installs the package using Kibana API. -func (i *manifestInstaller) Install() (*InstalledPackage, error) { - assets, err := i.kibanaClient.InstallPackage(i.manifest.Name, i.manifest.Version) +func (i *manifestInstaller) Install(ctx context.Context) (*InstalledPackage, error) { + assets, err := i.kibanaClient.InstallPackage(ctx, i.manifest.Name, i.manifest.Version) if err != nil { return nil, fmt.Errorf("can't install the package: %w", err) } @@ -50,8 +51,8 @@ func (i *manifestInstaller) Install() (*InstalledPackage, error) { } // Uninstall method uninstalls the package using Kibana API. -func (i *manifestInstaller) Uninstall() error { - _, err := i.kibanaClient.RemovePackage(i.manifest.Name, i.manifest.Version) +func (i *manifestInstaller) Uninstall(ctx context.Context) error { + _, err := i.kibanaClient.RemovePackage(ctx, i.manifest.Name, i.manifest.Version) if err != nil { return fmt.Errorf("can't remove the package: %w", err) } @@ -59,6 +60,6 @@ func (i *manifestInstaller) Uninstall() error { } // Manifest method returns the package manifest. -func (i *manifestInstaller) Manifest() (*packages.PackageManifest, error) { +func (i *manifestInstaller) Manifest(context.Context) (*packages.PackageManifest, error) { return i.manifest, nil } diff --git a/internal/packages/installer/zip_installer.go b/internal/packages/installer/zip_installer.go index df062bfcc9..39672c8a1d 100644 --- a/internal/packages/installer/zip_installer.go +++ b/internal/packages/installer/zip_installer.go @@ -5,6 +5,7 @@ package installer import ( + "context" "fmt" "github.com/elastic/elastic-package/internal/kibana" @@ -32,8 +33,8 @@ func CreateForZip(kibanaClient *kibana.Client, zipPath string) (*zipInstaller, e } // Install method installs the package using Kibana API. -func (i *zipInstaller) Install() (*InstalledPackage, error) { - assets, err := i.kibanaClient.InstallZipPackage(i.zipPath) +func (i *zipInstaller) Install(ctx context.Context) (*InstalledPackage, error) { + assets, err := i.kibanaClient.InstallZipPackage(ctx, i.zipPath) if err != nil { return nil, fmt.Errorf("can't install the package: %w", err) } @@ -46,8 +47,8 @@ func (i *zipInstaller) Install() (*InstalledPackage, error) { } // Uninstall method uninstalls the package using Kibana API. -func (i *zipInstaller) Uninstall() error { - _, err := i.kibanaClient.RemovePackage(i.manifest.Name, i.manifest.Version) +func (i *zipInstaller) Uninstall(ctx context.Context) error { + _, err := i.kibanaClient.RemovePackage(ctx, i.manifest.Name, i.manifest.Version) if err != nil { return fmt.Errorf("can't remove the package: %w", err) } @@ -55,6 +56,6 @@ func (i *zipInstaller) Uninstall() error { } // Manifest method returns the package manifest. -func (i *zipInstaller) Manifest() (*packages.PackageManifest, error) { +func (i *zipInstaller) Manifest(context.Context) (*packages.PackageManifest, error) { return i.manifest, nil } diff --git a/internal/serverless/project.go b/internal/serverless/project.go index 1c2a92f7bf..d95cd2806c 100644 --- a/internal/serverless/project.go +++ b/internal/serverless/project.go @@ -75,7 +75,7 @@ func (p *Project) Status(ctx context.Context, elasticsearchClient *elasticsearch status = map[string]string{ "elasticsearch": healthStatus(p.getESHealth(ctx, elasticsearchClient)), - "kibana": healthStatus(p.getKibanaHealth(kibanaClient)), + "kibana": healthStatus(p.getKibanaHealth(ctx, kibanaClient)), "fleet": healthStatus(p.getFleetHealth(ctx)), } return status, nil @@ -99,7 +99,7 @@ func (p *Project) ensureElasticsearchHealthy(ctx context.Context, elasticsearchC func (p *Project) ensureKibanaHealthy(ctx context.Context, kibanaClient *kibana.Client) error { for { - err := kibanaClient.CheckHealth() + err := kibanaClient.CheckHealth(ctx) if err == nil { return nil } @@ -129,8 +129,8 @@ func (p *Project) ensureFleetHealthy(ctx context.Context) error { } } -func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, error) { - fleetURL, err := kibanaClient.DefaultFleetServerURL() +func (p *Project) DefaultFleetServerURL(ctx context.Context, kibanaClient *kibana.Client) (string, error) { + fleetURL, err := kibanaClient.DefaultFleetServerURL(ctx) if err != nil { return "", fmt.Errorf("failed to query fleet server hosts: %w", err) } @@ -138,7 +138,7 @@ func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, er return fleetURL, nil } -func (p *Project) AddLogstashFleetOutput(profile *profile.Profile, kibanaClient *kibana.Client) error { +func (p *Project) AddLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error { logstashFleetOutput := kibana.FleetOutput{ Name: "logstash-output", ID: FleetLogstashOutput, @@ -146,14 +146,14 @@ func (p *Project) AddLogstashFleetOutput(profile *profile.Profile, kibanaClient Hosts: []string{"logstash:5044"}, } - if err := kibanaClient.AddFleetOutput(logstashFleetOutput); err != nil { + if err := kibanaClient.AddFleetOutput(ctx, logstashFleetOutput); err != nil { return fmt.Errorf("failed to add logstash fleet output: %w", err) } return nil } -func (p *Project) UpdateLogstashFleetOutput(profile *profile.Profile, kibanaClient *kibana.Client) error { +func (p *Project) UpdateLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error { certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent") caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem")) @@ -179,7 +179,7 @@ func (p *Project) UpdateLogstashFleetOutput(profile *profile.Profile, kibanaClie }, } - if err := kibanaClient.UpdateFleetOutput(logstashFleetOutput, FleetLogstashOutput); err != nil { + if err := kibanaClient.UpdateFleetOutput(ctx, logstashFleetOutput, FleetLogstashOutput); err != nil { return fmt.Errorf("failed to update logstash fleet output: %w", err) } @@ -190,8 +190,8 @@ func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elastics return elasticsearchClient.CheckHealth(ctx) } -func (p *Project) getKibanaHealth(kibanaClient *kibana.Client) error { - return kibanaClient.CheckHealth() +func (p *Project) getKibanaHealth(ctx context.Context, kibanaClient *kibana.Client) error { + return kibanaClient.CheckHealth(ctx) } func (p *Project) getFleetHealth(ctx context.Context) error { @@ -232,7 +232,7 @@ func (p *Project) getFleetHealth(ctx context.Context) error { return nil } -func (p *Project) CreateAgentPolicy(kibanaClient *kibana.Client, stackVersion string, outputId string, selfMonitor bool) error { +func (p *Project) CreateAgentPolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion string, outputId string, selfMonitor bool) error { policy := kibana.Policy{ ID: "elastic-agent-managed-ep", Name: "Elastic-Agent (elastic-package)", @@ -245,13 +245,13 @@ func (p *Project) CreateAgentPolicy(kibanaClient *kibana.Client, stackVersion st policy.MonitoringEnabled = []string{"logs", "metrics"} } - newPolicy, err := kibanaClient.CreatePolicy(policy) + newPolicy, err := kibanaClient.CreatePolicy(ctx, policy) if err != nil { return fmt.Errorf("error while creating agent policy: %w", err) } if selfMonitor { - err := p.createSystemPackagePolicy(kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace) + err := p.createSystemPackagePolicy(ctx, kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace) if err != nil { return err } @@ -260,7 +260,7 @@ func (p *Project) CreateAgentPolicy(kibanaClient *kibana.Client, stackVersion st return nil } -func (p *Project) createSystemPackagePolicy(kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error { +func (p *Project) createSystemPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error { systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{ KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX), }) @@ -279,7 +279,7 @@ func (p *Project) createSystemPackagePolicy(kibanaClient *kibana.Client, stackVe packagePolicy.Package.Name = "system" packagePolicy.Package.Version = systemPackages[0].Version - _, err = kibanaClient.CreatePackagePolicy(packagePolicy) + _, err = kibanaClient.CreatePackagePolicy(ctx, packagePolicy) if err != nil { return fmt.Errorf("error while creating package policy: %w", err) } diff --git a/internal/stack/serverless.go b/internal/stack/serverless.go index 44cc0a934c..8153c490fa 100644 --- a/internal/stack/serverless.go +++ b/internal/stack/serverless.go @@ -98,7 +98,7 @@ func (sp *serverlessProvider) createProject(ctx context.Context, settings projec return Config{}, err } - config.Parameters[paramServerlessFleetURL], err = project.DefaultFleetServerURL(sp.kibanaClient) + config.Parameters[paramServerlessFleetURL], err = project.DefaultFleetServerURL(ctx, sp.kibanaClient) if err != nil { return Config{}, fmt.Errorf("failed to get fleet URL: %w", err) } @@ -118,7 +118,7 @@ func (sp *serverlessProvider) createProject(ctx context.Context, settings projec } if settings.LogstashEnabled { - err = project.AddLogstashFleetOutput(sp.profile, sp.kibanaClient) + err = project.AddLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient) if err != nil { return Config{}, err } @@ -144,7 +144,7 @@ func (sp *serverlessProvider) currentProjectWithClientsAndFleetEndpoint(ctx cont fleetURL, found := config.Parameters[paramServerlessFleetURL] if !found { - fleetURL, err = project.DefaultFleetServerURL(sp.kibanaClient) + fleetURL, err = project.DefaultFleetServerURL(ctx, sp.kibanaClient) if err != nil { return nil, fmt.Errorf("failed to get fleet URL: %w", err) } @@ -275,7 +275,7 @@ func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error } logger.Infof("Creating agent policy") - err = project.CreateAgentPolicy(sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor) + err = project.CreateAgentPolicy(ctx, sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor) if err != nil { return fmt.Errorf("failed to create agent policy: %w", err) @@ -298,7 +298,7 @@ func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error // Updating the output with ssl certificates created in startLocalServices // The certificates are updated only when a new project is created and logstash is enabled if isNewProject && settings.LogstashEnabled { - err = project.UpdateLogstashFleetOutput(sp.profile, sp.kibanaClient) + err = project.UpdateLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient) if err != nil { return err } diff --git a/internal/testrunner/runners/asset/runner.go b/internal/testrunner/runners/asset/runner.go index 247cb4bd4c..c87bd76dc6 100644 --- a/internal/testrunner/runners/asset/runner.go +++ b/internal/testrunner/runners/asset/runner.go @@ -68,10 +68,10 @@ func (r *runner) Run(ctx context.Context, options testrunner.TestOptions) ([]tes r.packageRootPath = options.PackageRootPath r.kibanaClient = options.KibanaClient - return r.run() + return r.run(ctx) } -func (r *runner) run() ([]testrunner.TestResult, error) { +func (r *runner) run(ctx context.Context) ([]testrunner.TestResult, error) { result := testrunner.NewResultComposer(testrunner.TestResult{ TestType: TestType, Package: r.testFolder.Package, @@ -102,12 +102,8 @@ func (r *runner) run() ([]testrunner.TestResult, error) { if err != nil { return result.WithError(fmt.Errorf("can't create the package installer: %w", err)) } - installedPackage, err := packageInstaller.Install() - if err != nil { - return result.WithError(fmt.Errorf("can't install the package: %w", err)) - } - r.removePackageHandler = func(ctx context.Context) error { + removePackageHandler := func(ctx context.Context) error { pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.packageRootPath) if err != nil { return fmt.Errorf("reading package manifest failed: %w", err) @@ -130,7 +126,7 @@ func (r *runner) run() ([]testrunner.TestResult, error) { } logger.Debug("removing package...") - err = packageInstaller.Uninstall() + err = packageInstaller.Uninstall(ctx) if err != nil { // logging the error as a warning and not returning it since there could be other reasons that could make fail this process // for instance being defined a test agent policy where this package is used for debugging purposes @@ -139,6 +135,20 @@ func (r *runner) run() ([]testrunner.TestResult, error) { return nil } + installedPackage, err := packageInstaller.Install(ctx) + if errors.Is(err, context.Canceled) { + // Installation interrupted, at this point the package may have been installed, try to remove it for cleanup. + err := removePackageHandler(context.WithoutCancel(ctx)) + if err != nil { + logger.Debugf("error while removing package after installation interrupted: %s", err) + } + } + if err != nil { + return result.WithError(fmt.Errorf("can't install the package: %w", err)) + } + + r.removePackageHandler = removePackageHandler + // No Elasticsearch asset is created when an Input package is installed through the API. // This would require to create a Agent policy and add that input package to the Agent policy. // As those input packages could have some required fields, it would also require to add @@ -177,10 +187,14 @@ func (r *runner) run() ([]testrunner.TestResult, error) { } func (r *runner) TearDown(ctx context.Context) error { + // Avoid cancellations during cleanup. + cleanupCtx := context.WithoutCancel(ctx) + if r.removePackageHandler != nil { - if err := r.removePackageHandler(ctx); err != nil { + if err := r.removePackageHandler(cleanupCtx); err != nil { return err } + r.removePackageHandler = nil } return nil diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index cfab74a285..4938b88ab9 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -734,18 +734,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, servic return nil, fmt.Errorf("failed to initialize package installer: %v", err) } - if r.options.RunTearDown { - logger.Debug("Skip installing package") - } else { - // Allowed to re-install the package in RunTestsOnly to be able to - // test new changes introduced in the package - logger.Debug("Installing package...") - _, err = installer.Install() - if err != nil { - return nil, fmt.Errorf("failed to install package: %v", err) - } - } - r.deletePackageHandler = func(ctx context.Context) error { + deletePackageHandler := func(ctx context.Context) error { stackVersion, err := semver.NewVersion(serviceOptions.StackVersion) if err != nil { return fmt.Errorf("failed to parse stack version: %w", err) @@ -759,7 +748,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, servic } logger.Debug("removing package...") - err = installer.Uninstall() + err = installer.Uninstall(ctx) if err != nil { // logging the error as a warning and not returning it since there could be other reasons that could make fail this process // for instance being defined a test agent policy where this package is used for debugging purposes @@ -768,6 +757,26 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, servic return nil } + if r.options.RunTearDown { + logger.Debug("Skip installing package") + } else { + // Allowed to re-install the package in RunTestsOnly to be able to + // test new changes introduced in the package + logger.Debug("Installing package...") + _, err = installer.Install(ctx) + if errors.Is(err, context.Canceled) { + // Installation interrupted, at this point the package may have been installed, try to remove it for cleanup. + err := deletePackageHandler(context.WithoutCancel(ctx)) + if err != nil { + logger.Debugf("error while removing package after installation interrupted: %s", err) + } + } + if err != nil { + return nil, fmt.Errorf("failed to install package: %v", err) + } + } + r.deletePackageHandler = deletePackageHandler + // Configure package (single data stream) via Fleet APIs. var policy *kibana.Policy if r.options.RunTearDown || r.options.RunTestsOnly { @@ -786,14 +795,14 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, servic if r.options.Profile.Config("stack.logstash_enabled", "false") == "true" { p.DataOutputID = "fleet-logstash-output" } - policy, err = r.options.KibanaClient.CreatePolicy(p) + policy, err = r.options.KibanaClient.CreatePolicy(ctx, p) if err != nil { return nil, fmt.Errorf("could not create test policy: %w", err) } } r.deleteTestPolicyHandler = func(ctx context.Context) error { logger.Debug("deleting test policy...") - if err := r.options.KibanaClient.DeletePolicy(*policy); err != nil { + if err := r.options.KibanaClient.DeletePolicy(ctx, *policy); err != nil { return fmt.Errorf("error cleaning up test policy: %w", err) } return nil @@ -804,7 +813,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, servic if r.options.RunTearDown || r.options.RunTestsOnly { logger.Debug("Skip adding data stream config to policy") } else { - if err := r.options.KibanaClient.AddPackageDataStreamToPolicy(ds); err != nil { + if err := r.options.KibanaClient.AddPackageDataStreamToPolicy(ctx, ds); err != nil { return nil, fmt.Errorf("could not add data stream config to policy: %w", err) } } @@ -885,7 +894,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, servic default: logger.Debug("Set Debug log level to agent") origLogLevel = agent.LocalMetadata.Elastic.Agent.LogLevel - err = r.options.KibanaClient.SetAgentLogLevel(agent.ID, "debug") + err = r.options.KibanaClient.SetAgentLogLevel(ctx, agent.ID, "debug") if err != nil { return nil, fmt.Errorf("error setting log level debug for agent %s: %w", agent.ID, err) } @@ -893,7 +902,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, servic r.resetAgentLogLevelHandler = func(ctx context.Context) error { logger.Debugf("reassigning original log level %q back to agent...", origLogLevel) - if err := r.options.KibanaClient.SetAgentLogLevel(agent.ID, origLogLevel); err != nil { + if err := r.options.KibanaClient.SetAgentLogLevel(ctx, agent.ID, origLogLevel); err != nil { return fmt.Errorf("error reassigning original log level to agent: %w", err) } return nil @@ -903,7 +912,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, servic case r.options.RunTearDown || r.options.RunTestsOnly: logger.Debug("Skip assiging package data stream to agent") default: - policyWithDataStream, err := r.options.KibanaClient.GetPolicy(policy.ID) + policyWithDataStream, err := r.options.KibanaClient.GetPolicy(ctx, policy.ID) if err != nil { return nil, fmt.Errorf("could not read the policy with data stream: %w", err) } @@ -1186,7 +1195,7 @@ func (r *runner) runTest(ctx context.Context, config *testConfig, serviceContext func checkEnrolledAgents(ctx context.Context, client *kibana.Client, serviceContext servicedeployer.ServiceInfo) ([]kibana.Agent, error) { var agents []kibana.Agent enrolled, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) { - allAgents, err := client.ListAgents() + allAgents, err := client.ListAgents(ctx) if err != nil { return false, fmt.Errorf("could not list agents: %w", err) }