Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements in live tailing of logs #541

Merged
merged 35 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f0e7419
Improvements in live tailing of logs
sandeepsukhani May 2, 2019
b3d5708
Fixed linting errors
sandeepsukhani May 2, 2019
b0774be
Fixed linting errors
sandeepsukhani May 2, 2019
a2a7925
Merge branch 'master' of https://github.com/grafana/loki into live-lo…
sandeepsukhani May 6, 2019
7d79209
Added more comments to code for live log tailing
sandeepsukhani May 6, 2019
08dd668
go fmt
sandeepsukhani May 6, 2019
f240cc3
Merge branch 'master' of https://github.com/grafana/loki into live-lo…
sandeepsukhani May 8, 2019
8014aee
Merge branch 'master' of https://github.com/grafana/loki into live-lo…
sandeepsukhani May 9, 2019
b707818
Some code refactoring in live log tailing
sandeepsukhani May 9, 2019
e778f64
handling delayfor in logcli, max delayfor to be 5 seconds
sandeepsukhani May 9, 2019
6cf558b
fixed linting error
sandeepsukhani May 9, 2019
a843548
fixed linting error
sandeepsukhani May 9, 2019
60661b4
Merge branch 'master' of https://github.com/grafana/loki into live-lo…
sandeepsukhani May 9, 2019
6acfed7
some changes in tail response in live tailing
sandeepsukhani May 17, 2019
e041eae
Merge branch 'master' of https://github.com/grafana/loki into live-lo…
sandeepsukhani May 17, 2019
06e0f51
fixed linting error
sandeepsukhani May 17, 2019
bb773c8
fixed build failure
sandeepsukhani May 17, 2019
6878276
fixed linting error
sandeepsukhani May 17, 2019
e013134
Merge branch 'origin/master' into live-log-tailing-improvements
sandeepsukhani Jun 10, 2019
5a16fb1
Fixed failing build
sandeepsukhani Jun 10, 2019
0bbfcd6
fixed some of the mutexes
sandeepsukhani Jun 11, 2019
6777895
Merge branch 'origin/master' into live-log-tailing-improvements
sandeepsukhani Jun 11, 2019
3b04bc7
Some code refactoring
sandeepsukhani Jun 11, 2019
1539153
Merge branch 'master' into live-log-tailing-improvements
sandeepsukhani Jun 13, 2019
bb905cb
Merge remote-tracking branch 'upstream/master' into live-log-tailing-…
sandeepsukhani Jun 14, 2019
ac2ae22
Fixed issue with stopping ingesters gracefully when live tailing is b…
sandeepsukhani Jun 14, 2019
df33bd1
Added tests for tailer in querier
sandeepsukhani Jun 14, 2019
5e8c9d8
Fixed import error and some code refactoring
sandeepsukhani Jun 14, 2019
d15d8e6
Fixed linting errors
sandeepsukhani Jun 14, 2019
04cd344
Fixed linting error
sandeepsukhani Jun 14, 2019
4b3f9b7
Fixed typo
sandeepsukhani Jun 17, 2019
068ae6c
Live tailing made logql compatible, some code refactoring suggested i…
sandeepsukhani Jun 18, 2019
f857347
Merge remote-tracking branch 'upstream/master' into live-log-tailing-…
sandeepsukhani Jun 18, 2019
b415a4f
Removed unused parameter from the function
sandeepsukhani Jun 18, 2019
d4f3356
Merge remote-tracking branch 'upstream/master' into live-log-tailing-…
sandeepsukhani Jun 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/logcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
queryPath = "/api/prom/query?query=%s&limit=%d&start=%d&end=%d&direction=%s&regexp=%s"
labelsPath = "/api/prom/label"
labelValuesPath = "/api/prom/label/%s/values"
tailPath = "/api/prom/tail?query=%s&regexp=%s"
tailPath = "/api/prom/tail?query=%s&regexp=%s&delay_for=%d"
)

func query(from, through time.Time, direction logproto.Direction) (*logproto.QueryResponse, error) {
Expand Down Expand Up @@ -98,7 +98,7 @@ func doRequest(path string, out interface{}) error {
}

func liveTailQueryConn() (*websocket.Conn, error) {
path := fmt.Sprintf(tailPath, url.QueryEscape(*queryStr), url.QueryEscape(*regexpStr))
path := fmt.Sprintf(tailPath, url.QueryEscape(*queryStr), url.QueryEscape(*regexpStr), *delayFor)
return wsConnect(path)
}

Expand Down
1 change: 1 addition & 0 deletions cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
since = queryCmd.Flag("since", "Lookback window.").Default("1h").Duration()
forward = queryCmd.Flag("forward", "Scan forwards through logs.").Default("false").Bool()
tail = queryCmd.Flag("tail", "Tail the logs").Short('t').Default("false").Bool()
delayFor = queryCmd.Flag("delay-for", "Delay in tailing by number of seconds to accumulate logs").Default("0").Int()
noLabels = queryCmd.Flag("no-labels", "Do not print any labels").Default("false").Bool()
ignoreLabelsKey = queryCmd.Flag("exclude-label", "Exclude labels given the provided key during output.").Strings()
showLabelsKey = queryCmd.Flag("include-label", "Include labels given the provided key during output.").Strings()
Expand Down
6 changes: 6 additions & 0 deletions cmd/logcli/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,11 @@ func tailQuery() {
printLogEntry(entry.Timestamp, labels, entry.Line)
}
}
if len(tailReponse.DroppedEntries) != 0 {
log.Println("Server dropped following entries due to slow client")
for _, d := range tailReponse.DroppedEntries {
log.Println(d.Timestamp, d.Labels)
}
}
}
}
10 changes: 1 addition & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package distributor
import (
"context"
"flag"
"hash/fnv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -148,7 +147,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}
stream.Entries = entries
keys = append(keys, tokenFor(userID, stream.Labels))
keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{
stream: stream,
})
Expand Down Expand Up @@ -264,13 +263,6 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
return err
}

func tokenFor(userID, labels string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(userID))
_, _ = h.Write([]byte(labels))
return h.Sum32()
}

// Check implements the grpc healthcheck
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
Expand Down
39 changes: 37 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"errors"
"flag"
"net/http"
"sync"
Expand Down Expand Up @@ -60,8 +61,9 @@ type Ingester struct {
lifecycler *ring.Lifecycler
store ChunkStore

done sync.WaitGroup
quit chan struct{}
done sync.WaitGroup
quit chan struct{}
quitting chan struct{}

// One queue per flush thread. Fingerprint is used to
// pick a queue.
Expand All @@ -82,6 +84,7 @@ func New(cfg Config, store ChunkStore) (*Ingester, error) {
store: store,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
}

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
Expand Down Expand Up @@ -127,6 +130,14 @@ func (i *Ingester) Shutdown() {
i.lifecycler.Shutdown()
}

// Stopping helps cleaning up resources before actual shutdown
func (i *Ingester) Stopping() {
close(i.quitting)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
for _, instance := range i.getInstances() {
instance.closeTailers()
}
}

// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {

Expand Down Expand Up @@ -230,3 +241,27 @@ func (i *Ingester) getInstances() []*instance {
}
return instances
}

// Tail logs matching given query
func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error {
select {
case <-i.quitting:
return errors.New("Ingester is stopping")
default:
}

instanceID, err := user.ExtractOrgID(queryServer.Context())
if err != nil {
return err
}

instance := i.getOrCreateInstance(instanceID)
tailer, err := newTailer(instanceID, req.Query, req.Regex, queryServer)
if err != nil {
return err
}

instance.addNewTailer(tailer)
tailer.loop()
return nil
}
53 changes: 52 additions & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type instance struct {
streamsRemovedTotal prometheus.Counter

blockSize int
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
}

func newInstance(instanceID string, blockSize int) *instance {
Expand All @@ -65,6 +67,7 @@ func newInstance(instanceID string, blockSize int) *instance {
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),

blockSize: blockSize,
tailers: map[uint32]*tailer{},
}
}

Expand All @@ -87,6 +90,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
}

if err := stream.Push(ctx, s.Entries); err != nil {
Expand Down Expand Up @@ -126,7 +130,7 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return sendBatches(iter, queryServer, req.Limit)
}

func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
var labels []string
if req.Values {
values := i.index.LabelValues(req.Name)
Expand Down Expand Up @@ -175,6 +179,53 @@ outer:
return iterators, nil
}

func (i *instance) addNewTailer(t *tailer) {
i.streamsMtx.RLock()
for _, stream := range i.streams {
if stream.matchesTailer(t) {
stream.addTailer(t)
}
}
i.streamsMtx.RUnlock()

i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
i.tailers[t.getID()] = t
}

func (i *instance) addTailersToNewStream(stream *stream) {
closedTailers := []uint32{}

i.tailerMtx.RLock()
for _, t := range i.tailers {
if t.isClosed() {
closedTailers = append(closedTailers, t.getID())
continue
}

if stream.matchesTailer(t) {
stream.addTailer(t)
}
}
i.tailerMtx.RUnlock()

if len(closedTailers) != 0 {
i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
for _, closedTailer := range closedTailers {
delete(i.tailers, closedTailer)
}
}
}

func (i *instance) closeTailers() {
i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
for _, t := range i.tailers {
t.close()
}
}

func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand Down
55 changes: 52 additions & 3 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"context"
"net/http"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -49,6 +50,9 @@ type stream struct {
fp model.Fingerprint
labels []client.LabelAdapter
blockSize int

tailers map[uint32]*tailer
tailerMtx sync.RWMutex
}

type chunkDesc struct {
Expand All @@ -64,6 +68,7 @@ func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int
fp: fp,
labels: labels,
blockSize: blockSize,
tailers: map[uint32]*tailer{},
}
}

Expand All @@ -75,6 +80,8 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
chunksCreatedTotal.Inc()
}

storedEntries := []logproto.Entry{}

// Don't fail on the first append error - if samples are sent out of order,
// we still want to append the later ones.
var appendErr error
Expand All @@ -93,10 +100,40 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
}
if err := chunk.chunk.Append(&entries[i]); err != nil {
appendErr = err
} else {
// send only stored entries to tailers
storedEntries = append(storedEntries, entries[i])
}
chunk.lastUpdated = time.Now()
}

if len(storedEntries) != 0 {
go func() {
stream := logproto.Stream{Labels: client.FromLabelAdaptersToLabels(s.labels).String(), Entries: storedEntries}

closedTailers := []uint32{}

s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
tailer.send(stream)
}
s.tailerMtx.RUnlock()

if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()

for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
}
}()
}

if appendErr == chunkenc.ErrOutOfOrder {
return httpgrpc.Errorf(http.StatusBadRequest, "entry out of order for stream: %s", client.FromLabelAdaptersToLabels(s.labels).String())
}
Expand All @@ -108,12 +145,12 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
for _, c := range s.chunks {
iter, err := c.chunk.Iterator(from, through, direction)
itr, err := c.chunk.Iterator(from, through, direction)
if err != nil {
return nil, err
}
if iter != nil {
iterators = append(iterators, iter)
if itr != nil {
iterators = append(iterators, itr)
}
}

Expand All @@ -125,3 +162,15 @@ func (s *stream) Iterator(from, through time.Time, direction logproto.Direction)

return iter.NewNonOverlappingIterator(iterators, client.FromLabelAdaptersToLabels(s.labels).String()), nil
}

func (s *stream) addTailer(t *tailer) {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()

s.tailers[t.getID()] = t
}

func (s *stream) matchesTailer(t *tailer) bool {
metric := client.FromLabelAdaptersToMetric(s.labels)
return t.isWatchingLabels(metric)
}
Loading