diff --git a/cmd/testrunner.go b/cmd/testrunner.go index 34546716d..b0834f3d0 100644 --- a/cmd/testrunner.go +++ b/cmd/testrunner.go @@ -12,6 +12,7 @@ import ( "github.com/spf13/cobra" "github.com/elastic/elastic-package/internal/cobraext" + "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/testrunner" _ "github.com/elastic/elastic-package/internal/testrunner/runners" // register all test runners @@ -82,22 +83,29 @@ func testTypeCommandActionFactory(testType testrunner.TestType) cobraext.Command return errors.Wrap(err, "locating package root failed") } - testFolderPaths, err := testrunner.FindTestFolders(packageRootPath, testType, datasets) + testFolders, err := testrunner.FindTestFolders(packageRootPath, testType, datasets) if err != nil { return errors.Wrap(err, "unable to determine test folder paths") } - if failOnMissing && len(testFolderPaths) == 0 { + if failOnMissing && len(testFolders) == 0 { if len(datasets) > 0 { return fmt.Errorf("no %s tests found for %s dataset(s)", testType, strings.Join(datasets, ",")) } return fmt.Errorf("no %s tests found", testType) } - for _, path := range testFolderPaths { + esClient, err := elasticsearch.Client() + if err != nil { + return errors.Wrap(err, "fetching Elasticsearch client instance failed") + } + + for _, folder := range testFolders { if err := testrunner.Run(testType, testrunner.TestOptions{ - TestFolderPath: path, + TestFolder: folder, + PackageRootPath: packageRootPath, GenerateTestResult: generateTestResult, + ESClient: esClient, }); err != nil { return errors.Wrapf(err, "error running package %s tests", testType) } diff --git a/go.mod b/go.mod index b6b98d60e..8f7210162 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.14 require ( github.com/AlecAivazis/survey/v2 v2.1.0 github.com/Masterminds/semver v1.5.0 + github.com/aymerick/raymond v2.0.2+incompatible github.com/elastic/go-elasticsearch/v7 v7.9.0 github.com/elastic/package-spec/code/go v0.0.0-20200826192647-f45b05c887b5 github.com/go-git/go-billy/v5 v5.0.0 @@ -14,7 +15,10 @@ require ( github.com/magefile/mage v1.10.0 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.0.0 + golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de // indirect golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d + golang.org/x/sys v0.0.0-20200817155316-9781c653f443 // indirect + golang.org/x/text v0.3.3 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c ) diff --git a/go.sum b/go.sum index 708ff0068..66b547422 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/aymerick/raymond v2.0.2+incompatible h1:VEp3GpgdAnv9B2GFyTvqgcKvY+mfKMjPOA3SbKLtnU0= +github.com/aymerick/raymond v2.0.2+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -189,6 +191,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= +golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -206,7 +210,6 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200625001655-4c5254603344 h1:vGXIOMxbNfDTk/aXCmfdLgkrSV+Z2tcbze+pEc3v5W4= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -230,9 +233,13 @@ golang.org/x/sys v0.0.0-20190530182044-ad28b68e88f1/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200817155316-9781c653f443 h1:X18bCaipMcoJGm27Nv7zr4XYPKGUy92GtqboKC2Hxaw= +golang.org/x/sys v0.0.0-20200817155316-9781c653f443/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/internal/builder/dashboards.go b/internal/builder/dashboards.go index eb599b871..350618d48 100644 --- a/internal/builder/dashboards.go +++ b/internal/builder/dashboards.go @@ -10,6 +10,8 @@ import ( "path/filepath" "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/common" ) var fieldsToEncode = []string{ @@ -53,7 +55,7 @@ func encodeDashboards(destinationDir string) error { // The reason is that for versioning it is much nicer to have the full // json so only on packaging this is changed. func encodedSavedObject(data []byte) ([]byte, bool, error) { - savedObject := mapStr{} + savedObject := common.MapStr{} err := json.Unmarshal(data, &savedObject) if err != nil { return nil, false, errors.Wrapf(err, "unmarshalling saved object failed") @@ -61,7 +63,7 @@ func encodedSavedObject(data []byte) ([]byte, bool, error) { var changed bool for _, v := range fieldsToEncode { - out, err := savedObject.getValue(v) + out, err := savedObject.GetValue(v) // This means the key did not exists, no conversion needed. if err != nil { continue @@ -79,11 +81,11 @@ func encodedSavedObject(data []byte) ([]byte, bool, error) { if err != nil { return nil, false, err } - _, err = savedObject.put(v, string(r)) + _, err = savedObject.Put(v, string(r)) if err != nil { return nil, false, errors.Wrapf(err, "can't put value to the saved object") } changed = true } - return []byte(savedObject.stringToPrint()), changed, nil + return []byte(savedObject.StringToPrint()), changed, nil } diff --git a/internal/builder/mapstr.go b/internal/common/mapstr.go similarity index 79% rename from internal/builder/mapstr.go rename to internal/common/mapstr.go index 2a529b838..216edde64 100644 --- a/internal/builder/mapstr.go +++ b/internal/common/mapstr.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package builder +package common // WARNING: This code is copied from https://github.com/elastic/beats/blob/master/libbeat/common/mapstr.go // This was done to not have to import the full common package and all its dependencies @@ -21,13 +21,13 @@ var ( errKeyNotFound = errors.New("key not found") ) -// mapStr is a map[string]interface{} wrapper with utility methods for common +// MapStr is a map[string]interface{} wrapper with utility methods for common // map operations like converting to JSON. -type mapStr map[string]interface{} +type MapStr map[string]interface{} // GetValue gets a value from the map. If the key does not exist then an error // is returned. -func (m mapStr) getValue(key string) (interface{}, error) { +func (m MapStr) GetValue(key string) (interface{}, error) { _, _, v, found, err := mapFind(key, m, false) if err != nil { return nil, err @@ -45,7 +45,7 @@ func (m mapStr) getValue(key string) (interface{}, error) { // // If you need insert keys containing dots then you must use bracket notation // to insert values (e.g. m[key] = value). -func (m mapStr) put(key string, value interface{}) (interface{}, error) { +func (m MapStr) Put(key string, value interface{}) (interface{}, error) { // XXX `safemapstr.Put` mimics this implementation, both should be updated to have similar behavior k, d, old, _, err := mapFind(key, m, true) if err != nil { @@ -56,19 +56,19 @@ func (m mapStr) put(key string, value interface{}) (interface{}, error) { return old, nil } -// StringToPrint returns the mapStr as pretty JSON. -func (m mapStr) stringToPrint() string { - json, err := json.MarshalIndent(m, "", " ") +// StringToPrint returns the MapStr as pretty JSON. +func (m MapStr) StringToPrint() string { + j, err := json.MarshalIndent(m, "", " ") if err != nil { return fmt.Sprintf("Not valid json: %v", err) } - return string(json) + return string(j) } -// tomapStr performs a type assertion on v and returns a mapStr. v can be either -// a mapStr or a map[string]interface{}. If it's any other type or nil then +// tomapStr performs a type assertion on v and returns a MapStr. v can be either +// a MapStr or a map[string]interface{}. If it's any other type or nil then // an error is returned. -func toMapStr(v interface{}) (mapStr, error) { +func toMapStr(v interface{}) (MapStr, error) { m, ok := tryToMapStr(v) if !ok { return nil, errors.Errorf("expected map but type is %T", v) @@ -76,18 +76,18 @@ func toMapStr(v interface{}) (mapStr, error) { return m, nil } -func tryToMapStr(v interface{}) (mapStr, bool) { +func tryToMapStr(v interface{}) (MapStr, bool) { switch m := v.(type) { - case mapStr: + case MapStr: return m, true case map[string]interface{}: - return mapStr(m), true + return MapStr(m), true default: return nil, false } } -// mapFind iterates a mapStr based on a the given dotted key, finding the final +// mapFind iterates a MapStr based on a the given dotted key, finding the final // subMap and subKey to operate on. // An error is returned if some intermediate is no map or the key doesn't exist. // If createMissing is set to true, intermediate maps are created. @@ -97,9 +97,9 @@ func tryToMapStr(v interface{}) (mapStr, bool) { // the original value. func mapFind( key string, - data mapStr, + data MapStr, createMissing bool, -) (subKey string, subMap mapStr, oldValue interface{}, present bool, err error) { +) (subKey string, subMap MapStr, oldValue interface{}, present bool, err error) { // XXX `safemapstr.mapFind` mimics this implementation, both should be updated to have similar behavior for { @@ -117,7 +117,7 @@ func mapFind( d, exists := data[k] if !exists { if createMissing { - d = mapStr{} + d = MapStr{} data[k] = d } else { return "", nil, nil, false, errKeyNotFound diff --git a/internal/compose/compose.go b/internal/compose/compose.go index 8a85f8c24..324db9ac9 100644 --- a/internal/compose/compose.go +++ b/internal/compose/compose.go @@ -5,11 +5,18 @@ package compose import ( + "bytes" "fmt" + "io" "os" "os/exec" + "strconv" + "strings" "github.com/pkg/errors" + "gopkg.in/yaml.v3" + + "github.com/elastic/elastic-package/internal/logger" ) // Project represents a Docker Compose project. @@ -18,6 +25,68 @@ type Project struct { composeFilePaths []string } +// Config represents a Docker Compose configuration file. +type Config struct { + Services map[string]service +} +type service struct { + Ports []portMapping +} + +type portMapping struct { + ExternalIP string + ExternalPort int + InternalPort int + Protocol string +} + +// UnmarshalYAML unmarshals a Docker Compose port mapping in YAML to +// a portMapping. +func (p *portMapping) UnmarshalYAML(node *yaml.Node) error { + var str string + if err := node.Decode(&str); err != nil { + return err + } + + // First, parse out the protocol. + parts := strings.Split(str, "/") + p.Protocol = parts[1] + + // Now, try to parse out external host, external IP, and internal port. + parts = strings.Split(parts[0], ":") + var externalIP, internalPortStr, externalPortStr string + switch len(parts) { + case 1: + // All we have is an internal port. + internalPortStr = parts[0] + case 3: + // We have an external IP, external port, and an internal port. + externalIP = parts[0] + externalPortStr = parts[1] + internalPortStr = parts[2] + default: + return errors.New("could not parse port mapping") + } + + internalPort, err := strconv.Atoi(internalPortStr) + if err != nil { + return errors.Wrap(err, "error parsing internal port as integer") + } + p.InternalPort = internalPort + + if externalPortStr != "" { + externalPort, err := strconv.Atoi(externalPortStr) + if err != nil { + return errors.Wrap(err, "error parsing external port as integer") + } + p.ExternalPort = externalPort + } + + p.ExternalIP = externalIP + + return nil +} + // CommandOptions encapsulates the environment variables, extra arguments, and Docker Compose services // that can be passed to each Docker Compose command. type CommandOptions struct { @@ -54,7 +123,7 @@ func (p *Project) Up(opts CommandOptions) error { args = append(args, opts.ExtraArgs...) args = append(args, opts.Services...) - if err := runDockerComposeCmd(args, opts.Env); err != nil { + if err := p.runDockerComposeCmd(dockerComposeOptions{args: args, env: opts.Env}); err != nil { return errors.Wrap(err, "running Docker Compose up command failed") } @@ -67,7 +136,7 @@ func (p *Project) Down(opts CommandOptions) error { args = append(args, "down") args = append(args, opts.ExtraArgs...) - if err := runDockerComposeCmd(args, opts.Env); err != nil { + if err := p.runDockerComposeCmd(dockerComposeOptions{args: args, env: opts.Env}); err != nil { return errors.Wrap(err, "running Docker Compose down command failed") } @@ -81,13 +150,33 @@ func (p *Project) Build(opts CommandOptions) error { args = append(args, opts.ExtraArgs...) args = append(args, opts.Services...) - if err := runDockerComposeCmd(args, opts.Env); err != nil { + if err := p.runDockerComposeCmd(dockerComposeOptions{args: args, env: opts.Env}); err != nil { return errors.Wrap(err, "running Docker Compose build command failed") } return nil } +// Config returns the combined configuration for a Docker Compose project. +func (p *Project) Config(opts CommandOptions) (*Config, error) { + args := p.baseArgs() + args = append(args, "config") + args = append(args, opts.ExtraArgs...) + args = append(args, opts.Services...) + + var b bytes.Buffer + if err := p.runDockerComposeCmd(dockerComposeOptions{args: args, env: opts.Env, stdout: &b}); err != nil { + return nil, err + } + + var config Config + if err := yaml.Unmarshal(b.Bytes(), &config); err != nil { + return nil, err + } + + return &config, nil +} + // Pull pulls down images for a Docker Compose project. func (p *Project) Pull(opts CommandOptions) error { args := p.baseArgs() @@ -95,7 +184,7 @@ func (p *Project) Pull(opts CommandOptions) error { args = append(args, opts.ExtraArgs...) args = append(args, opts.Services...) - if err := runDockerComposeCmd(args, opts.Env); err != nil { + if err := p.runDockerComposeCmd(dockerComposeOptions{args: args, env: opts.Env}); err != nil { return errors.Wrap(err, "running Docker Compose pull command failed") } @@ -112,11 +201,20 @@ func (p *Project) baseArgs() []string { return args } -func runDockerComposeCmd(args, env []string) error { - cmd := exec.Command("docker-compose", args...) - cmd.Env = append(os.Environ(), env...) - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stdout +type dockerComposeOptions struct { + args []string + env []string + stdout io.Writer +} + +func (p *Project) runDockerComposeCmd(opts dockerComposeOptions) error { + cmd := exec.Command("docker-compose", opts.args...) + cmd.Env = append(os.Environ(), opts.env...) + if opts.stdout != nil { + cmd.Stdout = opts.stdout + } + + logger.Debugf("running command: %s", cmd) return cmd.Run() } diff --git a/internal/install/install.go b/internal/install/install.go index 2377de301..7c41e8e32 100644 --- a/internal/install/install.go +++ b/internal/install/install.go @@ -17,6 +17,7 @@ const ( elasticPackageDir = ".elastic-package" stackDir = "stack" packagesDir = "development" + serviceLogsDir = "tmp/service_logs" ) const versionFilename = "version" @@ -48,6 +49,10 @@ func EnsureInstalled() error { return errors.Wrap(err, "writing static resources failed") } + if err := createServiceLogsDir(elasticPackagePath); err != nil { + return errors.Wrap(err, "creating temp dir failed") + } + fmt.Fprintln(os.Stderr, "elastic-package has been installed.") return nil } @@ -70,6 +75,16 @@ func StackPackagesDir() (string, error) { return filepath.Join(stackDir, packagesDir), nil } +// ServiceLogsDir method returns the location of the directory to store service logs on the +// local filesystem, i.e. the same one where elastic-package is installed. +func ServiceLogsDir() (string, error) { + configurationDir, err := configurationDir() + if err != nil { + return "", errors.Wrap(err, "locating configuration directory failed") + } + return filepath.Join(configurationDir, serviceLogsDir), nil +} + func configurationDir() (string, error) { homeDir, err := os.UserHomeDir() if err != nil { @@ -131,3 +146,13 @@ func writeStaticResource(err error, path, content string) error { } return nil } + +func createServiceLogsDir(elasticPackagePath string) error { + dirPath := filepath.Join(elasticPackagePath, serviceLogsDir) + err := os.MkdirAll(dirPath, 0755) + if err != nil { + return errors.Wrapf(err, "creating service logs directory failed (path: %s)", dirPath) + } + + return nil +} diff --git a/internal/install/static_kibana_config_yml.go b/internal/install/static_kibana_config_yml.go index de2bab386..3c22de1f3 100644 --- a/internal/install/static_kibana_config_yml.go +++ b/internal/install/static_kibana_config_yml.go @@ -16,7 +16,7 @@ xpack.monitoring.ui.container.elasticsearch.enabled: true xpack.ingestManager.enabled: true xpack.ingestManager.registryUrl: "http://package-registry:8080" xpack.ingestManager.fleet.enabled: true -xpack.ingestManager.fleet.elasticsearch.host: "http://localhost:9200" +xpack.ingestManager.fleet.elasticsearch.host: "http://elasticsearch:9200" xpack.ingestManager.fleet.kibana.host: "http://localhost:5601" xpack.ingestManager.fleet.tlsCheckDisabled: true diff --git a/internal/install/static_snapshot_yml.go b/internal/install/static_snapshot_yml.go index e30be702a..6ceda19b0 100644 --- a/internal/install/static_snapshot_yml.go +++ b/internal/install/static_snapshot_yml.go @@ -69,4 +69,31 @@ services: depends_on: package-registry: condition: service_healthy + + elastic-agent: + image: docker.elastic.co/beats/elastic-agent:${STACK_VERSION} + depends_on: + elasticsearch: + condition: service_healthy + kibana: + condition: service_healthy + healthcheck: + test: "sh -c 'grep \"Agent is starting\" /usr/share/elastic-agent/elastic-agent.log*'" + retries: 30 + interval: 1s + environment: + - "FLEET_ENROLL=1" + - "FLEET_ENROLL_INSECURE=1" + - "FLEET_SETUP=1" + - "KIBANA_HOST=http://kibana:5601" + volumes: + - type: bind + source: ../tmp/service_logs/ + target: /tmp/service_logs/ + + elastic-agent_is_ready: + image: tianon/true + depends_on: + elastic-agent: + condition: service_healthy ` diff --git a/internal/kibana/ingestmanager/client.go b/internal/kibana/ingestmanager/client.go new file mode 100644 index 000000000..85830914e --- /dev/null +++ b/internal/kibana/ingestmanager/client.go @@ -0,0 +1,98 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ingestmanager + +import ( + "bytes" + "io/ioutil" + "net/http" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/stack" +) + +// Client represents an Ingest Manager API client. +type Client struct { + apiBaseURL string + + username string + password string +} + +// NewClient returns a new Ingest Manager API client. +func NewClient(baseURL, username, password string) (*Client, error) { + return &Client{ + baseURL + "/api/ingest_manager", + username, + password, + }, nil +} + +func (c *Client) get(resourcePath string) (int, []byte, error) { + url := c.apiBaseURL + "/" + resourcePath + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return 0, nil, errors.Wrapf(err, "could not create GET request to Ingest Manager resource: %s", resourcePath) + } + + req.SetBasicAuth(c.username, c.password) + + _, statusCode, respBody, err := sendRequest(req) + if err != nil { + return statusCode, respBody, errors.Wrapf(err, "error sending POST request to Ingest Manager resource: %s", resourcePath) + } + + return statusCode, respBody, nil +} + +func (c *Client) post(resourcePath string, body []byte) (int, []byte, error) { + return c.putOrPost(http.MethodPost, resourcePath, body) +} + +func (c *Client) put(resourcePath string, body []byte) (int, []byte, error) { + return c.putOrPost(http.MethodPut, resourcePath, body) +} + +func (c *Client) putOrPost(method, resourcePath string, body []byte) (int, []byte, error) { + reqBody := bytes.NewReader(body) + url := c.apiBaseURL + "/" + resourcePath + + logger.Debugf("%s %s", method, url) + logger.Debugf("%s", body) + + req, err := http.NewRequest(method, url, reqBody) + if err != nil { + return 0, nil, errors.Wrapf(err, "could not create POST request to Ingest Manager resource: %s", resourcePath) + } + + req.SetBasicAuth(c.username, c.password) + req.Header.Add("content-type", "application/json") + req.Header.Add("kbn-xsrf", stack.DefaultVersion) + + _, statusCode, respBody, err := sendRequest(req) + if err != nil { + return statusCode, respBody, errors.Wrapf(err, "error sending POST request to Ingest Manager resource: %s", resourcePath) + } + + return statusCode, respBody, nil +} + +func sendRequest(req *http.Request) (*http.Response, int, []byte, error) { + client := http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, 0, nil, errors.Wrap(err, "could not send request to Kibana API") + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return resp, resp.StatusCode, nil, errors.Wrap(err, "could not read response body") + } + + return resp, resp.StatusCode, body, nil +} diff --git a/internal/kibana/ingestmanager/client_agents.go b/internal/kibana/ingestmanager/client_agents.go new file mode 100644 index 000000000..162b5fb70 --- /dev/null +++ b/internal/kibana/ingestmanager/client_agents.go @@ -0,0 +1,59 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ingestmanager + +import ( + "encoding/json" + "fmt" + + "github.com/pkg/errors" +) + +// Agent represents an Elastic Agent enrolled with fleet. +type Agent struct { + ID string `json:"id"` + PolicyID string `json:"policy_id"` +} + +// ListAgents returns the list of agents enrolled with Fleet. +func (c *Client) ListAgents() ([]Agent, error) { + statusCode, respBody, err := c.get("fleet/agents") + if err != nil { + return nil, errors.Wrap(err, "could not list agents") + } + + if statusCode != 200 { + return nil, fmt.Errorf("could not list agents; API status code = %d", statusCode) + } + + var resp struct { + List []Agent `json:"list"` + } + + if err := json.Unmarshal(respBody, &resp); err != nil { + return nil, errors.Wrap(err, "could not convert list agents (response) to JSON") + } + + return resp.List, nil + +} + +// AssignPolicyToAgent assigns the given Policy to the given Agent. +func (c *Client) AssignPolicyToAgent(a Agent, p Policy) error { + reqBody := `{ "policy_id": "` + p.ID + `" }` + + path := fmt.Sprintf("fleet/agents/%s/reassign", a.ID) + statusCode, respBody, err := c.put(path, []byte(reqBody)) + if err != nil { + return errors.Wrap(err, "could not assign policy to agent") + } + + if statusCode != 200 { + return fmt.Errorf("could not assign policy to agent; API status code = %d; response body = %s", statusCode, respBody) + } + + return nil + +} diff --git a/internal/kibana/ingestmanager/client_policies.go b/internal/kibana/ingestmanager/client_policies.go new file mode 100644 index 000000000..7c8fd6119 --- /dev/null +++ b/internal/kibana/ingestmanager/client_policies.go @@ -0,0 +1,135 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ingestmanager + +import ( + "encoding/json" + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/packages" +) + +// Policy represents an Ingest Manager policy. +type Policy struct { + ID string `json:"id,omitempty"` + Name string `json:"name"` + Description string `json:"description"` + Namespace string `json:"namespace"` +} + +// CreatePolicy persists the given Policy in the Ingest Manager. +func (c *Client) CreatePolicy(p Policy) (*Policy, error) { + reqBody, err := json.Marshal(p) + if err != nil { + return nil, errors.Wrap(err, "could not convert policy (request) to JSON") + } + + statusCode, respBody, err := c.post("agent_policies", reqBody) + if err != nil { + return nil, errors.Wrap(err, "could not create policy") + } + + if statusCode != 200 { + return nil, fmt.Errorf("could not create policy; API status code = %d; response body = %s", statusCode, respBody) + } + + var resp struct { + Item Policy `json:"item"` + } + + if err := json.Unmarshal(respBody, &resp); err != nil { + return nil, errors.Wrap(err, "could not convert policy (response) to JSON") + } + + return &resp.Item, nil +} + +// DeletePolicy removes the given Policy from the Ingest Manager. +func (c *Client) DeletePolicy(p Policy) error { + reqBody := `{ "agentPolicyId": "` + p.ID + `" }` + + statusCode, respBody, err := c.post("agent_policies/delete", []byte(reqBody)) + if err != nil { + return errors.Wrap(err, "could not delete policy") + } + + if statusCode != 200 { + return fmt.Errorf("could not delete policy; API status code = %d; response body = %s", statusCode, respBody) + } + + return nil +} + +// Var represents a single variable at the package or +// data stream level, encapsulating the data type of the +// variable and it's value. +type Var struct { + Value packages.VarValue `json:"value"` + Type string `json:"type"` +} + +// Vars is a collection of variables either at the package or +// data stream level. +type Vars map[string]Var + +// DataStream represents a data stream within a package. +type DataStream struct { + Type string `json:"type"` + Dataset string `json:"dataset"` +} + +// Stream encapsulates a data stream and it's variables. +type Stream struct { + ID string `json:"id"` + Enabled bool `json:"enabled"` + DataStream DataStream `json:"data_stream"` + Vars Vars `json:"vars"` +} + +// Input represents a package-level input. +type Input struct { + Type string `json:"type"` + Enabled bool `json:"enabled"` + Streams []Stream `json:"streams"` + Vars Vars `json:"vars"` +} + +// PackageDataStream represents a request to add a single package's single data stream to a +// Policy in Ingest Manager. +type PackageDataStream struct { + Name string `json:"name"` + Description string `json:"description"` + Namespace string `json:"namespace"` + PolicyID string `json:"policy_id"` + Enabled bool `json:"enabled"` + OutputID string `json:"output_id"` + Inputs []Input `json:"inputs"` + Package struct { + Name string `json:"name"` + Title string `json:"title"` + Version string `json:"version"` + } `json:"package"` +} + +// AddPackageDataStreamToPolicy adds a PackageDataStream to a Policy in Ingest Manager. +func (c *Client) AddPackageDataStreamToPolicy(r PackageDataStream) error { + reqBody, err := json.Marshal(r) + if err != nil { + return errors.Wrap(err, "could not convert policy-package (request) to JSON") + } + + statusCode, respBody, err := c.post("package_policies", reqBody) + if err != nil { + return errors.Wrap(err, "could not add package to policy") + } + + if statusCode != 200 { + return fmt.Errorf("could not add package to policy; API status code = %d; response body = %s", statusCode, respBody) + } + + return nil +} diff --git a/internal/logger/logger.go b/internal/logger/logger.go index deabb158c..f821b713b 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -54,6 +54,16 @@ func Warnf(format string, a ...interface{}) { logMessagef("WARN", format, a...) } +// Error method logs message with "error" level. +func Error(a ...interface{}) { + logMessage("ERROR", a...) +} + +// Errorf method logs message with "error" level and formats it. +func Errorf(format string, a ...interface{}) { + logMessagef("ERROR", format, a...) +} + func logMessage(level string, a ...interface{}) { var all []interface{} all = append(all, fmt.Sprintf("%5s ", level)) diff --git a/internal/packages/packages.go b/internal/packages/packages.go index 6087d5ac8..f9a4d8706 100644 --- a/internal/packages/packages.go +++ b/internal/packages/packages.go @@ -5,6 +5,7 @@ package packages import ( + "encoding/json" "io/ioutil" "os" "path/filepath" @@ -21,20 +22,77 @@ const ( DatasetManifestFile = "manifest.yml" ) +// VarValue represents a variable value as defined in a package or dataset +// manifest file. +type VarValue struct { + scalar string + list []string +} + +// UnmarshalYAML knows how to parse a variable value from a package or dataset +// manifest file into a VarValue. +func (vv *VarValue) UnmarshalYAML(value *yaml.Node) error { + switch value.Kind { + case yaml.ScalarNode: + vv.scalar = value.Value + case yaml.SequenceNode: + vv.list = make([]string, len(value.Content)) + for idx, content := range value.Content { + vv.list[idx] = content.Value + } + default: + return errors.New("unknown variable value") + } + return nil +} + +// MarshalJSON knows how to serialize a VarValue into the appropriate +// JSON data type and value. +func (vv VarValue) MarshalJSON() ([]byte, error) { + if vv.scalar != "" { + return json.Marshal(vv.scalar) + } else if vv.list != nil { + return json.Marshal(vv.list) + } + return nil, nil +} + +type variable struct { + Name string `json:"name"` + Type string `json:"type"` + Default VarValue `json:"default"` +} + +type input struct { + Type string `json:"type"` + Vars []variable `json:"vars"` +} + +type configTemplate struct { + Inputs []input `json:"inputs"` +} + // PackageManifest represents the basic structure of a package's manifest type PackageManifest struct { - Name string `json:"name"` - Type string `json:"type"` - Version string `json:"version"` + Name string `json:"name"` + Title string `json:"title"` + Type string `json:"type"` + Version string `json:"version"` + ConfigTemplates []configTemplate `json:"config_templates" yaml:"config_templates"` } // DatasetManifest represents the structure of a dataset's manifest type DatasetManifest struct { + Name string `json:"name"` Title string `json:"title"` Type string `json:"type"` Elasticsearch *struct { IngestPipelineName string `json:"ingest_pipeline.name"` } `json:"elasticsearch"` + Streams []struct { + Input string `json:"input"` + Vars []variable `json:"vars"` + } `json:"streams"` } // FindPackageRoot finds and returns the path to the root folder of a package. @@ -117,9 +175,20 @@ func ReadDatasetManifest(path string) (*DatasetManifest, error) { if err != nil { return nil, errors.Wrapf(err, "unmarshalling dataset manifest failed (path: %s)", path) } + + m.Name = filepath.Base(filepath.Dir(path)) return &m, nil } +func (ct *configTemplate) FindInputByType(inputType string) *input { + for _, input := range ct.Inputs { + if input.Type == inputType { + return &input + } + } + return nil +} + func isPackageManifest(path string) (bool, error) { m, err := ReadPackageManifest(path) if err != nil { diff --git a/internal/stack/boot.go b/internal/stack/boot.go index c8e6c26c8..0ebb2cff8 100644 --- a/internal/stack/boot.go +++ b/internal/stack/boot.go @@ -24,7 +24,9 @@ type BootOptions struct { Services []string } -const dockerComposeProjectName = "elastic-package-stack" +// DockerComposeProjectName is the name of the Docker Compose project used to boot up +// Elastic Stack containers. +const DockerComposeProjectName = "elastic-package-stack" // BootUp method boots up the testing stack. func BootUp(options BootOptions) error { @@ -92,7 +94,7 @@ func dockerComposeBuild(options BootOptions) error { return errors.Wrap(err, "locating stack directory failed") } - c, err := compose.NewProject(dockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) + c, err := compose.NewProject(DockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) if err != nil { return errors.Wrap(err, "could not create docker compose project") } @@ -114,7 +116,7 @@ func dockerComposePull(options BootOptions) error { return errors.Wrap(err, "locating stack directory failed") } - c, err := compose.NewProject(dockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) + c, err := compose.NewProject(DockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) if err != nil { return errors.Wrap(err, "could not create docker compose project") } @@ -136,7 +138,7 @@ func dockerComposeUp(options BootOptions) error { return errors.Wrap(err, "locating stack directory failed") } - c, err := compose.NewProject(dockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) + c, err := compose.NewProject(DockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) if err != nil { return errors.Wrap(err, "could not create docker compose project") } @@ -164,7 +166,7 @@ func dockerComposeDown() error { return errors.Wrap(err, "locating stack directory failed") } - c, err := compose.NewProject(dockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) + c, err := compose.NewProject(DockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) if err != nil { return errors.Wrap(err, "could not create docker compose project") } @@ -183,8 +185,8 @@ func dockerComposeDown() error { func withDependentServices(services []string) []string { for _, aService := range services { - if aService == "kibana" { - return []string{} // kibana service requires to load all other services + if aService == "elastic-agent" { + return []string{} // elastic-agent service requires to load all other services } } return services diff --git a/internal/stack/shellinit.go b/internal/stack/shellinit.go index 19df8bab0..8046704cb 100644 --- a/internal/stack/shellinit.go +++ b/internal/stack/shellinit.go @@ -10,8 +10,9 @@ import ( "path/filepath" "github.com/pkg/errors" - yaml "gopkg.in/yaml.v3" + "gopkg.in/yaml.v3" + "github.com/elastic/elastic-package/internal/compose" "github.com/elastic/elastic-package/internal/install" ) @@ -31,10 +32,8 @@ var shellInitFormat = "export " + ElasticsearchHostEnv + "=%s\nexport " + Elasti ElasticsearchPasswordEnv + "=%s\nexport " + KibanaHostEnv + "=%s" type kibanaConfiguration struct { - ElasticsearchHost string `yaml:"xpack.ingestManager.fleet.elasticsearch.host"` ElasticsearchUsername string `yaml:"elasticsearch.username"` ElasticsearchPassword string `yaml:"elasticsearch.password"` - KibanaHost string `yaml:"xpack.ingestManager.fleet.kibana.host"` } // ShellInit method exposes environment variables that can be used for testing purposes. @@ -44,6 +43,7 @@ func ShellInit() (string, error) { return "", errors.Wrap(err, "locating stack directory failed") } + // Read Elasticsearch username and password from Kibana configuration file. kibanaConfigurationPath := filepath.Join(stackDir, "kibana.config.yml") body, err := ioutil.ReadFile(kibanaConfigurationPath) if err != nil { @@ -55,9 +55,27 @@ func ShellInit() (string, error) { if err != nil { return "", errors.Wrap(err, "unmarshalling Kibana configuration failed") } + + // Read Elasticsearch and Kibana hostnames from Elastic Stack Docker Compose configuration file. + p, err := compose.NewProject(DockerComposeProjectName, filepath.Join(stackDir, "snapshot.yml")) + if err != nil { + return "", errors.Wrap(err, "could not create docker compose project") + } + + serviceComposeConfig, err := p.Config(compose.CommandOptions{}) + if err != nil { + return "", errors.Wrap(err, "could not get Docker Compose configuration for service") + } + + kib := serviceComposeConfig.Services["kibana"] + kibHostPort := fmt.Sprintf("http://%s:%d", kib.Ports[0].ExternalIP, kib.Ports[0].ExternalPort) + + es := serviceComposeConfig.Services["elasticsearch"] + esHostPort := fmt.Sprintf("http://%s:%d", es.Ports[0].ExternalIP, es.Ports[0].ExternalPort) + return fmt.Sprintf(shellInitFormat, - kibanaCfg.ElasticsearchHost, + esHostPort, kibanaCfg.ElasticsearchUsername, kibanaCfg.ElasticsearchPassword, - kibanaCfg.KibanaHost), nil + kibHostPort), nil } diff --git a/internal/stack/version.go b/internal/stack/version.go index 6bac6bc20..0c58e63b6 100644 --- a/internal/stack/version.go +++ b/internal/stack/version.go @@ -6,5 +6,5 @@ package stack const ( // DefaultVersion is the default version of the stack - DefaultVersion = "7.9.0" + DefaultVersion = "7.10.0-SNAPSHOT" ) diff --git a/internal/testrunner/runners/pipeline/runner.go b/internal/testrunner/runners/pipeline/runner.go index 912e50177..f51cf6543 100644 --- a/internal/testrunner/runners/pipeline/runner.go +++ b/internal/testrunner/runners/pipeline/runner.go @@ -6,14 +6,13 @@ package pipeline import ( "fmt" - "github.com/elastic/elastic-package/internal/logger" "io/ioutil" "path/filepath" "strings" "github.com/pkg/errors" - "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/testrunner" ) @@ -39,7 +38,7 @@ func (r *runner) run() error { return errors.Wrap(err, "listing test case definitions failed") } - datasetPath, found, err := packages.FindDatasetRootForPath(r.options.TestFolderPath) + datasetPath, found, err := packages.FindDatasetRootForPath(r.options.TestFolder.Path) if err != nil { return errors.Wrap(err, "locating dataset root failed") } @@ -47,17 +46,12 @@ func (r *runner) run() error { return errors.New("dataset root not found") } - esClient, err := elasticsearch.Client() - if err != nil { - return errors.Wrap(err, "fetching Elasticsearch client instance failed") - } - - entryPipeline, pipelineIDs, err := installIngestPipelines(esClient, datasetPath) + entryPipeline, pipelineIDs, err := installIngestPipelines(r.options.ESClient, datasetPath) if err != nil { return errors.Wrap(err, "installing ingest pipelines failed") } defer func() { - err := uninstallIngestPipelines(esClient, pipelineIDs) + err := uninstallIngestPipelines(r.options.ESClient, pipelineIDs) if err != nil { logger.Warnf("uninstalling ingest pipelines failed: %v", err) } @@ -71,7 +65,7 @@ func (r *runner) run() error { } fmt.Printf("Test case: %s\n", tc.name) - result, err := simulatePipelineProcessing(esClient, entryPipeline, tc) + result, err := simulatePipelineProcessing(r.options.ESClient, entryPipeline, tc) if err != nil { return errors.Wrap(err, "simulating pipeline processing failed") } @@ -93,9 +87,9 @@ func (r *runner) run() error { } func (r *runner) listTestCaseFiles() ([]string, error) { - fis, err := ioutil.ReadDir(r.options.TestFolderPath) + fis, err := ioutil.ReadDir(r.options.TestFolder.Path) if err != nil { - return nil, errors.Wrapf(err, "reading pipeline tests failed (path: %s)", r.options.TestFolderPath) + return nil, errors.Wrapf(err, "reading pipeline tests failed (path: %s)", r.options.TestFolder.Path) } var files []string @@ -109,7 +103,7 @@ func (r *runner) listTestCaseFiles() ([]string, error) { } func (r *runner) loadTestCaseFile(testCaseFile string) (*testCase, error) { - testCasePath := filepath.Join(r.options.TestFolderPath, testCaseFile) + testCasePath := filepath.Join(r.options.TestFolder.Path, testCaseFile) testCaseData, err := ioutil.ReadFile(testCasePath) if err != nil { return nil, errors.Wrapf(err, "reading input file failed (testCasePath: %s)", testCasePath) @@ -139,7 +133,7 @@ func (r *runner) loadTestCaseFile(testCaseFile string) (*testCase, error) { } func (r *runner) verifyResults(testCaseFile string, result *testResult) error { - testCasePath := filepath.Join(r.options.TestFolderPath, testCaseFile) + testCasePath := filepath.Join(r.options.TestFolder.Path, testCaseFile) if r.options.GenerateTestResult { err := writeTestResult(testCasePath, result) diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go new file mode 100644 index 000000000..e1e6457cb --- /dev/null +++ b/internal/testrunner/runners/system/runner.go @@ -0,0 +1,366 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package system + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + es "github.com/elastic/go-elasticsearch/v7" + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/install" + "github.com/elastic/elastic-package/internal/kibana/ingestmanager" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/packages" + "github.com/elastic/elastic-package/internal/testrunner" + "github.com/elastic/elastic-package/internal/testrunner/runners/system/servicedeployer" +) + +func init() { + testrunner.RegisterRunner(TestType, Run) +} + +const ( + // TestType defining system tests + TestType testrunner.TestType = "system" +) + +type runner struct { + testFolder testrunner.TestFolder + packageRootPath string + stackSettings stackSettings + esClient *es.Client +} + +type stackSettings struct { + elasticsearch struct { + host string + username string + password string + } + kibana struct { + host string + } +} + +// Run runs the system tests defined under the given folder +func Run(options testrunner.TestOptions) error { + r := runner{ + options.TestFolder, + options.PackageRootPath, + getStackSettingsFromEnv(), + options.ESClient, + } + return r.run() +} + +func (r *runner) run() error { + pkgManifest, err := packages.ReadPackageManifest(filepath.Join(r.packageRootPath, packages.PackageManifestFile)) + if err != nil { + return errors.Wrap(err, "reading package manifest failed") + } + + datasetPath, found, err := packages.FindDatasetRootForPath(r.testFolder.Path) + if err != nil { + return errors.Wrap(err, "locating dataset root failed") + } + if !found { + return errors.New("dataset root not found") + } + + datasetManifest, err := packages.ReadDatasetManifest(filepath.Join(datasetPath, packages.DatasetManifestFile)) + if err != nil { + return errors.Wrap(err, "reading dataset manifest failed") + } + + // Step 1. Setup service. + // Step 1a. (Deferred) Tear down service. + logger.Info("setting up service...") + serviceDeployer, err := servicedeployer.Factory(r.packageRootPath) + if err != nil { + return errors.Wrap(err, "could not create service runner") + } + + tempDir, err := install.ServiceLogsDir() + if err != nil { + return errors.Wrap(err, "could not get temporary folder") + } + + ctxt := servicedeployer.ServiceContext{ + Name: r.testFolder.Package, + } + ctxt.Logs.Folder.Local = tempDir + ctxt.Logs.Folder.Agent = "/tmp/service_logs/" + + service, err := serviceDeployer.SetUp(ctxt) + if err != nil { + return errors.Wrap(err, "could not setup service") + } + ctxt = service.Context() + defer func() { + logger.Info("tearing down service...") + if err := service.TearDown(); err != nil { + logger.Errorf("error tearing down service: %s", err) + } + }() + + // Step 2. Configure package (single data stream) via Ingest Manager APIs. + im, err := ingestmanager.NewClient(r.stackSettings.kibana.host, r.stackSettings.elasticsearch.username, r.stackSettings.elasticsearch.password) + if err != nil { + return errors.Wrap(err, "could not create ingest manager client") + } + + logger.Info("creating test policy...") + testTime := time.Now().Format("20060102T15:04:05Z") + p := ingestmanager.Policy{ + Name: fmt.Sprintf("ep-test-system-%s-%s-%s", r.testFolder.Package, r.testFolder.Dataset, testTime), + Description: fmt.Sprintf("test policy created by elastic-package test system for data stream %s/%s", r.testFolder.Package, r.testFolder.Dataset), + Namespace: "ep", + } + policy, err := im.CreatePolicy(p) + if err != nil { + return errors.Wrap(err, "could not create test policy") + } + defer func() { + logger.Debug("deleting test policy...") + if err := im.DeletePolicy(*policy); err != nil { + logger.Errorf("error cleaning up test policy: %s", err) + } + }() + + testConfig, err := newConfig(r.testFolder.Path, ctxt) + if err != nil { + return errors.Wrap(err, "unable to load system test configuration") + } + + logger.Info("adding package datastream to test policy...") + ds := createPackageDatastream(*policy, *pkgManifest, *datasetManifest, *testConfig) + if err := im.AddPackageDataStreamToPolicy(ds); err != nil { + return errors.Wrap(err, "could not add dataset Config to policy") + } + + // Get enrolled agent ID + agents, err := im.ListAgents() + if err != nil { + return errors.Wrap(err, "could not list agents") + } + if agents == nil || len(agents) == 0 { + return errors.New("no agents found") + } + agent := agents[0] + origPolicy := ingestmanager.Policy{ + ID: agent.PolicyID, + } + + // Delete old data + dataStream := fmt.Sprintf( + "%s-%s-%s", + ds.Inputs[0].Streams[0].DataStream.Type, + ds.Inputs[0].Streams[0].DataStream.Dataset, + ds.Namespace, + ) + + logger.Info("deleting old data in data stream...") + if err := deleteDataStreamDocs(r.esClient, dataStream); err != nil { + return errors.Wrapf(err, "error deleting old data in data stream: %s", dataStream) + } + + // Assign policy to agent + logger.Info("assigning package datastream to agent...") + if err := im.AssignPolicyToAgent(agent, *policy); err != nil { + return errors.Wrap(err, "could not assign policy to agent") + } + defer func() { + logger.Debug("reassigning original policy back to agent...") + if err := im.AssignPolicyToAgent(agent, origPolicy); err != nil { + logger.Errorf("error reassigning original policy to agent: %s", err) + } + }() + + // Step 4. (TODO in future) Optionally exercise service to generate load. + + logger.Info("checking for expected data in data stream...") + passed, err := waitUntilTrue(func() (bool, error) { + resp, err := r.esClient.Search( + r.esClient.Search.WithIndex(dataStream), + ) + if err != nil { + return false, errors.Wrap(err, "could not search data stream") + } + defer resp.Body.Close() + + var results struct { + Hits struct { + Total struct { + Value int + } + } + } + + if err := json.NewDecoder(resp.Body).Decode(&results); err != nil { + return false, errors.Wrap(err, "could not decode search results response") + } + + hits := results.Hits.Total.Value + logger.Debugf("found %d hits in %s data stream", hits, dataStream) + return hits > 0, nil + }, 2*time.Minute) + + if err != nil { + return errors.Wrap(err, "could not check for expected data in data stream") + } + + if passed { + fmt.Printf("System test for %s/%s dataset passed!\n", r.testFolder.Package, r.testFolder.Dataset) + } else { + fmt.Printf("System test for %s/%s dataset failed\n", r.testFolder.Package, r.testFolder.Dataset) + return fmt.Errorf("system test for %s/%s dataset failed", r.testFolder.Package, r.testFolder.Dataset) + } + + defer func() { + logger.Debugf("deleting data in data stream...") + if err := deleteDataStreamDocs(r.esClient, dataStream); err != nil { + logger.Errorf("error deleting data in data stream", err) + } + }() + + defer func() { + sleepFor := 1 * time.Minute + logger.Debugf("waiting for %s before destructing...", sleepFor) + time.Sleep(sleepFor) + }() + return nil +} + +func getStackSettingsFromEnv() stackSettings { + s := stackSettings{} + + s.elasticsearch.host = os.Getenv("ELASTIC_PACKAGE_ELASTICSEARCH_HOST") + if s.elasticsearch.host == "" { + s.elasticsearch.host = "http://localhost:9200" + } + + s.elasticsearch.username = os.Getenv("ELASTIC_PACKAGE_ELASTICSEARCH_USERNAME") + s.elasticsearch.password = os.Getenv("ELASTIC_PACKAGE_ELASTICSEARCH_PASSWORD") + + s.kibana.host = os.Getenv("ELASTIC_PACKAGE_KIBANA_HOST") + if s.kibana.host == "" { + s.kibana.host = "http://localhost:5601" + } + + return s +} + +func createPackageDatastream( + p ingestmanager.Policy, + pkg packages.PackageManifest, + ds packages.DatasetManifest, + c testConfig, +) ingestmanager.PackageDataStream { + streamInput := ds.Streams[0].Input + r := ingestmanager.PackageDataStream{ + Name: fmt.Sprintf("%s-%s", pkg.Name, ds.Name), + Namespace: "ep", + PolicyID: p.ID, + Enabled: true, + } + + r.Package.Name = pkg.Name + r.Package.Title = pkg.Title + r.Package.Version = pkg.Version + + r.Inputs = []ingestmanager.Input{ + { + Type: streamInput, + Enabled: true, + }, + } + + streams := []ingestmanager.Stream{ + { + ID: fmt.Sprintf("%s-%s.%s", streamInput, pkg.Name, ds.Name), + Enabled: true, + DataStream: ingestmanager.DataStream{ + Type: ds.Type, + Dataset: fmt.Sprintf("%s.%s", pkg.Name, ds.Name), + }, + }, + } + + // Add dataset-level vars + dsVars := ingestmanager.Vars{} + for _, dsVar := range ds.Streams[0].Vars { + val := dsVar.Default + + cfgVar, exists := c.Dataset.Vars[dsVar.Name] + if exists { + // overlay var value from test configuration + val = cfgVar + } + + dsVars[dsVar.Name] = ingestmanager.Var{ + Type: dsVar.Type, + Value: val, + } + } + streams[0].Vars = dsVars + r.Inputs[0].Streams = streams + + // Add package-level vars + pkgVars := ingestmanager.Vars{} + input := pkg.ConfigTemplates[0].FindInputByType(streamInput) + if input != nil { + for _, pkgVar := range input.Vars { + val := pkgVar.Default + + cfgVar, exists := c.Vars[pkgVar.Name] + if exists { + // overlay var value from test configuration + val = cfgVar + } + + pkgVars[pkgVar.Name] = ingestmanager.Var{ + Type: pkgVar.Type, + Value: val, + } + } + } + r.Inputs[0].Vars = pkgVars + + return r +} + +func deleteDataStreamDocs(esClient *es.Client, dataStream string) error { + body := strings.NewReader(`{ "query": { "match_all": {} } }`) + _, err := esClient.DeleteByQuery([]string{dataStream}, body) + if err != nil { + return err + } + + return nil +} + +func waitUntilTrue(fn func() (bool, error), timeout time.Duration) (bool, error) { + startTime := time.Now() + for time.Now().Sub(startTime) < timeout { + result, err := fn() + if err != nil { + return false, err + } + + if result { + return true, nil + } + + time.Sleep(1 * time.Second) + } + + return false, nil +} diff --git a/internal/testrunner/runners/system/servicedeployer/compose.go b/internal/testrunner/runners/system/servicedeployer/compose.go new file mode 100644 index 000000000..6458cca87 --- /dev/null +++ b/internal/testrunner/runners/system/servicedeployer/compose.go @@ -0,0 +1,174 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package servicedeployer + +import ( + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/compose" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/stack" +) + +const ( + stdoutFileName = "stdout" + stderrFileName = "stderr" +) + +// DockerComposeServiceDeployer knows how to deploy a service defined via +// a Docker Compose file. +type DockerComposeServiceDeployer struct { + ymlPath string +} + +type dockerComposeDeployedService struct { + ctxt ServiceContext + + ymlPath string + project string + + stdout io.WriteCloser + stderr io.WriteCloser + + stdoutFilePath string + stderrFilePath string +} + +// NewDockerComposeServiceDeployer returns a new instance of a DockerComposeServiceDeployer. +func NewDockerComposeServiceDeployer(ymlPath string) (*DockerComposeServiceDeployer, error) { + return &DockerComposeServiceDeployer{ + ymlPath: ymlPath, + }, nil +} + +// SetUp sets up the service and returns any relevant information. +func (r *DockerComposeServiceDeployer) SetUp(ctxt ServiceContext) (DeployedService, error) { + logger.Debug("setting up service using Docker Compose service deployer") + service := dockerComposeDeployedService{ + ymlPath: r.ymlPath, + ctxt: ctxt, + project: "elastic-package-service", + } + + p, err := compose.NewProject(service.project, service.ymlPath) + if err != nil { + return nil, errors.Wrap(err, "could not create docker compose project for service") + } + + // Boot up service + opts := compose.CommandOptions{ + ExtraArgs: []string{"-d"}, + } + if err := p.Up(opts); err != nil { + return nil, errors.Wrap(err, "could not boot up service using docker compose") + } + + // Build service container name + serviceName := ctxt.Name + serviceContainer := fmt.Sprintf("%s_%s_1", service.project, serviceName) + service.ctxt.Hostname = serviceContainer + + // Redirect service container's STDOUT and STDERR streams to files in local logs folder + localLogsFolder := ctxt.Logs.Folder.Local + agentLogsFolder := ctxt.Logs.Folder.Agent + + service.stdoutFilePath = filepath.Join(localLogsFolder, stdoutFileName) + logger.Debugf("creating temp file %s to hold service container %s STDOUT", service.stdoutFilePath, serviceContainer) + outFile, err := os.Create(service.stdoutFilePath) + if err != nil { + return nil, errors.Wrap(err, "could not create STDOUT file") + } + service.stdout = outFile + ctxt.STDOUT = agentLogsFolder + stdoutFileName + + service.stderrFilePath = filepath.Join(localLogsFolder, stderrFileName) + logger.Debugf("creating temp file %s to hold service container %s STDERR", service.stderrFilePath, serviceContainer) + errFile, err := os.Create(service.stderrFilePath) + if err != nil { + return nil, errors.Wrap(err, "could not create STDERR file") + } + service.stderr = errFile + ctxt.STDERR = agentLogsFolder + stderrFileName + + logger.Debugf("redirecting service container %s STDOUT and STDERR to temp files", serviceContainer) + cmd := exec.Command("docker", "attach", "--no-stdin", serviceContainer) + cmd.Stdout = service.stdout + cmd.Stderr = service.stderr + + if err := cmd.Start(); err != nil { + return nil, errors.Wrap(err, "could not redirect service container STDOUT and STDERR streams") + } + + stackNetwork := fmt.Sprintf("%s_default", stack.DockerComposeProjectName) + logger.Debugf("attaching service container %s to stack network %s", serviceContainer, stackNetwork) + + cmd = exec.Command("docker", "network", "connect", stackNetwork, serviceContainer) + if err := cmd.Run(); err != nil { + return nil, errors.Wrap(err, "could not attach service container to stack network") + } + + logger.Debugf("adding service container %s internal ports to context", serviceContainer) + serviceComposeConfig, err := p.Config(compose.CommandOptions{}) + if err != nil { + return nil, errors.Wrap(err, "could not get Docker Compose configuration for service") + } + + s := serviceComposeConfig.Services[serviceName] + service.ctxt.Ports = make([]int, len(s.Ports)) + for idx, port := range s.Ports { + service.ctxt.Ports[idx] = port.InternalPort + } + + return &service, nil +} + +// TearDown tears down the service. +func (s *dockerComposeDeployedService) TearDown() error { + logger.Infof("tearing down service using docker compose runner") + defer func() { + if err := s.stderr.Close(); err != nil { + logger.Errorf("could not close STDERR file: %s: %s", s.stderrFilePath, err) + } else if err := os.Remove(s.stderrFilePath); err != nil { + logger.Errorf("could not delete STDERR file: %s: %s", s.stderrFilePath, err) + } + }() + + defer func() { + if err := s.stdout.Close(); err != nil { + logger.Errorf("could not close STDOUT file: %s: %s", s.stdoutFilePath, err) + } else if err := os.Remove(s.stdoutFilePath); err != nil { + logger.Errorf("could not delete STDOUT file: %s: %s", s.stdoutFilePath, err) + } + }() + + p, err := compose.NewProject(s.project, s.ymlPath) + if err != nil { + return errors.Wrap(err, "could not create docker compose project for service") + } + + opts := compose.CommandOptions{} + if err := p.Down(opts); err != nil { + return errors.Wrap(err, "could not shut down service using docker compose") + } + + return nil +} + +// Context returns the current context for the service. +func (s *dockerComposeDeployedService) Context() ServiceContext { + return s.ctxt +} + +// SetContext sets the current context for the service. +func (s *dockerComposeDeployedService) SetContext(ctxt ServiceContext) error { + s.ctxt = ctxt + return nil +} diff --git a/internal/testrunner/runners/system/servicedeployer/context.go b/internal/testrunner/runners/system/servicedeployer/context.go new file mode 100644 index 000000000..d0b3625e9 --- /dev/null +++ b/internal/testrunner/runners/system/servicedeployer/context.go @@ -0,0 +1,45 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package servicedeployer + +// ServiceContext encapsulates context that is both available to a ServiceDeployer and +// populated by a DeployedService. The fields in ServiceContext may be used in handlebars +// templates in system test configuration files, for example: {{ Hostname }}. +type ServiceContext struct { + // Name is the name of the service. + Name string + + // Hostname is the host name of the service, as addressable from + // the Agent container. + Hostname string + + // Ports is a list of ports that the service listens on, as addressable + // from the Agent container. + Ports []int + + // Logs contains folder paths for log files produced by the service. + Logs struct { + Folder struct { + // Local contains the folder path where log files produced by + // the service are stored on the local filesystem, i.e. where + // elastic-package is running. + Local string + + // Agent contains the folder path where log files produced by + // the service are stored on the Agent container's filesystem. + Agent string + } + } + + // STDOUT is a path to a file on the Agent container, where the STDOUT + // stream of the service is available. This is generally only useful + // for services running in Docker containers. + STDOUT string + + // STDERR is a path to a file on the Agent container, where the STDERR + // stream of the service is available. This is generally only useful + // for services running in Docker containers. + STDERR string +} diff --git a/internal/testrunner/runners/system/servicedeployer/deployed_service.go b/internal/testrunner/runners/system/servicedeployer/deployed_service.go new file mode 100644 index 000000000..35b2e55ba --- /dev/null +++ b/internal/testrunner/runners/system/servicedeployer/deployed_service.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package servicedeployer + +// DeployedService defines the interface for interacting with a service that has been deployed. +type DeployedService interface { + // TearDown implements the logic for tearing down a service. + TearDown() error + + // Context returns the current context from the service. + Context() ServiceContext + + // SetContext sets the current context for the service. + SetContext(str ServiceContext) error +} diff --git a/internal/testrunner/runners/system/servicedeployer/factory.go b/internal/testrunner/runners/system/servicedeployer/factory.go new file mode 100644 index 000000000..39fa1dd85 --- /dev/null +++ b/internal/testrunner/runners/system/servicedeployer/factory.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package servicedeployer + +import ( + "errors" + "os" + "path" +) + +var ( + // ErrNotFound is returned when the appropriate service runner for a package + // cannot be found. + ErrNotFound = errors.New("unable to find service runner") +) + +// Factory chooses the appropriate service runner for the given package, depending +// on service configuration files defined in the package. +func Factory(packageRootPath string) (ServiceDeployer, error) { + packageDevPath := path.Join(packageRootPath, "_dev") + + // Is the service defined using a docker compose configuration file? + dockerComposeYMLPath := path.Join(packageDevPath, "docker-compose.yml") + if _, err := os.Stat(dockerComposeYMLPath); err == nil { + return NewDockerComposeServiceDeployer(dockerComposeYMLPath) + } + + return nil, ErrNotFound +} diff --git a/internal/testrunner/runners/system/servicedeployer/service_deployer.go b/internal/testrunner/runners/system/servicedeployer/service_deployer.go new file mode 100644 index 000000000..5e1cb93af --- /dev/null +++ b/internal/testrunner/runners/system/servicedeployer/service_deployer.go @@ -0,0 +1,13 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package servicedeployer + +// ServiceDeployer defines the interface for deploying a service. It defines methods for +// controlling the lifecycle of a service. +type ServiceDeployer interface { + // SetUp implements the logic for setting up a service. It takes a context and returns a + // ServiceHandler. + SetUp(ctxt ServiceContext) (DeployedService, error) +} diff --git a/internal/testrunner/runners/system/system.go b/internal/testrunner/runners/system/system.go deleted file mode 100644 index ea97f17d4..000000000 --- a/internal/testrunner/runners/system/system.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package system - -import ( - "fmt" - - "github.com/elastic/elastic-package/internal/testrunner" -) - -const ( - // TestType defining system tests - TestType testrunner.TestType = "system" -) - -type runner struct { - testFolderPath string -} - -// Run runs the system tests defined under the given folder -func Run(options testrunner.TestOptions) error { - r := runner{options.TestFolderPath} - return r.run() -} - -func (r *runner) run() error { - fmt.Println("system run", r.testFolderPath) - return nil -} - -func init() { - testrunner.RegisterRunner(TestType, Run) -} diff --git a/internal/testrunner/runners/system/test_config.go b/internal/testrunner/runners/system/test_config.go new file mode 100644 index 000000000..3892e0058 --- /dev/null +++ b/internal/testrunner/runners/system/test_config.go @@ -0,0 +1,64 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package system + +import ( + "io/ioutil" + "os" + "path/filepath" + + "github.com/aymerick/raymond" + "github.com/pkg/errors" + "gopkg.in/yaml.v3" + + "github.com/elastic/elastic-package/internal/packages" + "github.com/elastic/elastic-package/internal/testrunner/runners/system/servicedeployer" +) + +const configFileName = "config.yml" + +type testConfig struct { + Vars map[string]packages.VarValue `yaml:"vars"` + Dataset struct { + Vars map[string]packages.VarValue `yaml:"vars"` + } `yaml:"dataset"` +} + +func newConfig(systemTestFolderPath string, ctxt servicedeployer.ServiceContext) (*testConfig, error) { + configFilePath := filepath.Join(systemTestFolderPath, configFileName) + data, err := ioutil.ReadFile(configFilePath) + if err != nil && os.IsNotExist(err) { + return nil, errors.Wrapf(err, "unable to find system test configuration file: %s", configFilePath) + } + + if err != nil { + return nil, errors.Wrapf(err, "could not load system test configuration file: %s", configFilePath) + } + + data, err = applyContext(data, ctxt) + if err != nil { + return nil, errors.Wrapf(err, "could not apply context to test configuration file: %s", configFilePath) + } + + var c testConfig + if err := yaml.Unmarshal(data, &c); err != nil { + return nil, errors.Wrapf(err, "unable to parse system test configuration file: %s", configFilePath) + } + + return &c, nil +} + +// applyContext takes the given system test configuration (data) and replaces any placeholder variables in +// it with values from the given context (ctxt). The context may be populated from various sources but usually the +// most interesting context values will be set by a ServiceDeployer in its SetUp method. +func applyContext(data []byte, ctxt servicedeployer.ServiceContext) ([]byte, error) { + tpl := string(data) + result, err := raymond.Render(tpl, ctxt) + if err != nil { + return data, errors.Wrap(err, "could not render data with context") + } + + return []byte(result), nil +} diff --git a/internal/testrunner/testrunner.go b/internal/testrunner/testrunner.go index 0be239f6a..e9dc32743 100644 --- a/internal/testrunner/testrunner.go +++ b/internal/testrunner/testrunner.go @@ -6,10 +6,10 @@ package testrunner import ( "fmt" - "path" "path/filepath" "strings" + "github.com/elastic/go-elasticsearch/v7" "github.com/pkg/errors" ) @@ -18,8 +18,10 @@ type TestType string // TestOptions contains test runner options. type TestOptions struct { - TestFolderPath string + TestFolder TestFolder + PackageRootPath string GenerateTestResult bool + ESClient *elasticsearch.Client } // RunFunc method defines main run function of a test runner. @@ -27,8 +29,16 @@ type RunFunc func(options TestOptions) error var runners = map[TestType]RunFunc{} +// TestFolder encapsulates the test folder path and names of the package + dataset +// to which the test folder belongs. +type TestFolder struct { + Path string + Package string + Dataset string +} + // FindTestFolders finds test folders for the given package and, optionally, test type and datasets -func FindTestFolders(packageRootPath string, testType TestType, datasets []string) ([]string, error) { +func FindTestFolders(packageRootPath string, testType TestType, datasets []string) ([]TestFolder, error) { // Expected folder structure: // / @@ -50,13 +60,29 @@ func FindTestFolders(packageRootPath string, testType TestType, datasets []strin datasetsGlob += ")" } - testFoldersGlob := path.Join(packageRootPath, "dataset", datasetsGlob, "_dev", "test", testTypeGlob) - matches, err := filepath.Glob(testFoldersGlob) + testFoldersGlob := filepath.Join(packageRootPath, "dataset", datasetsGlob, "_dev", "test", testTypeGlob) + paths, err := filepath.Glob(testFoldersGlob) if err != nil { return nil, errors.Wrap(err, "error finding test folders") } - return matches, nil + folders := make([]TestFolder, len(paths)) + _, pkg := filepath.Split(packageRootPath) + for idx, p := range paths { + relP := strings.TrimPrefix(p, packageRootPath) + parts := strings.Split(relP, string(filepath.Separator)) + dataset := parts[2] + + folder := TestFolder{ + p, + pkg, + dataset, + } + + folders[idx] = folder + } + + return folders, nil } // RegisterRunner method registers the test runner.