Skip to content

Commit

Permalink
caitong93#2 write and query opentsdb with millisecond, avoid duplica…
Browse files Browse the repository at this point in the history
…ted metrics with same unix timestamps when aggregate in prometheus

- http://opentsdb.net/docs/build/html/user_guide/writing/index.html#timestamps
  • Loading branch information
HackerWilson committed Mar 13, 2018
1 parent bb67635 commit 35c6a07
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
4 changes: 0 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@
branch = "master"
name = "github.com/prometheus/common"

[[constraint]]
name = "github.com/prometheus/prometheus"
version = "2.0.0"

[[constraint]]
branch = "master"
name = "golang.org/x/net"
13 changes: 9 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,14 @@ func serve(logger log.Logger, addr string, writers []writer, readers []reader) e
var resp *prompb.ReadResponse
resp, err = reader.Read(&req)
if err != nil {
level.Warn(logger).Log("msg", "Error executing query", "query", req, "storage", reader.Name(), "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
level.Warn(logger).Log("msg", "Error executing query", "query", fmt.Printf("%v", req), "storage", reader.Name(), "err", err)
labelsToSeries := map[string]*prompb.TimeSeries{}
resp := &prompb.ReadResponse{
Results: []*prompb.QueryResult{
{Timeseries: make([]*prompb.TimeSeries, 0, len(labelsToSeries))},
},
}
level.Warn(logger).Log("msg", "Error empty response", "resp", resp)
}
level.Info(logger).Log("msg", "Client read done")

Expand Down Expand Up @@ -260,5 +265,5 @@ func sendSamples(logger log.Logger, w writer, samples model.Samples) {
}
sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
sentBatchDuration.WithLabelValues(w.Name()).Observe(duration)
logger.Log("msg", fmt.Sprintf("Write %v samples, write=%v", len(samples), w.Name()))
logger.Log("msg", fmt.Sprintf("Write %v samples, write=%v, duration=%v", len(samples), w.Name(), duration))
}
16 changes: 9 additions & 7 deletions opentsdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"sync"
"time"

"github.com/kr/pretty"

"github.com/go-kit/kit/log/level"

"github.com/caitong93/opentsdb-adapter/prompb"
Expand Down Expand Up @@ -111,7 +109,7 @@ func (c *Client) Write(samples model.Samples) error {
metric := TagValue(s.Metric[model.MetricNameLabel])
reqs = append(reqs, StoreSamplesRequest{
Metric: metric,
Timestamp: s.Timestamp.Unix(),
Timestamp: s.Timestamp.UnixNano() / 1000000,
Value: v,
Tags: tagsFromMetric(s.Metric),
})
Expand Down Expand Up @@ -166,7 +164,6 @@ func (c *Client) Write(samples model.Samples) error {
}

func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
pretty.Println(req)
queryReqs := make([]*otdbQueryReq, 0, len(req.Queries))
smatchers := make(map[*otdbQueryReq]seriesMatcher)
for _, q := range req.Queries {
Expand All @@ -182,6 +179,10 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
defer cancel()
errCh := make(chan error, 1)
defer close(errCh)

u, _ := url.Parse(c.url)
u.Path = queryEndpoint

var l sync.Mutex
labelsToSeries := map[string]*prompb.TimeSeries{}
for i := range queryReqs {
Expand All @@ -199,7 +200,7 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
}
fmt.Println(string(rawBytes))

resp, err := ctxhttp.Post(ctx, c.client, c.url+queryEndpoint, contentTypeJSON, bytes.NewBuffer(rawBytes))
resp, err := ctxhttp.Post(ctx, c.client, u.String(), contentTypeJSON, bytes.NewBuffer(rawBytes))
if err != nil {
level.Warn(c.logger).Log("falied to send request to opentsdb")
errCh <- err
Expand Down Expand Up @@ -352,8 +353,9 @@ func mergeSamples(a, b []*prompb.Sample) []*prompb.Sample {

func (c *Client) buildQueryReq(q *prompb.Query) (*otdbQueryReq, seriesMatcher, error) {
req := otdbQueryReq{
Start: q.GetStartTimestampMs() / 1000,
End: q.GetEndTimestampMs() / 1000,
Start: q.GetStartTimestampMs() / 1000,
End: q.GetEndTimestampMs() / 1000,
MsResolution: true,
}

qr := otdbQuery{
Expand Down
7 changes: 4 additions & 3 deletions opentsdb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ type otdbQueryRes struct {
type otdbDPs map[int64]float64

type otdbQueryReq struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Queries []otdbQuery `json:"queries"`
Start int64 `json:"start"`
End int64 `json:"end"`
Queries []otdbQuery `json:"queries"`
MsResolution bool `json:"msResolution"`
}

type otdbQuery struct {
Expand Down

0 comments on commit 35c6a07

Please sign in to comment.