From d79448990371f216065748282d74583e70be08bf Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 11 Nov 2021 20:03:11 +0100 Subject: [PATCH 1/7] Report ingest pipeline processor coverage This improves the coverage reporting (--test-coverage flag) to include detailed coverage for ingest pipelines in the pipeline test. --- cmd/testrunner.go | 5 +- .../elasticsearch/node_stats/stats_api.go | 144 ++++++++++++++ .../pipeline/coverage/coverage.go | 123 ++++++++++++ internal/elasticsearch/pipeline/processors.go | 186 ++++++++++++++++++ internal/elasticsearch/pipeline/resource.go | 53 +++++ internal/testrunner/coverageoutput.go | 173 +++++++++++++--- .../runners/pipeline/ingest_pipeline.go | 90 +++------ .../testrunner/runners/pipeline/runner.go | 10 +- internal/testrunner/testrunner.go | 4 + 9 files changed, 691 insertions(+), 97 deletions(-) create mode 100644 internal/elasticsearch/node_stats/stats_api.go create mode 100644 internal/elasticsearch/pipeline/coverage/coverage.go create mode 100644 internal/elasticsearch/pipeline/processors.go create mode 100644 internal/elasticsearch/pipeline/resource.go diff --git a/cmd/testrunner.go b/cmd/testrunner.go index 8e54b06723..12ff1e45c6 100644 --- a/cmd/testrunner.go +++ b/cmd/testrunner.go @@ -149,10 +149,6 @@ func testTypeCommandActionFactory(runner testrunner.TestRunner) cobraext.Command if err != nil { return cobraext.FlagParsingError(err, cobraext.DataStreamsFlagName) } - - if testCoverage && len(dataStreams) > 0 { - return cobraext.FlagParsingError(errors.New("test coverage can be calculated only if all data streams are selected"), cobraext.DataStreamsFlagName) - } } if runner.TestFolderRequired() { @@ -203,6 +199,7 @@ func testTypeCommandActionFactory(runner testrunner.TestRunner) cobraext.Command API: esClient.API, DeferCleanup: deferCleanup, ServiceVariant: variantFlag, + WithCoverage: testCoverage, }) results = append(results, r...) diff --git a/internal/elasticsearch/node_stats/stats_api.go b/internal/elasticsearch/node_stats/stats_api.go new file mode 100644 index 0000000000..18302a5653 --- /dev/null +++ b/internal/elasticsearch/node_stats/stats_api.go @@ -0,0 +1,144 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package node_stats + +import ( + "encoding/json" + "io" + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/elasticsearch/pipeline" +) + +// StatsRecord contains stats for a measurable entity (pipeline, processor, etc.) +type StatsRecord struct { + Count, Current, Failed int64 + TimeInMillis int64 `json:"time_in_millis"` +} + +// ProcessorStats contains a processor's stats and some metadata. +type ProcessorStats struct { + Type, Extra string + Conditional bool + Stats StatsRecord +} + +// PipelineStats contains stats for a pipeline. +type PipelineStats struct { + StatsRecord + Processors []ProcessorStats +} + +// PipelineStatsMap holds the stats for a set of pipelines. +type PipelineStatsMap map[string]*PipelineStats + +type wrappedProcessor map[string]ProcessorStats + +func (p wrappedProcessor) extract() (stats ProcessorStats, err error) { + if len(p) != 1 { + return stats, errors.Errorf("can't extract processor stats. Need a single processor, got %d: %+v", len(p), p) + } + for k, v := range p { + stats = v + if off := strings.Index(k, ":"); off != -1 { + stats.Extra = k[off+1:] + k = k[:off] + } + switch v.Type { + case "conditional": + stats.Conditional = true + case k: + default: + return stats, errors.Errorf("can't understand processor identifier '%s' in %+v", k, p) + } + stats.Type = k + } + return stats, nil +} + +type pipelineStatsRecord struct { + StatsRecord + Processors []wrappedProcessor +} + +type pipelineStatsRecordMap map[string]pipelineStatsRecord + +func (r pipelineStatsRecord) extract() (stats *PipelineStats, err error) { + stats = &PipelineStats{ + StatsRecord: r.StatsRecord, + Processors: make([]ProcessorStats, len(r.Processors)), + } + for idx, wrapped := range r.Processors { + if stats.Processors[idx], err = wrapped.extract(); err != nil { + return stats, errors.Wrapf(err, "converting processor %d", idx) + } + } + return stats, nil +} + +type pipelinesStatsNode struct { + Ingest struct { + Pipelines pipelineStatsRecordMap + } +} + +type pipelinesStatsResponse struct { + Nodes map[string]pipelinesStatsNode +} + +func GetPipelineStats(esClient *elasticsearch.API, pipelines []pipeline.Resource) (stats PipelineStatsMap, err error) { + statsReq := esClient.Nodes.Stats.WithFilterPath("nodes.*.ingest.pipelines") + resp, err := esClient.Nodes.Stats(statsReq) + if err != nil { + return nil, errors.Wrapf(err, "Node Stats API call failed") + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrap(err, "failed to read Stats API response body") + } + + if resp.StatusCode != 200 { + return nil, errors.Wrapf(elasticsearch.NewError(body), "unexpected response status for Node Stats (%d): %s", resp.StatusCode, resp.Status()) + } + + var statsResponse pipelinesStatsResponse + if err = json.Unmarshal(body, &statsResponse); err != nil { + return nil, errors.Wrap(err, "error decoding Node Stats response") + } + + if nodeCount := len(statsResponse.Nodes); nodeCount != 1 { + return nil, errors.Errorf("more than 1 ES node in stats response (%d)", nodeCount) + } + var nodePipelines map[string]pipelineStatsRecord + for _, node := range statsResponse.Nodes { + nodePipelines = node.Ingest.Pipelines + } + stats = make(PipelineStatsMap, len(pipelines)) + for _, requested := range pipelines { + for pName, pStats := range nodePipelines { + if requested.Name == pName { + if stats[pName], err = pStats.extract(); err != nil { + return stats, errors.Wrapf(err, "converting pipeline %s", pName) + } + } + } + } + if len(stats) != len(pipelines) { + var missing []string + for _, requested := range pipelines { + if _, found := stats[requested.Name]; !found { + missing = append(missing, requested.Name) + } + } + return stats, errors.Wrapf(err, "Node Stats response is missing some expected pipelines: %v", missing) + } + + return stats, nil +} diff --git a/internal/elasticsearch/pipeline/coverage/coverage.go b/internal/elasticsearch/pipeline/coverage/coverage.go new file mode 100644 index 0000000000..205d229320 --- /dev/null +++ b/internal/elasticsearch/pipeline/coverage/coverage.go @@ -0,0 +1,123 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package coverage + +import ( + "path/filepath" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/elastic-package/internal/elasticsearch/node_stats" + "github.com/elastic/elastic-package/internal/elasticsearch/pipeline" + "github.com/elastic/elastic-package/internal/packages" + "github.com/elastic/elastic-package/internal/testrunner" +) + +// Get returns a coverage report for the provided set of ingest pipelines. +func Get(options testrunner.TestOptions, pipelines []pipeline.Resource) (*testrunner.CoberturaCoverage, error) { + packagePath, err := packages.MustFindPackageRoot() + if err != nil { + return nil, errors.Wrap(err, "error finding package root") + } + + dataStreamPath, found, err := packages.FindDataStreamRootForPath(options.TestFolder.Path) + if err != nil { + return nil, errors.Wrap(err, "locating data_stream root failed") + } + if !found { + return nil, errors.New("data stream root not found") + } + + // Use the Node Stats API to get stats for all installed pipelines. + // These stats contain hit counts for all main processors in a pipeline. + stats, err := node_stats.GetPipelineStats(options.API, pipelines) + if err != nil { + return nil, errors.Wrap(err, "error fetching pipeline stats for code coverage calculations") + } + + // Construct the Cobertura report. + pkg := &testrunner.CoberturaPackage{ + Name: options.TestFolder.Package + "." + options.TestFolder.DataStream, + } + + coverage := &testrunner.CoberturaCoverage{ + Sources: []*testrunner.CoberturaSource{ + { + Path: packagePath, + }, + }, + Packages: []*testrunner.CoberturaPackage{pkg}, + Timestamp: time.Now().UnixNano(), + } + + for _, p := range pipelines { + // Load the list of main processors from the pipeline source code, annotated with line numbers. + src, err := p.Processors() + if err != nil { + return nil, err + } + + pstats := stats[p.Name] + if pstats == nil { + return nil, errors.Errorf("pipeline '%s' not installed in Elasticsearch", p.Name) + } + + // Ensure there is no inconsistency in the list of processors in stats vs obtained from source. + if len(src) != len(pstats.Processors) { + return nil, errors.Errorf("processor count mismatch for %s (src:%d stats:%d)", p.FileName(), len(src), len(pstats.Processors)) + } + for idx, st := range pstats.Processors { + // Elasticsearch will return a `compound` processor in the case of `foreach` and + // any processor that defines `on_failure` processors. + if st.Type == "compound" { + continue + } + if st.Type != src[idx].Type { + return nil, errors.Errorf("processor type mismatch for %s processor %d (src:%s stats:%s)", p.FileName(), idx, src[idx].Type, st.Type) + } + } + // Tests install pipelines as `filename-` (without original extension). + // Use the filename part for the report. + pipelineName := p.Name + if nameEnd := strings.LastIndexByte(pipelineName, '-'); nameEnd != -1 { + pipelineName = pipelineName[:nameEnd] + } + // File path has to be relative to the packagePath added to the cobertura Sources list. + pipelinePath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline", p.FileName()) + pipelineRelPath, err := filepath.Rel(packagePath, pipelinePath) + if err != nil { + return nil, errors.Wrapf(err, "cannot create relative path to pipeline file. Package root: '%s', pipeline path: '%s'", packagePath, pipelinePath) + } + // Report every pipeline as a "class". + classCov := testrunner.CoberturaClass{ + Name: pipelineName, + Filename: pipelineRelPath, + } + + // Calculate covered and total processors (reported as both lines and methods). + covered := 0 + for idx, srcProc := range src { + if pstats.Processors[idx].Stats.Count > 0 { + covered++ + } + line := &testrunner.CoberturaLine{ + Number: srcProc.Line, + Hits: pstats.Processors[idx].Stats.Count, + } + classCov.Methods = append(classCov.Methods, &testrunner.CoberturaMethod{ + Name: srcProc.Type, + Lines: []*testrunner.CoberturaLine{line}, + Hits: pstats.Processors[idx].Stats.Count, + }) + classCov.Lines = append(classCov.Lines, line) + } + pkg.Classes = append(pkg.Classes, &classCov) + coverage.LinesValid += int64(len(src)) + coverage.LinesCovered += int64(covered) + } + return coverage, nil +} diff --git a/internal/elasticsearch/pipeline/processors.go b/internal/elasticsearch/pipeline/processors.go new file mode 100644 index 0000000000..efa5439fca --- /dev/null +++ b/internal/elasticsearch/pipeline/processors.go @@ -0,0 +1,186 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "bytes" + "encoding/json" + "io" + + "github.com/pkg/errors" + "gopkg.in/yaml.v3" +) + +type Processor struct { + Type string `yaml:"-"` + Tag string + Description string + Line int `yaml:"-"` +} + +func (p Resource) Processors() (procs []Processor, err error) { + switch p.Format { + case "yaml", "yml": + procs, err = ProcessorsFromYAMLPipeline(p.Content) + case "json": + procs, err = ProcessorsFromJSONPipeline(p.Content) + default: + return nil, errors.Errorf("unsupported pipeline Format: %s", p.Format) + } + return procs, errors.Wrapf(err, "failure processing %s pipeline '%s'", p.Format, p.FileName()) +} + +func ProcessorsFromYAMLPipeline(content []byte) (procs []Processor, err error) { + type pipeline struct { + Processors []yaml.Node + } + var p pipeline + if err = yaml.Unmarshal(content, &p); err != nil { + return nil, err + } + for idx, entry := range p.Processors { + if entry.Kind != yaml.MappingNode || len(entry.Content) != 2 { + return nil, errors.Errorf("processor#%d is not a single key map (kind:%v Content:%d)", idx, entry.Kind, len(entry.Content)) + } + var proc Processor + if err := entry.Content[1].Decode(&proc); err != nil { + return nil, errors.Wrapf(err, "error decoding processor#%d configuration", idx) + } + if err := entry.Content[0].Decode(&proc.Type); err != nil { + return nil, errors.Wrapf(err, "error decoding processor#%d type", idx) + } + proc.Line = entry.Line + procs = append(procs, proc) + } + return procs, nil +} + +type tokenStack []json.Token + +func (s *tokenStack) Push(t json.Token) { + *s = append(*s, t) +} + +func (s *tokenStack) Pop() json.Token { + top := s.Top() + if n := len(*s); n > 0 { + *s = (*s)[:n-1] + } + return top +} + +func (s *tokenStack) PopUntil(d json.Delim) { + for { + switch s.Pop() { + case d, io.EOF: + return + } + } +} + +func (s *tokenStack) Top() json.Token { + if n := len(*s); n > 0 { + return (*s)[n-1] + } + return io.EOF // ??? +} + +func (s *tokenStack) TopIsString() bool { + if n := len(*s); n > 0 { + _, ok := (*s)[n-1].(string) + return ok + } + return false +} + +func (s *tokenStack) Equals(b tokenStack) bool { + if len(*s) != len(b) { + return false + } + for idx, tk := range *s { + if b[idx] != tk { + return false + } + } + return true +} + +var processorJSONPath = tokenStack{json.Delim('{'), "processors", json.Delim('['), json.Delim('{')} + +func ProcessorsFromJSONPipeline(content []byte) (procs []Processor, err error) { + var processors []string + var offsets []int + decoder := json.NewDecoder(bytes.NewReader(content)) + var stack tokenStack + + for { + off := int(decoder.InputOffset()) + tk, err := decoder.Token() + if err == io.EOF { + break + } + delim, isDelim := tk.(json.Delim) + if isDelim && (delim == '}' || delim == ']') { + stack.PopUntil(delim - 2) // `}`-2 = `{` and `]`-2 = `[` + if stack.TopIsString() { + stack.Pop() + } + continue + } + if !isDelim && stack.TopIsString() { + stack.Pop() + continue + } + if str, ok := tk.(string); ok && stack.Equals(processorJSONPath) { + processors = append(processors, str) + offsets = append(offsets, off) + } + stack.Push(tk) + } + lines, err := offsetsToLineNumbers(offsets, content) + if err != nil { + return nil, err + } + + procs = make([]Processor, len(processors)) + for idx, proc := range processors { + procs[idx] = Processor{ + Type: proc, + Line: lines[idx], + } + } + return procs, nil +} + +func offsetsToLineNumbers(offsets []int, content []byte) (lines []int, err error) { + nextNewline := func(r []byte, offset int) int { + n := len(r) + if offset >= n { + return n + } + if delta := bytes.IndexByte(r[offset+1:], '\n'); delta > -1 { + return offset + delta + 1 + } + return n + } + lineEnd := nextNewline(content, -1) + line := 1 + lines = make([]int, len(offsets)) + for i := 0; i < len(offsets); { + if offsets[i] < lineEnd { + lines[i] = line + i++ + continue + } + for offsets[i] >= lineEnd { + if lineEnd == len(content) { + return nil, io.ErrUnexpectedEOF + } + line++ + lineEnd = nextNewline(content, lineEnd) + } + } + return lines, nil +} diff --git a/internal/elasticsearch/pipeline/resource.go b/internal/elasticsearch/pipeline/resource.go new file mode 100644 index 0000000000..2b7e643692 --- /dev/null +++ b/internal/elasticsearch/pipeline/resource.go @@ -0,0 +1,53 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "encoding/json" + "strings" + + "github.com/pkg/errors" + "gopkg.in/yaml.v3" +) + +type Resource struct { + Name string + Format string + Content []byte + asJSON []byte +} + +func (p *Resource) FileName() string { + pos := strings.LastIndexByte(p.Name, '-') + return p.Name[:pos] + "." + p.Format +} + +func (p *Resource) JSON() ([]byte, error) { + if len(p.asJSON) == 0 { + if err := p.toJSON(); err != nil { + return nil, err + } + } + return p.asJSON, nil +} + +func (p *Resource) toJSON() error { + switch p.Format { + case "json": + p.asJSON = p.Content + case "yaml", "yml": + var node map[string]interface{} + err := yaml.Unmarshal(p.Content, &node) + if err != nil { + return errors.Wrapf(err, "unmarshalling pipeline Content failed (pipeline: %s)", p.Name) + } + if p.asJSON, err = json.Marshal(&node); err != nil { + return errors.Wrapf(err, "marshalling pipeline Content failed (pipeline: %s)", p.Name) + } + default: + return errors.Errorf("unsupported pipeline Format '%s' for pipeline %s", p.Format, p.Name) + } + return nil +} diff --git a/internal/testrunner/coverageoutput.go b/internal/testrunner/coverageoutput.go index 6bfce1f921..73c4061650 100644 --- a/internal/testrunner/coverageoutput.go +++ b/internal/testrunner/coverageoutput.go @@ -23,6 +23,7 @@ type testCoverageDetails struct { packageName string testType TestType dataStreams map[string][]string // : + cobertura *CoberturaCoverage // For tests to provide custom Cobertura results. } func newTestCoverageDetails(packageName string, testType TestType) *testCoverageDetails { @@ -42,11 +43,17 @@ func (tcd *testCoverageDetails) withTestResults(results []TestResult) *testCover tcd.dataStreams[result.DataStream] = []string{} } tcd.dataStreams[result.DataStream] = append(tcd.dataStreams[result.DataStream], result.Name) + if tcd.cobertura != nil && result.Coverage != nil { + tcd.cobertura.Merge(result.Coverage) + } else { + tcd.cobertura = result.Coverage + } } return tcd } -type coberturaCoverage struct { +// CoberturaCoverage is the root element for a Cobertura XML report. +type CoberturaCoverage struct { XMLName xml.Name `xml:"coverage"` LineRate float32 `xml:"line-rate,attr"` BranchRate float32 `xml:"branch-rate,attr"` @@ -57,48 +64,53 @@ type coberturaCoverage struct { BranchesCovered int64 `xml:"branches-covered,attr"` BranchesValid int64 `xml:"branches-valid,attr"` Complexity float32 `xml:"complexity,attr"` - Sources []*coberturaSource `xml:"sources>source"` - Packages []*coberturaPackage `xml:"packages>package"` + Sources []*CoberturaSource `xml:"sources>source"` + Packages []*CoberturaPackage `xml:"packages>package"` } -type coberturaSource struct { +// CoberturaSource represents a base path to the covered source code. +type CoberturaSource struct { Path string `xml:",chardata"` } -type coberturaPackage struct { +// CoberturaPackage represents a package in a Cobertura XML report. +type CoberturaPackage struct { Name string `xml:"name,attr"` LineRate float32 `xml:"line-rate,attr"` BranchRate float32 `xml:"branch-rate,attr"` Complexity float32 `xml:"complexity,attr"` - Classes []*coberturaClass `xml:"classes>class"` + Classes []*CoberturaClass `xml:"classes>class"` } -type coberturaClass struct { +// CoberturaClass represents a class in a Cobertura XML report. +type CoberturaClass struct { Name string `xml:"name,attr"` Filename string `xml:"filename,attr"` LineRate float32 `xml:"line-rate,attr"` BranchRate float32 `xml:"branch-rate,attr"` Complexity float32 `xml:"complexity,attr"` - Methods []*coberturaMethod `xml:"methods>method"` + Methods []*CoberturaMethod `xml:"methods>method"` + Lines []*CoberturaLine `xml:"lines>line"` } -type coberturaMethod struct { - Name string `xml:"name,attr"` - Signature string `xml:"signature,attr"` - LineRate float32 `xml:"line-rate,attr"` - BranchRate float32 `xml:"branch-rate,attr"` - Complexity float32 `xml:"complexity,attr"` - Lines coberturaLines `xml:"lines>line"` +// CoberturaMethod represents a method in a Cobertura XML report. +type CoberturaMethod struct { + Name string `xml:"name,attr"` + Signature string `xml:"signature,attr"` + LineRate float32 `xml:"line-rate,attr"` + BranchRate float32 `xml:"branch-rate,attr"` + Complexity float32 `xml:"complexity,attr"` + Hits int64 `xml:"hits,attr"` + Lines []*CoberturaLine `xml:"lines>line"` } -type coberturaLine struct { +// CoberturaLine represents a source line in a Cobertura XML report. +type CoberturaLine struct { Number int `xml:"number,attr"` Hits int64 `xml:"hits,attr"` } -type coberturaLines []*coberturaLine - -func (c *coberturaCoverage) bytes() ([]byte, error) { +func (c *CoberturaCoverage) bytes() ([]byte, error) { out, err := xml.MarshalIndent(&c, "", " ") if err != nil { return nil, errors.Wrap(err, "unable to format test results as xUnit") @@ -113,6 +125,101 @@ func (c *coberturaCoverage) bytes() ([]byte, error) { return buffer.Bytes(), nil } +// Merge merges two coverage reports for a given class. +func (c *CoberturaClass) Merge(b *CoberturaClass) { + // Invariants + equal := c.Name == b.Name && + c.Filename == b.Filename && + len(c.Lines) == len(b.Lines) && + len(c.Methods) == len(b.Methods) && + len(c.Lines) == len(c.Methods) + for idx := range c.Lines { + equal = equal && c.Lines[idx].Number == b.Lines[idx].Number && + c.Methods[idx].Name == b.Methods[idx].Name && + len(c.Methods[idx].Lines) == 1 && + len(b.Methods[idx].Lines) == 1 && + c.Methods[idx].Lines[0].Number == b.Methods[idx].Lines[0].Number + } + if !equal { + panic(fmt.Sprintf("differing classes at Merge: %+v != %+v", *c, *b)) + } + // Update methods + for idx := range b.Methods { + c.Methods[idx].Hits += b.Methods[idx].Hits + c.Methods[idx].Lines[0].Hits += b.Methods[idx].Lines[0].Hits + } + // Rebuild lines + c.Lines = nil + for _, m := range c.Methods { + c.Lines = append(c.Lines, m.Lines...) + } +} + +// Merge merges two coverage reports for a given package. +func (p *CoberturaPackage) Merge(b *CoberturaPackage) { + // Merge classes + for _, class := range b.Classes { + var target *CoberturaClass + for _, existing := range p.Classes { + if existing.Name == class.Name { + target = existing + break + } + } + if target != nil { + target.Merge(class) + } else { + p.Classes = append(p.Classes, class) + } + } +} + +// Merge merges two coverage reports. +func (c *CoberturaCoverage) Merge(b *CoberturaCoverage) { + // Merge source paths + for _, path := range b.Sources { + found := false + for _, existing := range c.Sources { + if found = existing.Path == path.Path; found { + break + } + } + if !found { + c.Sources = append(c.Sources, path) + } + } + + // Merge packages + for _, pkg := range b.Packages { + var target *CoberturaPackage + for _, existing := range c.Packages { + if existing.Name == pkg.Name { + target = existing + break + } + } + if target != nil { + target.Merge(pkg) + } else { + c.Packages = append(c.Packages, pkg) + } + } + + // Recalculate global line coverage count + c.LinesValid = 0 + c.LinesCovered = 0 + for _, pkg := range c.Packages { + for _, cls := range pkg.Classes { + for _, line := range cls.Lines { + c.LinesValid++ + if line.Hits > 0 { + c.LinesCovered++ + } + } + } + } +} + // WriteCoverage function calculates test coverage for the given package. // It requires to execute tests for all data streams (same test type), so the coverage can be calculated properly. func WriteCoverage(packageRootPath, packageName string, testType TestType, results []TestResult) error { @@ -121,7 +228,11 @@ func WriteCoverage(packageRootPath, packageName string, testType TestType, resul return errors.Wrap(err, "can't collect test coverage details") } - report := transformToCoberturaReport(details) + // Use provided cobertura report, or generate a custom report if not available. + report := details.cobertura + if report == nil { + report = transformToCoberturaReport(details) + } err = writeCoverageReportFile(report, packageName) if err != nil { @@ -197,28 +308,28 @@ func verifyTestExpected(packageRootPath string, dataStreamName string, testType return true, nil } -func transformToCoberturaReport(details *testCoverageDetails) *coberturaCoverage { - var classes []*coberturaClass +func transformToCoberturaReport(details *testCoverageDetails) *CoberturaCoverage { + var classes []*CoberturaClass for dataStream, testCases := range details.dataStreams { if dataStream == "" { continue // ignore tests running in the package context (not data stream), mostly referring to installed assets } - var methods []*coberturaMethod + var methods []*CoberturaMethod if len(testCases) == 0 { - methods = append(methods, &coberturaMethod{ + methods = append(methods, &CoberturaMethod{ Name: "Missing", - Lines: []*coberturaLine{{Number: 1, Hits: 0}}, + Lines: []*CoberturaLine{{Number: 1, Hits: 0}}, }) } else { - methods = append(methods, &coberturaMethod{ + methods = append(methods, &CoberturaMethod{ Name: "OK", - Lines: []*coberturaLine{{Number: 1, Hits: 1}}, + Lines: []*CoberturaLine{{Number: 1, Hits: 1}}, }) } - aClass := &coberturaClass{ + aClass := &CoberturaClass{ Name: string(details.testType), Filename: details.packageName + "/" + dataStream, Methods: methods, @@ -226,9 +337,9 @@ func transformToCoberturaReport(details *testCoverageDetails) *coberturaCoverage classes = append(classes, aClass) } - return &coberturaCoverage{ + return &CoberturaCoverage{ Timestamp: time.Now().UnixNano(), - Packages: []*coberturaPackage{ + Packages: []*CoberturaPackage{ { Name: details.packageName, Classes: classes, @@ -237,7 +348,7 @@ func transformToCoberturaReport(details *testCoverageDetails) *coberturaCoverage } } -func writeCoverageReportFile(report *coberturaCoverage, packageName string) error { +func writeCoverageReportFile(report *CoberturaCoverage, packageName string) error { dest, err := testCoverageReportsDir() if err != nil { return errors.Wrap(err, "could not determine test coverage reports folder") diff --git a/internal/testrunner/runners/pipeline/ingest_pipeline.go b/internal/testrunner/runners/pipeline/ingest_pipeline.go index 14041f1be5..f1c8435a15 100644 --- a/internal/testrunner/runners/pipeline/ingest_pipeline.go +++ b/internal/testrunner/runners/pipeline/ingest_pipeline.go @@ -18,20 +18,14 @@ import ( "time" "github.com/pkg/errors" - "gopkg.in/yaml.v3" "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/elasticsearch/pipeline" "github.com/elastic/elastic-package/internal/packages" ) var ingestPipelineTag = regexp.MustCompile(`{{\s*IngestPipeline.+}}`) -type pipelineResource struct { - name string - format string - content []byte -} - type simulatePipelineRequest struct { Docs []pipelineDocument `json:"docs"` } @@ -48,7 +42,7 @@ type pipelineIngestedDocument struct { Doc pipelineDocument `json:"doc"` } -func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (string, []pipelineResource, error) { +func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (string, []pipeline.Resource, error) { dataStreamManifest, err := packages.ReadDataStreamManifest(filepath.Join(dataStreamPath, packages.DataStreamManifestFile)) if err != nil { return "", nil, errors.Wrap(err, "reading data stream manifest failed") @@ -62,19 +56,15 @@ func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (stri return "", nil, errors.Wrap(err, "loading ingest pipeline files failed") } - jsonPipelines, err := convertPipelineToJSON(pipelines) - if err != nil { - return "", nil, errors.Wrap(err, "converting pipelines failed") - } + err = installPipelinesInElasticsearch(api, pipelines) - err = installPipelinesInElasticsearch(api, jsonPipelines) if err != nil { return "", nil, errors.Wrap(err, "installing pipelines failed") } - return mainPipeline, jsonPipelines, nil + return mainPipeline, pipelines, nil } -func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineResource, error) { +func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipeline.Resource, error) { elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline") var pipelineFiles []string @@ -86,7 +76,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineReso pipelineFiles = append(pipelineFiles, files...) } - var pipelines []pipelineResource + var pipelines []pipeline.Resource for _, path := range pipelineFiles { c, err := os.ReadFile(path) if err != nil { @@ -102,75 +92,52 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineReso return []byte(getWithPipelineNameWithNonce(pipelineTag, nonce)) }) name := filepath.Base(path) - pipelines = append(pipelines, pipelineResource{ - name: getWithPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce), - format: filepath.Ext(path)[1:], - content: c, + pipelines = append(pipelines, pipeline.Resource{ + Name: getWithPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce), + Format: filepath.Ext(path)[1:], + Content: c, }) } return pipelines, nil } -func convertPipelineToJSON(pipelines []pipelineResource) ([]pipelineResource, error) { - var jsonPipelines []pipelineResource - for _, pipeline := range pipelines { - if pipeline.format == "json" { - jsonPipelines = append(jsonPipelines, pipeline) - continue - } - - var node map[string]interface{} - err := yaml.Unmarshal(pipeline.content, &node) - if err != nil { - return nil, errors.Wrapf(err, "unmarshalling pipeline content failed (pipeline: %s)", pipeline.name) - } - - c, err := json.Marshal(&node) - if err != nil { - return nil, errors.Wrapf(err, "marshalling pipeline content failed (pipeline: %s)", pipeline.name) - } - - jsonPipelines = append(jsonPipelines, pipelineResource{ - name: pipeline.name, - format: "json", - content: c, - }) - } - return jsonPipelines, nil -} - -func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []pipelineResource) error { - for _, pipeline := range pipelines { - if err := installPipeline(api, pipeline); err != nil { +func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []pipeline.Resource) error { + for _, p := range pipelines { + if err := installPipeline(api, p); err != nil { return err } } return nil } -func installPipeline(api *elasticsearch.API, pipeline pipelineResource) error { +func installPipeline(api *elasticsearch.API, pipeline pipeline.Resource) error { if err := putIngestPipeline(api, pipeline); err != nil { return err } // Just to be sure the pipeline has been uploaded. - return getIngestPipeline(api, pipeline.name) + return getIngestPipeline(api, pipeline.Name) } -func putIngestPipeline(api *elasticsearch.API, pipeline pipelineResource) error { - r, err := api.Ingest.PutPipeline(pipeline.name, bytes.NewReader(pipeline.content)) +func putIngestPipeline(api *elasticsearch.API, pipeline pipeline.Resource) error { + source, err := pipeline.JSON() if err != nil { - return errors.Wrapf(err, "PutPipeline API call failed (pipelineName: %s)", pipeline.name) + return err + } + r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source)) + if err != nil { + return errors.Wrapf(err, "PutPipeline API call failed (pipelineName: %s)", pipeline.Name) } defer r.Body.Close() body, err := io.ReadAll(r.Body) if err != nil { - return errors.Wrapf(err, "failed to read PutPipeline API response body (pipelineName: %s)", pipeline.name) + return errors.Wrapf(err, "failed to read PutPipeline API response body (pipelineName: %s)", pipeline.Name) } if r.StatusCode != http.StatusOK { + return errors.Wrapf(elasticsearch.NewError(body), "unexpected response status for PutPipeline (%d): %s (pipelineName: %s)", - r.StatusCode, r.Status(), pipeline.name) + r.StatusCode, r.Status(), pipeline.Name) } return nil } @@ -196,12 +163,13 @@ func getIngestPipeline(api *elasticsearch.API, pipelineName string) error { return nil } -func uninstallIngestPipelines(api *elasticsearch.API, pipelines []pipelineResource) error { +func uninstallIngestPipelines(api *elasticsearch.API, pipelines []pipeline.Resource) error { for _, pipeline := range pipelines { - _, err := api.Ingest.DeletePipeline(pipeline.name) + resp, err := api.Ingest.DeletePipeline(pipeline.Name) if err != nil { - return errors.Wrapf(err, "DeletePipeline API call failed (pipelineName: %s)", pipeline.name) + return errors.Wrapf(err, "DeletePipeline API call failed (pipelineName: %s)", pipeline.Name) } + resp.Body.Close() } return nil } diff --git a/internal/testrunner/runners/pipeline/runner.go b/internal/testrunner/runners/pipeline/runner.go index 9861aef4fa..599a4cb6bd 100644 --- a/internal/testrunner/runners/pipeline/runner.go +++ b/internal/testrunner/runners/pipeline/runner.go @@ -16,6 +16,8 @@ import ( "github.com/pkg/errors" "github.com/elastic/elastic-package/internal/common" + "github.com/elastic/elastic-package/internal/elasticsearch/pipeline" + "github.com/elastic/elastic-package/internal/elasticsearch/pipeline/coverage" "github.com/elastic/elastic-package/internal/fields" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/multierror" @@ -30,7 +32,7 @@ const ( type runner struct { options testrunner.TestOptions - pipelines []pipelineResource + pipelines []pipeline.Resource } func (r *runner) TestFolderRequired() bool { @@ -155,6 +157,12 @@ func (r *runner) run() ([]testrunner.TestResult, error) { continue } + if r.options.WithCoverage { + tr.Coverage, err = coverage.Get(r.options, r.pipelines) + if err != nil { + return nil, errors.Wrap(err, "error calculating pipeline coverage") + } + } results = append(results, tr) } return results, nil diff --git a/internal/testrunner/testrunner.go b/internal/testrunner/testrunner.go index 66358b306d..3c6fb4f1d8 100644 --- a/internal/testrunner/testrunner.go +++ b/internal/testrunner/testrunner.go @@ -29,6 +29,7 @@ type TestOptions struct { DeferCleanup time.Duration ServiceVariant string + WithCoverage bool } // TestRunner is the interface all test runners must implement. @@ -86,6 +87,9 @@ type TestResult struct { // If the test was skipped, the reason it was skipped and a link for more // details. Skipped *SkipConfig + + // Coverage details in Cobertura format (optional). + Coverage *CoberturaCoverage } // ResultComposer wraps a TestResult and provides convenience methods for From ef9a24c9aa41c98e4fd1644e6bc226ced462eea1 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Mon, 15 Nov 2021 19:20:43 +0100 Subject: [PATCH 2/7] Propagate error from merge --- internal/testrunner/coverageoutput.go | 30 ++++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/internal/testrunner/coverageoutput.go b/internal/testrunner/coverageoutput.go index 73c4061650..3fb12bada4 100644 --- a/internal/testrunner/coverageoutput.go +++ b/internal/testrunner/coverageoutput.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/elastic-package/internal/builder" + "github.com/elastic/elastic-package/internal/multierror" ) const coverageDtd = `` @@ -24,6 +25,7 @@ type testCoverageDetails struct { testType TestType dataStreams map[string][]string // : cobertura *CoberturaCoverage // For tests to provide custom Cobertura results. + errors multierror.Error } func newTestCoverageDetails(packageName string, testType TestType) *testCoverageDetails { @@ -44,7 +46,9 @@ func (tcd *testCoverageDetails) withTestResults(results []TestResult) *testCover } tcd.dataStreams[result.DataStream] = append(tcd.dataStreams[result.DataStream], result.Name) if tcd.cobertura != nil && result.Coverage != nil { - tcd.cobertura.Merge(result.Coverage) + if err := tcd.cobertura.Merge(result.Coverage); err != nil { + tcd.errors = append(tcd.errors, errors.Wrapf(err, "can't merge Cobertura coverage for test `%s`", result.Name)) + } } else { tcd.cobertura = result.Coverage } @@ -126,8 +130,8 @@ func (c *CoberturaCoverage) bytes() ([]byte, error) { } // Merge merges two coverage reports for a given class. -func (c *CoberturaClass) Merge(b *CoberturaClass) { - // Invariants +func (c *CoberturaClass) Merge(b *CoberturaClass) error { + // Check preconditions: classes should be the same. equal := c.Name == b.Name && c.Filename == b.Filename && len(c.Lines) == len(b.Lines) && @@ -141,7 +145,7 @@ func (c *CoberturaClass) Merge(b *CoberturaClass) { c.Methods[idx].Lines[0].Number == b.Methods[idx].Lines[0].Number } if !equal { - panic(fmt.Sprintf("differing classes at Merge: %+v != %+v", *c, *b)) + return errors.Errorf("merging incompatible classes: %+v != %+v", *c, *b) } // Update methods for idx := range b.Methods { @@ -153,10 +157,11 @@ func (c *CoberturaClass) Merge(b *CoberturaClass) { for _, m := range c.Methods { c.Lines = append(c.Lines, m.Lines...) } + return nil } // Merge merges two coverage reports for a given package. -func (p *CoberturaPackage) Merge(b *CoberturaPackage) { +func (p *CoberturaPackage) Merge(b *CoberturaPackage) error { // Merge classes for _, class := range b.Classes { var target *CoberturaClass @@ -167,15 +172,18 @@ func (p *CoberturaPackage) Merge(b *CoberturaPackage) { } } if target != nil { - target.Merge(class) + if err := target.Merge(class); err != nil { + return err + } } else { p.Classes = append(p.Classes, class) } } + return nil } // Merge merges two coverage reports. -func (c *CoberturaCoverage) Merge(b *CoberturaCoverage) { +func (c *CoberturaCoverage) Merge(b *CoberturaCoverage) error { // Merge source paths for _, path := range b.Sources { found := false @@ -199,7 +207,9 @@ func (c *CoberturaCoverage) Merge(b *CoberturaCoverage) { } } if target != nil { - target.Merge(pkg) + if err := target.Merge(pkg); err != nil { + return err + } } else { c.Packages = append(c.Packages, pkg) } @@ -218,6 +228,7 @@ func (c *CoberturaCoverage) Merge(b *CoberturaCoverage) { } } } + return nil } // WriteCoverage function calculates test coverage for the given package. @@ -250,6 +261,9 @@ func collectTestCoverageDetails(packageRootPath, packageName string, testType Te details := newTestCoverageDetails(packageName, testType). withUncoveredDataStreams(withoutTests). withTestResults(results) + if len(details.errors) > 0 { + return nil, details.errors + } return details, nil } From 517bc74726c4443857d44d4b54cbc19febca8440 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 17 Nov 2021 12:30:47 +0100 Subject: [PATCH 3/7] Fix logic bug --- internal/testrunner/coverageoutput.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/testrunner/coverageoutput.go b/internal/testrunner/coverageoutput.go index 3fb12bada4..756c43a564 100644 --- a/internal/testrunner/coverageoutput.go +++ b/internal/testrunner/coverageoutput.go @@ -49,7 +49,7 @@ func (tcd *testCoverageDetails) withTestResults(results []TestResult) *testCover if err := tcd.cobertura.Merge(result.Coverage); err != nil { tcd.errors = append(tcd.errors, errors.Wrapf(err, "can't merge Cobertura coverage for test `%s`", result.Name)) } - } else { + } else if tcd.cobertura == nil { tcd.cobertura = result.Coverage } } From 1b406e2d1aa23c90e3826d84263242d272e15d9e Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 17 Nov 2021 16:43:42 +0100 Subject: [PATCH 4/7] Tests and fixes for Processors method --- internal/elasticsearch/pipeline/processors.go | 24 +- .../elasticsearch/pipeline/processors_test.go | 255 ++++++++++++++++++ internal/elasticsearch/pipeline/resource.go | 3 + 3 files changed, 274 insertions(+), 8 deletions(-) create mode 100644 internal/elasticsearch/pipeline/processors_test.go diff --git a/internal/elasticsearch/pipeline/processors.go b/internal/elasticsearch/pipeline/processors.go index efa5439fca..a3f9329556 100644 --- a/internal/elasticsearch/pipeline/processors.go +++ b/internal/elasticsearch/pipeline/processors.go @@ -8,6 +8,7 @@ import ( "bytes" "encoding/json" "io" + "sort" "github.com/pkg/errors" "gopkg.in/yaml.v3" @@ -116,10 +117,13 @@ func ProcessorsFromJSONPipeline(content []byte) (procs []Processor, err error) { var stack tokenStack for { - off := int(decoder.InputOffset()) tk, err := decoder.Token() - if err == io.EOF { - break + off := int(decoder.InputOffset()) + if err != nil { + if err == io.EOF { + break + } + return nil, err } delim, isDelim := tk.(json.Delim) if isDelim && (delim == '}' || delim == ']') { @@ -144,28 +148,32 @@ func ProcessorsFromJSONPipeline(content []byte) (procs []Processor, err error) { return nil, err } - procs = make([]Processor, len(processors)) for idx, proc := range processors { - procs[idx] = Processor{ + procs = append(procs, Processor{ Type: proc, Line: lines[idx], - } + }) } return procs, nil } func offsetsToLineNumbers(offsets []int, content []byte) (lines []int, err error) { + if !sort.SliceIsSorted(offsets, func(i, j int) bool { + return offsets[i] < offsets[j] + }) { + return nil, errors.New("input offsets must be sorted") + } nextNewline := func(r []byte, offset int) int { n := len(r) if offset >= n { return n } - if delta := bytes.IndexByte(r[offset+1:], '\n'); delta > -1 { + if delta := bytes.IndexByte(r[offset:], '\n'); delta > -1 { return offset + delta + 1 } return n } - lineEnd := nextNewline(content, -1) + lineEnd := nextNewline(content, 0) line := 1 lines = make([]int, len(offsets)) for i := 0; i < len(offsets); { diff --git a/internal/elasticsearch/pipeline/processors_test.go b/internal/elasticsearch/pipeline/processors_test.go new file mode 100644 index 0000000000..80eeeb9bcb --- /dev/null +++ b/internal/elasticsearch/pipeline/processors_test.go @@ -0,0 +1,255 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResource_Processors(t *testing.T) { + tests := []struct { + name string + format string + content []byte + expected []Processor + wantErr bool + }{ + { + name: "Yaml pipeline", + format: "yml", + content: []byte(`--- +description: Made up pipeline +processors: +# First processor. +- grok: + tag: Extract header + field: message + patterns: + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client + %{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\] + \[pid %{NUMBER:process.pid:long}(:tid %{NUMBER:process.thread.id:long})?\]( + \[client %{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} + pattern_definitions: + APACHE_TIME: '%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}' + ignore_missing: true + +- date: + field: apache.error.timestamp + target_field: '@timestamp' + formats: + - EEE MMM dd H:m:s yyyy + - EEE MMM dd H:m:s.SSSSSS yyyy + on_failure: + - append: + field: error.message + value: '{{ _ingest.on_failure_message }}' +- set: + description: Set event category + field: event.category + value: web +# Some script +- script: + lang: painless + source: >- + [...] + +- grok: + field: source.address + ignore_missing: true + patterns: + - ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$ +- rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true +on_failure: +- set: + field: error.message + value: '{{ _ingest.on_failure_message }}' +`), + expected: []Processor{ + {Type: "grok", Line: 5, Tag: "Extract header"}, + {Type: "date", Line: 18}, + {Type: "set", Line: 28, Description: "Set event category"}, + {Type: "script", Line: 33}, + {Type: "grok", Line: 38}, + {Type: "rename", Line: 43}, + }, + }, + { + name: "Json pipeline", + format: "json", + content: []byte(`{ + "description": "Pipeline for parsing silly logs.", + "processors": [{"drop": {"if":"ctx.drop!=null"}}, + { + "set": { + "field": "event.ingested", + "value": "{{_ingest.timestamp}}" + } + }, {"remove":{"field": "message"}}, {"set": {"field": "temp.duration","value":1234}}, + { + "set":{ + "field": "event.kind", + "value": "event" + } + }], + "on_failure" : [{ + "set" : { + "field" : "error.message", + "value" : "{{ _ingest.on_failure_message }}" + } + }] +} +`), + expected: []Processor{ + {Type: "drop", Line: 3}, + {Type: "set", Line: 5}, + {Type: "remove", Line: 9}, + {Type: "set", Line: 9}, + {Type: "set", Line: 11}, + }, + }, + { + name: "Empty Yaml pipeline", + format: "yml", + content: []byte(``), + expected: nil, + }, + { + name: "Empty Json pipeline", + format: "json", + content: []byte(``), + expected: nil, + }, + { + name: "Json pipeline one liner", + format: "json", + content: []byte(`{"processors": [{"drop": {}}]}`), + expected: []Processor{ + {Type: "drop", Line: 1}, + }, + }, + { + name: "Malformed Json pipeline", + format: "json", + content: []byte(`{"processors": {"drop": {}}}`), + expected: nil, + }, + { + name: "Broken Json", + format: "json", + content: []byte(`{"processors": {"drop": {}},"`), + wantErr: true, + }, + { + name: "Malformed Yaml pipeline", + format: "yml", + content: []byte(`--- +processors: + foo: + bar: baz`), + wantErr: true, + }, + { + name: "Malformed Yaml", + format: "yml", + content: []byte(`foo123"`), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := Resource{ + Name: tt.name, + Format: tt.format, + Content: tt.content, + } + procs, err := p.Processors() + if !tt.wantErr { + if !assert.NoError(t, err) { + t.Fatal(err) + } + } else { + if !assert.Error(t, err) { + t.Fatal("error expected") + } + } + if !assert.Equal(t, tt.expected, procs) { + t.Errorf("Processors() gotProcs = %v, want %v", procs, tt.expected) + } + }) + } +} + +func Test_offsetsToLineNumbers(t *testing.T) { + raw := []byte(`1111 +2 +3333 + +555`) + tests := []struct { + name string + content []byte + offsets []int + expected []int + wantErr bool + }{ + { + name: "valid", + content: raw, + offsets: []int{0, 3, 5, 8, 14}, + expected: []int{1, 1, 2, 3, 5}, + }, + { + name: "first char", + content: raw, + offsets: []int{0, 5, 7, 12, 13}, + expected: []int{1, 2, 3, 4, 5}, + }, + { + name: "newline belongs to current line", + content: raw, + offsets: []int{4, 6, 11, 12}, + expected: []int{1, 2, 3, 4}, + }, + { + name: "out of range offset", + content: raw, + offsets: []int{0, 1, 2, 3, 9000}, + wantErr: true, + }, + { + name: "unordered", + content: raw, + offsets: []int{3, 0, 14, 8, 12}, + wantErr: true, + }, + { + name: "empty", + content: raw, + offsets: []int{}, + expected: []int{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lines, err := offsetsToLineNumbers(tt.offsets, tt.content) + if !tt.wantErr { + if !assert.NoError(t, err) { + t.Fatal(err) + } + } else { + if !assert.Error(t, err) { + t.Fatal("error expected") + } + } + assert.Equal(t, tt.expected, lines) + }) + } +} diff --git a/internal/elasticsearch/pipeline/resource.go b/internal/elasticsearch/pipeline/resource.go index 2b7e643692..c7ea1c8b47 100644 --- a/internal/elasticsearch/pipeline/resource.go +++ b/internal/elasticsearch/pipeline/resource.go @@ -21,6 +21,9 @@ type Resource struct { func (p *Resource) FileName() string { pos := strings.LastIndexByte(p.Name, '-') + if pos == -1 { + pos = len(p.Name) + } return p.Name[:pos] + "." + p.Format } From e0e0ea25388d311c37d83af9909954ea95f94ff1 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 17 Nov 2021 19:58:54 +0100 Subject: [PATCH 5/7] Test for Cobertura classes merge --- internal/testrunner/coverageoutput_test.go | 251 +++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 internal/testrunner/coverageoutput_test.go diff --git a/internal/testrunner/coverageoutput_test.go b/internal/testrunner/coverageoutput_test.go new file mode 100644 index 0000000000..3f3131bddf --- /dev/null +++ b/internal/testrunner/coverageoutput_test.go @@ -0,0 +1,251 @@ +package testrunner + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCoberturaCoverage_Merge(t *testing.T) { + tests := []struct { + name string + rhs, lhs, expected CoberturaCoverage + wantErr bool + }{ + { + name: "merge sources", + rhs: CoberturaCoverage{ + Sources: []*CoberturaSource{ + {Path: "/a"}, + {Path: "/c"}, + }, + }, + lhs: CoberturaCoverage{ + Sources: []*CoberturaSource{ + {Path: "/b"}, + {Path: "/c"}, + }, + }, + expected: CoberturaCoverage{ + Sources: []*CoberturaSource{ + {Path: "/a"}, + {Path: "/c"}, + {Path: "/b"}, + }, + }, + }, + { + name: "merge packages and classes", + rhs: CoberturaCoverage{ + Packages: []*CoberturaPackage{ + { + Name: "a", + Classes: []*CoberturaClass{ + {Name: "a.a"}, + {Name: "a.b"}, + }, + }, + { + Name: "b", + Classes: []*CoberturaClass{ + {Name: "b.a"}, + }, + }, + }, + }, + lhs: CoberturaCoverage{ + Packages: []*CoberturaPackage{ + { + Name: "c", + Classes: []*CoberturaClass{ + {Name: "a.a"}, + }, + }, + { + Name: "b", + Classes: []*CoberturaClass{ + {Name: "b.a"}, + {Name: "b.b"}, + }, + }, + }, + }, + expected: CoberturaCoverage{ + Packages: []*CoberturaPackage{ + { + Name: "a", + Classes: []*CoberturaClass{ + {Name: "a.a"}, + {Name: "a.b"}, + }, + }, + { + Name: "b", + Classes: []*CoberturaClass{ + {Name: "b.a"}, + {Name: "b.b"}, + }, + }, + { + Name: "c", + Classes: []*CoberturaClass{ + {Name: "a.a"}, + }, + }, + }, + }, + }, + { + name: "merge methods and lines", + rhs: CoberturaCoverage{ + Packages: []*CoberturaPackage{ + { + Name: "a", + Classes: []*CoberturaClass{ + { + Name: "a.a", + Methods: []*CoberturaMethod{ + { + Name: "foo", + Hits: 2, + Lines: []*CoberturaLine{ + { + Number: 13, + Hits: 2, + }, + }, + }, + { + Name: "bar", + Hits: 1, + Lines: []*CoberturaLine{ + { + Number: 24, + Hits: 1, + }, + }, + }, + }, + Lines: []*CoberturaLine{ + { + Number: 13, + Hits: 2, + }, + { + Number: 24, + Hits: 1, + }, + }, + }, + }, + }, + }, + }, + lhs: CoberturaCoverage{ + Packages: []*CoberturaPackage{ + { + Name: "a", + Classes: []*CoberturaClass{ + { + Name: "a.a", + Methods: []*CoberturaMethod{ + { + Name: "foo", + Hits: 3, + Lines: []*CoberturaLine{ + { + Number: 13, + Hits: 1, + }, + }, + }, + { + Name: "bar", + Hits: 1, + Lines: []*CoberturaLine{ + { + Number: 24, + Hits: 1, + }, + }, + }, + }, + Lines: []*CoberturaLine{ + { + Number: 13, + Hits: 1, + }, + { + Number: 24, + Hits: 2, + }, + }, + }, + }, + }, + }, + }, + expected: CoberturaCoverage{ + LinesCovered: 2, + LinesValid: 2, + Packages: []*CoberturaPackage{ + { + Name: "a", + Classes: []*CoberturaClass{ + { + Name: "a.a", + Methods: []*CoberturaMethod{ + { + Name: "foo", + Hits: 5, + Lines: []*CoberturaLine{ + { + Number: 13, + Hits: 3, + }, + }, + }, + { + Name: "bar", + Hits: 2, + Lines: []*CoberturaLine{ + { + Number: 24, + Hits: 2, + }, + }, + }, + }, + Lines: []*CoberturaLine{ + { + Number: 13, + Hits: 3, + }, + { + Number: 24, + Hits: 2, + }, + }, + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.rhs.Merge(&tt.lhs) + if !tt.wantErr { + if !assert.NoError(t, err) { + t.Fatal(err) + } + } else { + if !assert.Error(t, err) { + t.Fatal("error expected") + } + } + assert.Equal(t, tt.expected, tt.rhs) + }) + } +} From f33d0fa76baeb45467ff7e4bfa14ce7d940de2cd Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 18 Nov 2021 11:46:54 +0100 Subject: [PATCH 6/7] Missing license --- internal/testrunner/coverageoutput_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/testrunner/coverageoutput_test.go b/internal/testrunner/coverageoutput_test.go index 3f3131bddf..d66b770c5a 100644 --- a/internal/testrunner/coverageoutput_test.go +++ b/internal/testrunner/coverageoutput_test.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package testrunner import ( From 627102ce113bc38f7b18151282bc0439ac2c8234 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 22 Dec 2021 14:57:51 +0700 Subject: [PATCH 7/7] Take all processor lines into account --- .../pipeline/coverage/coverage.go | 21 ++++++---- internal/elasticsearch/pipeline/processors.go | 42 +++++++++++++++---- .../elasticsearch/pipeline/processors_test.go | 24 +++++------ internal/testrunner/coverageoutput.go | 17 ++++---- internal/testrunner/coverageoutput_test.go | 34 ++++++++++++--- 5 files changed, 96 insertions(+), 42 deletions(-) diff --git a/internal/elasticsearch/pipeline/coverage/coverage.go b/internal/elasticsearch/pipeline/coverage/coverage.go index 205d229320..e927e5ea67 100644 --- a/internal/elasticsearch/pipeline/coverage/coverage.go +++ b/internal/elasticsearch/pipeline/coverage/coverage.go @@ -104,16 +104,19 @@ func Get(options testrunner.TestOptions, pipelines []pipeline.Resource) (*testru if pstats.Processors[idx].Stats.Count > 0 { covered++ } - line := &testrunner.CoberturaLine{ - Number: srcProc.Line, - Hits: pstats.Processors[idx].Stats.Count, + method := testrunner.CoberturaMethod{ + Name: srcProc.Type, + Hits: pstats.Processors[idx].Stats.Count, } - classCov.Methods = append(classCov.Methods, &testrunner.CoberturaMethod{ - Name: srcProc.Type, - Lines: []*testrunner.CoberturaLine{line}, - Hits: pstats.Processors[idx].Stats.Count, - }) - classCov.Lines = append(classCov.Lines, line) + for num := srcProc.FirstLine; num <= srcProc.LastLine; num++ { + line := &testrunner.CoberturaLine{ + Number: num, + Hits: pstats.Processors[idx].Stats.Count, + } + classCov.Lines = append(classCov.Lines, line) + method.Lines = append(method.Lines, line) + } + classCov.Methods = append(classCov.Methods, &method) } pkg.Classes = append(pkg.Classes, &classCov) coverage.LinesValid += int64(len(src)) diff --git a/internal/elasticsearch/pipeline/processors.go b/internal/elasticsearch/pipeline/processors.go index a3f9329556..7801ca26b8 100644 --- a/internal/elasticsearch/pipeline/processors.go +++ b/internal/elasticsearch/pipeline/processors.go @@ -18,7 +18,8 @@ type Processor struct { Type string `yaml:"-"` Tag string Description string - Line int `yaml:"-"` + FirstLine int `yaml:"-"` + LastLine int `yaml:"-"` } func (p Resource) Processors() (procs []Processor, err error) { @@ -33,6 +34,19 @@ func (p Resource) Processors() (procs []Processor, err error) { return procs, errors.Wrapf(err, "failure processing %s pipeline '%s'", p.Format, p.FileName()) } +func latestLine(node *yaml.Node) int { + if node == nil { + return 0 + } + last := node.Line + for _, inner := range node.Content { + if line := latestLine(inner); line > last { + last = line + } + } + return last +} + func ProcessorsFromYAMLPipeline(content []byte) (procs []Processor, err error) { type pipeline struct { Processors []yaml.Node @@ -52,7 +66,8 @@ func ProcessorsFromYAMLPipeline(content []byte) (procs []Processor, err error) { if err := entry.Content[0].Decode(&proc.Type); err != nil { return nil, errors.Wrapf(err, "error decoding processor#%d type", idx) } - proc.Line = entry.Line + proc.FirstLine = entry.Line + proc.LastLine = latestLine(&entry) procs = append(procs, proc) } return procs, nil @@ -85,7 +100,7 @@ func (s *tokenStack) Top() json.Token { if n := len(*s); n > 0 { return (*s)[n-1] } - return io.EOF // ??? + return io.EOF } func (s *tokenStack) TopIsString() bool { @@ -112,7 +127,7 @@ var processorJSONPath = tokenStack{json.Delim('{'), "processors", json.Delim('[' func ProcessorsFromJSONPipeline(content []byte) (procs []Processor, err error) { var processors []string - var offsets []int + var startOff, endOff []int decoder := json.NewDecoder(bytes.NewReader(content)) var stack tokenStack @@ -131,6 +146,9 @@ func ProcessorsFromJSONPipeline(content []byte) (procs []Processor, err error) { if stack.TopIsString() { stack.Pop() } + if stack.Equals(processorJSONPath) { + endOff = append(endOff, off) + } continue } if !isDelim && stack.TopIsString() { @@ -139,19 +157,27 @@ func ProcessorsFromJSONPipeline(content []byte) (procs []Processor, err error) { } if str, ok := tk.(string); ok && stack.Equals(processorJSONPath) { processors = append(processors, str) - offsets = append(offsets, off) + startOff = append(startOff, off) } stack.Push(tk) } - lines, err := offsetsToLineNumbers(offsets, content) + if len(processors) != len(endOff) || len(processors) != len(startOff) { + return nil, errors.New("malformed JSON") + } + startLines, err := offsetsToLineNumbers(startOff, content) + if err != nil { + return nil, err + } + endLines, err := offsetsToLineNumbers(endOff, content) if err != nil { return nil, err } for idx, proc := range processors { procs = append(procs, Processor{ - Type: proc, - Line: lines[idx], + Type: proc, + FirstLine: startLines[idx], + LastLine: endLines[idx], }) } return procs, nil diff --git a/internal/elasticsearch/pipeline/processors_test.go b/internal/elasticsearch/pipeline/processors_test.go index 80eeeb9bcb..bc21707fa9 100644 --- a/internal/elasticsearch/pipeline/processors_test.go +++ b/internal/elasticsearch/pipeline/processors_test.go @@ -73,12 +73,12 @@ on_failure: value: '{{ _ingest.on_failure_message }}' `), expected: []Processor{ - {Type: "grok", Line: 5, Tag: "Extract header"}, - {Type: "date", Line: 18}, - {Type: "set", Line: 28, Description: "Set event category"}, - {Type: "script", Line: 33}, - {Type: "grok", Line: 38}, - {Type: "rename", Line: 43}, + {Type: "grok", FirstLine: 5, LastLine: 16, Tag: "Extract header"}, + {Type: "date", FirstLine: 18, LastLine: 27}, + {Type: "set", FirstLine: 28, LastLine: 31, Description: "Set event category"}, + {Type: "script", FirstLine: 33, LastLine: 35}, + {Type: "grok", FirstLine: 38, LastLine: 42}, + {Type: "rename", FirstLine: 43, LastLine: 46}, }, }, { @@ -108,11 +108,11 @@ on_failure: } `), expected: []Processor{ - {Type: "drop", Line: 3}, - {Type: "set", Line: 5}, - {Type: "remove", Line: 9}, - {Type: "set", Line: 9}, - {Type: "set", Line: 11}, + {Type: "drop", FirstLine: 3, LastLine: 3}, + {Type: "set", FirstLine: 5, LastLine: 8}, + {Type: "remove", FirstLine: 9, LastLine: 9}, + {Type: "set", FirstLine: 9, LastLine: 9}, + {Type: "set", FirstLine: 11, LastLine: 14}, }, }, { @@ -132,7 +132,7 @@ on_failure: format: "json", content: []byte(`{"processors": [{"drop": {}}]}`), expected: []Processor{ - {Type: "drop", Line: 1}, + {Type: "drop", FirstLine: 1, LastLine: 1}, }, }, { diff --git a/internal/testrunner/coverageoutput.go b/internal/testrunner/coverageoutput.go index 756c43a564..abec1d31ed 100644 --- a/internal/testrunner/coverageoutput.go +++ b/internal/testrunner/coverageoutput.go @@ -135,14 +135,13 @@ func (c *CoberturaClass) Merge(b *CoberturaClass) error { equal := c.Name == b.Name && c.Filename == b.Filename && len(c.Lines) == len(b.Lines) && - len(c.Methods) == len(b.Methods) && - len(c.Lines) == len(c.Methods) + len(c.Methods) == len(b.Methods) for idx := range c.Lines { - equal = equal && c.Lines[idx].Number == b.Lines[idx].Number && - c.Methods[idx].Name == b.Methods[idx].Name && - len(c.Methods[idx].Lines) == 1 && - len(b.Methods[idx].Lines) == 1 && - c.Methods[idx].Lines[0].Number == b.Methods[idx].Lines[0].Number + equal = equal && c.Lines[idx].Number == b.Lines[idx].Number + } + for idx := range c.Methods { + equal = equal && c.Methods[idx].Name == b.Methods[idx].Name && + len(c.Methods[idx].Lines) == len(b.Methods[idx].Lines) } if !equal { return errors.Errorf("merging incompatible classes: %+v != %+v", *c, *b) @@ -150,7 +149,9 @@ func (c *CoberturaClass) Merge(b *CoberturaClass) error { // Update methods for idx := range b.Methods { c.Methods[idx].Hits += b.Methods[idx].Hits - c.Methods[idx].Lines[0].Hits += b.Methods[idx].Lines[0].Hits + for l := range b.Methods[idx].Lines { + c.Methods[idx].Lines[l].Hits += b.Methods[idx].Lines[l].Hits + } } // Rebuild lines c.Lines = nil diff --git a/internal/testrunner/coverageoutput_test.go b/internal/testrunner/coverageoutput_test.go index d66b770c5a..799fa6c481 100644 --- a/internal/testrunner/coverageoutput_test.go +++ b/internal/testrunner/coverageoutput_test.go @@ -117,6 +117,10 @@ func TestCoberturaCoverage_Merge(t *testing.T) { Number: 13, Hits: 2, }, + { + Number: 14, + Hits: 2, + }, }, }, { @@ -135,6 +139,10 @@ func TestCoberturaCoverage_Merge(t *testing.T) { Number: 13, Hits: 2, }, + { + Number: 14, + Hits: 2, + }, { Number: 24, Hits: 1, @@ -155,12 +163,16 @@ func TestCoberturaCoverage_Merge(t *testing.T) { Methods: []*CoberturaMethod{ { Name: "foo", - Hits: 3, + Hits: 1, Lines: []*CoberturaLine{ { Number: 13, Hits: 1, }, + { + Number: 14, + Hits: 1, + }, }, }, { @@ -179,9 +191,13 @@ func TestCoberturaCoverage_Merge(t *testing.T) { Number: 13, Hits: 1, }, + { + Number: 14, + Hits: 1, + }, { Number: 24, - Hits: 2, + Hits: 1, }, }, }, @@ -190,8 +206,8 @@ func TestCoberturaCoverage_Merge(t *testing.T) { }, }, expected: CoberturaCoverage{ - LinesCovered: 2, - LinesValid: 2, + LinesCovered: 3, + LinesValid: 3, Packages: []*CoberturaPackage{ { Name: "a", @@ -201,12 +217,16 @@ func TestCoberturaCoverage_Merge(t *testing.T) { Methods: []*CoberturaMethod{ { Name: "foo", - Hits: 5, + Hits: 3, Lines: []*CoberturaLine{ { Number: 13, Hits: 3, }, + { + Number: 14, + Hits: 3, + }, }, }, { @@ -225,6 +245,10 @@ func TestCoberturaCoverage_Merge(t *testing.T) { Number: 13, Hits: 3, }, + { + Number: 14, + Hits: 3, + }, { Number: 24, Hits: 2,