Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions cmd/testrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,6 @@ func testTypeCommandActionFactory(runner testrunner.TestRunner) cobraext.Command
if err != nil {
return cobraext.FlagParsingError(err, cobraext.DataStreamsFlagName)
}

if testCoverage && len(dataStreams) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition was placed here to prevent coverage miscalculation. The runner is not aware of data stream presence (or being skipped) so it always assumes that all data streams have been selected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that. But given that (afaik) the CI never tests a single datastream, it would be very useful for pipeline developers to be able to run tests for a single datastream locally and get coverage results.

return cobraext.FlagParsingError(errors.New("test coverage can be calculated only if all data streams are selected"), cobraext.DataStreamsFlagName)
}
}

if runner.TestFolderRequired() {
Expand Down Expand Up @@ -203,6 +199,7 @@ func testTypeCommandActionFactory(runner testrunner.TestRunner) cobraext.Command
API: esClient.API,
DeferCleanup: deferCleanup,
ServiceVariant: variantFlag,
WithCoverage: testCoverage,
})

results = append(results, r...)
Expand Down
144 changes: 144 additions & 0 deletions internal/elasticsearch/node_stats/stats_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package node_stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename it to nodestats.


import (
"encoding/json"
"io"
"strings"

"github.com/pkg/errors"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/elasticsearch/pipeline"
)

// StatsRecord contains stats for a measurable entity (pipeline, processor, etc.)
type StatsRecord struct {
Count, Current, Failed int64
TimeInMillis int64 `json:"time_in_millis"`
}

// ProcessorStats contains a processor's stats and some metadata.
type ProcessorStats struct {
Type, Extra string
Conditional bool
Stats StatsRecord
}

// PipelineStats contains stats for a pipeline.
type PipelineStats struct {
StatsRecord
Processors []ProcessorStats
}

// PipelineStatsMap holds the stats for a set of pipelines.
type PipelineStatsMap map[string]*PipelineStats

type wrappedProcessor map[string]ProcessorStats

func (p wrappedProcessor) extract() (stats ProcessorStats, err error) {
if len(p) != 1 {
return stats, errors.Errorf("can't extract processor stats. Need a single processor, got %d: %+v", len(p), p)
}
for k, v := range p {
stats = v
if off := strings.Index(k, ":"); off != -1 {
stats.Extra = k[off+1:]
k = k[:off]
}
switch v.Type {
case "conditional":
stats.Conditional = true
case k:
default:
return stats, errors.Errorf("can't understand processor identifier '%s' in %+v", k, p)
}
stats.Type = k
}
return stats, nil
}

type pipelineStatsRecord struct {
StatsRecord
Processors []wrappedProcessor
}

type pipelineStatsRecordMap map[string]pipelineStatsRecord

func (r pipelineStatsRecord) extract() (stats *PipelineStats, err error) {
stats = &PipelineStats{
StatsRecord: r.StatsRecord,
Processors: make([]ProcessorStats, len(r.Processors)),
}
for idx, wrapped := range r.Processors {
if stats.Processors[idx], err = wrapped.extract(); err != nil {
return stats, errors.Wrapf(err, "converting processor %d", idx)
}
}
return stats, nil
}

type pipelinesStatsNode struct {
Ingest struct {
Pipelines pipelineStatsRecordMap
}
}

type pipelinesStatsResponse struct {
Nodes map[string]pipelinesStatsNode
}

func GetPipelineStats(esClient *elasticsearch.API, pipelines []pipeline.Resource) (stats PipelineStatsMap, err error) {
statsReq := esClient.Nodes.Stats.WithFilterPath("nodes.*.ingest.pipelines")
resp, err := esClient.Nodes.Stats(statsReq)
if err != nil {
return nil, errors.Wrapf(err, "Node Stats API call failed")
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "failed to read Stats API response body")
}

if resp.StatusCode != 200 {
return nil, errors.Wrapf(elasticsearch.NewError(body), "unexpected response status for Node Stats (%d): %s", resp.StatusCode, resp.Status())
}

var statsResponse pipelinesStatsResponse
if err = json.Unmarshal(body, &statsResponse); err != nil {
return nil, errors.Wrap(err, "error decoding Node Stats response")
}

if nodeCount := len(statsResponse.Nodes); nodeCount != 1 {
return nil, errors.Errorf("more than 1 ES node in stats response (%d)", nodeCount)
}
var nodePipelines map[string]pipelineStatsRecord
for _, node := range statsResponse.Nodes {
nodePipelines = node.Ingest.Pipelines
}
stats = make(PipelineStatsMap, len(pipelines))
for _, requested := range pipelines {
for pName, pStats := range nodePipelines {
if requested.Name == pName {
if stats[pName], err = pStats.extract(); err != nil {
return stats, errors.Wrapf(err, "converting pipeline %s", pName)
}
}
}
}
if len(stats) != len(pipelines) {
var missing []string
for _, requested := range pipelines {
if _, found := stats[requested.Name]; !found {
missing = append(missing, requested.Name)
}
}
return stats, errors.Wrapf(err, "Node Stats response is missing some expected pipelines: %v", missing)
}

return stats, nil
}
126 changes: 126 additions & 0 deletions internal/elasticsearch/pipeline/coverage/coverage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package coverage

import (
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/elastic-package/internal/elasticsearch/node_stats"
"github.com/elastic/elastic-package/internal/elasticsearch/pipeline"
"github.com/elastic/elastic-package/internal/packages"
"github.com/elastic/elastic-package/internal/testrunner"
)

// Get returns a coverage report for the provided set of ingest pipelines.
func Get(options testrunner.TestOptions, pipelines []pipeline.Resource) (*testrunner.CoberturaCoverage, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a bit of spaghetti code that has to be untangled first. Code coverage isn't the responsibility of Elasticsearch client or Pipeline, but test runners and it should stay there.

I suggest refactoring (moving around) this dependency first, then we can continue reviewing as this PR is relatively big.

packagePath, err := packages.MustFindPackageRoot()
if err != nil {
return nil, errors.Wrap(err, "error finding package root")
}

dataStreamPath, found, err := packages.FindDataStreamRootForPath(options.TestFolder.Path)
if err != nil {
return nil, errors.Wrap(err, "locating data_stream root failed")
}
if !found {
return nil, errors.New("data stream root not found")
}

// Use the Node Stats API to get stats for all installed pipelines.
// These stats contain hit counts for all main processors in a pipeline.
stats, err := node_stats.GetPipelineStats(options.API, pipelines)
if err != nil {
return nil, errors.Wrap(err, "error fetching pipeline stats for code coverage calculations")
}

// Construct the Cobertura report.
pkg := &testrunner.CoberturaPackage{
Name: options.TestFolder.Package + "." + options.TestFolder.DataStream,
}

coverage := &testrunner.CoberturaCoverage{
Sources: []*testrunner.CoberturaSource{
{
Path: packagePath,
},
},
Packages: []*testrunner.CoberturaPackage{pkg},
Timestamp: time.Now().UnixNano(),
}

for _, p := range pipelines {
// Load the list of main processors from the pipeline source code, annotated with line numbers.
src, err := p.Processors()
if err != nil {
return nil, err
}

pstats := stats[p.Name]
if pstats == nil {
return nil, errors.Errorf("pipeline '%s' not installed in Elasticsearch", p.Name)
}

// Ensure there is no inconsistency in the list of processors in stats vs obtained from source.
if len(src) != len(pstats.Processors) {
return nil, errors.Errorf("processor count mismatch for %s (src:%d stats:%d)", p.FileName(), len(src), len(pstats.Processors))
}
for idx, st := range pstats.Processors {
// Elasticsearch will return a `compound` processor in the case of `foreach` and
// any processor that defines `on_failure` processors.
if st.Type == "compound" {
continue
}
if st.Type != src[idx].Type {
return nil, errors.Errorf("processor type mismatch for %s processor %d (src:%s stats:%s)", p.FileName(), idx, src[idx].Type, st.Type)
}
}
// Tests install pipelines as `filename-<nonce>` (without original extension).
// Use the filename part for the report.
pipelineName := p.Name
if nameEnd := strings.LastIndexByte(pipelineName, '-'); nameEnd != -1 {
pipelineName = pipelineName[:nameEnd]
}
// File path has to be relative to the packagePath added to the cobertura Sources list.
pipelinePath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline", p.FileName())
pipelineRelPath, err := filepath.Rel(packagePath, pipelinePath)
if err != nil {
return nil, errors.Wrapf(err, "cannot create relative path to pipeline file. Package root: '%s', pipeline path: '%s'", packagePath, pipelinePath)
}
// Report every pipeline as a "class".
classCov := testrunner.CoberturaClass{
Name: pipelineName,
Filename: pipelineRelPath,
}

// Calculate covered and total processors (reported as both lines and methods).
covered := 0
for idx, srcProc := range src {
if pstats.Processors[idx].Stats.Count > 0 {
covered++
}
method := testrunner.CoberturaMethod{
Name: srcProc.Type,
Hits: pstats.Processors[idx].Stats.Count,
}
for num := srcProc.FirstLine; num <= srcProc.LastLine; num++ {
line := &testrunner.CoberturaLine{
Number: num,
Hits: pstats.Processors[idx].Stats.Count,
}
classCov.Lines = append(classCov.Lines, line)
method.Lines = append(method.Lines, line)
}
classCov.Methods = append(classCov.Methods, &method)
}
pkg.Classes = append(pkg.Classes, &classCov)
coverage.LinesValid += int64(len(src))
coverage.LinesCovered += int64(covered)
}
return coverage, nil
}
Loading