Skip to content

Commit

Permalink
Moderate data fetching refactoring
Browse files Browse the repository at this point in the history
- WIP: implement max-data-points and internal-aggregation clickhouse
  parameters
- Move data fetching into render package
- Create Rules.RollupPoints function, use it right after data fetching
- Implement locking in alias.Map for concurrent usage
- render.FetchDataPoints: universal function to get data points. Use
  carbonlink, if specified. Make requests in goroutines
- prometheus and render: use FetchDataPoints
- data: use ReadUvarint from clickhouse package
  • Loading branch information
Felixoid committed Aug 25, 2020
1 parent 37b91df commit 8a759b6
Show file tree
Hide file tree
Showing 11 changed files with 378 additions and 342 deletions.
8 changes: 7 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func (d *Duration) Value() time.Duration {
}

type Common struct {
Listen string `toml:"listen" json:"listen"`
// MetricPrefix string `toml:"metric-prefix"`
// MetricInterval *Duration `toml:"metric-interval"`
// MetricEndpoint string `toml:"metric-endpoint"`
Listen string `toml:"listen" json:"listen"`
MaxCPU int `toml:"max-cpu" json:"max-cpu"`
MaxMetricsInFindAnswer int `toml:"max-metrics-in-find-answer" json:"max-metrics-in-find-answer"` //zero means infinite
TargetBlacklist []string `toml:"target-blacklist" json:"target-blacklist"`
Expand All @@ -71,6 +71,10 @@ type ClickHouse struct {
ConnectTimeout *Duration `toml:"connect-timeout" json:"connect-timeout"`
DataTableLegacy string `toml:"data-table" json:"data-table"`
RollupConfLegacy string `toml:"rollup-conf" json:"-"`
// Sets the maximum for maxDataPoints parameter.
MaxDataPoints int `toml:"max-data-points" json:"max-data-points"`
// InternalAggregation controls if ClickHouse itself or graphite-clickhouse aggregates points to proper retention
InternalAggregation bool `toml:"internal-aggregation" json:"internal-aggregation"`
}

type Tags struct {
Expand Down Expand Up @@ -168,6 +172,8 @@ func New() *Config {
TagTable: "",
TaggedAutocompleDays: 7,
ConnectTimeout: &Duration{Duration: time.Second},
MaxDataPoints: 4096, // Default until https://github.com/ClickHouse/ClickHouse/pull/13947
InternalAggregation: false,
},
Tags: Tags{
Date: "2016-11-01",
Expand Down
4 changes: 4 additions & 0 deletions helper/point/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (pp *Points) List() []Point {
return pp.list
}

func (pp *Points) ReplaceList(list []Point) {
pp.list = list
}

func (pp *Points) Len() int {
return len(pp.list)
}
Expand Down
41 changes: 41 additions & 0 deletions helper/rollup/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,44 @@ func (r *Rules) RollupMetric(metricName string, from uint32, points []point.Poin
}
return r.RollupMetricAge(metricName, age, points)
}

// RollupPoints groups sorted Points by metric name and apply rollup one by one
func (r *Rules) RollupPoints(pp *point.Points, from uint32) error {
var i, n int
// i - current position of iterator
// n - position of the first record with current metric
l := pp.Len()
if l == 0 {
return nil
}
oldPoints := pp.List()
newPoints := make([]point.Point, 0)
rollup := func(p []point.Point) ([]point.Point, error) {
metricName := pp.MetricName(p[0].MetricID)
p, _, err := r.RollupMetric(metricName, from, p)
for i := range p {
p[i].MetricID = p[0].MetricID
}
return p, err
}

for i = 1; i < l; i++ {
if oldPoints[i].MetricID != oldPoints[n].MetricID {
points, err := rollup(oldPoints[n:i])
if err != nil {
return err
}
newPoints = append(newPoints, points...)
n = i
continue
}
}

points, err := rollup(oldPoints[n:i])
if err != nil {
return err
}
newPoints = append(newPoints, points...)
pp.ReplaceList(newPoints)
return nil
}
10 changes: 9 additions & 1 deletion pkg/alias/map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package alias

import (
"sync"

"github.com/lomik/graphite-clickhouse/finder"
"github.com/lomik/graphite-clickhouse/pkg/reverse"
)
Expand All @@ -14,11 +16,15 @@ type Value struct {
// Map from real metric name to display name and target
type Map struct {
data map[string][]Value
lock sync.RWMutex
}

// New returns new Map
func New() *Map {
return &Map{data: make(map[string][]Value)}
return &Map{
data: make(map[string][]Value),
lock: sync.RWMutex{},
}
}

// Merge data from finder.Result into aliases map
Expand All @@ -35,11 +41,13 @@ func (m *Map) MergeTarget(r finder.Result, target string) {
continue
}
abs := string(r.Abs(series[i]))
m.lock.Lock()
if x, ok := m.data[key]; ok {
m.data[key] = append(x, Value{Target: target, DisplayName: abs})
} else {
m.data[key] = []Value{Value{Target: target, DisplayName: abs}}
}
m.lock.Unlock()
}
}

Expand Down
46 changes: 10 additions & 36 deletions prometheus/querier_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@
package prometheus

import (
"fmt"
"time"

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/finder"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/pkg/alias"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/lomik/graphite-clickhouse/pkg/where"
"github.com/lomik/graphite-clickhouse/render"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -72,46 +67,25 @@ func (q *Querier) Select(selectParams *storage.SelectParams, labelsMatcher ...*l
return newMetricsSet(am.DisplayNames()), nil, nil
}

pointsTable, isReverse, rollupRules := render.SelectDataTable(q.config, from.Unix(), until.Unix(), []string{}, config.ContextPrometheus)
if pointsTable == "" {
return nil, nil, fmt.Errorf("data table is not specified")
}

wr := where.New()
wr.And(where.In("Path", am.Series(isReverse)))
wr.And(where.TimestampBetween("Time", from.Unix(), until.Unix()+1))

pw := where.New()
pw.And(where.DateBetween("Date", from, until))

query := fmt.Sprintf(render.QUERY,
pointsTable, pw.PreWhereSQL(), wr.SQL(),
)

body, err := clickhouse.Reader(
scope.WithTable(q.ctx, pointsTable),
q.config.ClickHouse.Url,
query,
clickhouse.Options{Timeout: q.config.ClickHouse.DataTimeout.Value(), ConnectTimeout: q.config.ClickHouse.ConnectTimeout.Value()},
)
maxDataPoints := (until.Unix() - from.Unix()) / (selectParams.Step / 1000)

if err != nil {
return nil, nil, err
fetchRequests := render.MultiFetchRequest{
render.TimeFrame{
From: from.Unix(),
Until: until.Unix(),
MaxDataPoints: maxDataPoints,
}: []string{},
}

data, err := render.DataParse(body, nil, isReverse)
reply, err := render.FetchDataPoints(q.ctx, q.config, am, fetchRequests)
if err != nil {
return nil, nil, err
}

data.Points.Sort()
data.Points.Uniq()

if data.Points.Len() == 0 {
if len(reply) == 0 {
return emptySeriesSet(), nil, nil
}

ss, err := makeSeriesSet(data, am, rollupRules)
ss, err := makeSeriesSet(reply[0].Data, am)
if err != nil {
return nil, nil, err
}
Expand Down
78 changes: 10 additions & 68 deletions prometheus/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,17 @@ package prometheus

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/finder"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/point"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/pkg/alias"
"github.com/lomik/graphite-clickhouse/pkg/dry"
"github.com/lomik/graphite-clickhouse/pkg/scope"
"github.com/lomik/graphite-clickhouse/pkg/where"
"github.com/lomik/graphite-clickhouse/render"
"github.com/prometheus/prometheus/prompb"
"go.uber.org/zap"
)

func (h *Handler) series(ctx context.Context, q *prompb.Query) (*alias.Map, error) {
Expand All @@ -50,66 +41,22 @@ func (h *Handler) queryData(ctx context.Context, q *prompb.Query, am *alias.Map)

fromTimestamp := q.StartTimestampMs / 1000
untilTimestamp := q.EndTimestampMs / 1000

pointsTable, isReverse, rollupObj := render.SelectDataTable(h.config, fromTimestamp, untilTimestamp, []string{}, config.ContextPrometheus)
if pointsTable == "" {
err := fmt.Errorf("data table is not specified")
scope.Logger(ctx).Error("select data table failed", zap.Error(err))
return nil, err
}

var maxStep uint32

now := time.Now().Unix()
age := uint32(dry.Max(0, now-fromTimestamp))
series := am.Series(isReverse)

for _, m := range series {
step, _ := rollupObj.Lookup(m, age)
if step > maxStep {
maxStep = step
}
}
pw := where.New()
pw.Andf(
"Date >='%s' AND Date <= '%s'",
time.Unix(fromTimestamp, 0).Format("2006-01-02"),
time.Unix(untilTimestamp, 0).Format("2006-01-02"),
)

wr := where.New()
wr.And(where.In("Path", series))

until := untilTimestamp - untilTimestamp%int64(maxStep) + int64(maxStep) - 1
wr.Andf("Time >= %d AND Time <= %d", fromTimestamp, until)

query := fmt.Sprintf(render.QUERY,
pointsTable, pw.PreWhereSQL(), wr.SQL(),
)

body, err := clickhouse.Reader(
scope.WithTable(ctx, pointsTable),
h.config.ClickHouse.Url,
query,
clickhouse.Options{Timeout: h.config.ClickHouse.DataTimeout.Value(), ConnectTimeout: h.config.ClickHouse.ConnectTimeout.Value()},
)

if err != nil {
return nil, err
}

data, err := render.DataParse(body, nil, false)
fetchRequests := render.MultiFetchRequest{
render.TimeFrame{
From: fromTimestamp,
Until: untilTimestamp,
MaxDataPoints: int64(h.config.ClickHouse.MaxDataPoints),
}: []string{},
}
reply, err := render.FetchDataPoints(ctx, h.config, am, fetchRequests)
if err != nil {
return nil, err
}

data.Points.Sort()
data.Points.Uniq()

return h.makeQueryResult(ctx, data, rollupObj, am, uint32(fromTimestamp), uint32(untilTimestamp))
return h.makeQueryResult(ctx, reply[0].Data, am, uint32(fromTimestamp), uint32(untilTimestamp))
}

func (h *Handler) makeQueryResult(ctx context.Context, data *render.Data, rollupObj *rollup.Rules, am *alias.Map, from, until uint32) (*prompb.QueryResult, error) {
func (h *Handler) makeQueryResult(ctx context.Context, data *render.Data, am *alias.Map, from, until uint32) (*prompb.QueryResult, error) {
if data == nil {
return &prompb.QueryResult{}, nil
}
Expand All @@ -126,11 +73,6 @@ func (h *Handler) makeQueryResult(ctx context.Context, data *render.Data, rollup

writeMetric := func(points []point.Point) {
metricName := data.Points.MetricName(points[0].MetricID)
points, _, err := rollupObj.RollupMetric(metricName, from, points)
if err != nil {
scope.Logger(ctx).Error("rollup failed", zap.Error(err))
return
}

for _, dn := range am.Get(metricName) {
u, err := url.Parse(dn.DisplayName)
Expand Down
8 changes: 1 addition & 7 deletions prometheus/series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package prometheus

import (
"github.com/lomik/graphite-clickhouse/helper/point"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/pkg/alias"

"github.com/lomik/graphite-clickhouse/render"
Expand Down Expand Up @@ -33,7 +32,7 @@ type seriesSet struct {

var _ storage.SeriesSet = &seriesSet{}

func makeSeriesSet(data *render.Data, am *alias.Map, rollupRules *rollup.Rules) (storage.SeriesSet, error) {
func makeSeriesSet(data *render.Data, am *alias.Map) (storage.SeriesSet, error) {
ss := &seriesSet{series: make([]series, 0), current: -1}
if data == nil {
return ss, nil
Expand All @@ -48,11 +47,6 @@ func makeSeriesSet(data *render.Data, am *alias.Map, rollupRules *rollup.Rules)
appendSeries := func(metricID uint32, points []point.Point) error {
metricName := data.Points.MetricName(metricID)

points, _, err := rollupRules.RollupMetric(metricName, points[0].Time, points)
if err != nil {
return err
}

for _, v := range am.Get(metricName) {
ss.series = append(ss.series, series{metricName: v.DisplayName, points: points})
}
Expand Down
Loading

0 comments on commit 8a759b6

Please sign in to comment.