Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influxdb tags as fields #585

Merged
merged 6 commits into from Apr 20, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/collectors.go
Expand Up @@ -68,7 +68,7 @@ func newCollector(collectorName, arg string, src *lib.SourceData, conf Config) (
case collectorJSON:
return jsonc.New(afero.NewOsFs(), arg)
case collectorInfluxDB:
config := conf.Collectors.InfluxDB
config := influxdb.NewConfig().Apply(conf.Collectors.InfluxDB)
if err := loadConfig(&config); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/login_influxdb.go
Expand Up @@ -46,7 +46,7 @@ This will set the default server used when just "-o influxdb" is passed.`,
return err
}

conf := config.Collectors.InfluxDB
conf := influxdb.NewConfig().Apply(config.Collectors.InfluxDB)
if len(args) > 0 {
if err := conf.UnmarshalText([]byte(args[0])); err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion samples/config.json
@@ -1,3 +1,8 @@
{
"vus": 100
"vus": 100,
"collectors": {
"influxdb": {
"tagsAsFields": ["vu","iter", "url", "name"]
}
}
}
18 changes: 16 additions & 2 deletions stats/influxdb/collector.go
Expand Up @@ -108,10 +108,12 @@ func (c *Collector) commit() {
}

for _, sample := range samples {
tags := sample.Tags.CloneTags() //TODO: optimize when implementing https://github.com/loadimpact/k6/issues/569
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea with this TODO is that since this pull request, sample.Tags is now comparable, it could be used as a map key for creating a simple cache. If you had something like this outside of the for loop:

cache := map[sample.Tags]struct {
	tags   map[string]string
	values map[string]interface{}
}{}

In the for loop you'd be able to check if certain tags are already in the cache with

if cached, ok := cache[sample.Tags]; ok {
	// Reuse the cached.tags, create a new map with {"value": sample.Value} and copy all elements from cached.values to it
} else {
	// Do what you currently do and then save the resulting tags and values as a new entry in the cache
}

I'm fine with merging this as it is, if you don't want to bother with this probably premature optimization. If it turns out that it's an issue, we'll optimize it in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at this now so we can hold off with the merge.

values := c.extractFields(tags, sample.Value)
p, err := client.NewPoint(
sample.Metric.Name,
sample.Tags.CloneTags(), //TODO: optimize when implementing https://github.com/loadimpact/k6/issues/569
map[string]interface{}{"value": sample.Value},
tags,
values,
sample.Time,
)
if err != nil {
Expand All @@ -130,6 +132,18 @@ func (c *Collector) commit() {
log.WithField("t", t).Debug("InfluxDB: Batch written!")
}

func (c *Collector) extractFields(tags map[string]string, value interface{}) map[string]interface{} {
fields := make(map[string]interface{})
fields["value"] = value
for _, tag := range c.Config.TagsAsFields {
if val, ok := tags[tag]; ok {
fields[tag] = val
delete(tags, tag)
}
}
return fields
}

// GetRequiredSystemTags returns which sample tags are needed by this collector
func (c *Collector) GetRequiredSystemTags() lib.TagSet {
return lib.TagSet{} // There are no required tags for this collector
Expand Down
19 changes: 15 additions & 4 deletions stats/influxdb/config.go
Expand Up @@ -38,14 +38,20 @@ type ConfigFields struct {
PayloadSize int `json:"payload_size,omitempty" envconfig:"INFLUXDB_PAYLOAD_SIZE"`

// Samples.
DB string `json:"db" envconfig:"INFLUXDB_DB"`
Precision string `json:"precision,omitempty" envconfig:"INFLUXDB_PRECISION"`
Retention string `json:"retention,omitempty" envconfig:"INFLUXDB_RETENTION"`
Consistency string `json:"consistency,omitempty" envconfig:"INFLUXDB_CONSISTENCY"`
DB string `json:"db" envconfig:"INFLUXDB_DB"`
Precision string `json:"precision,omitempty" envconfig:"INFLUXDB_PRECISION"`
Retention string `json:"retention,omitempty" envconfig:"INFLUXDB_RETENTION"`
Consistency string `json:"consistency,omitempty" envconfig:"INFLUXDB_CONSISTENCY"`
TagsAsFields []string `json:"tagsAsFields,omitempty" envconfig:"INFLUXDB_TAGS_AS_FIELDS"`
}

type Config ConfigFields

func NewConfig() *Config {
c := &Config{TagsAsFields: []string{"vu", "iter", "url"}}
return c
}

func (c Config) Apply(cfg Config) Config {
if cfg.Addr != "" {
c.Addr = cfg.Addr
Expand Down Expand Up @@ -74,6 +80,9 @@ func (c Config) Apply(cfg Config) Config {
if cfg.Consistency != "" {
c.Consistency = cfg.Consistency
}
if len(cfg.TagsAsFields) > 0 {
c.TagsAsFields = cfg.TagsAsFields
}
return c
}

Expand Down Expand Up @@ -112,6 +121,8 @@ func (c *Config) UnmarshalText(text []byte) error {
c.Retention = vs[0]
case "consistency":
c.Consistency = vs[0]
case "tagsAsFields":
c.TagsAsFields = vs
default:
return errors.Errorf("unknown query parameter: %s", k)
}
Expand Down