Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogQL: Vector and Range Vector Aggregation. #654

Merged
merged 9 commits into from
Sep 4, 2019
Merged
16 changes: 7 additions & 9 deletions cmd/logcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@ import (
)

const (
queryPath = "/api/prom/query?query=%s&limit=%d&start=%d&end=%d&direction=%s&regexp=%s"
queryPath = "/api/prom/query?query=%s&limit=%d&start=%d&end=%d&direction=%s"
labelsPath = "/api/prom/label"
labelValuesPath = "/api/prom/label/%s/values"
tailPath = "/api/prom/tail?query=%s&regexp=%s&delay_for=%d&limit=%d&start=%d"
tailPath = "/api/prom/tail?query=%s&delay_for=%d&limit=%d&start=%d"
)

func query(from, through time.Time, direction logproto.Direction) (*logproto.QueryResponse, error) {
path := fmt.Sprintf(queryPath,
url.QueryEscape(*queryStr), // query
*limit, // limit
from.UnixNano(), // start
through.UnixNano(), // end
direction.String(), // direction
url.QueryEscape(*regexpStr), // regexp
url.QueryEscape(*queryStr), // query
*limit, // limit
from.UnixNano(), // start
through.UnixNano(), // end
direction.String(), // direction
)

var resp logproto.QueryResponse
Expand Down Expand Up @@ -113,7 +112,6 @@ func doRequest(path string, out interface{}) error {
func liveTailQueryConn() (*websocket.Conn, error) {
path := fmt.Sprintf(tailPath,
url.QueryEscape(*queryStr), // query
url.QueryEscape(*regexpStr), // regexp
*delayFor, // delay_for
*limit, // limit
getStart(time.Now()).UnixNano(), // start
Expand Down
1 change: 0 additions & 1 deletion cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ var (

queryCmd = app.Command("query", "Run a LogQL query.")
queryStr = queryCmd.Arg("query", "eg '{foo=\"bar\",baz=\"blip\"}'").Required().String()
regexpStr = queryCmd.Arg("regex", "").String()
limit = queryCmd.Flag("limit", "Limit on number of entries to print.").Default("30").Int()
since = queryCmd.Flag("since", "Lookback window.").Default("1h").Duration()
from = queryCmd.Flag("from", "Start looking for logs at this absolute time (inclusive)").String()
Expand Down
177 changes: 177 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,180 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
}
]
}

```

- `GET /api/v1/query`

For doing instant queries at a single point in time, accepts the following parameters in the query-string:

- `query`: a logQL query
- `limit`: max number of entries to return (not used for metric queries)
- `time`: the evaluation time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is always now.
- `direction`: `forward` or `backward`, useful when specifying a limit. Default is backward.

Loki needs to query the index store in order to find log streams for particular labels and the store is spread out by time,
so you need to specify the time and labels accordingly. Querying a long time into the history will cause additional
load to the index server and make the query slower.

Responses looks like this:

```json
{
"resultType": "vector" | "streams",
"result": <value>
}
```

Examples:

```bash
$ curl -G -s "http://localhost:3100/api/v1/query" --data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)' | jq
{
"resultType": "vector",
"result": [
{
"metric": {},
"value": [
1559848867745737,
"1267.1266666666666"
]
},
{
"metric": {
"level": "warn"
},
"value": [
1559848867745737,
"37.77166666666667"
]
},
{
"metric": {
"level": "info"
},
"value": [
1559848867745737,
"37.69"
]
}
]
}
```

```bash
curl -G -s "http://localhost:3100/api/v1/query" --data-urlencode 'query={job="varlogs"}' | jq
{
"resultType": "streams",
"result": [
{
"labels": "{filename=\"/var/log/myproject.log\", job=\"varlogs\", level=\"info\"}",
"entries": [
{
"ts": "2019-06-06T19:25:41.972739Z",
"line": "foo"
},
{
"ts": "2019-06-06T19:25:41.972722Z",
"line": "bar"
}
]
}
]
```

- `GET /api/v1/query_range`

For doing queries over a range of time, accepts the following parameters in the query-string:

- `query`: a logQL query
- `limit`: max number of entries to return (not used for metric queries)
- `start`: the start time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is always one hour ago.
- `end`: the end time for the query, as a nanosecond Unix epoch (nanoseconds since 1970). Default is always now.
- `step`: query resolution step width in seconds. Default 1 second.
- `direction`: `forward` or `backward`, useful when specifying a limit. Default is backward.

Loki needs to query the index store in order to find log streams for particular labels and the store is spread out by time,
so you need to specify the time and labels accordingly. Querying a long time into the history will cause additional
load to the index server and make the query slower.

Responses looks like this:

```json
{
"resultType": "matrix" | "streams",
"result": <value>
}
```

Examples:

```bash
$ curl -G -s "http://localhost:3100/api/v1/query_range" --data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)' --data-urlencode 'step=300' | jq
Copy link
Contributor Author

@cyriltovena cyriltovena Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use /api/loki/api/v1/query_range ? @tomwilkie @slim-bean

Requirement :

  • we need a unique prefix to the path
  • we can point golang client at this endpoint.
  • should be similar to cortex api

{
"resultType": "matrix",
"result": [
{
"metric": {
"level": "info"
},
"values": [
[
1559848958663735,
"137.95"
],
[
1559849258663735,
"467.115"
],
[
1559849558663735,
"658.8516666666667"
]
]
},
{
"metric": {
"level": "warn"
},
"values": [
[
1559848958663735,
"137.27833333333334"
],
[
1559849258663735,
"467.69"
],
[
1559849558663735,
"660.6933333333334"
]
]
}
]
}
```

```bash
curl -G -s "http://localhost:3100/api/v1/query_range" --data-urlencode 'query={job="varlogs"}' | jq
{
"resultType": "streams",
"result": [
{
"labels": "{filename=\"/var/log/myproject.log\", job=\"varlogs\", level=\"info\"}",
"entries": [
{
"ts": "2019-06-06T19:25:41.972739Z",
"line": "foo"
},
{
"ts": "2019-06-06T19:25:41.972722Z",
"line": "bar"
}
]
}
]
```

- `GET /api/prom/query`
Expand All @@ -37,6 +211,9 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
so you need to specify the start and end labels accordingly. Querying a long time into the history will cause additional
load to the index server and make the query slower.

> This endpoint will be deprecated in the future you should use `api/v1/query_range` instead.
> You can only query for logs, it doesn't accept [queries returning metrics](./usage.md#counting-logs).

Responses looks like this:

```json
Expand Down
60 changes: 59 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Read more about the Explore feature in the [Grafana docs](http://docs.grafana.or

## Searching with Labels and Distributed Grep

A log query consists of two parts: **log stream selector**, and a **filter expression**. For performance reasons you need to start by choosing a set of log streams using a Prometheus-style log stream selector.
A log filter query consists of two parts: **log stream selector**, and a **filter expression**. For performance reasons you need to start by choosing a set of log streams using a Prometheus-style log stream selector.

The log stream selector will reduce the number of log streams to a manageable volume and then the regex search expression is used to do a distributed grep over those log streams.

Expand Down Expand Up @@ -76,3 +76,61 @@ The query language is still under development to support more features, e.g.,:
- Number extraction for timeseries based on number in log messages
- JSON accessors for filtering of JSON-structured logs
- Context (like `grep -C n`)

## Counting logs

Loki's LogQL support sample expression allowing to count entries per stream after the regex filtering stage.

### Range Vector aggregation

The language shares the same [range vector](https://prometheus.io/docs/prometheus/latest/querying/basics/#range-vector-selectors) concept from Prometheus, except that the selected range of samples contains a value of one for each log entry. You can then apply an aggregation over the selected range to transform it into an instant vector.

`rate` calculates the number of entries per second and `count_over_time` count of entries for the each log stream within the range.

In this example, we count all the log lines we have recorded within the last 5min for the mysql job.

> `count_over_time({job="mysql"}[5m])`

A range vector aggregation can also be applied to a [Filter Expression](#filter-expression), allowing you to select only matching log entries.

> `rate( ( {job="mysql"} |= "error" != "timeout)[10s] ) )`

The query above will compute the per second rate of all errors except those containing `timeout` within the last 10 seconds.

You can then use aggregation operators over the range vector aggregation.

### Aggregation operators

Like [PromQL](https://prometheus.io/docs/prometheus/latest/querying/operators/#aggregation-operators), Loki's LogQL support a subset of built-in aggregation operators that can be used to aggregate the element of a single vector, resulting in a new vector of fewer elements with aggregated values:

- `sum` (calculate sum over dimensions)
- `min` (select minimum over dimensions)
- `max` (select maximum over dimensions)
- `avg` (calculate the average over dimensions)
- `stddev` (calculate population standard deviation over dimensions)
- `stdvar` (calculate population standard variance over dimensions)
- `count` (count number of elements in the vector)
- `bottomk` (smallest k elements by sample value)
- `topk` (largest k elements by sample value)

These operators can either be used to aggregate over all label dimensions or preserve distinct dimensions by including a without or by clause.

> `<aggr-op>([parameter,] <vector expression>) [without|by (<label list>)]`

parameter is only required for `topk` and `bottomk`. without removes the listed labels from the result vector, while all other labels are preserved the output. by does the opposite and drops labels that are not listed in the by clause, even if their label values are identical between all elements of the vector.

topk and bottomk are different from other aggregators in that a subset of the input samples, including the original labels, are returned in the result vector. by and without are only used to bucket the input vector.

#### Examples

Get top 10 applications by highest log throughput:

> `topk(10,sum(rate({region="us-east1"}[5m]) by (name))`

Get the count of logs during the last 5 minutes by level:

> `sum(count_over_time({job="mysql"}[5m])) by (level)`

Get the rate of HTTP GET requests from nginx logs:

> `avg(rate(({job="nginx"} |= "GET")[10s])) by (region)`
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
}

instance := i.getOrCreateInstance(instanceID)
tailer, err := newTailer(instanceID, req.Query, req.Regex, queryServer)
tailer, err := newTailer(instanceID, req.Query, queryServer)
if err != nil {
return err
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func TestIngester(t *testing.T) {
ctx: ctx,
}
err = i.Query(&logproto.QueryRequest{
Query: `{foo="bar"}`,
Limit: 100,
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Selector: `{foo="bar"}`,
Limit: 100,
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
}, &result)
require.NoError(t, err)
require.Len(t, result.resps, 1)
Expand All @@ -68,10 +68,10 @@ func TestIngester(t *testing.T) {
ctx: ctx,
}
err = i.Query(&logproto.QueryRequest{
Query: `{foo="bar",bar="baz1"}`,
Limit: 100,
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Selector: `{foo="bar",bar="baz1"}`,
Limit: 100,
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
}, &result)
require.NoError(t, err)
require.Len(t, result.resps, 1)
Expand All @@ -82,10 +82,10 @@ func TestIngester(t *testing.T) {
ctx: ctx,
}
err = i.Query(&logproto.QueryRequest{
Query: `{foo="bar",bar="baz2"}`,
Limit: 100,
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Selector: `{foo="bar",bar="baz2"}`,
Limit: 100,
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
}, &result)
require.NoError(t, err)
require.Len(t, result.resps, 1)
Expand Down
Loading