Skip to content

Commit

Permalink
start router service asynchronously
Browse files Browse the repository at this point in the history
---

Signed-off-by: Vardhaman Surana <vardhaman.surana@infracloud.io>
  • Loading branch information
vardhaman-surana committed Oct 18, 2023
1 parent adb6ade commit aa1289b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 29 deletions.
12 changes: 7 additions & 5 deletions cmd/fission-bundle/main.go
Expand Up @@ -54,8 +54,8 @@ func runCanaryConfigServer(ctx context.Context, logger *zap.Logger) error {
return canaryconfigmgr.StartCanaryServer(ctx, logger, false)
}

func runRouter(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, port int, executorUrl string) {
router.Start(ctx, clientGen, logger, port, executorUrl)
func runRouter(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, port int, executorUrl string) error {
return router.Start(ctx, clientGen, logger, port, executorUrl)
}

func runExecutor(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, port int) error {
Expand Down Expand Up @@ -245,9 +245,11 @@ Options:

if arguments["--routerPort"] != nil {
port := getPort(logger, arguments["--routerPort"])
runRouter(ctx, clientGen, logger, port, executorUrl)
logger.Error("router exited")
return
err = runRouter(ctx, clientGen, logger, port, executorUrl)
if err != nil {
logger.Error("router exited", zap.Error(err))
return
}
}

if arguments["--executorPort"] != nil {
Expand Down
38 changes: 14 additions & 24 deletions pkg/router/router.go
Expand Up @@ -47,6 +47,7 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.uber.org/zap"

Expand Down Expand Up @@ -87,72 +88,60 @@ func serve(ctx context.Context, logger *zap.Logger, port int,
}

// Start starts a router
func Start(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, port int, executorURL string) {
func Start(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, port int, executorURL string) error {
fmap := makeFunctionServiceMap(logger, time.Minute)

fissionClient, err := clientGen.GetFissionClient()
if err != nil {
logger.Fatal("error making the fission client", zap.Error(err))
return errors.Wrap(err, "error making the fission client")
}
kubeClient, err := clientGen.GetKubernetesClient()
if err != nil {
logger.Fatal("error making the kube client", zap.Error(err))
return errors.Wrap(err, "error making the kube client")
}

err = crd.WaitForCRDs(ctx, logger, fissionClient)
if err != nil {
logger.Fatal("error waiting for CRDs", zap.Error(err))
return errors.Wrap(err, "error waiting for CRDs")
}

executor := executorClient.MakeClient(logger, executorURL)

timeoutStr := os.Getenv("ROUTER_ROUND_TRIP_TIMEOUT")
timeout, err := time.ParseDuration(timeoutStr)
if err != nil {
logger.Fatal("failed to parse timeout duration from 'ROUTER_ROUND_TRIP_TIMEOUT'",
zap.Error(err),
zap.String("value", timeoutStr))
return errors.Wrap(err, fmt.Sprintf("failed to parse timeout duration value('%s') from 'ROUTER_ROUND_TRIP_TIMEOUT'", timeoutStr))
}

timeoutExponentStr := os.Getenv("ROUTER_ROUNDTRIP_TIMEOUT_EXPONENT")
timeoutExponent, err := strconv.Atoi(timeoutExponentStr)
if err != nil {
logger.Fatal("failed to parse timeout exponent from 'ROUTER_ROUNDTRIP_TIMEOUT_EXPONENT'",
zap.Error(err),
zap.String("value", timeoutExponentStr))
return errors.Wrap(err, fmt.Sprintf("failed to parse timeout exponent value('%s') from 'ROUTER_ROUNDTRIP_TIMEOUT_EXPONENT'", timeoutExponentStr))
}

keepAliveTimeStr := os.Getenv("ROUTER_ROUND_TRIP_KEEP_ALIVE_TIME")
keepAliveTime, err := time.ParseDuration(keepAliveTimeStr)
if err != nil {
logger.Fatal("failed to parse keep alive duration from 'ROUTER_ROUND_TRIP_KEEP_ALIVE_TIME'",
zap.Error(err),
zap.String("value", keepAliveTimeStr))
return errors.Wrap(err, fmt.Sprintf("failed to parse keep alive duration value('%s') from 'ROUTER_ROUND_TRIP_KEEP_ALIVE_TIME'", keepAliveTimeStr))
}

disableKeepAliveStr := os.Getenv("ROUTER_ROUND_TRIP_DISABLE_KEEP_ALIVE")
disableKeepAlive, err := strconv.ParseBool(disableKeepAliveStr)
if err != nil {
disableKeepAlive = true
logger.Fatal("failed to parse enable keep alive from 'ROUTER_ROUND_TRIP_DISABLE_KEEP_ALIVE'",
zap.Error(err),
zap.String("value", disableKeepAliveStr))
return errors.Wrap(err, fmt.Sprintf("failed to parse enable keep alive value('%s') from 'ROUTER_ROUND_TRIP_DISABLE_KEEP_ALIVE'", disableKeepAliveStr))
}

maxRetriesStr := os.Getenv("ROUTER_ROUND_TRIP_MAX_RETRIES")
maxRetries, err := strconv.Atoi(maxRetriesStr)
if err != nil {
logger.Fatal("failed to parse max retries from 'ROUTER_ROUND_TRIP_MAX_RETRIES'",
zap.Error(err),
zap.String("value", maxRetriesStr))
return errors.Wrap(err, fmt.Sprintf("failed to parse max retries value('%s') from 'ROUTER_ROUND_TRIP_MAX_RETRIES'", maxRetriesStr))
}

isDebugEnvStr := os.Getenv("DEBUG_ENV")
isDebugEnv, err := strconv.ParseBool(isDebugEnvStr)
if err != nil {
logger.Fatal("failed to parse debug env from 'DEBUG_ENV'",
zap.Error(err),
zap.String("value", isDebugEnvStr))
return errors.Wrap(err, fmt.Sprintf("failed to parse debug env value('%s') from 'DEBUG_ENV'", isDebugEnvStr))
}

// svcAddrRetryCount is the max times for RetryingRoundTripper to retry with a specific service address
Expand Down Expand Up @@ -208,7 +197,7 @@ func Start(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *
svcAddrRetryCount: svcAddrRetryCount,
}, isDebugEnv, unTapServiceTimeout, throttler.MakeThrottler(svcAddrUpdateTimeout))
if err != nil {
logger.Fatal("error making HTTP trigger set", zap.Error(err))
return errors.Wrap(err, "error making HTTP trigger set")
}
go metrics.ServeMetrics(ctx, logger)

Expand All @@ -218,5 +207,6 @@ func Start(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *
ctx, span := tracer.Start(ctx, "router/Start")
defer span.End()

serve(ctx, logger, port, triggers, displayAccessLog)
go serve(ctx, logger, port, triggers, displayAccessLog)
return nil
}

0 comments on commit aa1289b

Please sign in to comment.