Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for tail to query frontend #2032

Merged
merged 9 commits into from
Jul 24, 2020
10 changes: 10 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package loki

import (
"fmt"
"github.com/grafana/loki/pkg/tailproxy"
"net/http"
"os"
"sort"
Expand Down Expand Up @@ -296,6 +297,13 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
frontend.RegisterFrontendServer(t.server.GRPC, t.frontend)

frontendHandler := queryrange.StatsHTTPMiddleware.Wrap(t.httpAuthMiddleware.Wrap(t.frontend.Handler()))

httpMiddleware := middleware.Merge(
t.httpAuthMiddleware,
queryrange.StatsHTTPMiddleware,
)
tp := tailproxy.New(t.cfg.Frontend, util.Logger)
wsHandler := httpMiddleware.Wrap(http.HandlerFunc(tp.Handle))
tivvit marked this conversation as resolved.
Show resolved Hide resolved
t.server.HTTP.Handle("/loki/api/v1/query_range", frontendHandler)
t.server.HTTP.Handle("/loki/api/v1/query", frontendHandler)
t.server.HTTP.Handle("/loki/api/v1/label", frontendHandler)
Expand All @@ -306,6 +314,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.server.HTTP.Handle("/api/prom/label", frontendHandler)
t.server.HTTP.Handle("/api/prom/label/{name}/values", frontendHandler)
t.server.HTTP.Handle("/api/prom/series", frontendHandler)
t.server.HTTP.Handle("/loki/api/v1/tail", wsHandler)
t.server.HTTP.Handle("/api/prom/tail", wsHandler)
// fallback route
t.server.HTTP.PathPrefix("/").Handler(frontendHandler)
tivvit marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
210 changes: 210 additions & 0 deletions pkg/tailproxy/ws_tail_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package tailproxy

import (
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/websocket"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/loghttp"
"github.com/prometheus/common/config"
"net/http"
"strconv"
"strings"
"sync"
)

type closer struct {
sync.Mutex
closed bool
}

func (c *closer) CloseFunc(f func()) {
c.Lock()
defer c.Unlock()
if c.closed {
return
}
f()
c.closed = true
}

func (c *closer) Close() {
c.Lock()
defer c.Unlock()
if c.closed {
return
}
c.closed = true
}

func (c *closer) isClosed() bool {
c.Lock()
defer c.Unlock()
return c.closed
}


type WsTailProxy struct {
downstreamURL string
log log.Logger
}

func New(cfg frontend.Config, log log.Logger) *WsTailProxy {
return &WsTailProxy{
downstreamURL: cfg.DownstreamURL,
log: log,
}
}

func (tp WsTailProxy) Handle(w http.ResponseWriter, r *http.Request) {
logger := util.WithContext(r.Context(), tp.log)

upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}
conCloser := &closer{}

defer func() {
if err := conn.Close(); err != nil {
level.Error(logger).Log("msg", "Error closing websocket", "err", err)
}
}()

auth := strings.Split(strings.ReplaceAll(r.Header.Get("Authorization"), "Basic ", ""), ":")
user := auth[0]
var password string
if len(auth) > 1 {
password = auth[1]
} else {
password = ""
}

tailClient := &client.Client{
TLSConfig: config.TLSConfig{},
Address: tp.downstreamURL,
OrgID: r.Header.Get("X-Scope-OrgID"),
Username: user,
Password: password,
}

q := r.URL.Query()
from, err := strconv.Atoi(q.Get("start"))
if err != nil {
level.Error(logger).Log("msg", "Missing start", "err", err)
return
}
delayFor, err := strconv.Atoi(q.Get("delayFor"))
if err != nil {
delayFor = 0
}
limit, err := strconv.Atoi(q.Get("limit"))
if err != nil {
limit = 30
}

clientConn, err := tailClient.LiveTailQueryConn(q.Get("query"), delayFor, limit, int64(from), false)
if err != nil {
level.Error(logger).Log("msg", "TailClient connection failed", "err", err)
return
}
clientConCloser := &closer{}
defer func() {
clientConCloser.CloseFunc(func() {
if err := clientConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
level.Error(logger).Log("msg", "Error closing websocket:", "err", err)
}
})
}()

tailReponse := new(loghttp.TailResponse)
wg := &sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()
for {
err = clientConn.ReadJSON(tailReponse)
if err != nil {
if closeErr, ok := err.(*websocket.CloseError); ok {
if closeErr.Code == websocket.CloseNormalClosure {
clientConCloser.Close()
break
}
level.Error(logger).Log("msg", "Error from tailClient", "err", err)
clientConCloser.Close()
break
} else {
level.Error(logger).Log("msg", "Unexpected error from tailClient", "err", err)
clientConCloser.Close()
break
}
} else {
if conCloser.isClosed() {
break
}
err = conn.WriteJSON(tailReponse)
if err != nil {
level.Error(logger).Log("msg", "ws write err", "err", err)
conCloser.CloseFunc(func() {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
})
break
}
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for {
err := conn.ReadJSON(tailReponse)
if err != nil {
if closeErr, ok := err.(*websocket.CloseError); ok {
if closeErr.Code == websocket.CloseNormalClosure {
conCloser.Close()
break
}
level.Error(logger).Log("msg", "Error from client", "err", err)
conCloser.Close()
break
} else {
level.Error(logger).Log("msg", "Unexpected error from client", "err", err)
conCloser.Close()
break
}
} else {
if clientConCloser.isClosed() {
break
}
err = clientConn.WriteJSON(tailReponse)
if err != nil {
level.Error(logger).Log("msg", "tailClient write err", "err", err)
clientConCloser.CloseFunc(func() {
if err := clientConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
})
break
}
}
}
}()

wg.Wait()
conCloser.CloseFunc(func() {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
level.Error(logger).Log("msg", "Error closing websocket:", "err", err)
}
})
}