Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions internal/elasticsearch/ingest/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package ingest

import (
"encoding/json"
"strings"

"github.com/pkg/errors"
"gopkg.in/yaml.v3"
)

// Pipeline represents a pipeline resource loaded from a file
type Pipeline struct {
Name string // Name of the pipeline
Format string // Format (extension) of the pipeline
Content []byte // Content is the original file contents.
}

// Filename returns the original filename associated with the pipeline.
func (p *Pipeline) Filename() string {
pos := strings.LastIndexByte(p.Name, '-')
if pos == -1 {
pos = len(p.Name)
}
return p.Name[:pos] + "." + p.Format
}

// MarshalJSON returns the pipeline contents in JSON format.
func (p *Pipeline) MarshalJSON() (asJSON []byte, err error) {
switch p.Format {
case "json":
asJSON = p.Content
case "yaml", "yml":
var node map[string]interface{}
err = yaml.Unmarshal(p.Content, &node)
if err != nil {
return nil, errors.Wrapf(err, "unmarshalling pipeline content failed (pipeline: %s)", p.Name)
}
if asJSON, err = json.Marshal(node); err != nil {
return nil, errors.Wrapf(err, "marshalling pipeline content failed (pipeline: %s)", p.Name)
}
default:
return nil, errors.Errorf("unsupported pipeline format '%s' (pipeline: %s)", p.Format, p.Name)
}
return asJSON, nil
}
88 changes: 88 additions & 0 deletions internal/elasticsearch/ingest/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package ingest

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPipelineFileName(t *testing.T) {
for _, tt := range []struct {
title string
pipeline Pipeline
expected string
}{
{
title: "name with nonce",
pipeline: Pipeline{
Name: "default-1234",
Format: "yml",
},
expected: "default.yml",
},
{
title: "name without nonce",
pipeline: Pipeline{
Name: "mypipeline",
Format: "json",
},
expected: "mypipeline.json",
},
{
title: "empty resource",
expected: ".",
},
} {
t.Run(tt.title, func(t *testing.T) {
assert.Equal(t, tt.expected, tt.pipeline.Filename())
})
}
}

func TestPipelineMarshalJSON(t *testing.T) {
for _, tt := range []struct {
title string
pipeline Pipeline
expected string
isErr bool
}{
{
title: "JSON source",
pipeline: Pipeline{
Format: "json",
Content: []byte(`{"foo":["bar"]}`),
},
expected: `{"foo":["bar"]}`,
},
{
title: "Yaml source",
pipeline: Pipeline{
Format: "yaml",
Content: []byte(`foo: ["bar"]`),
},
expected: `{"foo":["bar"]}`,
},
{
title: "bad Yaml",
pipeline: Pipeline{
Format: "yaml",
Content: []byte(`broken"`),
},
isErr: true,
},
} {
t.Run(tt.title, func(t *testing.T) {
got, err := tt.pipeline.MarshalJSON()
if tt.isErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, []byte(tt.expected), got)
}
})
}
}
90 changes: 29 additions & 61 deletions internal/testrunner/runners/pipeline/ingest_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@ import (
"time"

"github.com/pkg/errors"
"gopkg.in/yaml.v3"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
"github.com/elastic/elastic-package/internal/packages"
)

var ingestPipelineTag = regexp.MustCompile(`{{\s*IngestPipeline.+}}`)

type pipelineResource struct {
name string
format string
content []byte
}

type simulatePipelineRequest struct {
Docs []pipelineDocument `json:"docs"`
}
Expand All @@ -48,7 +42,7 @@ type pipelineIngestedDocument struct {
Doc pipelineDocument `json:"doc"`
}

func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (string, []pipelineResource, error) {
func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (string, []ingest.Pipeline, error) {
dataStreamManifest, err := packages.ReadDataStreamManifest(filepath.Join(dataStreamPath, packages.DataStreamManifestFile))
if err != nil {
return "", nil, errors.Wrap(err, "reading data stream manifest failed")
Expand All @@ -62,19 +56,15 @@ func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (stri
return "", nil, errors.Wrap(err, "loading ingest pipeline files failed")
}

jsonPipelines, err := convertPipelineToJSON(pipelines)
if err != nil {
return "", nil, errors.Wrap(err, "converting pipelines failed")
}
err = installPipelinesInElasticsearch(api, pipelines)

err = installPipelinesInElasticsearch(api, jsonPipelines)
if err != nil {
return "", nil, errors.Wrap(err, "installing pipelines failed")
}
return mainPipeline, jsonPipelines, nil
return mainPipeline, pipelines, nil
}

func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineResource, error) {
func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]ingest.Pipeline, error) {
elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline")

var pipelineFiles []string
Expand All @@ -86,7 +76,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineReso
pipelineFiles = append(pipelineFiles, files...)
}

var pipelines []pipelineResource
var pipelines []ingest.Pipeline
for _, path := range pipelineFiles {
c, err := os.ReadFile(path)
if err != nil {
Expand All @@ -102,75 +92,52 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineReso
return []byte(getWithPipelineNameWithNonce(pipelineTag, nonce))
})
name := filepath.Base(path)
pipelines = append(pipelines, pipelineResource{
name: getWithPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
format: filepath.Ext(path)[1:],
content: c,
pipelines = append(pipelines, ingest.Pipeline{
Name: getWithPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
Format: filepath.Ext(path)[1:],
Content: c,
})
}
return pipelines, nil
}

func convertPipelineToJSON(pipelines []pipelineResource) ([]pipelineResource, error) {
var jsonPipelines []pipelineResource
for _, pipeline := range pipelines {
if pipeline.format == "json" {
jsonPipelines = append(jsonPipelines, pipeline)
continue
}

var node map[string]interface{}
err := yaml.Unmarshal(pipeline.content, &node)
if err != nil {
return nil, errors.Wrapf(err, "unmarshalling pipeline content failed (pipeline: %s)", pipeline.name)
}

c, err := json.Marshal(&node)
if err != nil {
return nil, errors.Wrapf(err, "marshalling pipeline content failed (pipeline: %s)", pipeline.name)
}

jsonPipelines = append(jsonPipelines, pipelineResource{
name: pipeline.name,
format: "json",
content: c,
})
}
return jsonPipelines, nil
}

func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []pipelineResource) error {
for _, pipeline := range pipelines {
if err := installPipeline(api, pipeline); err != nil {
func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []ingest.Pipeline) error {
for _, p := range pipelines {
if err := installPipeline(api, p); err != nil {
return err
}
}
return nil
}

func installPipeline(api *elasticsearch.API, pipeline pipelineResource) error {
func installPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error {
if err := putIngestPipeline(api, pipeline); err != nil {
return err
}
// Just to be sure the pipeline has been uploaded.
return getIngestPipeline(api, pipeline.name)
return getIngestPipeline(api, pipeline.Name)
}

func putIngestPipeline(api *elasticsearch.API, pipeline pipelineResource) error {
r, err := api.Ingest.PutPipeline(pipeline.name, bytes.NewReader(pipeline.content))
func putIngestPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error {
source, err := pipeline.MarshalJSON()
if err != nil {
return errors.Wrapf(err, "PutPipeline API call failed (pipelineName: %s)", pipeline.name)
return err
}
r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source))
if err != nil {
return errors.Wrapf(err, "PutPipeline API call failed (pipelineName: %s)", pipeline.Name)
}
defer r.Body.Close()

body, err := io.ReadAll(r.Body)
if err != nil {
return errors.Wrapf(err, "failed to read PutPipeline API response body (pipelineName: %s)", pipeline.name)
return errors.Wrapf(err, "failed to read PutPipeline API response body (pipelineName: %s)", pipeline.Name)
}

if r.StatusCode != http.StatusOK {

return errors.Wrapf(elasticsearch.NewError(body), "unexpected response status for PutPipeline (%d): %s (pipelineName: %s)",
r.StatusCode, r.Status(), pipeline.name)
r.StatusCode, r.Status(), pipeline.Name)
}
return nil
}
Expand All @@ -196,12 +163,13 @@ func getIngestPipeline(api *elasticsearch.API, pipelineName string) error {
return nil
}

func uninstallIngestPipelines(api *elasticsearch.API, pipelines []pipelineResource) error {
func uninstallIngestPipelines(api *elasticsearch.API, pipelines []ingest.Pipeline) error {
for _, pipeline := range pipelines {
_, err := api.Ingest.DeletePipeline(pipeline.name)
resp, err := api.Ingest.DeletePipeline(pipeline.Name)
if err != nil {
return errors.Wrapf(err, "DeletePipeline API call failed (pipelineName: %s)", pipeline.name)
return errors.Wrapf(err, "DeletePipeline API call failed (pipelineName: %s)", pipeline.Name)
}
resp.Body.Close()
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion internal/testrunner/runners/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/elastic-package/internal/common"
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
"github.com/elastic/elastic-package/internal/fields"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/multierror"
Expand All @@ -30,7 +31,7 @@ const (

type runner struct {
options testrunner.TestOptions
pipelines []pipelineResource
pipelines []ingest.Pipeline
}

func (r *runner) TestFolderRequired() bool {
Expand Down