Skip to content

Commit

Permalink
InfluxDB: Close Flux query results (grafana#26917)
Browse files Browse the repository at this point in the history
* InfluxDB: Drop ctxhttp usage
* InfluxDB: Clean up code
* InfluxDB: Close query results

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Aug 13, 2020
1 parent d823b26 commit c63bbba
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 57 deletions.
1 change: 0 additions & 1 deletion pkg/models/datasource_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ var ptc = proxyTransportCache{

func (ds *DataSource) GetHttpClient() (*http.Client, error) {
transport, err := ds.GetHttpTransport()

if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/tsdb/influxdb/flux/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func getConverter(t string) (*data.FieldConverter, error) {
return &AnyToOptionalString, nil
}

return nil, fmt.Errorf("No matching converter found for [%v]", t)
return nil, fmt.Errorf("no matching converter found for [%v]", t)
}

// Init initializes the frame to be returned
Expand All @@ -94,6 +94,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
if err != nil {
return err
}

fb.value = converter
fb.isTimeSeries = true
case isTag(col.Name()):
Expand All @@ -106,6 +107,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
if col == nil {
return fmt.Errorf("no time column in timeSeries")
}

fb.timeColumn = col.Name()
fb.timeDisplay = "Time"
if "_time" != fb.timeColumn {
Expand All @@ -118,6 +120,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
if err != nil {
return err
}

fb.columns = append(fb.columns, columnInfo{
name: col.Name(),
converter: converter,
Expand Down Expand Up @@ -201,13 +204,14 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error {
if fb.isTimeSeries {
time, ok := record.ValueByKey(fb.timeColumn).(time.Time)
if !ok {
return fmt.Errorf("unable to get time colum: %s", fb.timeColumn)
return fmt.Errorf("unable to get time colum: %q", fb.timeColumn)
}

val, err := fb.value.Converter(record.Value())
if err != nil {
return err
}

fb.active.Fields[0].Append(time)
fb.active.Fields[1].Append(val)
} else {
Expand All @@ -217,6 +221,7 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error {
if err != nil {
return err
}

fb.active.Fields[idx].Append(val)
}
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/tsdb/influxdb/flux/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/influxdata/influxdb-client-go/api"
)

// ExecuteQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it.
// executeQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it.
// maxSeries somehow limits the response.
func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) {
func executeQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) {
dr = backend.DataResponse{}

flux, err := Interpolate(query)
Expand All @@ -20,10 +20,11 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max
return
}

glog.Debug("Flux", "interpolated query", flux)
glog.Debug("Executing Flux query", "interpolated query", flux)

tables, err := runner.runQuery(ctx, flux)
if err != nil {
glog.Warn("Flux query failed", "err", err, "query", flux)
dr.Error = err
metaFrame := data.NewFrame("meta for error")
metaFrame.Meta = &data.FrameMeta{
Expand All @@ -32,6 +33,7 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max
dr.Frames = append(dr.Frames, metaFrame)
return
}
defer tables.Close()

dr = readDataFrames(tables, int(float64(query.MaxDataPoints)*1.5), maxSeries)

Expand All @@ -46,6 +48,7 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max
}

func readDataFrames(result *api.QueryTableResult, maxPoints int, maxSeries int) (dr backend.DataResponse) {
glog.Debug("Reading data frames from query result", "maxPoints", maxPoints, "maxSeries", maxSeries)
dr = backend.DataResponse{}

builder := &FrameBuilder{
Expand All @@ -69,7 +72,7 @@ func readDataFrames(result *api.QueryTableResult, maxPoints int, maxSeries int)
}

if builder.frames == nil {
dr.Error = fmt.Errorf("Invalid state")
dr.Error = fmt.Errorf("invalid state")
return dr
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/tsdb/influxdb/flux/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func verifyGoldenResponse(name string) (*backend.DataResponse, error) {
testDataPath: name + ".csv",
}

dr := ExecuteQuery(context.Background(), QueryModel{MaxDataPoints: 100}, runner, 50)
dr := executeQuery(context.Background(), QueryModel{MaxDataPoints: 100}, runner, 50)
err := experimental.CheckGoldenDataResponse("./testdata/"+name+".golden.txt", &dr, true)
return &dr, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/tsdb/influxdb/flux/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func init() {

// Query builds flux queries, executes them, and returns the results.
func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
glog.Debug("Received a query", "query", *tsdbQuery)
tRes := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
Expand All @@ -38,7 +39,7 @@ func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQ
continue
}

res := ExecuteQuery(context.Background(), *qm, runner, 50)
res := executeQuery(context.Background(), *qm, runner, 50)

tRes.Results[query.RefId] = backendDataResponseToTSDBResponse(&res, query.RefId)
}
Expand All @@ -57,9 +58,10 @@ type queryRunner interface {
runQuery(ctx context.Context, q string) (*api.QueryTableResult, error)
}

// runQuery executes fluxQuery against the Runner's organization and returns an flux typed result.
// runQuery executes fluxQuery against the Runner's organization and returns a Flux typed result.
func (r *Runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTableResult, error) {
return r.client.QueryApi(r.org).Query(ctx, fluxQuery)
qa := r.client.QueryApi(r.org)
return qa.Query(ctx, fluxQuery)
}

// RunnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration).
Expand All @@ -71,7 +73,7 @@ func RunnerFromDataSource(dsInfo *models.DataSource) (*Runner, error) {

url := dsInfo.Url
if url == "" {
return nil, fmt.Errorf("missing url from datasource configuration")
return nil, fmt.Errorf("missing URL from datasource configuration")
}
token, found := dsInfo.SecureJsonData.DecryptedValue("token")
if !found {
Expand Down
5 changes: 2 additions & 3 deletions pkg/tsdb/influxdb/flux/query_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ func GetQueryModelTSDB(query *tsdb.Query, timeRange *tsdb.TimeRange, dsInfo *mod
return nil, fmt.Errorf("failed to re-encode the flux query into JSON: %w", err)
}

err = json.Unmarshal(queryBytes, &model)
if err != nil {
return nil, fmt.Errorf("error reading query: %s", err.Error())
if err := json.Unmarshal(queryBytes, &model); err != nil {
return nil, fmt.Errorf("error reading query: %w", err)
}
if model.Options.DefaultBucket == "" {
model.Options.DefaultBucket = dsInfo.JsonData.Get("defaultBucket").MustString("")
Expand Down
61 changes: 31 additions & 30 deletions pkg/tsdb/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
"golang.org/x/net/context/ctxhttp"
)

type InfluxDBExecutor struct {
Expand Down Expand Up @@ -51,6 +50,8 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
return flux.Query(ctx, dsInfo, tsdbQuery)
}

glog.Debug("Making a non-Flux type query")

// NOTE: the following path is currently only called from alerting queries
// In dashboards, the request runs through proxy and are managed in the frontend

Expand All @@ -68,7 +69,7 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
glog.Debug("Influxdb query", "raw query", rawQuery)
}

req, err := e.createRequest(dsInfo, rawQuery)
req, err := e.createRequest(ctx, dsInfo, rawQuery)
if err != nil {
return nil, err
}
Expand All @@ -78,7 +79,7 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
return nil, err
}

resp, err := ctxhttp.Do(ctx, httpClient, req)
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
Expand All @@ -91,12 +92,9 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
err = dec.Decode(&response)

if err != nil {
if err := dec.Decode(&response); err != nil {
return nil, err
}

if response.Err != nil {
return nil, response.Err
}
Expand All @@ -109,42 +107,45 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
}

func (e *InfluxDBExecutor) getQuery(dsInfo *models.DataSource, queries []*tsdb.Query, context *tsdb.TsdbQuery) (*Query, error) {
if len(queries) == 0 {
return nil, fmt.Errorf("query request contains no queries")
}

// The model supports multiple queries, but right now this is only used from
// alerting so we only needed to support batch executing 1 query at a time.
if len(queries) > 0 {
query, err := e.QueryParser.Parse(queries[0].Model, dsInfo)
if err != nil {
return nil, err
}
return query, nil
query, err := e.QueryParser.Parse(queries[0].Model, dsInfo)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("query request contains no queries")
return query, nil
}

func (e *InfluxDBExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) {
func (e *InfluxDBExecutor) createRequest(ctx context.Context, dsInfo *models.DataSource, query string) (*http.Request, error) {
u, err := url.Parse(dsInfo.Url)
if err != nil {
return nil, err
}

u.Path = path.Join(u.Path, "query")
httpMode := dsInfo.JsonData.Get("httpMode").MustString("GET")

req, err := func() (*http.Request, error) {
switch httpMode {
case "GET":
return http.NewRequest(http.MethodGet, u.String(), nil)
case "POST":
bodyValues := url.Values{}
bodyValues.Add("q", query)
body := bodyValues.Encode()
return http.NewRequest(http.MethodPost, u.String(), strings.NewReader(body))
default:
return nil, ErrInvalidHttpMode
var req *http.Request
switch httpMode {
case "GET":
req, err = http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
}()

if err != nil {
return nil, err
case "POST":
bodyValues := url.Values{}
bodyValues.Add("q", query)
body := bodyValues.Encode()
req, err = http.NewRequestWithContext(ctx, http.MethodPost, u.String(), strings.NewReader(body))
if err != nil {
return nil, err
}
default:
return nil, ErrInvalidHttpMode
}

req.Header.Set("User-Agent", "Grafana")
Expand Down
9 changes: 6 additions & 3 deletions pkg/tsdb/influxdb/influxdb_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package influxdb

import (
"context"
"io/ioutil"
"net/url"
"testing"
Expand All @@ -23,7 +24,8 @@ func TestInfluxDB(t *testing.T) {
ResponseParser: &ResponseParser{},
}
Convey("createRequest with GET httpMode", func() {
req, _ := e.createRequest(datasource, query)
req, err := e.createRequest(context.Background(), datasource, query)
So(err, ShouldBeNil)

Convey("as default", func() {
So(req.Method, ShouldEqual, "GET")
Expand All @@ -41,7 +43,8 @@ func TestInfluxDB(t *testing.T) {

Convey("createRequest with POST httpMode", func() {
datasource.JsonData.Set("httpMode", "POST")
req, _ := e.createRequest(datasource, query)
req, err := e.createRequest(context.Background(), datasource, query)
So(err, ShouldBeNil)

Convey("method should be POST", func() {
So(req.Method, ShouldEqual, "POST")
Expand All @@ -63,7 +66,7 @@ func TestInfluxDB(t *testing.T) {

Convey("createRequest with PUT httpMode", func() {
datasource.JsonData.Set("httpMode", "PUT")
_, err := e.createRequest(datasource, query)
_, err := e.createRequest(context.Background(), datasource, query)

Convey("should miserably fail", func() {
So(err, ShouldEqual, ErrInvalidHttpMode)
Expand Down
3 changes: 1 addition & 2 deletions pkg/tsdb/influxdb/model_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,13 @@ func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart,

func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) {
var result []*QueryPart

for _, groupObj := range model.Get("groupBy").MustArray() {
groupJson := simplejson.NewFromAny(groupObj)
queryPart, err := qp.parseQueryPart(groupJson)

if err != nil {
return nil, err
}

result = append(result, queryPart)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/tsdb/influxdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ var (

func (query *Query) Build(queryContext *tsdb.TsdbQuery) (string, error) {
var res string

if query.UseRawQuery && query.RawQuery != "" {
res = query.RawQuery
} else {
Expand Down
3 changes: 1 addition & 2 deletions pkg/tsdb/influxdb/query_part.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ func (r QueryDefinition) Render(query *Query, queryContext *tsdb.TsdbQuery, part

func NewQueryPart(typ string, params []string) (*QueryPart, error) {
def, exist := renders[typ]

if !exist {
return nil, fmt.Errorf("Missing query definition for %s", typ)
return nil, fmt.Errorf("missing query definition for %q", typ)
}

return &QueryPart{
Expand Down
9 changes: 5 additions & 4 deletions pkg/tsdb/influxdb/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func (rp *ResponseParser) Parse(response *Response, query *Query) *tsdb.QueryRes

func (rp *ResponseParser) transformRows(rows []Row, queryResult *tsdb.QueryResult, query *Query) tsdb.TimeSeriesSlice {
var result tsdb.TimeSeriesSlice

for _, row := range rows {
for columnIndex, column := range row.Columns {
if column == "time" {
Expand Down Expand Up @@ -104,7 +103,6 @@ func (rp *ResponseParser) formatSeriesName(row Row, column string, query *Query)

func (rp *ResponseParser) buildSeriesNameFromQuery(row Row, column string) string {
var tags []string

for k, v := range row.Tags {
tags = append(tags, fmt.Sprintf("%s: %s", k, v))
}
Expand All @@ -118,9 +116,12 @@ func (rp *ResponseParser) buildSeriesNameFromQuery(row Row, column string) strin
}

func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (tsdb.TimePoint, error) {
var value null.Float = rp.parseValue(valuePair[valuePosition])
value := rp.parseValue(valuePair[valuePosition])

timestampNumber, _ := valuePair[0].(json.Number)
timestampNumber, ok := valuePair[0].(json.Number)
if !ok {
return tsdb.TimePoint{}, fmt.Errorf("valuePair[0] has invalid type: %#v", valuePair[0])
}
timestamp, err := timestampNumber.Float64()
if err != nil {
return tsdb.TimePoint{}, err
Expand Down

0 comments on commit c63bbba

Please sign in to comment.