Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query-frontend: cancel stream to scheduler when streaming failed #3302

Merged
merged 4 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [ENHANCEMENT] Compactor: Add new `cortex_compactor_block_max_time_delta_seconds` histogram for detecting if compaction of blocks is lagging behind. #3240
* [BUGFIX] Flusher: Add `Overrides` as a dependency to prevent panics when starting with `-target=flusher`. #3151
* [BUGFIX] Updated `golang.org/x/text` dependency to fix CVE-2022-32149. #3285
* [BUGFIX] Query-frontend: properly close gRPC streams to the query-scheduler to stop memory and goroutines leak. #3302

### Mixin

Expand Down
30 changes: 19 additions & 11 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,15 @@ func (w *frontendSchedulerWorker) stop() {
}

func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb.SchedulerForFrontendClient) {
backoffConfig := backoff.Config{
MinBackoff: 250 * time.Millisecond,
MaxBackoff: 2 * time.Second,
}
// attemptLoop returns false if there was any error with forwarding requests to scheduler.
attemptLoop := func() bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel the stream after we are done to release resources

backoff := backoff.New(ctx, backoffConfig)
for backoff.Ongoing() {
loop, loopErr := client.FrontendLoop(ctx)
if loopErr != nil {
level.Error(w.log).Log("msg", "error contacting scheduler", "err", loopErr, "addr", w.schedulerAddr)
backoff.Wait()
continue
return false
}

loopErr = w.schedulerLoop(loop)
Expand All @@ -287,11 +284,22 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb

if loopErr != nil {
level.Error(w.log).Log("msg", "error sending requests to scheduler", "err", loopErr, "addr", w.schedulerAddr)
backoff.Wait()
continue
return false
}
return true
}

backoff.Reset()
backoffConfig := backoff.Config{
MinBackoff: 250 * time.Millisecond,
MaxBackoff: 2 * time.Second,
}
backoff := backoff.New(ctx, backoffConfig)
for backoff.Ongoing() {
if !attemptLoop() {
backoff.Wait()
} else {
backoff.Reset()
}
}
}

Expand Down
84 changes: 75 additions & 9 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"fmt"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/querier/stats"
Expand All @@ -39,10 +42,14 @@ import (
const testFrontendWorkerConcurrency = 5

func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) {
return setupFrontendWithConcurrencyAndServerOptions(t, reg, schedulerReplyFunc, testFrontendWorkerConcurrency)
}

func setupFrontendWithConcurrencyAndServerOptions(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend, concurrency int, opts ...grpc.ServerOption) (*Frontend, *mockScheduler) {
l, err := net.Listen("tcp", "")
require.NoError(t, err)

server := grpc.NewServer()
server := grpc.NewServer(opts...)

h, p, err := net.SplitHostPort(l.Addr().String())
require.NoError(t, err)
Expand All @@ -53,12 +60,11 @@ func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc f
cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.SchedulerAddress = l.Addr().String()
cfg.WorkerConcurrency = testFrontendWorkerConcurrency
cfg.WorkerConcurrency = concurrency
cfg.Addr = h
cfg.Port = grpcPort

//logger := log.NewLogfmtLogger(os.Stdout)
logger := log.NewNopLogger()
logger := log.NewLogfmtLogger(os.Stdout)
f, err := NewFrontend(cfg, logger, reg)
require.NoError(t, err)

Expand All @@ -67,17 +73,18 @@ func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc f
ms := newMockScheduler(t, f, schedulerReplyFunc)
schedulerpb.RegisterSchedulerForFrontendServer(server, ms)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), f))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), f)
})

go func() {
_ = server.Serve(l)
}()

t.Cleanup(func() {
_ = l.Close()
server.GracefulStop()
})

require.NoError(t, services.StartAndAwaitRunning(context.Background(), f))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), f)
})

// Wait for frontend to connect to scheduler.
Expand Down Expand Up @@ -425,3 +432,62 @@ func TestConfig_Validate(t *testing.T) {
})
}
}

func TestWithClosingGrpcServer(t *testing.T) {
// This test is easier with single frontend worker.
const frontendConcurrency = 1
const userID = "test"

f, _ := setupFrontendWithConcurrencyAndServerOptions(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.TOO_MANY_REQUESTS_PER_TENANT}
}, frontendConcurrency, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 100 * time.Millisecond,
MaxConnectionAge: 100 * time.Millisecond,
MaxConnectionAgeGrace: 100 * time.Millisecond,
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}))

// Connection will be established on the first roundtrip.
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int(resp.Code), http.StatusTooManyRequests)

// Verify that there is one stream open.
require.Equal(t, 1, checkStreamGoroutines())

// Wait a bit, to make sure that server closes connection.
time.Sleep(1 * time.Second)

// Despite server closing connections, stream-related goroutines still exist.
require.Equal(t, 1, checkStreamGoroutines())

// Another request will work as before, because worker will recreate connection.
resp, err = f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int(resp.Code), http.StatusTooManyRequests)

// There should still be only one stream open, and one goroutine created for it.
// Previously frontend leaked goroutine because stream that received "EOF" due to server closing the connection, never stopped its goroutine.
require.Equal(t, 1, checkStreamGoroutines())
}

const createdBy = "created by google.golang.org/grpc.newClientStreamWithParams"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Move this const inside checkStreamGoroutines().


func checkStreamGoroutines() int {
buf := make([]byte, 1000000)
stacklen := runtime.Stack(buf, true)

str := string(buf[:stacklen])
count := 0
for len(str) > 0 {
ix := strings.Index(str, createdBy)
if ix < 0 {
break
}
count++

str = str[ix+len(createdBy):]
}
return count
}