Skip to content

Commit

Permalink
example memory leak during configuration update
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Gridnev authored and flymedllva committed May 3, 2024
1 parent da53dd4 commit 880955e
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 26 deletions.
2 changes: 1 addition & 1 deletion router/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dev:
go run cmd/router/main.go
DEV_MODE=true go run cmd/router/main.go

update-snapshot:
cd ./pkg/config && go test -update -race ./...
Expand Down
1 change: 1 addition & 0 deletions router/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"engineConfig":{"defaultFlushInterval":"500","datasourceConfigurations":[{"kind":"GRAPHQL","rootNodes":[{"typeName":"Query","fieldNames":["test"]}],"overrideFieldPathFromAlias":true,"customGraphql":{"fetch":{"url":{"staticVariableContent":"http://localhost:8080/query"},"method":"POST","body":{},"baseUrl":{},"path":{}},"subscription":{"enabled":true,"url":{"staticVariableContent":"http://localhost:8080/query"},"protocol":"GRAPHQL_SUBSCRIPTION_PROTOCOL_WS"},"federation":{"enabled":true,"serviceSdl":"type Query {\n test(filter: TestFilter, a: Int! = 10, b: Int! = 50): Int!\n}\n\ninput TestFilter {\n a: TestSubFilter\n b: TestSubFilter\n}\n\ninput TestSubFilter {\n a: String\n b: String\n}\n\n\nextend schema @link(\n url: \"https://specs.apollo.dev/federation/v2.3\",\n import: [\"@external\", \"@requires\", \"@provides\", \"@key\", \"@shareable\", \"@inaccessible\", \"@tag\", \"@override\", \"@composeDirective\", \"@extends\"]\n)"},"upstreamSchema":{"key":"ff47b8f11ff1d6c665e7ba8e5657f743de825afe"}},"requestTimeoutSeconds":"10","id":"0"}],"fieldConfigurations":[{"typeName":"Query","fieldName":"test","argumentsConfiguration":[{"name":"filter","sourceType":"FIELD_ARGUMENT"},{"name":"a","sourceType":"FIELD_ARGUMENT"},{"name":"b","sourceType":"FIELD_ARGUMENT"}]}],"graphqlSchema":"directive @authenticated on ENUM | FIELD_DEFINITION | INTERFACE | OBJECT | SCALAR\n\ndirective @inaccessible on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\ndirective @requiresScopes(scopes: [[openfed__Scope!]!]!) on ENUM | FIELD_DEFINITION | INTERFACE | OBJECT | SCALAR\n\ndirective @tag(name: String!) repeatable on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\nscalar openfed__Scope\n\ntype Query {\n test(filter: TestFilter, a: Int! = 10, b: Int! = 50): Int!\n}\n\ninput TestFilter {\n a: TestSubFilter\n b: TestSubFilter\n}\n\ninput TestSubFilter {\n a: String\n b: String\n}","stringStorage":{"ff47b8f11ff1d6c665e7ba8e5657f743de825afe":"schema {\n query: Query\n}\n\ndirective @authenticated on ENUM | FIELD_DEFINITION | INTERFACE | OBJECT | SCALAR\n\ndirective @composeDirective(name: String!) repeatable on SCHEMA\n\ndirective @eventsPublish(sourceName: String! = \"default\", topic: String!) on FIELD_DEFINITION\n\ndirective @eventsRequest(sourceName: String! = \"default\", topic: String!) on FIELD_DEFINITION\n\ndirective @eventsSubscribe(sourceName: String! = \"default\", topic: String!) on FIELD_DEFINITION\n\ndirective @extends on INTERFACE | OBJECT\n\ndirective @external on FIELD_DEFINITION | OBJECT\n\ndirective @inaccessible on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\ndirective @interfaceObject on OBJECT\n\ndirective @key(fields: openfed__FieldSet!, resolvable: Boolean = true) repeatable on INTERFACE | OBJECT\n\ndirective @link(as: String, for: String, import: [String], url: String!) repeatable on SCHEMA\n\ndirective @override(from: String!) on FIELD_DEFINITION\n\ndirective @provides(fields: openfed__FieldSet!) on FIELD_DEFINITION\n\ndirective @requires(fields: openfed__FieldSet!) on FIELD_DEFINITION\n\ndirective @requiresScopes(scopes: [[openfed__Scope!]!]!) on ENUM | FIELD_DEFINITION | INTERFACE | OBJECT | SCALAR\n\ndirective @shareable on FIELD_DEFINITION | OBJECT\n\ndirective @tag(name: String!) repeatable on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\ntype Query {\n test(a: Int! = 10, b: Int! = 50, filter: TestFilter): Int!\n}\n\ninput TestFilter {\n a: TestSubFilter\n b: TestSubFilter\n}\n\ninput TestSubFilter {\n a: String\n b: String\n}\n\nscalar openfed__FieldSet\n\nscalar openfed__Scope"}},"subgraphs":[{"id":"0","name":"test","routingUrl":"http://localhost:8080/query"}]}
41 changes: 37 additions & 4 deletions router/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,39 @@
# yaml-language-server: $schema=./pkg/config/config.schema.json
version: "1"

# See pkg/config/config.go for the full list of configuration options.
# This file is used to debugging purposes only.
# General router options
graph:
name: "production"
token: ""

version: "1"
log_level: "info"
listen_addr: "localhost:3002"
playground_enabled: true
introspection_enabled: true
json_log: true
shutdown_delay: 15s
grace_period: 20s
poll_interval: 10s
health_check_path: "/health"
readiness_check_path: "/health/ready"
liveness_check_path: "/health/live"
router_config_path: "config.json"

cors:
allow_origins: ["*"]
allow_methods:
- HEAD
- GET
- POST
allow_headers:
- Origin
- Content-Length
- Content-Type
allow_credentials: true
max_age_minutes: 5m

# Config for custom modules
# See "https://cosmo-docs.wundergraph.com/router/custom-modules" for more information
modules:
myModule:
# Arbitrary values, unmarshalled by the module
value: 1
70 changes: 49 additions & 21 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,11 @@ func (r *Router) bootstrap(ctx context.Context) error {
return nil
}

type routerConfigPoll struct {
pollInterval time.Duration
ticker *time.Ticker
}

// Start starts the server. It does not block. The server can be shutdown with Router.Shutdown().
// Not safe for concurrent use.
func (r *Router) Start(ctx context.Context) error {
Expand All @@ -793,22 +798,23 @@ func (r *Router) Start(ctx context.Context) error {
if err := r.bootstrap(ctx); err != nil {
return fmt.Errorf("failed to bootstrap application: %w", err)
}
routerConfig := r.routerConfig

// Start the server with the static config without polling
if r.routerConfig != nil {
r.logger.Info("Static router config provided. Polling is disabled. Updating router config is only possible by providing a config.")
return r.updateServerAndStart(ctx, r.routerConfig)
}
//if r.routerConfig != nil {
// r.logger.Info("Static router config provided. Polling is disabled. Updating router config is only possible by providing a config.")
// return r.updateServerAndStart(ctx, r.routerConfig)
//}

// when no static config is provided and no poller is configured, we can't start the server
if r.configPoller == nil {
return fmt.Errorf("config fetcher not provided. Please provide a static router config instead")
}
//if r.configPoller == nil {
// return fmt.Errorf("config fetcher not provided. Please provide a static router config instead")
//}

routerConfig, err := r.configPoller.GetRouterConfig(ctx)
if err != nil {
return fmt.Errorf("failed to get initial router config: %w", err)
}
//routerConfig, err := r.configPoller.GetRouterConfig(ctx)
//if err != nil {
// return fmt.Errorf("failed to get initial router config: %w", err)
//}

if err := r.updateServerAndStart(ctx, routerConfig); err != nil {
r.logger.Error("Failed to start server with initial config", zap.Error(err))
Expand All @@ -817,17 +823,39 @@ func (r *Router) Start(ctx context.Context) error {

r.logger.Info("Polling for router config updates in the background")

r.configPoller.Subscribe(ctx, func(newConfig *nodev1.RouterConfig, oldVersion string) error {
r.logger.Info("Router config has changed, upgrading server",
zap.String("old_version", oldVersion),
zap.String("new_version", newConfig.GetVersion()),
)
if err := r.updateServerAndStart(ctx, newConfig); err != nil {
r.logger.Error("Failed to start server with new config. Trying again on the next update cycle.", zap.Error(err))
return err
//r.configPoller.Subscribe(ctx, func(newConfig *nodev1.RouterConfig, oldVersion string) error {
// r.logger.Info("Router config has changed, upgrading server",
// zap.String("old_version", oldVersion),
// zap.String("new_version", newConfig.GetVersion()),
// )
// if err := r.updateServerAndStart(ctx, newConfig); err != nil {
// r.logger.Error("Failed to start server with new config. Trying again on the next update cycle.", zap.Error(err))
// return err
// }
// return nil
//})
interval := time.Second / 64
c := &routerConfigPoll{
pollInterval: interval,
ticker: time.NewTicker(interval),
}
go func() {
for {
select {
case <-ctx.Done():
c.ticker.Stop()
return
case <-c.ticker.C:
r.logger.Info("Router config has changed, upgrading server",
zap.String("old_version", routerConfig.GetVersion()),
zap.String("new_version", routerConfig.GetVersion()),
)
if err := r.updateServerAndStart(ctx, routerConfig); err != nil {
r.logger.Error("Failed to start server with new config. Trying again on the next update cycle.", zap.Error(err))
}
}
}
return nil
})
}()

return nil
}
Expand Down

0 comments on commit 880955e

Please sign in to comment.