diff --git a/config/config.go b/config/config.go index 6d451b5a1..a165e7500 100644 --- a/config/config.go +++ b/config/config.go @@ -42,10 +42,10 @@ func (d *Duration) Value() time.Duration { } type Common struct { - Listen string `toml:"listen" json:"listen"` // MetricPrefix string `toml:"metric-prefix"` // MetricInterval *Duration `toml:"metric-interval"` // MetricEndpoint string `toml:"metric-endpoint"` + Listen string `toml:"listen" json:"listen"` MaxCPU int `toml:"max-cpu" json:"max-cpu"` MaxMetricsInFindAnswer int `toml:"max-metrics-in-find-answer" json:"max-metrics-in-find-answer"` //zero means infinite TargetBlacklist []string `toml:"target-blacklist" json:"target-blacklist"` @@ -71,6 +71,10 @@ type ClickHouse struct { ConnectTimeout *Duration `toml:"connect-timeout" json:"connect-timeout"` DataTableLegacy string `toml:"data-table" json:"data-table"` RollupConfLegacy string `toml:"rollup-conf" json:"-"` + // Sets the maximum for maxDataPoints parameter. + MaxDataPoints int `toml:"max-data-points" json:"max-data-points"` + // InternalAggregation controls if ClickHouse itself or graphite-clickhouse aggregates points to proper retention + InternalAggregation bool `toml:"internal-aggregation" json:"internal-aggregation"` } type Tags struct { @@ -168,6 +172,8 @@ func New() *Config { TagTable: "", TaggedAutocompleDays: 7, ConnectTimeout: &Duration{Duration: time.Second}, + MaxDataPoints: 4096, // Default until https://github.com/ClickHouse/ClickHouse/pull/13947 + InternalAggregation: false, }, Tags: Tags{ Date: "2016-11-01", diff --git a/helper/point/points.go b/helper/point/points.go index a1307cea9..49aa198a0 100644 --- a/helper/point/points.go +++ b/helper/point/points.go @@ -52,6 +52,10 @@ func (pp *Points) List() []Point { return pp.list } +func (pp *Points) ReplaceList(list []Point) { + pp.list = list +} + func (pp *Points) Len() int { return len(pp.list) } diff --git a/helper/rollup/rules.go b/helper/rollup/rules.go index d78fb7ed0..87f4ecaaf 100644 --- a/helper/rollup/rules.go +++ b/helper/rollup/rules.go @@ -226,3 +226,44 @@ func (r *Rules) RollupMetric(metricName string, from uint32, points []point.Poin } return r.RollupMetricAge(metricName, age, points) } + +// RollupPoints groups sorted Points by metric name and apply rollup one by one +func (r *Rules) RollupPoints(pp *point.Points, from uint32) error { + var i, n int + // i - current position of iterator + // n - position of the first record with current metric + l := pp.Len() + if l == 0 { + return nil + } + oldPoints := pp.List() + newPoints := make([]point.Point, 0) + rollup := func(p []point.Point) ([]point.Point, error) { + metricName := pp.MetricName(p[0].MetricID) + p, _, err := r.RollupMetric(metricName, from, p) + for i := range p { + p[i].MetricID = p[0].MetricID + } + return p, err + } + + for i = 1; i < l; i++ { + if oldPoints[i].MetricID != oldPoints[n].MetricID { + points, err := rollup(oldPoints[n:i]) + if err != nil { + return err + } + newPoints = append(newPoints, points...) + n = i + continue + } + } + + points, err := rollup(oldPoints[n:i]) + if err != nil { + return err + } + newPoints = append(newPoints, points...) + pp.ReplaceList(newPoints) + return nil +} diff --git a/pkg/alias/map.go b/pkg/alias/map.go index 58c2f7288..a1a510489 100644 --- a/pkg/alias/map.go +++ b/pkg/alias/map.go @@ -1,6 +1,8 @@ package alias import ( + "sync" + "github.com/lomik/graphite-clickhouse/finder" "github.com/lomik/graphite-clickhouse/pkg/reverse" ) @@ -14,11 +16,15 @@ type Value struct { // Map from real metric name to display name and target type Map struct { data map[string][]Value + lock sync.RWMutex } // New returns new Map func New() *Map { - return &Map{data: make(map[string][]Value)} + return &Map{ + data: make(map[string][]Value), + lock: sync.RWMutex{}, + } } // Merge data from finder.Result into aliases map @@ -35,11 +41,13 @@ func (m *Map) MergeTarget(r finder.Result, target string) { continue } abs := string(r.Abs(series[i])) + m.lock.Lock() if x, ok := m.data[key]; ok { m.data[key] = append(x, Value{Target: target, DisplayName: abs}) } else { m.data[key] = []Value{Value{Target: target, DisplayName: abs}} } + m.lock.Unlock() } } diff --git a/prometheus/querier_select.go b/prometheus/querier_select.go index 4b96a5ab6..8e4939255 100644 --- a/prometheus/querier_select.go +++ b/prometheus/querier_select.go @@ -3,15 +3,10 @@ package prometheus import ( - "fmt" "time" - "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/finder" - "github.com/lomik/graphite-clickhouse/helper/clickhouse" "github.com/lomik/graphite-clickhouse/pkg/alias" - "github.com/lomik/graphite-clickhouse/pkg/scope" - "github.com/lomik/graphite-clickhouse/pkg/where" "github.com/lomik/graphite-clickhouse/render" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -72,46 +67,25 @@ func (q *Querier) Select(selectParams *storage.SelectParams, labelsMatcher ...*l return newMetricsSet(am.DisplayNames()), nil, nil } - pointsTable, isReverse, rollupRules := render.SelectDataTable(q.config, from.Unix(), until.Unix(), []string{}, config.ContextPrometheus) - if pointsTable == "" { - return nil, nil, fmt.Errorf("data table is not specified") - } - - wr := where.New() - wr.And(where.In("Path", am.Series(isReverse))) - wr.And(where.TimestampBetween("Time", from.Unix(), until.Unix()+1)) - - pw := where.New() - pw.And(where.DateBetween("Date", from, until)) - - query := fmt.Sprintf(render.QUERY, - pointsTable, pw.PreWhereSQL(), wr.SQL(), - ) - - body, err := clickhouse.Reader( - scope.WithTable(q.ctx, pointsTable), - q.config.ClickHouse.Url, - query, - clickhouse.Options{Timeout: q.config.ClickHouse.DataTimeout.Value(), ConnectTimeout: q.config.ClickHouse.ConnectTimeout.Value()}, - ) + maxDataPoints := (until.Unix() - from.Unix()) / (selectParams.Step / 1000) - if err != nil { - return nil, nil, err + fetchRequests := render.MultiFetchRequest{ + render.TimeFrame{ + From: from.Unix(), + Until: until.Unix(), + MaxDataPoints: maxDataPoints, + }: []string{}, } - - data, err := render.DataParse(body, nil, isReverse) + reply, err := render.FetchDataPoints(q.ctx, q.config, am, fetchRequests) if err != nil { return nil, nil, err } - data.Points.Sort() - data.Points.Uniq() - - if data.Points.Len() == 0 { + if len(reply) == 0 { return emptySeriesSet(), nil, nil } - ss, err := makeSeriesSet(data, am, rollupRules) + ss, err := makeSeriesSet(reply[0].Data, am) if err != nil { return nil, nil, err } diff --git a/prometheus/read.go b/prometheus/read.go index f510bfd1b..2349cf4a3 100644 --- a/prometheus/read.go +++ b/prometheus/read.go @@ -4,26 +4,17 @@ package prometheus import ( "context" - "fmt" "io/ioutil" "net/http" "net/url" - "time" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" - "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/finder" - "github.com/lomik/graphite-clickhouse/helper/clickhouse" "github.com/lomik/graphite-clickhouse/helper/point" - "github.com/lomik/graphite-clickhouse/helper/rollup" "github.com/lomik/graphite-clickhouse/pkg/alias" - "github.com/lomik/graphite-clickhouse/pkg/dry" - "github.com/lomik/graphite-clickhouse/pkg/scope" - "github.com/lomik/graphite-clickhouse/pkg/where" "github.com/lomik/graphite-clickhouse/render" "github.com/prometheus/prometheus/prompb" - "go.uber.org/zap" ) func (h *Handler) series(ctx context.Context, q *prompb.Query) (*alias.Map, error) { @@ -50,66 +41,22 @@ func (h *Handler) queryData(ctx context.Context, q *prompb.Query, am *alias.Map) fromTimestamp := q.StartTimestampMs / 1000 untilTimestamp := q.EndTimestampMs / 1000 - - pointsTable, isReverse, rollupObj := render.SelectDataTable(h.config, fromTimestamp, untilTimestamp, []string{}, config.ContextPrometheus) - if pointsTable == "" { - err := fmt.Errorf("data table is not specified") - scope.Logger(ctx).Error("select data table failed", zap.Error(err)) - return nil, err - } - - var maxStep uint32 - - now := time.Now().Unix() - age := uint32(dry.Max(0, now-fromTimestamp)) - series := am.Series(isReverse) - - for _, m := range series { - step, _ := rollupObj.Lookup(m, age) - if step > maxStep { - maxStep = step - } - } - pw := where.New() - pw.Andf( - "Date >='%s' AND Date <= '%s'", - time.Unix(fromTimestamp, 0).Format("2006-01-02"), - time.Unix(untilTimestamp, 0).Format("2006-01-02"), - ) - - wr := where.New() - wr.And(where.In("Path", series)) - - until := untilTimestamp - untilTimestamp%int64(maxStep) + int64(maxStep) - 1 - wr.Andf("Time >= %d AND Time <= %d", fromTimestamp, until) - - query := fmt.Sprintf(render.QUERY, - pointsTable, pw.PreWhereSQL(), wr.SQL(), - ) - - body, err := clickhouse.Reader( - scope.WithTable(ctx, pointsTable), - h.config.ClickHouse.Url, - query, - clickhouse.Options{Timeout: h.config.ClickHouse.DataTimeout.Value(), ConnectTimeout: h.config.ClickHouse.ConnectTimeout.Value()}, - ) - - if err != nil { - return nil, err - } - - data, err := render.DataParse(body, nil, false) + fetchRequests := render.MultiFetchRequest{ + render.TimeFrame{ + From: fromTimestamp, + Until: untilTimestamp, + MaxDataPoints: int64(h.config.ClickHouse.MaxDataPoints), + }: []string{}, + } + reply, err := render.FetchDataPoints(ctx, h.config, am, fetchRequests) if err != nil { return nil, err } - data.Points.Sort() - data.Points.Uniq() - - return h.makeQueryResult(ctx, data, rollupObj, am, uint32(fromTimestamp), uint32(untilTimestamp)) + return h.makeQueryResult(ctx, reply[0].Data, am, uint32(fromTimestamp), uint32(untilTimestamp)) } -func (h *Handler) makeQueryResult(ctx context.Context, data *render.Data, rollupObj *rollup.Rules, am *alias.Map, from, until uint32) (*prompb.QueryResult, error) { +func (h *Handler) makeQueryResult(ctx context.Context, data *render.Data, am *alias.Map, from, until uint32) (*prompb.QueryResult, error) { if data == nil { return &prompb.QueryResult{}, nil } @@ -126,11 +73,6 @@ func (h *Handler) makeQueryResult(ctx context.Context, data *render.Data, rollup writeMetric := func(points []point.Point) { metricName := data.Points.MetricName(points[0].MetricID) - points, _, err := rollupObj.RollupMetric(metricName, from, points) - if err != nil { - scope.Logger(ctx).Error("rollup failed", zap.Error(err)) - return - } for _, dn := range am.Get(metricName) { u, err := url.Parse(dn.DisplayName) diff --git a/prometheus/series_set.go b/prometheus/series_set.go index 8b9c17ecd..20694e4ae 100644 --- a/prometheus/series_set.go +++ b/prometheus/series_set.go @@ -4,7 +4,6 @@ package prometheus import ( "github.com/lomik/graphite-clickhouse/helper/point" - "github.com/lomik/graphite-clickhouse/helper/rollup" "github.com/lomik/graphite-clickhouse/pkg/alias" "github.com/lomik/graphite-clickhouse/render" @@ -33,7 +32,7 @@ type seriesSet struct { var _ storage.SeriesSet = &seriesSet{} -func makeSeriesSet(data *render.Data, am *alias.Map, rollupRules *rollup.Rules) (storage.SeriesSet, error) { +func makeSeriesSet(data *render.Data, am *alias.Map) (storage.SeriesSet, error) { ss := &seriesSet{series: make([]series, 0), current: -1} if data == nil { return ss, nil @@ -48,11 +47,6 @@ func makeSeriesSet(data *render.Data, am *alias.Map, rollupRules *rollup.Rules) appendSeries := func(metricID uint32, points []point.Point) error { metricName := data.Points.MetricName(metricID) - points, _, err := rollupRules.RollupMetric(metricName, points[0].Time, points) - if err != nil { - return err - } - for _, v := range am.Get(metricName) { ss.series = append(ss.series, series{metricName: v.DisplayName, points: points}) } diff --git a/render/data.go b/render/data.go index 73b48d62e..a08f2e5ac 100644 --- a/render/data.go +++ b/render/data.go @@ -3,42 +3,38 @@ package render import ( "bufio" "bytes" + "context" "encoding/binary" "errors" + "fmt" "io" "math" + "sync" + "time" + "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/helper/clickhouse" "github.com/lomik/graphite-clickhouse/helper/point" + "github.com/lomik/graphite-clickhouse/helper/rollup" "github.com/lomik/graphite-clickhouse/pkg/alias" + "github.com/lomik/graphite-clickhouse/pkg/dry" "github.com/lomik/graphite-clickhouse/pkg/reverse" + "github.com/lomik/graphite-clickhouse/pkg/scope" + "github.com/lomik/graphite-clickhouse/pkg/where" + "go.uber.org/zap" + + graphitePickle "github.com/lomik/graphite-pickle" ) -var errUvarintRead = errors.New("ReadUvarint: Malformed array") -var errUvarintOverflow = errors.New("ReadUvarint: varint overflows a 64-bit integer") var errClickHouseResponse = errors.New("Malformed response from clickhouse") // QUERY to get data from ClickHouse const QUERY = `SELECT Path, groupArray(Time), groupArray(Value), groupArray(Timestamp) FROM %s %s %s GROUP BY Path FORMAT RowBinary` -func ReadUvarint(array []byte) (uint64, int, error) { - var x uint64 - var s uint - l := len(array) - 1 - for i := 0; ; i++ { - if i > l { - return x, i + 1, errUvarintRead - } - if array[i] < 0x80 { - if i > 9 || i == 9 && array[i] > 1 { - return x, i + 1, errUvarintOverflow - } - return x | uint64(array[i])< 0 { + tm := uint32(time.Now().Unix()) + + for metric, points := range res { + metricID := result.MetricID(metric) + for _, p := range points { + result.AppendPoint(metricID, p.Value, uint32(p.Timestamp), tm) + } + } + } + + carbonlinkResponseChan <- result + }() + + return fetchResult +} + +// FetchDataPoints fetches the data from ClickHouse and parses it into []CHResponse +func FetchDataPoints(ctx context.Context, cfg *config.Config, am *alias.Map, fetchRequests MultiFetchRequest) ([]CHResponse, error) { + logger := scope.Logger(ctx) + var rollupTime time.Duration + var lock sync.RWMutex + var wg sync.WaitGroup + defer func() { + logger.Debug( + "rollup", + zap.String("runtime", rollupTime.String()), + zap.Duration("runtime_ns", rollupTime), + ) + }() + + reply := make([]CHResponse, 0, len(fetchRequests)) + errors := make([]error, 0, len(fetchRequests)) + fetch := func(tf TimeFrame, targets []string) { + defer wg.Done() + fromTimestamp := tf.From + untilTimestamp := tf.Until + + pointsTable, isReverse, rollupObj := SelectDataTable(cfg, fromTimestamp, untilTimestamp, targets, config.ContextGraphite) + if pointsTable == "" { + err := fmt.Errorf("data tables is not specified for %v", targets[0]) + logger.Error("data tables is not specified", zap.Error(err)) + // http.Error(w, err.Error(), http.StatusInternalServerError) + lock.Lock() + errors = append(errors, err) + lock.Unlock() + return + } + + var maxStep uint32 + + now := time.Now().Unix() + age := uint32(dry.Max(0, now-fromTimestamp)) + + metricList := am.Series(isReverse) + + if len(metricList) == 0 { + return + } + + // calculate max step + for _, m := range metricList { + step, _ := rollupObj.Lookup(m, age) + if step > maxStep { + maxStep = step + } + } + + pw := where.New() + pw.And(where.DateBetween("Date", time.Unix(fromTimestamp, 0), time.Unix(untilTimestamp, 0))) + + wr := where.New() + wr.And(where.In("Path", metricList)) + + until := untilTimestamp - untilTimestamp%int64(maxStep) + int64(maxStep) - 1 + wr.And(where.TimestampBetween("Time", fromTimestamp, until)) + + query := fmt.Sprintf( + QUERY, + pointsTable, pw.PreWhereSQL(), wr.SQL(), + ) + + // from carbonlink request + carbonlinkResponseRead := queryCarbonlink(ctx, cfg, metricList) + + body, err := clickhouse.Reader( + scope.WithTable(ctx, pointsTable), + cfg.ClickHouse.Url, + query, + clickhouse.Options{Timeout: cfg.ClickHouse.DataTimeout.Value(), ConnectTimeout: cfg.ClickHouse.ConnectTimeout.Value()}, + ) + + if err != nil { + logger.Error("reader", zap.Error(err)) + lock.Lock() + errors = append(errors, err) + lock.Unlock() + return + } + + // fetch carbonlink response + carbonlinkData := carbonlinkResponseRead() + + parseStart := time.Now() + + // pass carbonlinkData to DataParse + data, err := DataParse(body, carbonlinkData, isReverse) + + if err != nil { + logger.Error("data", zap.Error(err), zap.Int("read_bytes", data.length)) + lock.Lock() + errors = append(errors, err) + lock.Unlock() + return + } + logger.Info("render", zap.Int("read_bytes", data.length), zap.Int("read_points", data.Points.Len())) + + d := time.Since(parseStart) + logger.Debug("parse", zap.String("runtime", d.String()), zap.Duration("runtime_ns", d)) + + sortStart := time.Now() + data.Points.Sort() + d = time.Since(sortStart) + logger.Debug("sort", zap.String("runtime", d.String()), zap.Duration("runtime_ns", d)) + + data.Points.Uniq() + data.Aliases = am + rollupStart := time.Now() + err = rollupObj.RollupPoints(data.Points, uint32(fromTimestamp)) + if err != nil { + logger.Error("rollup failed", zap.Error(err)) + lock.Lock() + errors = append(errors, err) + lock.Unlock() + return + } + rollupTime += time.Since(rollupStart) + + lock.Lock() + reply = append(reply, CHResponse{ + Data: data, + RollupObj: rollupObj, + From: fromTimestamp, + Until: untilTimestamp, + }) + lock.Unlock() + } + + for tf, targets := range fetchRequests { + wg.Add(1) + go fetch(tf, targets) + } + wg.Wait() + for _, err := range errors { + if err != nil { + return EmptyResponse, err + } + } + + return reply, nil +} + +func queryUnaggregatedData() { + return +} func (d *Data) finalName(name string) string { s, ok := d.nameMap[name] @@ -62,7 +282,7 @@ func (d *Data) finalName(name string) string { // Error handler for DataSplitFunc func splitErrorHandler(data *[]byte, atEOF bool, tokenLen int, err error) (int, []byte, error) { - if err == errUvarintRead { + if err == clickhouse.ErrUvarintRead { if atEOF { return 0, nil, clickhouse.NewErrDataParse(errClickHouseResponse.Error(), string(*data)) } diff --git a/render/handler.go b/render/handler.go index ea5e9ce4b..5e80958a8 100644 --- a/render/handler.go +++ b/render/handler.go @@ -1,11 +1,11 @@ package render import ( - "context" "fmt" "io/ioutil" "net/http" "strconv" + "sync" "time" "go.uber.org/zap" @@ -14,19 +14,13 @@ import ( "github.com/lomik/graphite-clickhouse/config" "github.com/lomik/graphite-clickhouse/finder" "github.com/lomik/graphite-clickhouse/helper/clickhouse" - "github.com/lomik/graphite-clickhouse/helper/point" - "github.com/lomik/graphite-clickhouse/helper/rollup" "github.com/lomik/graphite-clickhouse/pkg/alias" "github.com/lomik/graphite-clickhouse/pkg/dry" "github.com/lomik/graphite-clickhouse/pkg/scope" - "github.com/lomik/graphite-clickhouse/pkg/where" - - graphitePickle "github.com/lomik/graphite-pickle" ) type Handler struct { - config *config.Config - carbonlink *graphitePickle.CarbonlinkClient + config *config.Config } func NewHandler(config *config.Config) *Handler { @@ -34,79 +28,16 @@ func NewHandler(config *config.Config) *Handler { config: config, } - if config.Carbonlink.Server != "" { - h.carbonlink = graphitePickle.NewCarbonlinkClient( - config.Carbonlink.Server, - config.Carbonlink.Retries, - config.Carbonlink.Threads, - config.Carbonlink.ConnectTimeout.Value(), - config.Carbonlink.QueryTimeout.Value(), - ) - } return h } -// returns callable result fetcher -func (h *Handler) queryCarbonlink(parentCtx context.Context, logger *zap.Logger, metrics []string) func() *point.Points { - if h.carbonlink == nil { - return func() *point.Points { return nil } - } - - carbonlinkResponseChan := make(chan *point.Points, 1) - - fetchResult := func() *point.Points { - result := <-carbonlinkResponseChan - return result - } - - go func() { - ctx, cancel := context.WithTimeout(parentCtx, h.config.Carbonlink.TotalTimeout.Value()) - defer cancel() - - res, err := h.carbonlink.CacheQueryMulti(ctx, metrics) - - if err != nil { - logger.Info("carbonlink failed", zap.Error(err)) - } - - result := point.NewPoints() - - if res != nil && len(res) > 0 { - tm := uint32(time.Now().Unix()) - - for metric, points := range res { - metricID := result.MetricID(metric) - for _, p := range points { - result.AppendPoint(metricID, p.Value, uint32(p.Timestamp), tm) - } - } - } - - carbonlinkResponseChan <- result - }() - - return fetchResult -} - -type timeFrame struct { - from int64 - until int64 -} - -type chResponse struct { - data *Data - rollupObj *rollup.Rules - from int64 - until int64 -} - func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger := scope.Logger(r.Context()) var prefix string var err error - fetchRequests := make(map[timeFrame][]string) + fetchRequests := make(MultiFetchRequest) r.ParseMultipartForm(1024 * 1024) @@ -132,9 +63,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { q.Set("until", fmt.Sprintf("%d", pv3Request.Metrics[0].StopTime)) for _, m := range pv3Request.Metrics { - tf := timeFrame{ - from: m.StartTime, - until: m.StopTime, + tf := TimeFrame{ + From: m.StartTime, + Until: m.StopTime, + MaxDataPoints: m.MaxDataPoints, } if _, ok := fetchRequests[tf]; ok { fetchRequests[tf] = append(fetchRequests[tf], m.PathExpression) @@ -153,133 +85,70 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - untilTimestamp, err := strconv.ParseInt(r.FormValue("until"), 10, 32) + untilTimestamp, err := strconv.ParseInt(r.FormValue("maxDataPoints"), 10, 32) if err != nil { http.Error(w, "Bad request (cannot parse until)", http.StatusBadRequest) return } + maxDataPoints, err := strconv.ParseInt(r.FormValue("until"), 10, 32) + if err != nil { + maxDataPoints = int64(h.config.ClickHouse.MaxDataPoints) + } + targets := dry.RemoveEmptyStrings(r.Form["target"]) - tf := timeFrame{ - from: fromTimestamp, - until: untilTimestamp, + tf := TimeFrame{ + From: fromTimestamp, + Until: untilTimestamp, + MaxDataPoints: maxDataPoints, } fetchRequests[tf] = targets } am := alias.New() + var wg sync.WaitGroup + var lock sync.RWMutex + errors := make([]error, 0, len(fetchRequests)) for tf, targets := range fetchRequests { for _, target := range targets { - // Search in small index table first - fndResult, err := finder.Find(h.config, r.Context(), target, tf.from, tf.until) - if err != nil { - logger.Error("find", zap.Error(err)) - clickhouse.HandleError(w, err) - return - } + wg.Add(1) + go func(target string) { + defer wg.Done() + // Search in small index table first + fndResult, err := finder.Find(h.config, r.Context(), target, tf.From, tf.Until) + lock.Lock() + errors = append(errors, err) + lock.Unlock() + if err != nil { + logger.Error("find", zap.Error(err)) + return + } - am.MergeTarget(fndResult, target) + am.MergeTarget(fndResult, target) + }(target) } } - - logger.Info("finder", zap.Int("metrics", am.Len())) - - reply := make([]chResponse, 0, len(fetchRequests)) - metricCount := 0 - for tf, targets := range fetchRequests { - fromTimestamp := tf.from - untilTimestamp := tf.until - - pointsTable, isReverse, rollupObj := SelectDataTable(h.config, fromTimestamp, untilTimestamp, targets, config.ContextGraphite) - if pointsTable == "" { - logger.Error("data tables is not specified", zap.Error(err)) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - var maxStep uint32 - - now := time.Now().Unix() - age := uint32(dry.Max(0, now-fromTimestamp)) - - metricList := am.Series(isReverse) - - if len(metricList) == 0 { - continue - } - metricCount += len(metricList) - - // calculate max step - for _, m := range metricList { - step, _ := rollupObj.Lookup(m, age) - if step > maxStep { - maxStep = step - } - } - - pw := where.New() - pw.And(where.DateBetween("Date", time.Unix(fromTimestamp, 0), time.Unix(untilTimestamp, 0))) - - wr := where.New() - wr.And(where.In("Path", metricList)) - - until := untilTimestamp - untilTimestamp%int64(maxStep) + int64(maxStep) - 1 - wr.And(where.TimestampBetween("Time", fromTimestamp, until)) - - query := fmt.Sprintf(QUERY, - pointsTable, pw.PreWhereSQL(), wr.SQL(), - ) - - // from carbonlink request - carbonlinkResponseRead := h.queryCarbonlink(r.Context(), logger, metricList) - - body, err := clickhouse.Reader( - scope.WithTable(r.Context(), pointsTable), - h.config.ClickHouse.Url, - query, - clickhouse.Options{Timeout: h.config.ClickHouse.DataTimeout.Value(), ConnectTimeout: h.config.ClickHouse.ConnectTimeout.Value()}, - ) - - if err != nil { - logger.Error("reader", zap.Error(err)) - clickhouse.HandleError(w, err) - return - } - - // fetch carbonlink response - carbonlinkData := carbonlinkResponseRead() - - parseStart := time.Now() - - // pass carbonlinkData to DataParse - data, err := DataParse(body, carbonlinkData, isReverse) - + wg.Wait() + for _, err := range errors { if err != nil { - logger.Error("data", zap.Error(err), zap.Int("read_bytes", data.length)) clickhouse.HandleError(w, err) return } - logger.Info("render", zap.Int("read_bytes", data.length), zap.Int("read_points", data.Points.Len())) + } - d := time.Since(parseStart) - logger.Debug("parse", zap.String("runtime", d.String()), zap.Duration("runtime_ns", d)) + logger.Info("finder", zap.Int("metrics", am.Len())) - sortStart := time.Now() - data.Points.Sort() - d = time.Since(sortStart) - logger.Debug("sort", zap.String("runtime", d.String()), zap.Duration("runtime_ns", d)) + if am.Len() == 0 { + h.Reply(w, r, "", EmptyResponse) + return + } - data.Points.Uniq() - data.Aliases = am - reply = append(reply, chResponse{ - data: data, - from: fromTimestamp, - until: untilTimestamp, - rollupObj: rollupObj, - }) + reply, err := FetchDataPoints(r.Context(), h.config, am, fetchRequests) + if err != nil { + clickhouse.HandleError(w, err) } - if metricCount == 0 { + if len(reply) == 0 { h.Reply(w, r, "", EmptyResponse) return } @@ -288,13 +157,13 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.Reply(w, r, prefix, reply) } -func (h *Handler) Reply(w http.ResponseWriter, r *http.Request, prefix string, data []chResponse) { +func (h *Handler) Reply(w http.ResponseWriter, r *http.Request, prefix string, data []CHResponse) { start := time.Now() // All formats, except of carbonapi_v3_pb would have same from and until time, and data would contain only // one response switch r.FormValue("format") { case "pickle": - h.ReplyPickle(w, r, data[0].data, uint32(data[0].from), uint32(data[0].until), prefix, data[0].rollupObj) + h.ReplyPickle(w, r, data[0].Data, uint32(data[0].From), uint32(data[0].Until), prefix, data[0].RollupObj) case "protobuf": h.ReplyProtobuf(w, r, prefix, data, false) case "carbonapi_v3_pb": diff --git a/render/reply_pickle.go b/render/reply_pickle.go index 6f25ae00e..4b2c0bcd9 100644 --- a/render/reply_pickle.go +++ b/render/reply_pickle.go @@ -13,7 +13,6 @@ import ( ) func (h *Handler) ReplyPickle(w http.ResponseWriter, r *http.Request, data *Data, from, until uint32, prefix string, rollupObj *rollup.Rules) { - var rollupTime time.Duration var pickleTime time.Duration points := data.Points.List() @@ -21,10 +20,6 @@ func (h *Handler) ReplyPickle(w http.ResponseWriter, r *http.Request, data *Data logger := scope.Logger(r.Context()) defer func() { - logger.Debug("rollup", - zap.String("runtime", rollupTime.String()), - zap.Duration("runtime_ns", rollupTime), - ) logger.Debug("pickle", zap.String("runtime", pickleTime.String()), zap.Duration("runtime_ns", pickleTime), @@ -100,14 +95,7 @@ func (h *Handler) ReplyPickle(w http.ResponseWriter, r *http.Request, data *Data writeMetric := func(points []point.Point) { metricName := data.Points.MetricName(points[0].MetricID) - rollupStart := time.Now() - points, step, err := rollupObj.RollupMetric(metricName, from, points) - if err != nil { - logger.Error("rollup failed", zap.Error(err)) - return - } - rollupTime += time.Since(rollupStart) - + step := points[1].Time - points[0].Time for _, a := range data.Aliases.Get(metricName) { writeAlias(a.DisplayName, a.Target, points, step) } diff --git a/render/reply_protobuf.go b/render/reply_protobuf.go index a8b914ff3..a55de5ddb 100644 --- a/render/reply_protobuf.go +++ b/render/reply_protobuf.go @@ -5,15 +5,10 @@ import ( "bytes" "net/http" - "go.uber.org/zap" - "github.com/lomik/graphite-clickhouse/helper/point" - "github.com/lomik/graphite-clickhouse/pkg/scope" ) -func (h *Handler) ReplyProtobuf(w http.ResponseWriter, r *http.Request, perfix string, multiData []chResponse, pbv3 bool) { - logger := scope.Logger(r.Context()) - +func (h *Handler) ReplyProtobuf(w http.ResponseWriter, r *http.Request, perfix string, multiData []CHResponse, pbv3 bool) { // var multiResponse carbonzipperpb.MultiFetchResponse writer := bufio.NewWriterSize(w, 1024*1024) defer writer.Flush() @@ -28,10 +23,9 @@ func (h *Handler) ReplyProtobuf(w http.ResponseWriter, r *http.Request, perfix s totalWritten := 0 for _, d := range multiData { - data := d.data - rollupObj := d.rollupObj - from := uint32(d.from) - until := uint32(d.until) + data := d.Data + from := uint32(d.From) + until := uint32(d.Until) points := data.Points.List() if len(points) == 0 { @@ -41,11 +35,7 @@ func (h *Handler) ReplyProtobuf(w http.ResponseWriter, r *http.Request, perfix s writeMetric := func(points []point.Point) { metricName := data.Points.MetricName(points[0].MetricID) - points, step, err := rollupObj.RollupMetric(metricName, from, points) - if err != nil { - logger.Error("rollup failed", zap.Error(err)) - return - } + step := points[1].Time - points[0].Time for _, a := range data.Aliases.Get(metricName) { writeAlias(mb, mb2, writer, a.Target, a.DisplayName, from, until, step, points)