Skip to content

Commit

Permalink
race fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dencoded committed Nov 23, 2017
1 parent 4b61494 commit a83964a
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 74 deletions.
2 changes: 1 addition & 1 deletion api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {

//store.Disconnect()

if rpcLoadCount > 0 {
if atomic.LoadUint32(&rpcLoadCount) > 0 {
saveRPCDefinitionsBackup(apiCollection)
}

Expand Down
10 changes: 8 additions & 2 deletions api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/gorilla/mux"
"github.com/justinas/alice"

"sync"
"sync/atomic"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/coprocess"
Expand Down Expand Up @@ -224,7 +227,7 @@ func processSpec(spec *APISpec, apisByListen map[string]int,

if spec.UseOauth2 {
log.Debug("Loading OAuth Manager")
if !rpcEmergencyMode {
if atomic.LoadUint32(&rpcEmergencyMode) == 0 {
oauthManager := addOAuthHandlers(spec, subrouter)
log.Debug("-- Added OAuth Handlers")

Expand Down Expand Up @@ -519,7 +522,10 @@ func (d *DummyProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// Create the individual API (app) specs based on live configurations and assign middleware
func loadApps(specs []*APISpec, muxer *mux.Router) {
func loadApps(specs []*APISpec, muxer *mux.Router, muxerMu *sync.Mutex) {
muxerMu.Lock()
defer muxerMu.Unlock()

hostname := config.Global.HostName
if hostname != "" {
muxer = muxer.Host(hostname).Subrouter()
Expand Down
22 changes: 12 additions & 10 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const apiTestDef = `{

func loadSampleAPI(t *testing.T, def string) {
spec := createSpecTest(t, def)
loadApps([]*APISpec{spec}, discardMuxer)
loadApps([]*APISpec{spec}, discardMuxer, &discardMuxerMu)
}

type testAPIDefinition struct {
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestApiHandlerPostDupPath(t *testing.T) {
apisMu.Lock()
apisByID = make(map[string]*APISpec)
apisMu.Unlock()
loadApps(specs(), discardMuxer)
loadApps(specs(), discardMuxer, &discardMuxerMu)

s2 = getApiSpec("2")
if want, got := "/v1-2", s2.Proxy.ListenPath; want != got {
Expand All @@ -177,9 +177,9 @@ func TestApiHandlerPostDupPath(t *testing.T) {
apisMu.Lock()
apisByID = make(map[string]*APISpec)
apisMu.Unlock()
loadApps(specs()[1:], discardMuxer)
loadApps(specs(), discardMuxer)
loadApps(specs(), discardMuxer)
loadApps(specs()[1:], discardMuxer, &discardMuxerMu)
loadApps(specs(), discardMuxer, &discardMuxerMu)
loadApps(specs(), discardMuxer, &discardMuxerMu)

s2 = getApiSpec("2")
if want, got := "/v1-2", s2.Proxy.ListenPath; want != got {
Expand All @@ -195,8 +195,8 @@ func TestApiHandlerPostDupPath(t *testing.T) {
apisMu.Lock()
apisByID = make(map[string]*APISpec)
apisMu.Unlock()
loadApps(specs(), discardMuxer)
loadApps(specs(), discardMuxer)
loadApps(specs(), discardMuxer, &discardMuxerMu)
loadApps(specs(), discardMuxer, &discardMuxerMu)

s2 = getApiSpec("2")
if want, got := "/v1-2", s2.Proxy.ListenPath; want != got {
Expand Down Expand Up @@ -670,8 +670,9 @@ func BenchmarkApiReload(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
newMuxes := mux.NewRouter()
loadAPIEndpoints(newMuxes)
loadApps(specs, newMuxes)
var newMuxesMu sync.Mutex
loadAPIEndpoints(newMuxes, &newMuxesMu)
loadApps(specs, newMuxes, &newMuxesMu)
}
}

Expand Down Expand Up @@ -746,7 +747,8 @@ func TestApiLoaderLongestPathFirst(t *testing.T) {
apisMu.Unlock()

mu := mux.NewRouter()
loadApps(specs, mu)
var muMutex sync.Mutex
loadApps(specs, mu, &muMutex)

for hp := range inputs {
rec := httptest.NewRecorder()
Expand Down
2 changes: 1 addition & 1 deletion cert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestGatewayControlAPIMutualTLS(t *testing.T) {
ln, _ := generateListener(0)
baseURL := "https://" + strings.Replace(ln.Addr().String(), "[::]", "127.0.0.1", -1)
listen(ln, nil, nil)
loadAPIEndpoints(mainRouter)
loadAPIEndpoints(mainRouter, &mainRouterMu)
defer func() {
ln.Close()
config.Global.HttpServerOptions.SSLCertificates = nil
Expand Down
9 changes: 6 additions & 3 deletions gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/gorilla/mux"
"github.com/justinas/alice"

"sync"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
Expand All @@ -34,7 +36,8 @@ func init() {

var (
// to register to, but never used
discardMuxer = mux.NewRouter()
discardMuxer = mux.NewRouter()
discardMuxerMu sync.Mutex

// to simulate time ticks for tests that do reloads
reloadTick = make(chan time.Time)
Expand Down Expand Up @@ -158,7 +161,7 @@ func TestMain(m *testing.M) {
afterConfSetup(&config.Global)
initialiseSystem(nil)
// Small part of start()
loadAPIEndpoints(mainRouter)
loadAPIEndpoints(mainRouter, &mainRouterMu)
if analytics.GeoIPDB == nil {
panic("GeoIPDB was not initialized")
}
Expand Down Expand Up @@ -829,7 +832,7 @@ func testHttp(t *testing.T, tests []tykHttpTest, separateControlPort bool) {
// This is emulate calling start()
// But this lines is the only thing needed for this tests
if config.Global.ControlAPIPort == 0 {
loadAPIEndpoints(mainRouter)
loadAPIEndpoints(mainRouter, &mainRouterMu)
}

if m.goagain {
Expand Down
77 changes: 52 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -48,6 +49,7 @@ var (
templates *template.Template
analytics RedisAnalyticsHandler
GlobalEventsJSVM JSVM
globalEventsJSVMMu sync.Mutex
memProfFile *os.File
MainNotifier RedisNotifier
DefaultOrgStore DefaultSessionManager
Expand All @@ -67,10 +69,14 @@ var (
policiesMu sync.RWMutex
policiesByID = map[string]user.Policy{}

mainRouter *mux.Router
controlRouter *mux.Router
LE_MANAGER letsencrypt.Manager
LE_FIRSTRUN bool
mainRouter *mux.Router
mainRouterMu sync.Mutex

controlRouter *mux.Router
controlRouterMu sync.Mutex

LE_MANAGER letsencrypt.Manager
LE_FIRSTRUN bool

NodeID string

Expand All @@ -88,22 +94,40 @@ var (
}
)

func initGlobalEventsJSVM(spec *APISpec) {
globalEventsJSVMMu.Lock()
defer globalEventsJSVMMu.Unlock()
GlobalEventsJSVM.Init(spec)
}

func resetMainRouter() {
mainRouterMu.Lock()
defer mainRouterMu.Unlock()
mainRouter = mux.NewRouter()
}

func resetControlRouter() {
controlRouterMu.Lock()
defer controlRouterMu.Unlock()
controlRouter = mux.NewRouter()
}

func getApiSpec(apiID string) *APISpec {
apisMu.RLock()
defer apisMu.RUnlock()
return apisByID[apiID]
}

func getApiSpecLen() int {
func apiSpecsLen() int {
apisMu.Lock()
defer apisMu.Unlock()
return len(apisByID)
}

// Create all globals and init connection handlers
func setupGlobals() {
mainRouter = mux.NewRouter()
controlRouter = mux.NewRouter()
resetMainRouter()
resetControlRouter()

if config.Global.EnableAnalytics && config.Global.Storage.Type != "redis" {
log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -141,7 +165,7 @@ func setupGlobals() {

// Set up global JSVM
if config.Global.EnableJSVM {
GlobalEventsJSVM.Init(nil)
initGlobalEventsJSVM(nil)
}

if config.Global.CoProcessOptions.EnableCoProcess {
Expand Down Expand Up @@ -324,7 +348,10 @@ func controlAPICheckClientCertificate(certLevel string, next http.Handler) http.
}

// Set up default Tyk control API endpoints - these are global, so need to be added first
func loadAPIEndpoints(muxer *mux.Router) {
func loadAPIEndpoints(muxer *mux.Router, muxerMu *sync.Mutex) {
muxerMu.Lock()
defer muxerMu.Unlock()

hostname := config.Global.HostName
if config.Global.ControlAPIHostname != "" {
hostname = config.Global.ControlAPIHostname
Expand Down Expand Up @@ -604,7 +631,7 @@ func doReload() {

// skip re-loading only if dashboard service reported 0 APIs
// and current registry had 0 APIs
if len(apiSpecs) == 0 && getApiSpecLen() == 0 {
if len(apiSpecs) == 0 && apiSpecsLen() == 0 {
log.WithFields(logrus.Fields{
"prefix": "main",
}).Warning("No API Definitions found, not reloading")
Expand All @@ -615,30 +642,30 @@ func doReload() {

// Reset the JSVM
if config.Global.EnableJSVM {
GlobalEventsJSVM.Init(nil)
initGlobalEventsJSVM(nil)
}

log.WithFields(logrus.Fields{
"prefix": "main",
}).Info("Preparing new router")
mainRouter = mux.NewRouter()
resetMainRouter()
if config.Global.HttpServerOptions.OverrideDefaults {
mainRouter.SkipClean(config.Global.HttpServerOptions.SkipURLCleaning)
}

if config.Global.ControlAPIPort == 0 {
loadAPIEndpoints(mainRouter)
loadAPIEndpoints(mainRouter, &mainRouterMu)
}

loadApps(apiSpecs, mainRouter)
loadApps(apiSpecs, mainRouter, &mainRouterMu)

log.WithFields(logrus.Fields{
"prefix": "main",
}).Info("API reload complete")

// Unset these
rpcEmergencyModeLoaded = false
rpcEmergencyMode = false
atomic.StoreUint32(&rpcEmergencyModeLoaded, 0)
atomic.StoreUint32(&rpcEmergencyMode, 0)
}

// startReloadChan and reloadDoneChan are used by the two reload loops
Expand Down Expand Up @@ -1158,7 +1185,7 @@ func start(arguments map[string]interface{}) {
DefaultQuotaStore.Init(getGlobalStorageHandler("orgkey.", false))
}
if config.Global.ControlAPIPort == 0 {
loadAPIEndpoints(mainRouter)
loadAPIEndpoints(mainRouter, &mainRouterMu)
}

// Start listening for reload messages
Expand Down Expand Up @@ -1300,15 +1327,15 @@ func listen(l, controlListener net.Listener, err error) {

startDRL()

if !rpcEmergencyMode {
if atomic.LoadUint32(&rpcEmergencyMode) == 0 {
syncAPISpecs()
if apiSpecs != nil {
loadApps(apiSpecs, mainRouter)
loadApps(apiSpecs, mainRouter, &mainRouterMu)
syncPolicies()
}

if config.Global.ControlAPIPort > 0 {
loadAPIEndpoints(controlRouter)
loadAPIEndpoints(controlRouter, &controlRouterMu)
}
}

Expand Down Expand Up @@ -1347,7 +1374,7 @@ func listen(l, controlListener net.Listener, err error) {

go http.Serve(l, mainHandler{})

if !rpcEmergencyMode {
if atomic.LoadUint32(&rpcEmergencyMode) == 0 {
if controlListener != nil {
go http.Serve(controlListener, controlRouter)
}
Expand Down Expand Up @@ -1376,15 +1403,15 @@ func listen(l, controlListener net.Listener, err error) {
startDRL()

// Resume accepting connections in a new goroutine.
if !rpcEmergencyMode {
if atomic.LoadUint32(&rpcEmergencyMode) == 0 {
syncAPISpecs()
if apiSpecs != nil {
loadApps(apiSpecs, mainRouter)
loadApps(apiSpecs, mainRouter, &mainRouterMu)
syncPolicies()
}

if config.Global.ControlAPIPort > 0 {
loadAPIEndpoints(controlRouter)
loadAPIEndpoints(controlRouter, &controlRouterMu)
}

if config.Global.UseDBAppConfigs {
Expand Down Expand Up @@ -1425,7 +1452,7 @@ func listen(l, controlListener net.Listener, err error) {

go http.Serve(l, mainHandler{})

if !rpcEmergencyMode {
if atomic.LoadUint32(&rpcEmergencyMode) == 0 {
if controlListener != nil {
log.WithFields(logrus.Fields{
"prefix": "main",
Expand Down
2 changes: 1 addition & 1 deletion mw_version_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func TestVersionMwExpiresHeader(t *testing.T) {
spec := createSpecTest(t, nonExpiringDef)
loadApps([]*APISpec{spec}, discardMuxer)
loadApps([]*APISpec{spec}, discardMuxer, &discardMuxerMu)

session := createNonThrottledSession()
spec.SessionManager.UpdateSession("1234xyz", session, 60)
Expand Down
Loading

0 comments on commit a83964a

Please sign in to comment.