From e9cc0c0cc7c9736e83354d3ad14a440068c3f8a0 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Sat, 29 Jun 2019 12:30:29 +0300 Subject: [PATCH 01/19] Add output option for csv format --- cmd/collectors.go | 4 ++ stats/csv/collector.go | 145 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 stats/csv/collector.go diff --git a/cmd/collectors.go b/cmd/collectors.go index 93979b9bb64..cc5d36efa63 100644 --- a/cmd/collectors.go +++ b/cmd/collectors.go @@ -30,6 +30,7 @@ import ( "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/consts" "github.com/loadimpact/k6/stats/cloud" + "github.com/loadimpact/k6/stats/csv" "github.com/loadimpact/k6/stats/datadog" "github.com/loadimpact/k6/stats/influxdb" jsonc "github.com/loadimpact/k6/stats/json" @@ -47,6 +48,7 @@ const ( collectorCloud = "cloud" collectorStatsD = "statsd" collectorDatadog = "datadog" + collectorCSV = "csv" ) func parseCollector(s string) (t, arg string) { @@ -111,6 +113,8 @@ func newCollector(collectorName, arg string, src *lib.SourceData, conf Config) ( return nil, err } return datadog.New(config) + case collectorCSV: + return csv.New(afero.NewOsFs(), arg, conf.SystemTags) default: return nil, errors.Errorf("unknown output type: %s", collectorName) } diff --git a/stats/csv/collector.go b/stats/csv/collector.go new file mode 100644 index 00000000000..f6198071630 --- /dev/null +++ b/stats/csv/collector.go @@ -0,0 +1,145 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2016 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package csv + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "os" + + "github.com/loadimpact/k6/lib" + "github.com/loadimpact/k6/stats" + log "github.com/sirupsen/logrus" + "github.com/spf13/afero" +) + +type Collector struct { + outfile io.WriteCloser + fname string + restags []string + header bool +} + +// Verify that Collector implements lib.Collector +var _ lib.Collector = &Collector{} + +// Similar to ioutil.NopCloser, but for writers +type nopCloser struct { + io.Writer +} + +func (nopCloser) Close() error { return nil } + +func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { + if fname == "" || fname == "-" { + return &Collector{ + outfile: nopCloser{os.Stdout}, + fname: "-", + }, nil + } + + logfile, err := fs.Create(fname) + if err != nil { + return nil, err + } + + restags := []string{} + for tag, flag := range tags { + if flag { + restags = append(restags, tag) + } + } + + return &Collector{ + outfile: logfile, + fname: fname, + restags: restags, + header: true, + }, nil +} + +func (c *Collector) Init() error { + return nil +} + +func (c *Collector) SetRunStatus(status lib.RunStatus) {} + +func (c *Collector) Run(ctx context.Context) { + log.WithField("filename", c.fname).Debug("CSV: Writing CSV metrics") + <-ctx.Done() + _ = c.outfile.Close() +} + +func (c *Collector) Collect(scs []stats.SampleContainer) { + if c.header { + header := MakeHeader(c.restags) + c.WriteToCSV(header) + c.header = false + } + for _, sc := range scs { + for _, sample := range sc.GetSamples() { + row := SampleToRow(&sample, c.restags) + c.WriteToCSV(row) + } + } +} + +func (c *Collector) Link() string { + return "" +} + +func (c *Collector) WriteToCSV(row []string) { + writer := csv.NewWriter(c.outfile) + defer writer.Flush() + err := writer.Write(row) + if err != nil { + log.WithField("filename", c.fname).Error("CSV: Error writing to file") + } +} + +func MakeHeader(tags []string) []string { + return append([]string{"metric_name", "timestamp", "metric_value"}, tags...) +} + +func SampleToRow(sample *stats.Sample, restags []string) []string { + if sample == nil { + return nil + } + + row := []string{} + row = append(row, sample.Metric.Name) + row = append(row, fmt.Sprintf("%d", sample.Time.Unix())) + row = append(row, fmt.Sprintf("%f", sample.Value)) + sample_tags := sample.Tags.CloneTags() + + for _, tag := range restags { + row = append(row, sample_tags[tag]) + } + + return row +} + +// GetRequiredSystemTags returns which sample tags are needed by this collector +func (c *Collector) GetRequiredSystemTags() lib.TagSet { + return lib.TagSet{} // There are no required tags for this collector +} From 63ede6a78f6bc83652f81be9ee993bb1eb942d7e Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Sat, 29 Jun 2019 12:52:02 +0300 Subject: [PATCH 02/19] Fix naming, add comments, fix writing column names --- stats/csv/collector.go | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index f6198071630..690c35942f8 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -33,11 +33,11 @@ import ( "github.com/spf13/afero" ) +// Collector saving output to csv implements the lib.Collector interface type Collector struct { outfile io.WriteCloser fname string - restags []string - header bool + resTags []string } // Verify that Collector implements lib.Collector @@ -50,6 +50,7 @@ type nopCloser struct { func (nopCloser) Close() error { return nil } +// New Creates new instance of CSV collector func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { if fname == "" || fname == "-" { return &Collector{ @@ -63,51 +64,53 @@ func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { return nil, err } - restags := []string{} + resTags := []string{} for tag, flag := range tags { if flag { - restags = append(restags, tag) + resTags = append(resTags, tag) } } return &Collector{ outfile: logfile, fname: fname, - restags: restags, - header: true, + resTags: resTags, }, nil } +// Init writes column names to csv file func (c *Collector) Init() error { + header := MakeHeader(c.resTags) + c.WriteToCSV(header) return nil } +// SetRunStatus does nothing func (c *Collector) SetRunStatus(status lib.RunStatus) {} +// Run just blocks until the context is done func (c *Collector) Run(ctx context.Context) { log.WithField("filename", c.fname).Debug("CSV: Writing CSV metrics") <-ctx.Done() _ = c.outfile.Close() } +// Collect Writes samples to the csv file func (c *Collector) Collect(scs []stats.SampleContainer) { - if c.header { - header := MakeHeader(c.restags) - c.WriteToCSV(header) - c.header = false - } for _, sc := range scs { for _, sample := range sc.GetSamples() { - row := SampleToRow(&sample, c.restags) + row := SampleToRow(&sample, c.resTags) c.WriteToCSV(row) } } } +// Link returns a dummy string, it's only included to satisfy the lib.Collector interface func (c *Collector) Link() string { return "" } +// WriteToCSV writes row to csv file func (c *Collector) WriteToCSV(row []string) { writer := csv.NewWriter(c.outfile) defer writer.Flush() @@ -117,11 +120,13 @@ func (c *Collector) WriteToCSV(row []string) { } } +// MakeHeader creates list of column names for csv file func MakeHeader(tags []string) []string { return append([]string{"metric_name", "timestamp", "metric_value"}, tags...) } -func SampleToRow(sample *stats.Sample, restags []string) []string { +// SampleToRow converts sample into array of strings +func SampleToRow(sample *stats.Sample, resTags []string) []string { if sample == nil { return nil } @@ -130,10 +135,10 @@ func SampleToRow(sample *stats.Sample, restags []string) []string { row = append(row, sample.Metric.Name) row = append(row, fmt.Sprintf("%d", sample.Time.Unix())) row = append(row, fmt.Sprintf("%f", sample.Value)) - sample_tags := sample.Tags.CloneTags() + sampleTags := sample.Tags.CloneTags() - for _, tag := range restags { - row = append(row, sample_tags[tag]) + for _, tag := range resTags { + row = append(row, sampleTags[tag]) } return row From d881ca74662c287b82518c351e65780d1dddfcd5 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Thu, 4 Jul 2019 11:05:21 +0300 Subject: [PATCH 03/19] Save extra tags, flush writer after multiple rows --- stats/csv/collector.go | 101 +++++++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 29 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 690c35942f8..76a81146807 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -21,6 +21,7 @@ package csv import ( + "bytes" "context" "encoding/csv" "fmt" @@ -35,9 +36,11 @@ import ( // Collector saving output to csv implements the lib.Collector interface type Collector struct { - outfile io.WriteCloser - fname string - resTags []string + outfile io.WriteCloser + fname string + resTags []string + ignoredTags []string + csvWriter *csv.Writer } // Verify that Collector implements lib.Collector @@ -52,10 +55,24 @@ func (nopCloser) Close() error { return nil } // New Creates new instance of CSV collector func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { + resTags := []string{} + ignoredTags := []string{} + for tag, flag := range tags { + if flag { + resTags = append(resTags, tag) + } else { + ignoredTags = append(ignoredTags, tag) + } + } + if fname == "" || fname == "-" { + logfile := nopCloser{os.Stdout} return &Collector{ - outfile: nopCloser{os.Stdout}, - fname: "-", + outfile: logfile, + fname: "-", + resTags: resTags, + ignoredTags: ignoredTags, + csvWriter: csv.NewWriter(logfile), }, nil } @@ -64,24 +81,23 @@ func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { return nil, err } - resTags := []string{} - for tag, flag := range tags { - if flag { - resTags = append(resTags, tag) - } - } - return &Collector{ - outfile: logfile, - fname: fname, - resTags: resTags, + outfile: logfile, + fname: fname, + resTags: resTags, + ignoredTags: ignoredTags, + csvWriter: csv.NewWriter(logfile), }, nil } // Init writes column names to csv file func (c *Collector) Init() error { header := MakeHeader(c.resTags) - c.WriteToCSV(header) + err := c.csvWriter.Write(header) + if err != nil { + log.WithField("filename", c.fname).Error("CSV: Error writing column names to file") + } + c.csvWriter.Flush() return nil } @@ -92,6 +108,7 @@ func (c *Collector) SetRunStatus(status lib.RunStatus) {} func (c *Collector) Run(ctx context.Context) { log.WithField("filename", c.fname).Debug("CSV: Writing CSV metrics") <-ctx.Done() + c.csvWriter.Flush() _ = c.outfile.Close() } @@ -99,10 +116,14 @@ func (c *Collector) Run(ctx context.Context) { func (c *Collector) Collect(scs []stats.SampleContainer) { for _, sc := range scs { for _, sample := range sc.GetSamples() { - row := SampleToRow(&sample, c.resTags) - c.WriteToCSV(row) + row := SampleToRow(&sample, c.resTags, c.ignoredTags) + err := c.csvWriter.Write(row) + if err != nil { + log.WithField("filename", c.fname).Error("CSV: Error writing to file") + } } } + c.csvWriter.Flush() } // Link returns a dummy string, it's only included to satisfy the lib.Collector interface @@ -110,23 +131,14 @@ func (c *Collector) Link() string { return "" } -// WriteToCSV writes row to csv file -func (c *Collector) WriteToCSV(row []string) { - writer := csv.NewWriter(c.outfile) - defer writer.Flush() - err := writer.Write(row) - if err != nil { - log.WithField("filename", c.fname).Error("CSV: Error writing to file") - } -} - // MakeHeader creates list of column names for csv file func MakeHeader(tags []string) []string { + tags = append(tags, "extra_tags") return append([]string{"metric_name", "timestamp", "metric_value"}, tags...) } // SampleToRow converts sample into array of strings -func SampleToRow(sample *stats.Sample, resTags []string) []string { +func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string) []string { if sample == nil { return nil } @@ -141,6 +153,37 @@ func SampleToRow(sample *stats.Sample, resTags []string) []string { row = append(row, sampleTags[tag]) } + extraTags := bytes.Buffer{} + prev := false + for tag, val := range sampleTags { + extra := true + for _, resTag := range resTags { + if tag == resTag { + extra = false + break + } + } + if extra { + for _, ignoredTag := range ignoredTags { + if tag == ignoredTag { + extra = false + break + } + } + } + + if extra { + if prev { + extraTags.WriteString("&") + } + extraTags.WriteString(tag) + extraTags.WriteString("=") + extraTags.WriteString(val) + prev = true + } + } + row = append(row, extraTags.String()) + return row } From f55b4ebf581130975d3c13c82b80851b47081150 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Fri, 5 Jul 2019 16:00:35 +0300 Subject: [PATCH 04/19] Add test for csv collector --- stats/csv/collector_test.go | 153 ++++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 stats/csv/collector_test.go diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go new file mode 100644 index 00000000000..337c157816c --- /dev/null +++ b/stats/csv/collector_test.go @@ -0,0 +1,153 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2016 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package csv + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/loadimpact/k6/stats" + + "github.com/loadimpact/k6/lib" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" +) + +func TestMakeHeader(t *testing.T) { + testdata := map[string][]string{ + "One tag": []string{ + "tag1", + }, + "Two tags": []string{ + "tag1", "tag2", + }, + } + + for testname, tags := range testdata { + t.Run(testname, func(t *testing.T) { + header := MakeHeader(tags) + assert.Equal(t, len(tags)+4, len(header)) + assert.Equal(t, "metric_name", header[0]) + assert.Equal(t, "timestamp", header[1]) + assert.Equal(t, "metric_value", header[2]) + assert.Equal(t, "extra_tags", header[len(header)-1]) + }) + } +} + +func TestSampleToRow(t *testing.T) { + testSamples := []stats.Sample{ + stats.Sample{ + Time: time.Now(), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + }), + }, + stats.Sample{ + Time: time.Now(), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + "tag4": "val4", + "tag5": "val5", + }), + }, + } + + enabledTags := map[string][][]string{ + "One tag": [][]string{ + []string{"tag1"}, + []string{"tag2"}, + }, + "Two tags": [][]string{ + []string{"tag1", "tag2"}, + []string{}, + }, + "Two tags, one ignored": [][]string{ + []string{"tag1", "tag2"}, + []string{"tag3"}, + }, + } + + for testname, tags := range enabledTags { + for _, sample := range testSamples { + t.Run(testname, func(t *testing.T) { + row := SampleToRow(&sample, tags[0], tags[1]) + assert.Equal(t, len(tags[0])+4, len(row)) + for _, tag := range tags[1] { + assert.False(t, strings.Contains(row[len(row)-1], tag)) + } + }) + } + } +} + +func TestCollect(t *testing.T) { + testSamples := []stats.SampleContainer{ + stats.Sample{ + Time: time.Unix(1562324643, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + }), + }, + stats.Sample{ + Time: time.Unix(1562324644, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + "tag4": "val4", + }), + }, + } + + t.Run("Collect", func(t *testing.T) { + mem := afero.NewMemMapFs() + collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + assert.NoError(t, err) + assert.NotNil(t, collector) + + err = collector.Init() + assert.NoError(t, err) + + collector.Collect(testSamples) + csvbytes, _ := afero.ReadFile(mem, "path") + csvstr := fmt.Sprintf("%s", csvbytes) + assert.Equal(t, + "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\nmy_metric,1562324643,1.000000,val1,val3,\nmy_metric,1562324644,1.000000,val1,val3,tag4=val4\n", + csvstr) + }) +} From 0dc9707fa2498cb16e5ea733cf8816bdd4fe9395 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Fri, 5 Jul 2019 16:50:24 +0300 Subject: [PATCH 05/19] Write data to buffer to avoid blocks on slow disk operations --- stats/csv/collector.go | 56 +++++++++++++++++----- stats/csv/collector_test.go | 96 ++++++++++++++++++++++++++++++------- 2 files changed, 125 insertions(+), 27 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 76a81146807..6f07b0b7c29 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -27,6 +27,8 @@ import ( "fmt" "io" "os" + "sync" + "time" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/stats" @@ -34,6 +36,10 @@ import ( "github.com/spf13/afero" ) +const ( + saveInterval = 1 * time.Second +) + // Collector saving output to csv implements the lib.Collector interface type Collector struct { outfile io.WriteCloser @@ -41,6 +47,9 @@ type Collector struct { resTags []string ignoredTags []string csvWriter *csv.Writer + csvLock sync.Mutex + buffer []stats.Sample + bufferLock sync.Mutex } // Verify that Collector implements lib.Collector @@ -106,24 +115,49 @@ func (c *Collector) SetRunStatus(status lib.RunStatus) {} // Run just blocks until the context is done func (c *Collector) Run(ctx context.Context) { - log.WithField("filename", c.fname).Debug("CSV: Writing CSV metrics") - <-ctx.Done() - c.csvWriter.Flush() - _ = c.outfile.Close() + ticker := time.NewTicker(saveInterval) + for { + select { + case <-ticker.C: + c.WriteToFile() + case <-ctx.Done(): + c.WriteToFile() + c.outfile.Close() + return + } + } } -// Collect Writes samples to the csv file +// Collect Saves samples to buffer func (c *Collector) Collect(scs []stats.SampleContainer) { + c.bufferLock.Lock() + defer c.bufferLock.Unlock() for _, sc := range scs { - for _, sample := range sc.GetSamples() { - row := SampleToRow(&sample, c.resTags, c.ignoredTags) - err := c.csvWriter.Write(row) - if err != nil { - log.WithField("filename", c.fname).Error("CSV: Error writing to file") + c.buffer = append(c.buffer, sc.GetSamples()...) + } +} + +// WriteToFile Writes samples to the csv file +func (c *Collector) WriteToFile() { + c.bufferLock.Lock() + samples := c.buffer + c.buffer = nil + c.bufferLock.Unlock() + + if len(samples) > 0 { + c.csvLock.Lock() + defer c.csvLock.Unlock() + for _, sc := range samples { + for _, sample := range sc.GetSamples() { + row := SampleToRow(&sample, c.resTags, c.ignoredTags) + err := c.csvWriter.Write(row) + if err != nil { + log.WithField("filename", c.fname).Error("CSV: Error writing to file") + } } } + c.csvWriter.Flush() } - c.csvWriter.Flush() } // Link returns a dummy string, it's only included to satisfy the lib.Collector interface diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 337c157816c..c1cc2679801 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -21,8 +21,10 @@ package csv import ( + "context" "fmt" "strings" + "sync" "testing" "time" @@ -35,10 +37,10 @@ import ( func TestMakeHeader(t *testing.T) { testdata := map[string][]string{ - "One tag": []string{ + "One tag": { "tag1", }, - "Two tags": []string{ + "Two tags": { "tag1", "tag2", }, } @@ -57,7 +59,7 @@ func TestMakeHeader(t *testing.T) { func TestSampleToRow(t *testing.T) { testSamples := []stats.Sample{ - stats.Sample{ + { Time: time.Now(), Metric: stats.New("my_metric", stats.Gauge), Value: 1, @@ -67,7 +69,7 @@ func TestSampleToRow(t *testing.T) { "tag3": "val3", }), }, - stats.Sample{ + { Time: time.Now(), Metric: stats.New("my_metric", stats.Gauge), Value: 1, @@ -82,17 +84,17 @@ func TestSampleToRow(t *testing.T) { } enabledTags := map[string][][]string{ - "One tag": [][]string{ - []string{"tag1"}, - []string{"tag2"}, + "One tag": { + {"tag1"}, + {"tag2"}, }, - "Two tags": [][]string{ - []string{"tag1", "tag2"}, - []string{}, + "Two tags": { + {"tag1", "tag2"}, + {}, }, - "Two tags, one ignored": [][]string{ - []string{"tag1", "tag2"}, - []string{"tag3"}, + "Two tags, one ignored": { + {"tag1", "tag2"}, + {"tag3"}, }, } @@ -108,7 +110,6 @@ func TestSampleToRow(t *testing.T) { } } } - func TestCollect(t *testing.T) { testSamples := []stats.SampleContainer{ stats.Sample{ @@ -133,17 +134,80 @@ func TestCollect(t *testing.T) { }), }, } - t.Run("Collect", func(t *testing.T) { mem := afero.NewMemMapFs() collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) assert.NoError(t, err) assert.NotNil(t, collector) - err = collector.Init() + collector.Collect(testSamples) + + assert.Equal(t, len(testSamples), len(collector.buffer)) + }) +} + +func TestRun(t *testing.T) { + t.Run("Run", func(t *testing.T) { + collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + assert.NoError(t, err) + assert.NotNil(t, collector) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + collector.Init() + collector.Run(ctx) + }() + cancel() + wg.Wait() + }) +} + +func TestRunCollect(t *testing.T) { + testSamples := []stats.SampleContainer{ + stats.Sample{ + Time: time.Unix(1562324643, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + }), + }, + stats.Sample{ + Time: time.Unix(1562324644, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + "tag4": "val4", + }), + }, + } + + t.Run("Run and Collect", func(t *testing.T) { + mem := afero.NewMemMapFs() + collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) assert.NoError(t, err) + assert.NotNil(t, collector) + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + collector.Run(ctx) + wg.Done() + }() + collector.Init() collector.Collect(testSamples) + time.Sleep(1 * time.Second) + cancel() + wg.Wait() csvbytes, _ := afero.ReadFile(mem, "path") csvstr := fmt.Sprintf("%s", csvbytes) assert.Equal(t, From 2efe635e10d91358740991a4e50e57f6c0e05cae Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Tue, 9 Jul 2019 09:57:16 +0300 Subject: [PATCH 06/19] Attempt to fix scopelint and gosec warnings --- stats/csv/collector.go | 28 +++++++++++++++++++++++----- stats/csv/collector_test.go | 16 +++++++++++----- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 6f07b0b7c29..1a4e00c691a 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -27,6 +27,7 @@ import ( "fmt" "io" "os" + "sort" "sync" "time" @@ -73,6 +74,7 @@ func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { ignoredTags = append(ignoredTags, tag) } } + sort.Strings(resTags) if fname == "" || fname == "-" { logfile := nopCloser{os.Stdout} @@ -122,7 +124,10 @@ func (c *Collector) Run(ctx context.Context) { c.WriteToFile() case <-ctx.Done(): c.WriteToFile() - c.outfile.Close() + err := c.outfile.Close() + if err != nil { + log.WithField("filename", c.fname).Error("CSV: Error closing the file") + } return } } @@ -149,6 +154,7 @@ func (c *Collector) WriteToFile() { defer c.csvLock.Unlock() for _, sc := range samples { for _, sample := range sc.GetSamples() { + sample := sample row := SampleToRow(&sample, c.resTags, c.ignoredTags) err := c.csvWriter.Write(row) if err != nil { @@ -208,11 +214,23 @@ func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string) [ if extra { if prev { - extraTags.WriteString("&") + _, err := extraTags.WriteString("&") + if err != nil { + break + } + } + _, err := extraTags.WriteString(tag) + if err != nil { + break + } + _, err = extraTags.WriteString("=") + if err != nil { + break + } + _, err = extraTags.WriteString(val) + if err != nil { + break } - extraTags.WriteString(tag) - extraTags.WriteString("=") - extraTags.WriteString(val) prev = true } } diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index c1cc2679801..55b93ecc5a7 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -47,6 +47,7 @@ func TestMakeHeader(t *testing.T) { for testname, tags := range testdata { t.Run(testname, func(t *testing.T) { + tags := tags header := MakeHeader(tags) assert.Equal(t, len(tags)+4, len(header)) assert.Equal(t, "metric_name", header[0]) @@ -99,11 +100,14 @@ func TestSampleToRow(t *testing.T) { } for testname, tags := range enabledTags { + eTags := tags[0] + iTags := tags[1] for _, sample := range testSamples { t.Run(testname, func(t *testing.T) { - row := SampleToRow(&sample, tags[0], tags[1]) - assert.Equal(t, len(tags[0])+4, len(row)) - for _, tag := range tags[1] { + sample := sample + row := SampleToRow(&sample, eTags, iTags) + assert.Equal(t, len(eTags)+4, len(row)) + for _, tag := range iTags { assert.False(t, strings.Contains(row[len(row)-1], tag)) } }) @@ -157,7 +161,8 @@ func TestRun(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - collector.Init() + err := collector.Init() + assert.NoError(t, err) collector.Run(ctx) }() cancel() @@ -203,7 +208,8 @@ func TestRunCollect(t *testing.T) { collector.Run(ctx) wg.Done() }() - collector.Init() + err = collector.Init() + assert.NoError(t, err) collector.Collect(testSamples) time.Sleep(1 * time.Second) cancel() From 7a1b4b7d5f61db548d38fbc545ac1d9ad737fab6 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Tue, 9 Jul 2019 10:03:07 +0300 Subject: [PATCH 07/19] Fix incorrect variable pin --- stats/csv/collector_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 55b93ecc5a7..f5d713f7a9e 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -46,8 +46,8 @@ func TestMakeHeader(t *testing.T) { } for testname, tags := range testdata { + tags := tags t.Run(testname, func(t *testing.T) { - tags := tags header := MakeHeader(tags) assert.Equal(t, len(tags)+4, len(header)) assert.Equal(t, "metric_name", header[0]) @@ -103,8 +103,8 @@ func TestSampleToRow(t *testing.T) { eTags := tags[0] iTags := tags[1] for _, sample := range testSamples { + sample := sample t.Run(testname, func(t *testing.T) { - sample := sample row := SampleToRow(&sample, eTags, iTags) assert.Equal(t, len(eTags)+4, len(row)) for _, tag := range iTags { From 2d39019bafd17957f54b71b266efaf2480de4435 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Wed, 10 Jul 2019 10:17:14 +0300 Subject: [PATCH 08/19] Fix scopelint issues by using lambda functions --- stats/csv/collector.go | 13 +++++++------ stats/csv/collector_test.go | 38 ++++++++++++++++++------------------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 1a4e00c691a..1d6024f653c 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -154,12 +154,13 @@ func (c *Collector) WriteToFile() { defer c.csvLock.Unlock() for _, sc := range samples { for _, sample := range sc.GetSamples() { - sample := sample - row := SampleToRow(&sample, c.resTags, c.ignoredTags) - err := c.csvWriter.Write(row) - if err != nil { - log.WithField("filename", c.fname).Error("CSV: Error writing to file") - } + func(sample stats.Sample) { + row := SampleToRow(&sample, c.resTags, c.ignoredTags) + err := c.csvWriter.Write(row) + if err != nil { + log.WithField("filename", c.fname).Error("CSV: Error writing to file") + } + }(sample) } } c.csvWriter.Flush() diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index f5d713f7a9e..732863cb0b1 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -46,15 +46,16 @@ func TestMakeHeader(t *testing.T) { } for testname, tags := range testdata { - tags := tags - t.Run(testname, func(t *testing.T) { - header := MakeHeader(tags) - assert.Equal(t, len(tags)+4, len(header)) - assert.Equal(t, "metric_name", header[0]) - assert.Equal(t, "timestamp", header[1]) - assert.Equal(t, "metric_value", header[2]) - assert.Equal(t, "extra_tags", header[len(header)-1]) - }) + func(testname string, tags []string) { + t.Run(testname, func(t *testing.T) { + header := MakeHeader(tags) + assert.Equal(t, len(tags)+4, len(header)) + assert.Equal(t, "metric_name", header[0]) + assert.Equal(t, "timestamp", header[1]) + assert.Equal(t, "metric_value", header[2]) + assert.Equal(t, "extra_tags", header[len(header)-1]) + }) + }(testname, tags) } } @@ -100,17 +101,16 @@ func TestSampleToRow(t *testing.T) { } for testname, tags := range enabledTags { - eTags := tags[0] - iTags := tags[1] for _, sample := range testSamples { - sample := sample - t.Run(testname, func(t *testing.T) { - row := SampleToRow(&sample, eTags, iTags) - assert.Equal(t, len(eTags)+4, len(row)) - for _, tag := range iTags { - assert.False(t, strings.Contains(row[len(row)-1], tag)) - } - }) + func(testname string, sample stats.Sample, resTags []string, ignoredTags []string) { + t.Run(testname, func(t *testing.T) { + row := SampleToRow(&sample, resTags, ignoredTags) + assert.Equal(t, len(resTags)+4, len(row)) + for _, tag := range ignoredTags { + assert.False(t, strings.Contains(row[len(row)-1], tag)) + } + }) + }(testname, sample, tags[0], tags[1]) } } } From 53644009ee199da8e530a326969df46daad8680e Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Wed, 10 Jul 2019 10:35:58 +0300 Subject: [PATCH 09/19] Break up the long string into smaller ones to avoid lint errors --- stats/csv/collector_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 732863cb0b1..eecbf563f73 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -217,7 +217,9 @@ func TestRunCollect(t *testing.T) { csvbytes, _ := afero.ReadFile(mem, "path") csvstr := fmt.Sprintf("%s", csvbytes) assert.Equal(t, - "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\nmy_metric,1562324643,1.000000,val1,val3,\nmy_metric,1562324644,1.000000,val1,val3,tag4=val4\n", + "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n"+ + "my_metric,1562324643,1.000000,val1,val3,\n"+ + "my_metric,1562324644,1.000000,val1,val3,tag4=val4\n", csvstr) }) } From c7862b24d5de3f8bd60533299bb504e3a194ae47 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Wed, 10 Jul 2019 11:46:54 +0300 Subject: [PATCH 10/19] Add more test coverage --- stats/csv/collector_test.go | 110 ++++++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 6 deletions(-) diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index eecbf563f73..67fb8e28da9 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -60,7 +60,7 @@ func TestMakeHeader(t *testing.T) { } func TestSampleToRow(t *testing.T) { - testSamples := []stats.Sample{ + testSamples := []*stats.Sample{ { Time: time.Now(), Metric: stats.New("my_metric", stats.Gauge), @@ -83,6 +83,7 @@ func TestSampleToRow(t *testing.T) { "tag5": "val5", }), }, + nil, } enabledTags := map[string][][]string{ @@ -102,12 +103,17 @@ func TestSampleToRow(t *testing.T) { for testname, tags := range enabledTags { for _, sample := range testSamples { - func(testname string, sample stats.Sample, resTags []string, ignoredTags []string) { + func(testname string, sample *stats.Sample, resTags []string, ignoredTags []string) { t.Run(testname, func(t *testing.T) { - row := SampleToRow(&sample, resTags, ignoredTags) - assert.Equal(t, len(resTags)+4, len(row)) - for _, tag := range ignoredTags { - assert.False(t, strings.Contains(row[len(row)-1], tag)) + row := SampleToRow(sample, resTags, ignoredTags) + if row != nil { + assert.Equal(t, len(resTags)+4, len(row)) + for _, tag := range ignoredTags { + assert.False(t, strings.Contains(row[len(row)-1], tag)) + } + } else { + assert.Nil(t, row) + assert.Nil(t, sample) } }) }(testname, sample, tags[0], tags[1]) @@ -223,3 +229,95 @@ func TestRunCollect(t *testing.T) { csvstr) }) } + +func TestNew(t *testing.T) { + configs := []struct { + fname string + tags lib.TagSet + }{ + { + fname: "name", + tags: lib.TagSet{ + "tag1": true, + "tag2": false, + "tag3": true, + }, + }, + { + fname: "-", + tags: lib.TagSet{ + "tag1": true, + }, + }, + { + fname: "", + tags: lib.TagSet{ + "tag1": false, + "tag2": false, + }, + }, + } + expected := []struct { + fname string + resTags []string + ignoredTags []string + }{ + { + fname: "name", + resTags: []string{ + "tag1", "tag3", + }, + ignoredTags: []string{ + "tag2", + }, + }, + { + fname: "-", + resTags: []string{ + "tag1", + }, + ignoredTags: []string{}, + }, + { + fname: "-", + resTags: []string{}, + ignoredTags: []string{ + "tag1", "tag2", + }, + }, + } + + for i := range configs { + func(config struct { + fname string + tags lib.TagSet + }, expected struct { + fname string + resTags []string + ignoredTags []string + }) { + t.Run(config.fname, func(t *testing.T) { + collector, err := New(afero.NewMemMapFs(), config.fname, config.tags) + assert.NoError(t, err) + assert.NotNil(t, collector) + assert.Equal(t, expected.fname, collector.fname) + assert.Equal(t, expected.resTags, collector.resTags) + assert.Equal(t, expected.ignoredTags, collector.ignoredTags) + }) + }(configs[i], expected[i]) + } +} + +func TestGetRequiredSystemTags(t *testing.T) { + collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + assert.NoError(t, err) + assert.NotNil(t, collector) + assert.Equal(t, lib.TagSet{}, collector.GetRequiredSystemTags()) +} + +func TestLink(t *testing.T) { + collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + assert.NoError(t, err) + assert.NotNil(t, collector) + assert.Equal(t, "", collector.Link()) +} From f5b82d2c3d0e6df6f10c006416f362b46fa2a976 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Wed, 10 Jul 2019 12:22:04 +0300 Subject: [PATCH 11/19] Fix inconsistent test results due to the nature of a set --- stats/csv/collector_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 67fb8e28da9..ced635bcaf8 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -23,6 +23,7 @@ package csv import ( "context" "fmt" + "sort" "strings" "sync" "testing" @@ -301,7 +302,11 @@ func TestNew(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, collector) assert.Equal(t, expected.fname, collector.fname) + sort.Strings(expected.resTags) + sort.Strings(collector.resTags) assert.Equal(t, expected.resTags, collector.resTags) + sort.Strings(expected.ignoredTags) + sort.Strings(collector.ignoredTags) assert.Equal(t, expected.ignoredTags, collector.ignoredTags) }) }(configs[i], expected[i]) From a7f5a4956e4d6ca1c3c3551b2309880e76786920 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Mon, 12 Aug 2019 12:19:40 +0300 Subject: [PATCH 12/19] Resolve code review issues --- stats/csv/collector.go | 47 ++++------ stats/csv/collector_test.go | 176 ++++++++++++++++-------------------- 2 files changed, 96 insertions(+), 127 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 1d6024f653c..fae9262636b 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -33,7 +33,7 @@ import ( "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/stats" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/spf13/afero" ) @@ -51,6 +51,7 @@ type Collector struct { csvLock sync.Mutex buffer []stats.Sample bufferLock sync.Mutex + row []string } // Verify that Collector implements lib.Collector @@ -75,6 +76,7 @@ func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { } } sort.Strings(resTags) + sort.Strings(ignoredTags) if fname == "" || fname == "-" { logfile := nopCloser{os.Stdout} @@ -84,6 +86,7 @@ func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { resTags: resTags, ignoredTags: ignoredTags, csvWriter: csv.NewWriter(logfile), + row: make([]string, 0, 3+len(resTags)+1), }, nil } @@ -98,6 +101,7 @@ func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { resTags: resTags, ignoredTags: ignoredTags, csvWriter: csv.NewWriter(logfile), + row: make([]string, 0, 3+len(resTags)+1), }, nil } @@ -106,7 +110,7 @@ func (c *Collector) Init() error { header := MakeHeader(c.resTags) err := c.csvWriter.Write(header) if err != nil { - log.WithField("filename", c.fname).Error("CSV: Error writing column names to file") + logrus.WithField("filename", c.fname).Error("CSV: Error writing column names to file") } c.csvWriter.Flush() return nil @@ -126,7 +130,7 @@ func (c *Collector) Run(ctx context.Context) { c.WriteToFile() err := c.outfile.Close() if err != nil { - log.WithField("filename", c.fname).Error("CSV: Error closing the file") + logrus.WithField("filename", c.fname).Error("CSV: Error closing the file") } return } @@ -154,13 +158,15 @@ func (c *Collector) WriteToFile() { defer c.csvLock.Unlock() for _, sc := range samples { for _, sample := range sc.GetSamples() { - func(sample stats.Sample) { - row := SampleToRow(&sample, c.resTags, c.ignoredTags) + sample := sample + if &sample != nil { + row := c.row[:0] + row = SampleToRow(&sample, c.resTags, c.ignoredTags, row) err := c.csvWriter.Write(row) if err != nil { - log.WithField("filename", c.fname).Error("CSV: Error writing to file") + logrus.WithField("filename", c.fname).Error("CSV: Error writing to file") } - }(sample) + } } } c.csvWriter.Flush() @@ -169,7 +175,7 @@ func (c *Collector) WriteToFile() { // Link returns a dummy string, it's only included to satisfy the lib.Collector interface func (c *Collector) Link() string { - return "" + return c.fname } // MakeHeader creates list of column names for csv file @@ -179,12 +185,7 @@ func MakeHeader(tags []string) []string { } // SampleToRow converts sample into array of strings -func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string) []string { - if sample == nil { - return nil - } - - row := []string{} +func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string, row []string) []string { row = append(row, sample.Metric.Name) row = append(row, fmt.Sprintf("%d", sample.Time.Unix())) row = append(row, fmt.Sprintf("%f", sample.Value)) @@ -197,23 +198,7 @@ func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string) [ extraTags := bytes.Buffer{} prev := false for tag, val := range sampleTags { - extra := true - for _, resTag := range resTags { - if tag == resTag { - extra = false - break - } - } - if extra { - for _, ignoredTag := range ignoredTags { - if tag == ignoredTag { - extra = false - break - } - } - } - - if extra { + if sort.SearchStrings(resTags, tag) == len(resTags) && sort.SearchStrings(ignoredTags, tag) == len(ignoredTags) { if prev { _, err := extraTags.WriteString("&") if err != nil { diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index ced635bcaf8..d36df32951d 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -47,16 +47,15 @@ func TestMakeHeader(t *testing.T) { } for testname, tags := range testdata { - func(testname string, tags []string) { - t.Run(testname, func(t *testing.T) { - header := MakeHeader(tags) - assert.Equal(t, len(tags)+4, len(header)) - assert.Equal(t, "metric_name", header[0]) - assert.Equal(t, "timestamp", header[1]) - assert.Equal(t, "metric_value", header[2]) - assert.Equal(t, "extra_tags", header[len(header)-1]) - }) - }(testname, tags) + testname, tags := testname, tags + t.Run(testname, func(t *testing.T) { + header := MakeHeader(tags) + assert.Equal(t, len(tags)+4, len(header)) + assert.Equal(t, "metric_name", header[0]) + assert.Equal(t, "timestamp", header[1]) + assert.Equal(t, "metric_value", header[2]) + assert.Equal(t, "extra_tags", header[len(header)-1]) + }) } } @@ -84,7 +83,6 @@ func TestSampleToRow(t *testing.T) { "tag5": "val5", }), }, - nil, } enabledTags := map[string][][]string{ @@ -104,20 +102,19 @@ func TestSampleToRow(t *testing.T) { for testname, tags := range enabledTags { for _, sample := range testSamples { - func(testname string, sample *stats.Sample, resTags []string, ignoredTags []string) { - t.Run(testname, func(t *testing.T) { - row := SampleToRow(sample, resTags, ignoredTags) - if row != nil { - assert.Equal(t, len(resTags)+4, len(row)) - for _, tag := range ignoredTags { - assert.False(t, strings.Contains(row[len(row)-1], tag)) - } - } else { - assert.Nil(t, row) - assert.Nil(t, sample) + testname, sample, resTags, ignoredTags := testname, sample, tags[0], tags[1] + t.Run(testname, func(t *testing.T) { + row := SampleToRow(sample, resTags, ignoredTags, make([]string, 0, 3+len(resTags)+1)) + if row != nil { + assert.Equal(t, len(resTags)+4, len(row)) + for _, tag := range ignoredTags { + assert.False(t, strings.Contains(row[len(row)-1], tag)) } - }) - }(testname, sample, tags[0], tags[1]) + } else { + assert.Nil(t, row) + assert.Nil(t, sample) + } + }) } } } @@ -145,36 +142,33 @@ func TestCollect(t *testing.T) { }), }, } - t.Run("Collect", func(t *testing.T) { - mem := afero.NewMemMapFs() - collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) - assert.NoError(t, err) - assert.NotNil(t, collector) - collector.Collect(testSamples) + mem := afero.NewMemMapFs() + collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + assert.NoError(t, err) + assert.NotNil(t, collector) + + collector.Collect(testSamples) - assert.Equal(t, len(testSamples), len(collector.buffer)) - }) + assert.Equal(t, len(testSamples), len(collector.buffer)) } func TestRun(t *testing.T) { - t.Run("Run", func(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) - assert.NoError(t, err) - assert.NotNil(t, collector) + collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + assert.NoError(t, err) + assert.NotNil(t, collector) - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := collector.Init() - assert.NoError(t, err) - collector.Run(ctx) - }() - cancel() - wg.Wait() - }) + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := collector.Init() + assert.NoError(t, err) + collector.Run(ctx) + }() + cancel() + wg.Wait() } func TestRunCollect(t *testing.T) { @@ -202,33 +196,31 @@ func TestRunCollect(t *testing.T) { }, } - t.Run("Run and Collect", func(t *testing.T) { - mem := afero.NewMemMapFs() - collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) - assert.NoError(t, err) - assert.NotNil(t, collector) + mem := afero.NewMemMapFs() + collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + assert.NoError(t, err) + assert.NotNil(t, collector) - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() - err = collector.Init() - assert.NoError(t, err) - collector.Collect(testSamples) - time.Sleep(1 * time.Second) - cancel() - wg.Wait() - csvbytes, _ := afero.ReadFile(mem, "path") - csvstr := fmt.Sprintf("%s", csvbytes) - assert.Equal(t, - "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n"+ - "my_metric,1562324643,1.000000,val1,val3,\n"+ - "my_metric,1562324644,1.000000,val1,val3,tag4=val4\n", - csvstr) - }) + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + collector.Run(ctx) + wg.Done() + }() + err = collector.Init() + assert.NoError(t, err) + collector.Collect(testSamples) + time.Sleep(1 * time.Second) + cancel() + wg.Wait() + csvbytes, _ := afero.ReadFile(mem, "path") + csvstr := fmt.Sprintf("%s", csvbytes) + assert.Equal(t, + "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n"+ + "my_metric,1562324643,1.000000,val1,val3,\n"+ + "my_metric,1562324644,1.000000,val1,val3,tag4=val4\n", + csvstr) } func TestNew(t *testing.T) { @@ -289,27 +281,19 @@ func TestNew(t *testing.T) { } for i := range configs { - func(config struct { - fname string - tags lib.TagSet - }, expected struct { - fname string - resTags []string - ignoredTags []string - }) { - t.Run(config.fname, func(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), config.fname, config.tags) - assert.NoError(t, err) - assert.NotNil(t, collector) - assert.Equal(t, expected.fname, collector.fname) - sort.Strings(expected.resTags) - sort.Strings(collector.resTags) - assert.Equal(t, expected.resTags, collector.resTags) - sort.Strings(expected.ignoredTags) - sort.Strings(collector.ignoredTags) - assert.Equal(t, expected.ignoredTags, collector.ignoredTags) - }) - }(configs[i], expected[i]) + config, expected := configs[i], expected[i] + t.Run(config.fname, func(t *testing.T) { + collector, err := New(afero.NewMemMapFs(), config.fname, config.tags) + assert.NoError(t, err) + assert.NotNil(t, collector) + assert.Equal(t, expected.fname, collector.fname) + sort.Strings(expected.resTags) + sort.Strings(collector.resTags) + assert.Equal(t, expected.resTags, collector.resTags) + sort.Strings(expected.ignoredTags) + sort.Strings(collector.ignoredTags) + assert.Equal(t, expected.ignoredTags, collector.ignoredTags) + }) } } @@ -324,5 +308,5 @@ func TestLink(t *testing.T) { collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) assert.NoError(t, err) assert.NotNil(t, collector) - assert.Equal(t, "", collector.Link()) + assert.Equal(t, "path", collector.Link()) } From e137f59ac366c89917ae3423f057e31a7f4a881f Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Wed, 14 Aug 2019 11:14:52 +0300 Subject: [PATCH 13/19] Make CSV collector more configurable --- cmd/collectors.go | 13 ++++- cmd/config.go | 3 + stats/csv/collector.go | 56 ++++++++++--------- stats/csv/collector_test.go | 27 +++++---- stats/csv/config.go | 97 +++++++++++++++++++++++++++++++++ stats/csv/config_test.go | 106 ++++++++++++++++++++++++++++++++++++ 6 files changed, 262 insertions(+), 40 deletions(-) create mode 100644 stats/csv/config.go create mode 100644 stats/csv/config_test.go diff --git a/cmd/collectors.go b/cmd/collectors.go index cc5d36efa63..8b244d7781c 100644 --- a/cmd/collectors.go +++ b/cmd/collectors.go @@ -114,7 +114,18 @@ func newCollector(collectorName, arg string, src *lib.SourceData, conf Config) ( } return datadog.New(config) case collectorCSV: - return csv.New(afero.NewOsFs(), arg, conf.SystemTags) + config := csv.NewConfig().Apply(conf.Collectors.CSV) + if err := envconfig.Process("k6", &config); err != nil { + return nil, err + } + if arg != "" { + cmdConfig, err := csv.ParseArg(arg) + if err != nil { + return nil, err + } + config = config.Apply(cmdConfig) + } + return csv.New(afero.NewOsFs(), conf.SystemTags, config) default: return nil, errors.Errorf("unknown output type: %s", collectorName) } diff --git a/cmd/config.go b/cmd/config.go index be20ae574e0..9eeb64016a6 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -35,6 +35,7 @@ import ( "github.com/loadimpact/k6/lib/scheduler" "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats/cloud" + "github.com/loadimpact/k6/stats/csv" "github.com/loadimpact/k6/stats/datadog" "github.com/loadimpact/k6/stats/influxdb" "github.com/loadimpact/k6/stats/kafka" @@ -72,6 +73,7 @@ type Config struct { Cloud cloud.Config `json:"cloud"` StatsD common.Config `json:"statsd"` Datadog datadog.Config `json:"datadog"` + CSV csv.Config `json:"csv"` } `json:"collectors"` } @@ -97,6 +99,7 @@ func (c Config) Apply(cfg Config) Config { c.Collectors.Kafka = c.Collectors.Kafka.Apply(cfg.Collectors.Kafka) c.Collectors.StatsD = c.Collectors.StatsD.Apply(cfg.Collectors.StatsD) c.Collectors.Datadog = c.Collectors.Datadog.Apply(cfg.Collectors.Datadog) + c.Collectors.CSV = c.Collectors.CSV.Apply(cfg.Collectors.CSV) return c } diff --git a/stats/csv/collector.go b/stats/csv/collector.go index fae9262636b..9050c4b0ff2 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -37,21 +37,18 @@ import ( "github.com/spf13/afero" ) -const ( - saveInterval = 1 * time.Second -) - // Collector saving output to csv implements the lib.Collector interface type Collector struct { - outfile io.WriteCloser - fname string - resTags []string - ignoredTags []string - csvWriter *csv.Writer - csvLock sync.Mutex - buffer []stats.Sample - bufferLock sync.Mutex - row []string + outfile io.WriteCloser + fname string + resTags []string + ignoredTags []string + csvWriter *csv.Writer + csvLock sync.Mutex + buffer []stats.Sample + bufferLock sync.Mutex + row []string + saveInterval time.Duration } // Verify that Collector implements lib.Collector @@ -65,7 +62,7 @@ type nopCloser struct { func (nopCloser) Close() error { return nil } // New Creates new instance of CSV collector -func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { +func New(fs afero.Fs, tags lib.TagSet, config Config) (*Collector, error) { resTags := []string{} ignoredTags := []string{} for tag, flag := range tags { @@ -78,15 +75,19 @@ func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { sort.Strings(resTags) sort.Strings(ignoredTags) + saveInterval := time.Duration(config.SaveInterval.Duration) + fname := config.FileName.String + if fname == "" || fname == "-" { logfile := nopCloser{os.Stdout} return &Collector{ - outfile: logfile, - fname: "-", - resTags: resTags, - ignoredTags: ignoredTags, - csvWriter: csv.NewWriter(logfile), - row: make([]string, 0, 3+len(resTags)+1), + outfile: logfile, + fname: "-", + resTags: resTags, + ignoredTags: ignoredTags, + csvWriter: csv.NewWriter(logfile), + row: make([]string, 0, 3+len(resTags)+1), + saveInterval: saveInterval, }, nil } @@ -96,12 +97,13 @@ func New(fs afero.Fs, fname string, tags lib.TagSet) (*Collector, error) { } return &Collector{ - outfile: logfile, - fname: fname, - resTags: resTags, - ignoredTags: ignoredTags, - csvWriter: csv.NewWriter(logfile), - row: make([]string, 0, 3+len(resTags)+1), + outfile: logfile, + fname: fname, + resTags: resTags, + ignoredTags: ignoredTags, + csvWriter: csv.NewWriter(logfile), + row: make([]string, 0, 3+len(resTags)+1), + saveInterval: saveInterval, }, nil } @@ -121,7 +123,7 @@ func (c *Collector) SetRunStatus(status lib.RunStatus) {} // Run just blocks until the context is done func (c *Collector) Run(ctx context.Context) { - ticker := time.NewTicker(saveInterval) + ticker := time.NewTicker(c.saveInterval) for { select { case <-ticker.C: diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index d36df32951d..5064d4e31bd 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -29,6 +29,9 @@ import ( "testing" "time" + "gopkg.in/guregu/null.v3" + + "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/loadimpact/k6/lib" @@ -144,7 +147,7 @@ func TestCollect(t *testing.T) { } mem := afero.NewMemMapFs() - collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + collector, err := New(mem, lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) assert.NoError(t, err) assert.NotNil(t, collector) @@ -154,7 +157,7 @@ func TestCollect(t *testing.T) { } func TestRun(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + collector, err := New(afero.NewMemMapFs(), lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) assert.NoError(t, err) assert.NotNil(t, collector) @@ -197,7 +200,7 @@ func TestRunCollect(t *testing.T) { } mem := afero.NewMemMapFs() - collector, err := New(mem, "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + collector, err := New(mem, lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("path"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) assert.NoError(t, err) assert.NotNil(t, collector) @@ -225,11 +228,11 @@ func TestRunCollect(t *testing.T) { func TestNew(t *testing.T) { configs := []struct { - fname string - tags lib.TagSet + cfg Config + tags lib.TagSet }{ { - fname: "name", + cfg: Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, tags: lib.TagSet{ "tag1": true, "tag2": false, @@ -237,13 +240,13 @@ func TestNew(t *testing.T) { }, }, { - fname: "-", + cfg: Config{FileName: null.StringFrom("-"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, tags: lib.TagSet{ "tag1": true, }, }, { - fname: "", + cfg: Config{FileName: null.StringFrom(""), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, tags: lib.TagSet{ "tag1": false, "tag2": false, @@ -282,8 +285,8 @@ func TestNew(t *testing.T) { for i := range configs { config, expected := configs[i], expected[i] - t.Run(config.fname, func(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), config.fname, config.tags) + t.Run(config.cfg.FileName.String, func(t *testing.T) { + collector, err := New(afero.NewMemMapFs(), config.tags, config.cfg) assert.NoError(t, err) assert.NotNil(t, collector) assert.Equal(t, expected.fname, collector.fname) @@ -298,14 +301,14 @@ func TestNew(t *testing.T) { } func TestGetRequiredSystemTags(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + collector, err := New(afero.NewMemMapFs(), lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) assert.NoError(t, err) assert.NotNil(t, collector) assert.Equal(t, lib.TagSet{}, collector.GetRequiredSystemTags()) } func TestLink(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), "path", lib.TagSet{"tag1": true, "tag2": false, "tag3": true}) + collector, err := New(afero.NewMemMapFs(), lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("path"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) assert.NoError(t, err) assert.NotNil(t, collector) assert.Equal(t, "path", collector.Link()) diff --git a/stats/csv/config.go b/stats/csv/config.go new file mode 100644 index 00000000000..a4267532237 --- /dev/null +++ b/stats/csv/config.go @@ -0,0 +1,97 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2016 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package csv + +import ( + "strings" + "time" + + "github.com/kubernetes/helm/pkg/strvals" + "github.com/loadimpact/k6/lib/types" + "github.com/mitchellh/mapstructure" + "gopkg.in/guregu/null.v3" +) + +// Config is the config for the csv collector +type Config struct { + // Samples. + FileName null.String `json:"file_name" envconfig:"CSV_FILENAME"` + SaveInterval types.NullDuration `json:"save_interval" envconfig:"CSV_SAVE_INTERVAL"` +} + +// config is a duplicate of ConfigFields as we can not mapstructure.Decode into +// null types so we duplicate the struct with primitive types to Decode into +type config struct { + FileName string `json:"file_name" mapstructure:"file_name" envconfig:"CSV_FILENAME"` + SaveInterval string `json:"save_interval" mapstructure:"save_interval" envconfig:"CSV_SAVE_INTERVAL"` +} + +// NewConfig creates a new Config instance with default values for some fields. +func NewConfig() Config { + return Config{ + FileName: null.StringFrom("file.csv"), + SaveInterval: types.NullDurationFrom(1 * time.Second), + } +} + +func (c Config) Apply(cfg Config) Config { + if cfg.FileName.Valid { + c.FileName = cfg.FileName + } + if cfg.SaveInterval.Valid { + c.SaveInterval = cfg.SaveInterval + } + return c +} + +// ParseArg takes an arg string and converts it to a config +func ParseArg(arg string) (Config, error) { + c := Config{} + + if !strings.Contains(arg, "=") { + c.FileName = null.StringFrom(arg) + c.SaveInterval = types.NullDurationFrom(1 * time.Second) + return c, nil + } + + params, err := strvals.Parse(arg) + + if err != nil { + return c, err + } + + if v, ok := params["save_interval"].(string); ok { + err := c.SaveInterval.UnmarshalText([]byte(v)) + if err != nil { + return c, err + } + } + + var cfg config + err = mapstructure.Decode(params, &cfg) + if err != nil { + return c, err + } + + c.FileName = null.StringFrom(cfg.FileName) + + return c, nil +} diff --git a/stats/csv/config_test.go b/stats/csv/config_test.go new file mode 100644 index 00000000000..48590995d05 --- /dev/null +++ b/stats/csv/config_test.go @@ -0,0 +1,106 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2016 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package csv + +import ( + "testing" + "time" + + "gopkg.in/guregu/null.v3" + + "github.com/loadimpact/k6/lib/types" + "github.com/stretchr/testify/assert" +) + +func TestNewConfig(t *testing.T) { + config := NewConfig() + assert.Equal(t, "file.csv", config.FileName.String) + assert.Equal(t, "1s", config.SaveInterval.String()) +} + +func TestApply(t *testing.T) { + configs := []Config{ + Config{ + FileName: null.StringFrom(""), + SaveInterval: types.NullDurationFrom(2 * time.Second), + }, + Config{ + FileName: null.StringFrom("newPath"), + SaveInterval: types.NewNullDuration(time.Duration(1), false), + }, + } + expected := []struct { + FileName string + SaveInterval string + }{ + { + FileName: "", + SaveInterval: "2s", + }, + { + FileName: "newPath", + SaveInterval: "1s", + }, + } + + for i := range configs { + config := configs[i] + expected := expected[i] + t.Run(expected.FileName+"_"+expected.SaveInterval, func(t *testing.T) { + baseConfig := NewConfig() + baseConfig = baseConfig.Apply(config) + + assert.Equal(t, expected.FileName, baseConfig.FileName.String) + assert.Equal(t, expected.SaveInterval, baseConfig.SaveInterval.String()) + }) + } +} + +func TestParseArg(t *testing.T) { + args := []string{ + "test_file.csv", + "file_name=test.csv,save_interval=5s", + } + + expected := []Config{ + Config{ + FileName: null.StringFrom("test_file.csv"), + SaveInterval: types.NullDurationFrom(1 * time.Second), + }, + Config{ + FileName: null.StringFrom("test.csv"), + SaveInterval: types.NullDurationFrom(5 * time.Second), + }, + } + + for i := range args { + arg := args[i] + expected := expected[i] + + t.Run(expected.FileName.String+"_"+expected.SaveInterval.String(), func(t *testing.T) { + config, err := ParseArg(arg) + + assert.Nil(t, err) + assert.Equal(t, expected.FileName.String, config.FileName.String) + assert.Equal(t, expected.SaveInterval.String(), config.SaveInterval.String()) + }) + } +} From 4720f1664ebb0589cbbe2db5378a817312a57509 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Wed, 14 Aug 2019 11:29:46 +0300 Subject: [PATCH 14/19] Fix GolangCI issues --- stats/csv/collector.go | 12 +-- stats/csv/collector_test.go | 34 +++++-- stats/csv/config.go | 195 ++++++++++++++++++------------------ stats/csv/config_test.go | 8 +- 4 files changed, 135 insertions(+), 114 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 9050c4b0ff2..cb6cd070fbd 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -161,13 +161,11 @@ func (c *Collector) WriteToFile() { for _, sc := range samples { for _, sample := range sc.GetSamples() { sample := sample - if &sample != nil { - row := c.row[:0] - row = SampleToRow(&sample, c.resTags, c.ignoredTags, row) - err := c.csvWriter.Write(row) - if err != nil { - logrus.WithField("filename", c.fname).Error("CSV: Error writing to file") - } + row := c.row[:0] + row = SampleToRow(&sample, c.resTags, c.ignoredTags, row) + err := c.csvWriter.Write(row) + if err != nil { + logrus.WithField("filename", c.fname).Error("CSV: Error writing to file") } } } diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 5064d4e31bd..947f300de13 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -104,8 +104,10 @@ func TestSampleToRow(t *testing.T) { } for testname, tags := range enabledTags { + testname := testname + resTags, ignoredTags := tags[0], tags[1] for _, sample := range testSamples { - testname, sample, resTags, ignoredTags := testname, sample, tags[0], tags[1] + sample := sample t.Run(testname, func(t *testing.T) { row := SampleToRow(sample, resTags, ignoredTags, make([]string, 0, 3+len(resTags)+1)) if row != nil { @@ -147,7 +149,11 @@ func TestCollect(t *testing.T) { } mem := afero.NewMemMapFs() - collector, err := New(mem, lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) + collector, err := New( + mem, + lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, + Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, + ) assert.NoError(t, err) assert.NotNil(t, collector) @@ -157,7 +163,11 @@ func TestCollect(t *testing.T) { } func TestRun(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) + collector, err := New( + afero.NewMemMapFs(), + lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, + Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, + ) assert.NoError(t, err) assert.NotNil(t, collector) @@ -200,7 +210,11 @@ func TestRunCollect(t *testing.T) { } mem := afero.NewMemMapFs() - collector, err := New(mem, lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("path"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) + collector, err := New( + mem, + lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, + Config{FileName: null.StringFrom("path"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, + ) assert.NoError(t, err) assert.NotNil(t, collector) @@ -301,14 +315,22 @@ func TestNew(t *testing.T) { } func TestGetRequiredSystemTags(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) + collector, err := New( + afero.NewMemMapFs(), + lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, + Config{FileName: null.StringFrom("name"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, + ) assert.NoError(t, err) assert.NotNil(t, collector) assert.Equal(t, lib.TagSet{}, collector.GetRequiredSystemTags()) } func TestLink(t *testing.T) { - collector, err := New(afero.NewMemMapFs(), lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, Config{FileName: null.StringFrom("path"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}) + collector, err := New( + afero.NewMemMapFs(), + lib.TagSet{"tag1": true, "tag2": false, "tag3": true}, + Config{FileName: null.StringFrom("path"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, + ) assert.NoError(t, err) assert.NotNil(t, collector) assert.Equal(t, "path", collector.Link()) diff --git a/stats/csv/config.go b/stats/csv/config.go index a4267532237..859bbee70cc 100644 --- a/stats/csv/config.go +++ b/stats/csv/config.go @@ -1,97 +1,98 @@ -/* - * - * k6 - a next-generation load testing tool - * Copyright (C) 2016 Load Impact - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package csv - -import ( - "strings" - "time" - - "github.com/kubernetes/helm/pkg/strvals" - "github.com/loadimpact/k6/lib/types" - "github.com/mitchellh/mapstructure" - "gopkg.in/guregu/null.v3" -) - -// Config is the config for the csv collector -type Config struct { - // Samples. - FileName null.String `json:"file_name" envconfig:"CSV_FILENAME"` - SaveInterval types.NullDuration `json:"save_interval" envconfig:"CSV_SAVE_INTERVAL"` -} - -// config is a duplicate of ConfigFields as we can not mapstructure.Decode into -// null types so we duplicate the struct with primitive types to Decode into -type config struct { - FileName string `json:"file_name" mapstructure:"file_name" envconfig:"CSV_FILENAME"` - SaveInterval string `json:"save_interval" mapstructure:"save_interval" envconfig:"CSV_SAVE_INTERVAL"` -} - -// NewConfig creates a new Config instance with default values for some fields. -func NewConfig() Config { - return Config{ - FileName: null.StringFrom("file.csv"), - SaveInterval: types.NullDurationFrom(1 * time.Second), - } -} - -func (c Config) Apply(cfg Config) Config { - if cfg.FileName.Valid { - c.FileName = cfg.FileName - } - if cfg.SaveInterval.Valid { - c.SaveInterval = cfg.SaveInterval - } - return c -} - -// ParseArg takes an arg string and converts it to a config -func ParseArg(arg string) (Config, error) { - c := Config{} - - if !strings.Contains(arg, "=") { - c.FileName = null.StringFrom(arg) - c.SaveInterval = types.NullDurationFrom(1 * time.Second) - return c, nil - } - - params, err := strvals.Parse(arg) - - if err != nil { - return c, err - } - - if v, ok := params["save_interval"].(string); ok { - err := c.SaveInterval.UnmarshalText([]byte(v)) - if err != nil { - return c, err - } - } - - var cfg config - err = mapstructure.Decode(params, &cfg) - if err != nil { - return c, err - } - - c.FileName = null.StringFrom(cfg.FileName) - - return c, nil -} +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2016 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package csv + +import ( + "strings" + "time" + + "github.com/kubernetes/helm/pkg/strvals" + "github.com/loadimpact/k6/lib/types" + "github.com/mitchellh/mapstructure" + "gopkg.in/guregu/null.v3" +) + +// Config is the config for the csv collector +type Config struct { + // Samples. + FileName null.String `json:"file_name" envconfig:"CSV_FILENAME"` + SaveInterval types.NullDuration `json:"save_interval" envconfig:"CSV_SAVE_INTERVAL"` +} + +// config is a duplicate of ConfigFields as we can not mapstructure.Decode into +// null types so we duplicate the struct with primitive types to Decode into +type config struct { + FileName string `json:"file_name" mapstructure:"file_name" envconfig:"CSV_FILENAME"` + SaveInterval string `json:"save_interval" mapstructure:"save_interval" envconfig:"CSV_SAVE_INTERVAL"` +} + +// NewConfig creates a new Config instance with default values for some fields. +func NewConfig() Config { + return Config{ + FileName: null.StringFrom("file.csv"), + SaveInterval: types.NullDurationFrom(1 * time.Second), + } +} + +// Apply merges two configs by overwriting properties in the old config +func (c Config) Apply(cfg Config) Config { + if cfg.FileName.Valid { + c.FileName = cfg.FileName + } + if cfg.SaveInterval.Valid { + c.SaveInterval = cfg.SaveInterval + } + return c +} + +// ParseArg takes an arg string and converts it to a config +func ParseArg(arg string) (Config, error) { + c := Config{} + + if !strings.Contains(arg, "=") { + c.FileName = null.StringFrom(arg) + c.SaveInterval = types.NullDurationFrom(1 * time.Second) + return c, nil + } + + params, err := strvals.Parse(arg) + + if err != nil { + return c, err + } + + if v, ok := params["save_interval"].(string); ok { + err = c.SaveInterval.UnmarshalText([]byte(v)) + if err != nil { + return c, err + } + } + + var cfg config + err = mapstructure.Decode(params, &cfg) + if err != nil { + return c, err + } + + c.FileName = null.StringFrom(cfg.FileName) + + return c, nil +} diff --git a/stats/csv/config_test.go b/stats/csv/config_test.go index 48590995d05..1ed37a54f72 100644 --- a/stats/csv/config_test.go +++ b/stats/csv/config_test.go @@ -38,11 +38,11 @@ func TestNewConfig(t *testing.T) { func TestApply(t *testing.T) { configs := []Config{ - Config{ + { FileName: null.StringFrom(""), SaveInterval: types.NullDurationFrom(2 * time.Second), }, - Config{ + { FileName: null.StringFrom("newPath"), SaveInterval: types.NewNullDuration(time.Duration(1), false), }, @@ -81,11 +81,11 @@ func TestParseArg(t *testing.T) { } expected := []Config{ - Config{ + { FileName: null.StringFrom("test_file.csv"), SaveInterval: types.NullDurationFrom(1 * time.Second), }, - Config{ + { FileName: null.StringFrom("test.csv"), SaveInterval: types.NullDurationFrom(5 * time.Second), }, From e43bcf91f779649723c44989e742f92c01801afe Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Mon, 19 Aug 2019 14:07:33 +0300 Subject: [PATCH 15/19] Add better unit tests, fix error with sort.SearchStrings --- stats/csv/collector.go | 31 +++++--- stats/csv/collector_test.go | 153 +++++++++++++++++++++++++----------- 2 files changed, 127 insertions(+), 57 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index cb6cd070fbd..0c6a2bd985d 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -198,23 +198,22 @@ func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string, r extraTags := bytes.Buffer{} prev := false for tag, val := range sampleTags { - if sort.SearchStrings(resTags, tag) == len(resTags) && sort.SearchStrings(ignoredTags, tag) == len(ignoredTags) { + if !IsStringInSlice(resTags, tag) && !IsStringInSlice(ignoredTags, tag) { if prev { - _, err := extraTags.WriteString("&") - if err != nil { + if _, err := extraTags.WriteString("&"); err != nil { break } } - _, err := extraTags.WriteString(tag) - if err != nil { + + if _, err := extraTags.WriteString(tag); err != nil { break } - _, err = extraTags.WriteString("=") - if err != nil { + + if _, err := extraTags.WriteString("="); err != nil { break } - _, err = extraTags.WriteString(val) - if err != nil { + + if _, err := extraTags.WriteString(val); err != nil { break } prev = true @@ -225,6 +224,20 @@ func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string, r return row } +// IsStringInSlice returns whether the string is contained within a string slice +func IsStringInSlice(slice []string, str string) bool { + if sort.SearchStrings(slice, str) == len(slice) { + return false + } + + for _, item := range slice { + if item == str { + return true + } + } + return false +} + // GetRequiredSystemTags returns which sample tags are needed by this collector func (c *Collector) GetRequiredSystemTags() lib.TagSet { return lib.TagSet{} // There are no required tags for this collector diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 947f300de13..592bf4d4401 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -24,7 +24,6 @@ import ( "context" "fmt" "sort" - "strings" "sync" "testing" "time" @@ -63,64 +62,122 @@ func TestMakeHeader(t *testing.T) { } func TestSampleToRow(t *testing.T) { - testSamples := []*stats.Sample{ + testData := []struct { + testname string + sample *stats.Sample + resTags []string + ignoredTags []string + }{ { - Time: time.Now(), - Metric: stats.New("my_metric", stats.Gauge), - Value: 1, - Tags: stats.NewSampleTags(map[string]string{ - "tag1": "val1", - "tag2": "val2", - "tag3": "val3", - }), + testname: "One res tag, one ignored tag, one extra tag", + sample: &stats.Sample{ + Time: time.Now(), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + }), + }, + resTags: []string{"tag1"}, + ignoredTags: []string{"tag2"}, }, { - Time: time.Now(), - Metric: stats.New("my_metric", stats.Gauge), - Value: 1, - Tags: stats.NewSampleTags(map[string]string{ - "tag1": "val1", - "tag2": "val2", - "tag3": "val3", - "tag4": "val4", - "tag5": "val5", - }), + testname: "Two res tags, three extra tags", + sample: &stats.Sample{ + Time: time.Now(), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + "tag4": "val4", + "tag5": "val5", + }), + }, + resTags: []string{"tag1", "tag2"}, + ignoredTags: []string{}, + }, + { + testname: "Two res tags, two ignored", + sample: &stats.Sample{ + Time: time.Now(), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + "tag4": "val4", + "tag5": "val5", + "tag6": "val6", + }), + }, + resTags: []string{"tag1", "tag3"}, + ignoredTags: []string{"tag4", "tag6"}, }, } - enabledTags := map[string][][]string{ - "One tag": { - {"tag1"}, - {"tag2"}, + expected := []struct { + baseRow []string + extraRow []string + }{ + { + baseRow: []string{ + "my_metric", + fmt.Sprintf("%d", time.Now().Unix()), + "1.000000", + "val1", + }, + extraRow: []string{ + "tag3=val3", + }, }, - "Two tags": { - {"tag1", "tag2"}, - {}, + { + baseRow: []string{ + "my_metric", + fmt.Sprintf("%d", time.Now().Unix()), + "1.000000", + "val1", + "val2", + }, + extraRow: []string{ + "tag3=val3", + "tag4=val4", + "tag5=val5", + }, }, - "Two tags, one ignored": { - {"tag1", "tag2"}, - {"tag3"}, + { + baseRow: []string{ + "my_metric", + fmt.Sprintf("%d", time.Now().Unix()), + "1.000000", + "val1", + "val3", + }, + extraRow: []string{ + "tag2=val2", + "tag5=val5", + }, }, } - for testname, tags := range enabledTags { - testname := testname - resTags, ignoredTags := tags[0], tags[1] - for _, sample := range testSamples { - sample := sample - t.Run(testname, func(t *testing.T) { - row := SampleToRow(sample, resTags, ignoredTags, make([]string, 0, 3+len(resTags)+1)) - if row != nil { - assert.Equal(t, len(resTags)+4, len(row)) - for _, tag := range ignoredTags { - assert.False(t, strings.Contains(row[len(row)-1], tag)) - } - } else { - assert.Nil(t, row) - assert.Nil(t, sample) - } - }) - } + for i := range testData { + testname, sample := testData[i].testname, testData[i].sample + resTags, ignoredTags := testData[i].resTags, testData[i].ignoredTags + expectedRow := expected[i] + + t.Run(testname, func(t *testing.T) { + row := SampleToRow(sample, resTags, ignoredTags, make([]string, 0, 3+len(resTags)+1)) + for ind, cell := range expectedRow.baseRow { + assert.Equal(t, cell, row[ind]) + } + for _, cell := range expectedRow.extraRow { + assert.Contains(t, row[len(row)-1], cell) + } + }) } } func TestCollect(t *testing.T) { From 12754c22d89bf9cf55f55b261b1f3f26c59d123c Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Mon, 19 Aug 2019 14:30:42 +0300 Subject: [PATCH 16/19] Optimize function for checking if string is in slice --- stats/csv/collector.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 0c6a2bd985d..533e80d7818 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -226,16 +226,10 @@ func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string, r // IsStringInSlice returns whether the string is contained within a string slice func IsStringInSlice(slice []string, str string) bool { - if sort.SearchStrings(slice, str) == len(slice) { + if index := sort.SearchStrings(slice, str); index == len(slice) || slice[index] != str { return false } - - for _, item := range slice { - if item == str { - return true - } - } - return false + return true } // GetRequiredSystemTags returns which sample tags are needed by this collector From 9363a2bc5ea367d58531faa847b5dbc65db0b0f1 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Mon, 19 Aug 2019 14:41:17 +0300 Subject: [PATCH 17/19] Fix tests being dependent on current time --- stats/csv/collector_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 592bf4d4401..dbc43b91cce 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -71,7 +71,7 @@ func TestSampleToRow(t *testing.T) { { testname: "One res tag, one ignored tag, one extra tag", sample: &stats.Sample{ - Time: time.Now(), + Time: time.Unix(1562324644, 0), Metric: stats.New("my_metric", stats.Gauge), Value: 1, Tags: stats.NewSampleTags(map[string]string{ @@ -86,7 +86,7 @@ func TestSampleToRow(t *testing.T) { { testname: "Two res tags, three extra tags", sample: &stats.Sample{ - Time: time.Now(), + Time: time.Unix(1562324644, 0), Metric: stats.New("my_metric", stats.Gauge), Value: 1, Tags: stats.NewSampleTags(map[string]string{ @@ -103,7 +103,7 @@ func TestSampleToRow(t *testing.T) { { testname: "Two res tags, two ignored", sample: &stats.Sample{ - Time: time.Now(), + Time: time.Unix(1562324644, 0), Metric: stats.New("my_metric", stats.Gauge), Value: 1, Tags: stats.NewSampleTags(map[string]string{ @@ -127,7 +127,7 @@ func TestSampleToRow(t *testing.T) { { baseRow: []string{ "my_metric", - fmt.Sprintf("%d", time.Now().Unix()), + "1562324644", "1.000000", "val1", }, @@ -138,7 +138,7 @@ func TestSampleToRow(t *testing.T) { { baseRow: []string{ "my_metric", - fmt.Sprintf("%d", time.Now().Unix()), + "1562324644", "1.000000", "val1", "val2", @@ -152,7 +152,7 @@ func TestSampleToRow(t *testing.T) { { baseRow: []string{ "my_metric", - fmt.Sprintf("%d", time.Now().Unix()), + "1562324644", "1.000000", "val1", "val3", From 5c8a8784a360f4e45abff7134af7699537944af2 Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Tue, 20 Aug 2019 13:46:59 +0300 Subject: [PATCH 18/19] Instantiate slice with length and use indexes instead of append --- stats/csv/collector.go | 19 +++++++++---------- stats/csv/collector_test.go | 2 +- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 533e80d7818..b67c49cd3d2 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -86,7 +86,7 @@ func New(fs afero.Fs, tags lib.TagSet, config Config) (*Collector, error) { resTags: resTags, ignoredTags: ignoredTags, csvWriter: csv.NewWriter(logfile), - row: make([]string, 0, 3+len(resTags)+1), + row: make([]string, 3+len(resTags)+1, 3+len(resTags)+1), saveInterval: saveInterval, }, nil } @@ -102,7 +102,7 @@ func New(fs afero.Fs, tags lib.TagSet, config Config) (*Collector, error) { resTags: resTags, ignoredTags: ignoredTags, csvWriter: csv.NewWriter(logfile), - row: make([]string, 0, 3+len(resTags)+1), + row: make([]string, 3+len(resTags)+1, 3+len(resTags)+1), saveInterval: saveInterval, }, nil } @@ -161,8 +161,7 @@ func (c *Collector) WriteToFile() { for _, sc := range samples { for _, sample := range sc.GetSamples() { sample := sample - row := c.row[:0] - row = SampleToRow(&sample, c.resTags, c.ignoredTags, row) + row := SampleToRow(&sample, c.resTags, c.ignoredTags, c.row) err := c.csvWriter.Write(row) if err != nil { logrus.WithField("filename", c.fname).Error("CSV: Error writing to file") @@ -186,13 +185,13 @@ func MakeHeader(tags []string) []string { // SampleToRow converts sample into array of strings func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string, row []string) []string { - row = append(row, sample.Metric.Name) - row = append(row, fmt.Sprintf("%d", sample.Time.Unix())) - row = append(row, fmt.Sprintf("%f", sample.Value)) + row[0] = sample.Metric.Name + row[1] = fmt.Sprintf("%d", sample.Time.Unix()) + row[2] = fmt.Sprintf("%f", sample.Value) sampleTags := sample.Tags.CloneTags() - for _, tag := range resTags { - row = append(row, sampleTags[tag]) + for ind, tag := range resTags { + row[ind+3] = sampleTags[tag] } extraTags := bytes.Buffer{} @@ -219,7 +218,7 @@ func SampleToRow(sample *stats.Sample, resTags []string, ignoredTags []string, r prev = true } } - row = append(row, extraTags.String()) + row[len(row)-1] = extraTags.String() return row } diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index dbc43b91cce..3dca1f4a4b6 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -170,7 +170,7 @@ func TestSampleToRow(t *testing.T) { expectedRow := expected[i] t.Run(testname, func(t *testing.T) { - row := SampleToRow(sample, resTags, ignoredTags, make([]string, 0, 3+len(resTags)+1)) + row := SampleToRow(sample, resTags, ignoredTags, make([]string, 3+len(resTags)+1, 3+len(resTags)+1)) for ind, cell := range expectedRow.baseRow { assert.Equal(t, cell, row[ind]) } From bd1997ae1f4bb34fedcb9c2ab018b30b315a83ae Mon Sep 17 00:00:00 2001 From: Sergey Chernyaev Date: Tue, 20 Aug 2019 13:50:23 +0300 Subject: [PATCH 19/19] Fixed golang-ci errors --- stats/csv/collector.go | 4 ++-- stats/csv/collector_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/stats/csv/collector.go b/stats/csv/collector.go index b67c49cd3d2..1232fe3c298 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -86,7 +86,7 @@ func New(fs afero.Fs, tags lib.TagSet, config Config) (*Collector, error) { resTags: resTags, ignoredTags: ignoredTags, csvWriter: csv.NewWriter(logfile), - row: make([]string, 3+len(resTags)+1, 3+len(resTags)+1), + row: make([]string, 3+len(resTags)+1), saveInterval: saveInterval, }, nil } @@ -102,7 +102,7 @@ func New(fs afero.Fs, tags lib.TagSet, config Config) (*Collector, error) { resTags: resTags, ignoredTags: ignoredTags, csvWriter: csv.NewWriter(logfile), - row: make([]string, 3+len(resTags)+1, 3+len(resTags)+1), + row: make([]string, 3+len(resTags)+1), saveInterval: saveInterval, }, nil } diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 3dca1f4a4b6..70793b8deda 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -170,7 +170,7 @@ func TestSampleToRow(t *testing.T) { expectedRow := expected[i] t.Run(testname, func(t *testing.T) { - row := SampleToRow(sample, resTags, ignoredTags, make([]string, 3+len(resTags)+1, 3+len(resTags)+1)) + row := SampleToRow(sample, resTags, ignoredTags, make([]string, 3+len(resTags)+1)) for ind, cell := range expectedRow.baseRow { assert.Equal(t, cell, row[ind]) }