From c7b7a4744a01271a1a081dc9a5e60713601953c8 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 15 Jan 2018 18:24:11 +0200 Subject: [PATCH] Add more router locks --- api_loader.go | 6 +++--- gateway_test.go | 24 ++++++++++++++++++++++++ helpers_test.go | 1 - main.go | 17 +++++++++++------ rpc_backup_handlers.go | 7 ++++--- 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/api_loader.go b/api_loader.go index 54c15335347..081fd7bc87c 100644 --- a/api_loader.go +++ b/api_loader.go @@ -518,20 +518,20 @@ func (d *DummyProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { d.SH.ServeHTTP(w, r) } -func loadGlobalApps() { +func loadGlobalApps(router *mux.Router) { // we need to make a full copy of the slice, as loadApps will // use in-place to sort the apis. apisMu.RLock() specs := make([]*APISpec, len(apiSpecs)) copy(specs, apiSpecs) apisMu.RUnlock() - loadApps(specs, mainRouter) + loadApps(specs, router) if config.Global.NewRelic.AppName != "" { log.WithFields(logrus.Fields{ "prefix": "main", }).Info("Adding NewRelic instrumentation") - AddNewRelicInstrumentation(NewRelicApplication, mainRouter) + AddNewRelicInstrumentation(NewRelicApplication, router) } } diff --git a/gateway_test.go b/gateway_test.go index 4afae0b3fb4..055073ddc24 100644 --- a/gateway_test.go +++ b/gateway_test.go @@ -706,6 +706,30 @@ func TestWebsocketsUpstreamUpgradeRequest(t *testing.T) { }) } +// func TestConcurrencyReloads(t *testing.T) { +// var wg sync.WaitGroup + +// ts := newTykTestServer() +// defer ts.Close() + +// buildAndLoadAPI() + +// for i := 0; i < 100; i++ { +// wg.Add(1) +// go func() { +// client := &http.Client{} +// ts.Run(t, test.TestCase{Client: client, Path: "/sample", Code: 200}) +// wg.Done() +// }() +// } + +// for i := 0; i < 10; i++ { +// buildAndLoadAPI() +// } + +// wg.Wait() +// } + func TestWebsocketsSeveralOpenClose(t *testing.T) { config.Global.HttpServerOptions.EnableWebSockets = true defer resetTestConfig() diff --git a/helpers_test.go b/helpers_test.go index 243b7722018..4757dcf3187 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -253,7 +253,6 @@ type tykTestServerConfig struct { delay time.Duration hotReload bool overrideDefaults bool - lastResponse *http.Response } type tykTestServer struct { diff --git a/main.go b/main.go index eb6c2575122..9cc4904f097 100644 --- a/main.go +++ b/main.go @@ -126,6 +126,9 @@ func apisByIDLen() int { // Create all globals and init connection handlers func setupGlobals() { + reloadMu.Lock() + defer reloadMu.Unlock() + mainRouter = mux.NewRouter() controlRouter = mux.NewRouter() @@ -656,21 +659,23 @@ func doReload() { log.WithFields(logrus.Fields{ "prefix": "main", }).Info("Preparing new router") - mainRouter = mux.NewRouter() + newRouter := mux.NewRouter() if config.Global.HttpServerOptions.OverrideDefaults { - mainRouter.SkipClean(config.Global.HttpServerOptions.SkipURLCleaning) + newRouter.SkipClean(config.Global.HttpServerOptions.SkipURLCleaning) } if config.Global.ControlAPIPort == 0 { - loadAPIEndpoints(mainRouter) + loadAPIEndpoints(newRouter) } - loadGlobalApps() + loadGlobalApps(newRouter) log.WithFields(logrus.Fields{ "prefix": "main", }).Info("API reload complete") + mainRouter = newRouter + // Unset these rpcEmergencyModeLoaded = false rpcEmergencyMode = false @@ -1321,7 +1326,7 @@ func listen(l, controlListener net.Listener, err error) { if !rpcEmergencyMode { count := syncAPISpecs() if count > 0 { - loadGlobalApps() + loadGlobalApps(mainRouter) syncPolicies() } @@ -1397,7 +1402,7 @@ func listen(l, controlListener net.Listener, err error) { if !rpcEmergencyMode { count := syncAPISpecs() if count > 0 { - loadGlobalApps() + loadGlobalApps(mainRouter) syncPolicies() } diff --git a/rpc_backup_handlers.go b/rpc_backup_handlers.go index 9c6c0e4ecf1..3edce925382 100644 --- a/rpc_backup_handlers.go +++ b/rpc_backup_handlers.go @@ -95,7 +95,6 @@ func doLoadWithBackup(specs []*APISpec) { log.Warning("[RPC Backup] --> Initialised JSVM") newRouter := mux.NewRouter() - mainRouter = newRouter log.Warning("[RPC Backup] --> Set up routers") log.Warning("[RPC Backup] --> Loading endpoints") @@ -108,12 +107,14 @@ func doLoadWithBackup(specs []*APISpec) { if config.Global.NewRelic.AppName != "" { log.Warning("[RPC Backup] --> Adding NewRelic instrumentation") - AddNewRelicInstrumentation(NewRelicApplication, mainRouter) + AddNewRelicInstrumentation(NewRelicApplication, newRouter) log.Warning("[RPC Backup] --> NewRelic instrumentation added") } newServeMux := http.NewServeMux() - newServeMux.Handle("/", mainRouter) + newServeMux.Handle("/", newRouter) + + mainRouter = newRouter http.DefaultServeMux = newServeMux log.Warning("[RPC Backup] --> Replaced muxer")