diff --git a/internal/certs/certs.go b/internal/certs/certs.go index b0b901e3e7..977a99af6f 100644 --- a/internal/certs/certs.go +++ b/internal/certs/certs.go @@ -100,7 +100,7 @@ func LoadCA(certFile, keyFile string) (*Issuer, error) { } func newCA(parent *Issuer) (*Issuer, error) { - cert, err := New(true, parent) + cert, err := New(true, false, parent) if err != nil { return nil, err } @@ -115,13 +115,19 @@ func (i *Issuer) IssueIntermediate() (*Issuer, error) { // Issue issues a certificate with the given options. This certificate // can be used to configure a TLS server. func (i *Issuer) Issue(opts ...Option) (*Certificate, error) { - return New(false, i, opts...) + return New(false, false, i, opts...) +} + +// IssueClient issues a certificate with the given options. This certificate +// can be used to configure a TLS client. +func (i *Issuer) IssueClient(opts ...Option) (*Certificate, error) { + return New(false, true, i, opts...) } // NewSelfSignedCert issues a self-signed certificate with the given options. // This certificate can be used to configure a TLS server. func NewSelfSignedCert(opts ...Option) (*Certificate, error) { - return New(false, nil, opts...) + return New(false, false, nil, opts...) } // Option is a function that can modify a certificate template. To be used @@ -140,7 +146,7 @@ func WithName(name string) Option { // New is the main helper to create a certificate, it is recommended to // use the more specific ones for specific use cases. -func New(isCA bool, issuer *Issuer, opts ...Option) (*Certificate, error) { +func New(isCA, isClient bool, issuer *Issuer, opts ...Option) (*Certificate, error) { key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) if err != nil { return nil, fmt.Errorf("failed to generate key: %w", err) @@ -172,6 +178,15 @@ func New(isCA bool, issuer *Issuer, opts ...Option) (*Certificate, error) { } else { template.Subject.CommonName = "intermediate elastic-package CA" } + // If the requester is a client we set clientAuth instead + } else if isClient { + template.ExtKeyUsage = []x509.ExtKeyUsage{ + x509.ExtKeyUsageClientAuth, + } + + // Include local hostname and ips as alternates in service certificates. + template.DNSNames = []string{"localhost"} + template.IPAddresses = []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")} } else { template.ExtKeyUsage = []x509.ExtKeyUsage{ // Required for Chrome in OSX to show the "Proceed anyway" link. @@ -313,8 +328,8 @@ func checkExpectedCertUsage(cert *x509.Certificate) error { if !cert.IsCA { // Required for Chrome in OSX to show the "Proceed anyway" link. // https://stackoverflow.com/a/64309893/28855 - if !containsExtKeyUsage(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth) { - return fmt.Errorf("missing server auth key usage in certificate") + if !(containsExtKeyUsage(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth) || containsExtKeyUsage(cert.ExtKeyUsage, x509.ExtKeyUsageClientAuth)) { + return fmt.Errorf("missing either of server/client auth key usage in certificate") } } diff --git a/internal/kibana/fleet.go b/internal/kibana/fleet.go index 244b606085..869044bb12 100644 --- a/internal/kibana/fleet.go +++ b/internal/kibana/fleet.go @@ -12,6 +12,20 @@ import ( "time" ) +type FleetOutput struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Hosts []string `json:"hosts,omitempty"` + Type string `json:"type,omitempty"` + SSL *AgentSSL `json:"ssl,omitempty"` +} + +type AgentSSL struct { + CertificateAuthorities []string `json:"certificate_authorities,omitempty"` + Certificate string `json:"certificate,omitempty"` + Key string `json:"key,omitempty"` +} + // DefaultFleetServerURL returns the default Fleet server configured in Kibana func (c *Client) DefaultFleetServerURL() (string, error) { path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI) @@ -45,6 +59,45 @@ func (c *Client) DefaultFleetServerURL() (string, error) { return "", errors.New("could not find the fleet server URL for this project") } +// UpdateFleetOutput updates an existing output to fleet +// For example, to update ssl certificates etc., +func (c *Client) UpdateFleetOutput(fo FleetOutput, outputId string) error { + reqBody, err := json.Marshal(fo) + if err != nil { + return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err) + } + + statusCode, respBody, err := c.put(fmt.Sprintf("%s/outputs/%s", FleetAPI, outputId), reqBody) + if err != nil { + return fmt.Errorf("could not update fleet output: %w", err) + } + + if statusCode != http.StatusOK { + return fmt.Errorf("could not update fleet output; API status code = %d; response body = %s", statusCode, respBody) + } + + return nil +} + +// AddFleetOutput adds an additional output to fleet eg., logstash +func (c *Client) AddFleetOutput(fo FleetOutput) error { + reqBody, err := json.Marshal(fo) + if err != nil { + return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err) + } + + statusCode, respBody, err := c.post(fmt.Sprintf("%s/outputs", FleetAPI), reqBody) + if err != nil { + return fmt.Errorf("could not create fleet output: %w", err) + } + + if statusCode != http.StatusOK { + return fmt.Errorf("could not add fleet output; API status code = %d; response body = %s", statusCode, respBody) + } + + return nil +} + func (c *Client) SetAgentLogLevel(agentID, level string) error { path := fmt.Sprintf("%s/agents/%s/actions", FleetAPI, agentID) diff --git a/internal/serverless/project.go b/internal/serverless/project.go index 2b911e3e37..6d5e3940b8 100644 --- a/internal/serverless/project.go +++ b/internal/serverless/project.go @@ -11,15 +11,22 @@ import ( "io" "net/http" "net/url" + "os" + "path/filepath" "strings" "time" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/kibana" "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/profile" "github.com/elastic/elastic-package/internal/registry" ) +const ( + FleetLogstashOutput = "fleet-logstash-output" +) + // Project represents a serverless project type Project struct { url string @@ -131,6 +138,54 @@ func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, er return fleetURL, nil } +func (p *Project) AddLogstashFleetOutput(profile *profile.Profile, kibanaClient *kibana.Client) error { + logstashFleetOutput := kibana.FleetOutput{ + Name: "logstash-output", + ID: FleetLogstashOutput, + Type: "logstash", + Hosts: []string{"logstash:5044"}, + } + + if err := kibanaClient.AddFleetOutput(logstashFleetOutput); err != nil { + return fmt.Errorf("failed to add logstash fleet output: %w", err) + } + + return nil +} + +func (p *Project) UpdateLogstashFleetOutput(profile *profile.Profile, kibanaClient *kibana.Client) error { + certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent") + + caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem")) + if err != nil { + return fmt.Errorf("failed to read ca certificate: %w", err) + } + + certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem")) + if err != nil { + return fmt.Errorf("failed to read client certificate: %w", err) + } + + keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem")) + if err != nil { + return fmt.Errorf("failed to read client certificate private key: %w", err) + } + + logstashFleetOutput := kibana.FleetOutput{ + SSL: &kibana.AgentSSL{ + CertificateAuthorities: []string{string(caFile)}, + Certificate: string(certFile), + Key: string(keyFile), + }, + } + + if err := kibanaClient.UpdateFleetOutput(logstashFleetOutput, FleetLogstashOutput); err != nil { + return fmt.Errorf("failed to update logstash fleet output: %w", err) + } + + return nil +} + func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error { return elasticsearchClient.CheckHealth(ctx) } @@ -177,7 +232,7 @@ func (p *Project) getFleetHealth(ctx context.Context) error { return nil } -func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client) error { +func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client, outputId string) error { systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{ KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX), }) @@ -195,7 +250,9 @@ func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Cl Description: "Policy created by elastic-package", Namespace: "default", MonitoringEnabled: []string{"logs", "metrics"}, + DataOutputID: outputId, } + newPolicy, err := kibanaClient.CreatePolicy(policy) if err != nil { return fmt.Errorf("error while creating agent policy: %w", err) diff --git a/internal/stack/_static/serverless-docker-compose.yml.tmpl b/internal/stack/_static/serverless-docker-compose.yml.tmpl new file mode 100644 index 0000000000..2ec85a1577 --- /dev/null +++ b/internal/stack/_static/serverless-docker-compose.yml.tmpl @@ -0,0 +1,59 @@ +version: '2.3' +services: + elastic-agent: + image: "{{ fact "agent_image" }}" + healthcheck: + test: "elastic-agent status" + timeout: 2s + start_period: 360s + retries: 180 + interval: 5s + hostname: docker-fleet-agent + env_file: "./elastic-agent.env" + volumes: + - type: bind + source: ../../../tmp/service_logs/ + target: /tmp/service_logs/ + # Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235). + - type: bind + source: ../../../tmp/service_logs/ + target: /run/service_logs/ + - "../certs/ca-cert.pem:/etc/ssl/certs/elastic-package.pem" + + elastic-agent_is_ready: + image: tianon/true + depends_on: + elastic-agent: + condition: service_healthy + +{{ $logstash_enabled := fact "logstash_enabled" }} +{{ if eq $logstash_enabled "true" }} + logstash: + image: "{{ fact "logstash_image" }}" + healthcheck: + test: bin/logstash -t + interval: 60s + timeout: 50s + retries: 5 + # logstash expects the key in pkcs8 format. Hence converting the key.pem to pkcs8 format using openssl. + # Also logstash-filter-elastic_integration plugin is installed by default to run ingest pipelines in logstash. + # elastic-package#1637 made improvements to enable logstash stats through port 9600. + command: bash -c 'openssl pkcs8 -inform PEM -in /usr/share/logstash/config/certs/key.pem -topk8 -nocrypt -outform PEM -out /usr/share/logstash/config/certs/logstash.pkcs8.key && chmod 777 /usr/share/logstash/config/certs/logstash.pkcs8.key && if [[ ! $(bin/logstash-plugin list) == *"logstash-filter-elastic_integration"* ]]; then echo "Missing plugin logstash-filter-elastic_integration, installing now" && bin/logstash-plugin install logstash-filter-elastic_integration; fi && bin/logstash -f /usr/share/logstash/pipeline/logstash.conf' + volumes: + - "../certs/logstash:/usr/share/logstash/config/certs" + - "./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro" + ports: + - "127.0.0.1:5044:5044" + - "127.0.0.1:9600:9600" + environment: + - xpack.monitoring.enabled=false + - ELASTIC_USER={{ fact "username" }} + - ELASTIC_PASSWORD={{ fact "password" }} + - ELASTIC_HOSTS={{ fact "elasticsearch_host" }} + + logstash_is_ready: + image: tianon/true + depends_on: + logstash: + condition: service_healthy +{{ end }} diff --git a/internal/stack/_static/serverless-elastic-agent.yml.tmpl b/internal/stack/_static/serverless-elastic-agent.yml.tmpl deleted file mode 100644 index 94744bb2b7..0000000000 --- a/internal/stack/_static/serverless-elastic-agent.yml.tmpl +++ /dev/null @@ -1,26 +0,0 @@ -version: '2.3' -services: - elastic-agent: - image: "{{ fact "agent_image" }}" - healthcheck: - test: "elastic-agent status" - timeout: 2s - start_period: 360s - retries: 180 - interval: 5s - hostname: docker-fleet-agent - env_file: "./elastic-agent.env" - volumes: - - type: bind - source: ../../../tmp/service_logs/ - target: /tmp/service_logs/ - # Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235). - - type: bind - source: ../../../tmp/service_logs/ - target: /run/service_logs/ - - elastic-agent_is_ready: - image: tianon/true - depends_on: - elastic-agent: - condition: service_healthy diff --git a/internal/stack/_static/serverless-logstash.conf.tmpl b/internal/stack/_static/serverless-logstash.conf.tmpl new file mode 100644 index 0000000000..d42f1d493e --- /dev/null +++ b/internal/stack/_static/serverless-logstash.conf.tmpl @@ -0,0 +1,32 @@ +input { + elastic_agent { + port => 5044 + ssl_enabled => true + ssl_certificate_authorities => ["/usr/share/logstash/config/certs/ca-cert.pem"] + ssl_certificate => "/usr/share/logstash/config/certs/cert.pem" + ssl_key => "/usr/share/logstash/config/certs/logstash.pkcs8.key" + } +} + + +filter { + elastic_integration { + remove_field => ['@version'] + hosts => ["{{ fact "elasticsearch_host" }}"] + username => '{{ fact "username" }}' + password => '{{ fact "password" }}' + ssl_enabled => true + ssl_verification_mode => "full" + } +} + + +output { + elasticsearch { + hosts => ["{{ fact "elasticsearch_host" }}"] + user => '{{ fact "username" }}' + password => '{{ fact "password" }}' + ssl_enabled => true + data_stream => "true" + } +} diff --git a/internal/stack/certs.go b/internal/stack/certs.go index 7416d6e072..8c2c264ae6 100644 --- a/internal/stack/certs.go +++ b/internal/stack/certs.go @@ -16,14 +16,24 @@ import ( "github.com/elastic/elastic-package/internal/certs" ) +type tlsService struct { + Name string + IsClient bool +} + // tlsServices is the list of server TLS certificates that will be // created in the given path. -var tlsServices = []string{ - "elasticsearch", - "kibana", - "package-registry", - "fleet-server", - "logstash", +var tlsServices = []tlsService{ + {Name: "elasticsearch"}, + {Name: "kibana"}, + {Name: "package-registry"}, + {Name: "fleet-server"}, + {Name: "logstash"}, +} + +var tlsServicesServerless = []tlsService{ + {Name: "logstash"}, + {Name: "elastic-agent", IsClient: true}, } var ( @@ -43,7 +53,7 @@ var ( // initTLSCertificates initializes all the certificates needed to run the services // managed by elastic-package stack. It includes a CA, and a pair of keys and // certificates for each service. -func initTLSCertificates(fileProvider string, profilePath string) ([]resource.Resource, error) { +func initTLSCertificates(fileProvider string, profilePath string, tlsServices []tlsService) ([]resource.Resource, error) { certsDir := filepath.Join(profilePath, CertificatesDirectory) caCertFile := filepath.Join(profilePath, string(CACertificateFile)) caKeyFile := filepath.Join(profilePath, string(CAKeyFile)) @@ -69,7 +79,7 @@ func initTLSCertificates(fileProvider string, profilePath string) ([]resource.Re } for _, service := range tlsServices { - certsDir := filepath.Join(certsDir, service) + certsDir := filepath.Join(certsDir, service.Name) caFile := filepath.Join(certsDir, "ca-cert.pem") certFile := filepath.Join(certsDir, "cert.pem") keyFile := filepath.Join(certsDir, "key.pem") @@ -119,7 +129,7 @@ func certWriteToResource(resources []resource.Resource, fileProvider string, pro } func initCA(certFile, keyFile string) (*certs.Issuer, error) { - if err := verifyTLSCertificates(certFile, certFile, keyFile, ""); err == nil { + if err := verifyTLSCertificates(certFile, certFile, keyFile, tlsService{}); err == nil { // Valid CA is already present, load it to check service certificates. ca, err := certs.LoadCA(certFile, keyFile) if err != nil { @@ -134,21 +144,30 @@ func initCA(certFile, keyFile string) (*certs.Issuer, error) { return ca, nil } -func initServiceTLSCertificates(ca *certs.Issuer, caCertFile string, certFile, keyFile, service string) (*certs.Certificate, error) { +func initServiceTLSCertificates(ca *certs.Issuer, caCertFile string, certFile, keyFile string, service tlsService) (*certs.Certificate, error) { if err := verifyTLSCertificates(caCertFile, certFile, keyFile, service); err == nil { // Certificate already present and valid, load it. return certs.LoadCertificate(certFile, keyFile) } - cert, err := ca.Issue(certs.WithName(service)) - if err != nil { - return nil, fmt.Errorf("error initializing certificate for %q", service) + var cert *certs.Certificate + var err error + if service.IsClient { + cert, err = ca.IssueClient(certs.WithName(service.Name)) + if err != nil { + return nil, fmt.Errorf("error initializing certificate for %q", service.Name) + } + } else { + cert, err = ca.Issue(certs.WithName(service.Name)) + if err != nil { + return nil, fmt.Errorf("error initializing certificate for %q", service.Name) + } } return cert, nil } -func verifyTLSCertificates(caFile, certFile, keyFile, name string) error { +func verifyTLSCertificates(caFile, certFile, keyFile string, service tlsService) error { cert, err := certs.LoadCertificate(certFile, keyFile) if err != nil { return err @@ -161,9 +180,16 @@ func verifyTLSCertificates(caFile, certFile, keyFile, name string) error { options := x509.VerifyOptions{ Roots: certPool, } - if name != "" { - options.DNSName = name + if service.Name != "" { + options.DNSName = service.Name } + + // By default ExtKeyUsageServerAuth is add to KeyUsages + // See https://github.com/golang/go/blob/master/src/crypto/x509/verify.go#L193-L195 + if service.IsClient { + options.KeyUsages = []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth} + } + err = cert.Verify(options) if err != nil { return err diff --git a/internal/stack/certs_test.go b/internal/stack/certs_test.go index 2d7daf2eba..07b6183a7c 100644 --- a/internal/stack/certs_test.go +++ b/internal/stack/certs_test.go @@ -15,50 +15,62 @@ import ( ) func TestTLSCertsInitialization(t *testing.T) { + + tests := []struct { + name string + services []tlsService + }{ + {"tlsServices", tlsServices}, + {"tlsServicesServerless", tlsServicesServerless}, + } profilePath := t.TempDir() caCertFile := filepath.Join(profilePath, "certs", "ca-cert.pem") caKeyFile := filepath.Join(profilePath, "certs", "ca-key.pem") - assert.Error(t, verifyTLSCertificates(caCertFile, caCertFile, caKeyFile, "")) + assert.Error(t, verifyTLSCertificates(caCertFile, caCertFile, caKeyFile, tlsService{})) providerName := "test-file" - resources, err := initTLSCertificates(providerName, profilePath) - require.NoError(t, err) - resourceManager := resource.NewManager() - resourceManager.RegisterProvider(providerName, &resource.FileProvider{ - Prefix: profilePath, - }) - _, err = resourceManager.Apply(resources) - require.NoError(t, err) - assert.NoError(t, verifyTLSCertificates(caCertFile, caCertFile, caKeyFile, "")) + for _, tt := range tests { + resources, err := initTLSCertificates(providerName, profilePath, tt.services) + require.NoError(t, err) - for _, service := range tlsServices { - t.Run(service, func(t *testing.T) { - serviceCertFile := filepath.Join(profilePath, "certs", service, "cert.pem") - serviceKeyFile := filepath.Join(profilePath, "certs", service, "key.pem") - assert.NoError(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service)) + resourceManager.RegisterProvider(providerName, &resource.FileProvider{ + Prefix: profilePath, }) + _, err = resourceManager.Apply(resources) + require.NoError(t, err) + + assert.NoError(t, verifyTLSCertificates(caCertFile, caCertFile, caKeyFile, tlsService{})) + for _, service := range tt.services { + t.Run(service.Name, func(t *testing.T) { + serviceCertFile := filepath.Join(profilePath, "certs", service.Name, "cert.pem") + serviceKeyFile := filepath.Join(profilePath, "certs", service.Name, "key.pem") + assert.NoError(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service)) + }) + } } - t.Run("service certificate individually recreated", func(t *testing.T) { - service := tlsServices[0] - serviceCertFile := filepath.Join(profilePath, "certs", service, "cert.pem") - serviceKeyFile := filepath.Join(profilePath, "certs", service, "key.pem") - assert.NoError(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service)) + for _, tt := range tests { + t.Run("service certificate individually recreated", func(t *testing.T) { + service := tt.services[0] + serviceCertFile := filepath.Join(profilePath, "certs", service.Name, "cert.pem") + serviceKeyFile := filepath.Join(profilePath, "certs", service.Name, "key.pem") + assert.NoError(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service)) - // Remove the certificate. - os.Remove(serviceCertFile) - os.Remove(serviceKeyFile) - assert.Error(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service)) + // Remove the certificate. + os.Remove(serviceCertFile) + os.Remove(serviceKeyFile) + assert.Error(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service)) - // Check it is created again and is validated by the same CA. - resources, err := initTLSCertificates(providerName, profilePath) - require.NoError(t, err) - _, err = resourceManager.Apply(resources) - require.NoError(t, err) + // Check it is created again and is validated by the same CA. + resources, err := initTLSCertificates(providerName, profilePath, tt.services) + require.NoError(t, err) + _, err = resourceManager.Apply(resources) + require.NoError(t, err) - assert.NoError(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service)) - }) + assert.NoError(t, verifyTLSCertificates(caCertFile, serviceCertFile, serviceKeyFile, service)) + }) + } } diff --git a/internal/stack/resources.go b/internal/stack/resources.go index c0a8b10a32..d34d0b660a 100644 --- a/internal/stack/resources.go +++ b/internal/stack/resources.go @@ -123,8 +123,9 @@ func applyResources(profile *profile.Profile, stackVersion string) error { "kibana_version": stackVersion, "agent_version": stackVersion, - "kibana_host": "https://kibana:5601", - "fleet_url": "https://fleet-server:8220", + "kibana_host": "https://kibana:5601", + "fleet_url": "https://fleet-server:8220", + "elasticsearch_host": "https://elasticsearch:9200", "username": elasticsearchUsername, "password": elasticsearchPassword, @@ -144,7 +145,7 @@ func applyResources(profile *profile.Profile, stackVersion string) error { resourceManager.RegisterProvider("certs", &resource.FileProvider{ Prefix: profile.ProfilePath, }) - certResources, err := initTLSCertificates("certs", profile.ProfilePath) + certResources, err := initTLSCertificates("certs", profile.ProfilePath, tlsServices) if err != nil { return fmt.Errorf("failed to create TLS files: %w", err) } diff --git a/internal/stack/serverless.go b/internal/stack/serverless.go index 02197a753d..f58204c23d 100644 --- a/internal/stack/serverless.go +++ b/internal/stack/serverless.go @@ -29,6 +29,7 @@ const ( configRegion = "stack.serverless.region" configProjectType = "stack.serverless.type" configElasticCloudURL = "stack.elastic_cloud.host" + configLogstashEnabled = "stack.logstash_enabled" defaultRegion = "aws-us-east-1" defaultProjectType = "observability" @@ -54,7 +55,8 @@ type projectSettings struct { Region string Type string - StackVersion string + StackVersion string + LogstashEnabled bool } func (sp *serverlessProvider) createProject(settings projectSettings, options Options, conf Config) (Config, error) { @@ -75,7 +77,6 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op paramServerlessProjectID: project.ID, paramServerlessProjectType: project.Type, } - config.ElasticsearchHost = project.Endpoints.Elasticsearch config.KibanaHost = project.Endpoints.Kibana config.ElasticsearchUsername = project.Credentials.Username @@ -118,6 +119,13 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op return Config{}, fmt.Errorf("not all services are healthy: %w", err) } + if settings.LogstashEnabled { + err = project.AddLogstashFleetOutput(sp.profile, sp.kibanaClient) + if err != nil { + return Config{}, err + } + } + return config, nil } @@ -198,10 +206,11 @@ func (sp *serverlessProvider) createClients(project *serverless.Project) error { func getProjectSettings(options Options) (projectSettings, error) { s := projectSettings{ - Name: createProjectName(options), - Type: options.Profile.Config(configProjectType, defaultProjectType), - Region: options.Profile.Config(configRegion, defaultRegion), - StackVersion: options.StackVersion, + Name: createProjectName(options), + Type: options.Profile.Config(configProjectType, defaultProjectType), + Region: options.Profile.Config(configRegion, defaultRegion), + StackVersion: options.StackVersion, + LogstashEnabled: options.Profile.Config(configLogstashEnabled, "false") == "true", } return s, nil @@ -244,6 +253,7 @@ func (sp *serverlessProvider) BootUp(options Options) error { var project *serverless.Project + isNewProject := false project, err = sp.currentProject(config) switch err { default: @@ -260,11 +270,18 @@ func (sp *serverlessProvider) BootUp(options Options) error { return fmt.Errorf("failed to retrieve latest project created: %w", err) } + outputID := "" + if settings.LogstashEnabled { + outputID = serverless.FleetLogstashOutput + } + logger.Infof("Creating agent policy") - err = project.CreateAgentPolicy(options.StackVersion, sp.kibanaClient) + err = project.CreateAgentPolicy(options.StackVersion, sp.kibanaClient, outputID) + if err != nil { return fmt.Errorf("failed to create agent policy: %w", err) } + isNewProject = true // TODO: Ensuring a specific GeoIP database would make tests reproducible // Currently geo ip files would be ignored when running pipeline tests @@ -273,10 +290,19 @@ func (sp *serverlessProvider) BootUp(options Options) error { printUserConfig(options.Printer, config) } - logger.Infof("Starting local agent") - err = sp.startLocalAgent(options, config) + logger.Infof("Starting local services") + err = sp.startLocalServices(options, config) if err != nil { - return fmt.Errorf("failed to start local agent: %w", err) + return fmt.Errorf("failed to start local services: %w", err) + } + + // Updating the output with ssl certificates created in startLocalServices + // The certificates are updated only when a new project is created and logstash is enabled + if isNewProject && settings.LogstashEnabled { + err = project.UpdateLogstashFleetOutput(sp.profile, sp.kibanaClient) + if err != nil { + return err + } } return nil @@ -286,25 +312,25 @@ func (sp *serverlessProvider) composeProjectName() string { return DockerComposeProjectName(sp.profile) } -func (sp *serverlessProvider) localAgentComposeProject() (*compose.Project, error) { +func (sp *serverlessProvider) localServicesComposeProject() (*compose.Project, error) { composeFile := sp.profile.Path(profileStackPath, SnapshotFile) return compose.NewProject(sp.composeProjectName(), composeFile) } -func (sp *serverlessProvider) startLocalAgent(options Options, config Config) error { +func (sp *serverlessProvider) startLocalServices(options Options, config Config) error { err := applyServerlessResources(sp.profile, options.StackVersion, config) if err != nil { - return fmt.Errorf("could not initialize compose files for local agent: %w", err) + return fmt.Errorf("could not initialize compose files for local services: %w", err) } - project, err := sp.localAgentComposeProject() + project, err := sp.localServicesComposeProject() if err != nil { - return fmt.Errorf("could not initialize local agent compose project") + return fmt.Errorf("could not initialize local services compose project") } err = project.Build(compose.CommandOptions{}) if err != nil { - return fmt.Errorf("failed to build images for local agent: %w", err) + return fmt.Errorf("failed to build images for local services: %w", err) } err = project.Up(compose.CommandOptions{ExtraArgs: []string{"-d"}}) @@ -333,10 +359,10 @@ func (sp *serverlessProvider) TearDown(options Options) error { var errs error - err = sp.destroyLocalAgent() + err = sp.destroyLocalServices() if err != nil { - logger.Errorf("failed to destroy local agent: %v", err) - errs = fmt.Errorf("failed to destroy local agent: %w", err) + logger.Errorf("failed to destroy local services: %v", err) + errs = fmt.Errorf("failed to destroy local services: %w", err) } project, err := sp.currentProject(config) @@ -357,15 +383,15 @@ func (sp *serverlessProvider) TearDown(options Options) error { return errs } -func (sp *serverlessProvider) destroyLocalAgent() error { - project, err := sp.localAgentComposeProject() +func (sp *serverlessProvider) destroyLocalServices() error { + project, err := sp.localServicesComposeProject() if err != nil { - return fmt.Errorf("could not initialize local agent compose project") + return fmt.Errorf("could not initialize local services compose project") } err = project.Down(compose.CommandOptions{}) if err != nil { - return fmt.Errorf("failed to destroy local agent: %w", err) + return fmt.Errorf("failed to destroy local services: %w", err) } return nil diff --git a/internal/stack/serverlessresources.go b/internal/stack/serverlessresources.go index 76c1630d3d..8eb9b1896a 100644 --- a/internal/stack/serverlessresources.go +++ b/internal/stack/serverlessresources.go @@ -6,6 +6,8 @@ package stack import ( "fmt" + "net" + "net/url" "os" "path/filepath" "strings" @@ -20,12 +22,16 @@ var ( serverlessStackResources = []resource.Resource{ &resource.File{ Path: SnapshotFile, - Content: staticSource.Template("_static/serverless-elastic-agent.yml.tmpl"), + Content: staticSource.Template("_static/serverless-docker-compose.yml.tmpl"), }, &resource.File{ Path: ElasticAgentEnvFile, Content: staticSource.Template("_static/elastic-agent.env.tmpl"), }, + &resource.File{ + Path: LogstashConfigFile, + Content: staticSource.Template("_static/serverless-logstash.conf.tmpl"), + }, } ) @@ -39,12 +45,15 @@ func applyServerlessResources(profile *profile.Profile, stackVersion string, con resourceManager := resource.NewManager() resourceManager.AddFacter(resource.StaticFacter{ - "agent_version": stackVersion, - "agent_image": appConfig.StackImageRefs(stackVersion).ElasticAgent, - "username": config.ElasticsearchUsername, - "password": config.ElasticsearchPassword, - "kibana_host": config.KibanaHost, - "fleet_url": config.Parameters[paramServerlessFleetURL], + "agent_version": stackVersion, + "agent_image": appConfig.StackImageRefs(stackVersion).ElasticAgent, + "logstash_image": appConfig.StackImageRefs(stackVersion).Logstash, + "elasticsearch_host": esHostWithPort(config.ElasticsearchHost), + "username": config.ElasticsearchUsername, + "password": config.ElasticsearchPassword, + "kibana_host": config.KibanaHost, + "fleet_url": config.Parameters[paramServerlessFleetURL], + "logstash_enabled": profile.Config("stack.logstash_enabled", "false"), }) os.MkdirAll(stackDir, 0755) @@ -52,7 +61,19 @@ func applyServerlessResources(profile *profile.Profile, stackVersion string, con Prefix: stackDir, }) - results, err := resourceManager.Apply(serverlessStackResources) + resources := append([]resource.Resource{}, serverlessStackResources...) + + // Keeping certificates in the profile directory for backwards compatibility reasons. + resourceManager.RegisterProvider("certs", &resource.FileProvider{ + Prefix: profile.ProfilePath, + }) + certResources, err := initTLSCertificates("certs", profile.ProfilePath, tlsServicesServerless) + if err != nil { + return fmt.Errorf("failed to create TLS files: %w", err) + } + resources = append(resources, certResources...) + + results, err := resourceManager.Apply(resources) if err != nil { var errors []string for _, result := range results { @@ -65,3 +86,19 @@ func applyServerlessResources(profile *profile.Profile, stackVersion string, con return nil } + +// esHostWithPort checks if the es host has a port already added in the string , else adds 443 +// This is to mitigate a known issue in logstash - https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-serverless +func esHostWithPort(host string) string { + url, err := url.Parse(host) + if err != nil { + return host + } + + if url.Port() == "" { + url.Host = net.JoinHostPort(url.Hostname(), "443") + return url.String() + } + + return host +} diff --git a/internal/stack/serverlessresources_test.go b/internal/stack/serverlessresources_test.go new file mode 100644 index 0000000000..707ea97eb2 --- /dev/null +++ b/internal/stack/serverlessresources_test.go @@ -0,0 +1,33 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package stack + +import ( + "testing" +) + +func TestEsHostWithPort(t *testing.T) { + tests := []struct { + name string + host string + want string + }{ + {"host without port", "https://hostname", "https://hostname:443"}, + {"host with port", "https://hostname:443", "https://hostname:443"}, + {"host with differernt port", "https://hostname:9200", "https://hostname:9200"}, + {"ipv6 host", "http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "http://[2001:db8:1f70::999:de8:7648:6e8]:100/"}, + {"ipv6 host without port", "http://[2001:db8:1f70::999:de8:7648:6e8]", "http://[2001:db8:1f70::999:de8:7648:6e8]:443"}, + {"host with path", "https://hostname/xyz", "https://hostname:443/xyz"}, + {"ipv6 host with path", "https://[::1]/xyz", "https://[::1]:443/xyz"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := esHostWithPort(tt.host); got != tt.want { + t.Errorf("esHostWithPort() = %v, want %v", got, tt.want) + } + }) + } +}