Skip to content

Commit

Permalink
fix: Connection retries when scheduler restarts for dataflow and cont…
Browse files Browse the repository at this point in the history
…roller (#5292)

* add retry for connection drop

* fix typo

* add retry for controller grpc streams

* add helper function for backoff retry

* add comment
  • Loading branch information
sakoush authored Feb 5, 2024
1 parent 92b9945 commit a0093db
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 30 deletions.
1 change: 1 addition & 0 deletions operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2y
github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/c2h5oh/datasize v0.0.0-20171227191756-4eba002a5eae/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
Expand Down Expand Up @@ -382,6 +384,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97DwqyJO1AENw9kA=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
Expand Down
67 changes: 53 additions & 14 deletions operator/scheduler/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"
"time"

backoff "github.com/cenkalti/backoff/v4"
"github.com/go-logr/logr"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"google.golang.org/grpc"
Expand Down Expand Up @@ -43,7 +44,7 @@ type SchedulerClient struct {
mu sync.Mutex
}

// connect on demand by add getConnection(namespace) which if not existing calls connect to sheduler.
// connect on demand by add getConnection(namespace) which if not existing calls connect to scheduler.
// For this will need to know ports (hardwire for now to 9004 and 9044 - ssl comes fom envvar - so always
// the same for all schedulers

Expand All @@ -66,30 +67,68 @@ func getSchedulerHost(namespace string) string {
return fmt.Sprintf("seldon-scheduler.%s", namespace)
}

// startEventHanders starts the grpc stream connections to the scheduler for the different resources we care about
// we also add a retry mechanism to reconnect if the connection is lost, this can happen if the scheduler is restarted
// or if the network connection is lost. We use an exponential backoff to retry the connection.
// note that when the scheduler is completely dead we will be not be able to reconnect and these go routines will retry forever
// TODO: add a max retry count and report back to the caller.
func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientConn) {
retryFn := func(fn func(context context.Context, conn *grpc.ClientConn) error, context context.Context, conn *grpc.ClientConn) error {
logFailure := func(err error, delay time.Duration) {
s.logger.Error(err, "Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
fnWithArgs := func() error {
return fn(context, conn)
}
err := backoff.RetryNotify(fnWithArgs, backOffExp, logFailure)
if err != nil {
s.logger.Error(err, "Failed to connect to scheduler", "namespace", namespace)
return err
}
return nil
}

// Subscribe the event streams from scheduler
go func() {
err := s.SubscribeModelEvents(context.Background(), conn)
if err != nil {
s.RemoveConnection(namespace)
for {
err := retryFn(s.SubscribeModelEvents, context.Background(), conn)
if err != nil {
s.logger.Error(err, "Subscribe ended for model events", "namespace", namespace)
} else {
s.logger.Info("Subscribe ended for model events", "namespace", namespace)
}
}
}()
go func() {
err := s.SubscribeServerEvents(context.Background(), conn)
if err != nil {
s.RemoveConnection(namespace)
for {
err := retryFn(s.SubscribeServerEvents, context.Background(), conn)
if err != nil {
s.logger.Error(err, "Subscribe ended for server events", "namespace", namespace)
} else {
s.logger.Info("Subscribe ended for server events", "namespace", namespace)
}
}
}()
go func() {
err := s.SubscribePipelineEvents(context.Background(), conn)
if err != nil {
s.RemoveConnection(namespace)
for {
err := retryFn(s.SubscribePipelineEvents, context.Background(), conn)
if err != nil {
s.logger.Error(err, "Subscribe ended for pipeline events", "namespace", namespace)
} else {
s.logger.Info("Subscribe ended for pipeline events", "namespace", namespace)
}
}
}()
go func() {
err := s.SubscribeExperimentEvents(context.Background(), conn)
if err != nil {
s.RemoveConnection(namespace)
for {
err := retryFn(s.SubscribeExperimentEvents, context.Background(), conn)
if err != nil {
s.logger.Error(err, "Subscribe ended for experiment events", "namespace", namespace)
} else {
s.logger.Info("Subscribe ended for experiment events", "namespace", namespace)
}
}
}()
}
Expand All @@ -106,7 +145,7 @@ func (s *SchedulerClient) RemoveConnection(namespace string) {
}
}

// A smoke test allows us to quicky check if we actually have a functional grpc connection to the scheduler
// A smoke test allows us to quickly check if we actually have a functional grpc connection to the scheduler
func (s *SchedulerClient) smokeTestConnection(conn *grpc.ClientConn) error {
grcpClient := scheduler.NewSchedulerClient(conn)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ the Change License after the Change Date as each is defined in accordance with t

package io.seldon.dataflow

import com.github.michaelbull.retry.ContinueRetrying
import com.github.michaelbull.retry.StopRetrying
import com.github.michaelbull.retry.policy.RetryPolicy
import com.github.michaelbull.retry.policy.binaryExponentialBackoff
import com.github.michaelbull.retry.policy.plus
import com.github.michaelbull.retry.retry
import io.grpc.ManagedChannelBuilder
import io.grpc.StatusException
Expand Down Expand Up @@ -55,19 +51,13 @@ class PipelineSubscriber(
private val client = ChainerGrpcKt.ChainerCoroutineStub(channel)

private val pipelines = ConcurrentHashMap<PipelineId, Pipeline>()
private val grpcFailurePolicy: RetryPolicy<Throwable> = {
when (reason) {
is StatusException,
is StatusRuntimeException -> ContinueRetrying
else -> StopRetrying
// TODO - be more intelligent about non-retryable errors (e.g. not implemented)
}
}

suspend fun subscribe() {
logger.info("will connect to ${upstreamHost}:${upstreamPort}")
retry(grpcFailurePolicy + binaryExponentialBackoff(50..5_000L)) {
subscribePipelines(kafkaConsumerGroupIdPrefix, namespace)
while (true) {
retry(binaryExponentialBackoff(50..5_000L)) {
subscribePipelines(kafkaConsumerGroupIdPrefix, namespace)
}
}
}

Expand Down Expand Up @@ -97,8 +87,12 @@ class PipelineSubscriber(
else -> logger.warn("unrecognised pipeline operation (${update.op})")
}
}
.onCompletion {
logger.info("pipeline subscription terminated")
.onCompletion { cause ->
if (cause == null) {
logger.info("pipeline subscription completed successfully")
} else {
logger.error("pipeline subscription terminated with error ${cause}")
}
}
.collect()
// TODO - error handling?
Expand Down

0 comments on commit a0093db

Please sign in to comment.