Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated stream json objects to be more parse friendly #1010

Merged
merged 14 commits into from
Sep 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ lint:
########

test: all
go test -p=6 ./...
GOGC=20 go test -p=6 ./...

#########
# Clean #
Expand Down
122 changes: 42 additions & 80 deletions docs/loki/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The Loki server has the following API endpoints (_Note:_ Authentication is out of scope for this project):

- `POST /api/prom/push`
- `POST /loki/api/v1/push`

For sending log entries, expects a snappy compressed proto in the HTTP Body:

Expand All @@ -23,7 +23,7 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o

```

- `GET /api/v1/query`
- `GET /loki/api/v1/query`

For doing instant queries at a single point in time, accepts the following parameters in the query-string:

Expand Down Expand Up @@ -51,7 +51,7 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
Examples:

```bash
$ curl -G -s "http://localhost:3100/api/v1/query" --data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)' | jq
$ curl -G -s "http://localhost:3100/loki/api/v1/query" --data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)' | jq
{
"status" : "success",
"data": {
Expand Down Expand Up @@ -88,31 +88,31 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
```

```bash
curl -G -s "http://localhost:3100/api/v1/query" --data-urlencode 'query={job="varlogs"}' | jq
curl -G -s "http://localhost:3100/loki/api/v1/query" --data-urlencode 'query={job="varlogs"}' | jq
{
"status" : "success",
"data": {
"resultType": "streams",
"result": [
{
"labels": "{filename=\"/var/log/myproject.log\", job=\"varlogs\", level=\"info\"}",
"entries": [
{
"ts": "2019-06-06T19:25:41.972739Z",
"line": "foo"
},
{
"ts": "2019-06-06T19:25:41.972722Z",
"line": "bar"
}
]
}
"resultType": "streams",
"result": [
{
"stream": {
"filename": "/var/hostlog/syslog",
"job": "varlogs"
},
"values": [
[
"1568234281726420425",
"foo"
],
[
"1568234269716526880",
"bar"
]
]
}
]
}
```

- `GET /api/v1/query_range`
- `GET /loki/api/v1/query_range`

For doing queries over a range of time, accepts the following parameters in the query-string:

Expand Down Expand Up @@ -142,7 +142,7 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
Examples:

```bash
$ curl -G -s "http://localhost:3100/api/v1/query_range" --data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)' --data-urlencode 'step=300' | jq
$ curl -G -s "http://localhost:3100/loki/api/v1/query_range" --data-urlencode 'query=sum(rate({job="varlogs"}[10m])) by (level)' --data-urlencode 'step=300' | jq
{
"status" : "success",
"data": {
Expand Down Expand Up @@ -192,69 +192,31 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
```

```bash
curl -G -s "http://localhost:3100/api/v1/query_range" --data-urlencode 'query={job="varlogs"}' | jq
{
"status" : "success",
"data": {
"resultType": "streams",
"result": [
{
"labels": "{filename=\"/var/log/myproject.log\", job=\"varlogs\", level=\"info\"}",
"entries": [
{
"ts": "2019-06-06T19:25:41.972739Z",
"line": "foo"
},
{
"ts": "2019-06-06T19:25:41.972722Z",
"line": "bar"
}
]
}
]
}
}
```

- `GET /api/prom/query`

For doing queries, accepts the following parameters in the query-string:

- `query`: a [logQL query](../querying.md) (eg: `{name=~"mysql.+"}` or `{name=~"mysql.+"} |= "error"`)
- `limit`: max number of entries to return
- `start`: the start time for the query, as a nanosecond Unix epoch (nanoseconds since 1970) or as RFC3339Nano (eg: "2006-01-02T15:04:05.999999999-07:00"). Default is always one hour ago.
- `end`: the end time for the query, as a nanosecond Unix epoch (nanoseconds since 1970) or as RFC3339Nano (eg: "2006-01-02T15:04:05.999999999-07:00"). Default is current time.
- `direction`: `forward` or `backward`, useful when specifying a limit. Default is backward.
- `regexp`: a regex to filter the returned results

Loki needs to query the index store in order to find log streams for particular labels and the store is spread out by time,
so you need to specify the start and end labels accordingly. Querying a long time into the history will cause additional
load to the index server and make the query slower.

> This endpoint will be deprecated in the future you should use `api/v1/query_range` instead.
> You can only query for logs, it doesn't accept [queries returning metrics](../querying.md#counting-logs).

Responses looks like this:

```json
curl -G -s "http://localhost:3100/loki/api/v1/query_range" --data-urlencode 'query={job="varlogs"}' | jq
{
"streams": [
"resultType": "streams",
"result": [
{
"labels": "{instance=\"...\", job=\"...\", namespace=\"...\"}",
"entries": [
{
"ts": "2018-06-27T05:20:28.699492635Z",
"line": "..."
},
...
"stream": {
"filename": "/var/hostlog/syslog",
"job": "varlogs"
},
"values": [
[
"1568234281726420425",
"foo"
],
[
"1568234269716526880",
"bar"
]
]
},
...
}
]
}
```

- `GET /api/prom/label`
- `GET /loki/api/v1/label`

For doing label name queries, accepts the following parameters in the query-string:

Expand All @@ -273,7 +235,7 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
}
```

- `GET /api/prom/label/<name>/values`
- `GET /loki/api/v1/label/<name>/values`

For doing label values queries, accepts the following parameters in the query-string:

Expand Down
35 changes: 35 additions & 0 deletions pkg/logproto/marshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package logproto

import (
"encoding/json"
fmt "fmt"

"github.com/prometheus/prometheus/promql"
)

// MarshalJSON converts an Entry object to be prom compatible for http queries
func (e *Entry) MarshalJSON() ([]byte, error) {
l, err := json.Marshal(e.Line)
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf("[\"%d\",%s]", e.Timestamp.UnixNano(), l)), nil
}

// MarshalJSON converts a Stream object to be prom compatible for http queries
func (s *Stream) MarshalJSON() ([]byte, error) {
parsedLabels, err := promql.ParseMetric(s.Labels)
if err != nil {
return nil, err
}
l, err := json.Marshal(parsedLabels)
if err != nil {
return nil, err
}
e, err := json.Marshal(s.Entries)
if err != nil {
return nil, err
}

return []byte(fmt.Sprintf("{\"stream\":%s,\"values\":%s}", l, e)), nil
}
85 changes: 85 additions & 0 deletions pkg/logproto/marshal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package logproto

import (
"encoding/json"
fmt "fmt"
reflect "reflect"
"testing"
time "time"

"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
)

var (
entries = []Entry{
{
Timestamp: time.Now(),
Line: "testline",
},
{
Timestamp: time.Date(2019, 9, 10, 1, 1, 1, 1, time.UTC),
Line: "{}\"'!@$%&*^(_)(",
},
}
streams = []Stream{
{
Labels: "{}",
Entries: []Entry{},
},
{
Labels: "{name=\"value\",name1=\"value1\"}",
Entries: []Entry{},
},
}
)

func Test_EntryMarshalJSON(t *testing.T) {
var array []interface{}

for _, entry := range entries {

bytes, err := entry.MarshalJSON()
require.NoError(t, err)

err = json.Unmarshal(bytes, &array)
require.NoError(t, err)

timestamp, ok := array[0].(string)
require.True(t, ok)

line, ok := array[1].(string)
require.True(t, ok)

require.Equal(t, fmt.Sprint(entry.Timestamp.UnixNano()), timestamp, "Timestamps not equal ", array[0])
require.Equal(t, entry.Line, line, "Lines are not equal ", array[1])
}
}

func Test_StreamMarshalJSON(t *testing.T) {
actual := struct {
Labels map[string]string `json:"stream"`
Entries []Entry `json:"values"`
}{}

for _, expected := range streams {

bytes, err := expected.MarshalJSON()
require.NoError(t, err)

err = json.Unmarshal(bytes, &actual)
require.NoError(t, err)

// check labels
expectedLabels, err := promql.ParseMetric(expected.Labels)
require.NoError(t, err)

require.Equal(t, len(actual.Labels), len(expectedLabels))
for _, l := range expectedLabels {
require.Equal(t, l.Value, actual.Labels[l.Name])
}

// check entries
require.True(t, reflect.DeepEqual(actual.Entries, expected.Entries))
}
}
4 changes: 4 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ func (t *Loki) initDistributor() (err error) {
}

t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.distributor.ReadinessHandler))
t.server.HTTP.Handle("/loki/api/v1/push", middleware.Merge(
t.httpAuthMiddleware,
).Wrap(http.HandlerFunc(t.distributor.PushHandler)))
t.server.HTTP.Handle("/api/prom/push", middleware.Merge(
t.httpAuthMiddleware,
).Wrap(http.HandlerFunc(t.distributor.PushHandler)))

return
}

Expand Down