diff --git a/api_definition.go b/api_definition.go index aa8ee807c07..ffa8a95d439 100644 --- a/api_definition.go +++ b/api_definition.go @@ -832,6 +832,11 @@ func (a *APISpec) Init(authStore, sessionStore, healthStore, orgStore storage.Ha a.OrgSessionManager.Init(orgStore) } +func (a *APISpec) StopSessionManagerPool() { + a.SessionManager.Stop() + a.OrgSessionManager.Stop() +} + func (a *APISpec) getURLStatus(stat URLStatus) RequestStatus { switch stat { case Ignored: diff --git a/auth_manager.go b/auth_manager.go index daca2f32f99..9c3f3d1dd67 100644 --- a/auth_manager.go +++ b/auth_manager.go @@ -54,6 +54,8 @@ type DefaultSessionManager struct { poolSize int shouldStop uint32 poolWG sync.WaitGroup + bufferSize int + keyPrefix string } type SessionUpdate struct { @@ -103,15 +105,22 @@ func (b *DefaultSessionManager) Init(store storage.Handler) { b.store.Connect() if b.asyncWrites { - //check pool size in config and set to 50 if unset + // check pool size in config and set to 50 if unset b.poolSize = config.Global().SessionUpdatePoolSize if b.poolSize <= 0 { b.poolSize = 50 } + //check size for channel buffer and set to 1000 if unset + b.bufferSize = config.Global().SessionUpdateBufferSize + if b.bufferSize <= 0 { + b.bufferSize = 1000 + } log.WithField("Auth Manager", b.poolSize).Debug("Session update async pool size") - b.updateChan = make(chan *SessionUpdate) + b.updateChan = make(chan *SessionUpdate, b.bufferSize) + + b.keyPrefix = b.store.GetKeyPrefix() //start worker pool atomic.SwapUint32(&b.shouldStop, 0) @@ -125,32 +134,42 @@ func (b *DefaultSessionManager) Init(store storage.Handler) { func (b *DefaultSessionManager) updateWorker() { defer b.poolWG.Done() - for { - //grab update object from channel - u, ok := <-b.updateChan - //if channel closed return from worker - if !ok { - return + for range b.updateChan { + // grab update object from channel + u := <-b.updateChan + + v, err := json.Marshal(u.session) + if err != nil { + log.Error("Error marshalling session for async session update") + continue } - v, _ := json.Marshal(u.session) if u.isHashed { - u.keyVal = b.store.GetKeyPrefix() + u.keyVal - b.store.SetRawKey(u.keyVal, string(v), u.ttl) + u.keyVal = b.keyPrefix + u.keyVal + err := b.store.SetRawKey(u.keyVal, string(v), u.ttl) + if err != nil { + log.Errorf("Error updating hashed key: %v", err) + } + continue } - b.store.SetKey(u.keyVal, string(v), u.ttl) + err = b.store.SetKey(u.keyVal, string(v), u.ttl) + if err != nil { + log.Errorf("Error updating non-hashed key: %v", err) + } } } func (b *DefaultSessionManager) Stop() { - //flag to stop adding data to chan - atomic.SwapUint32(&b.shouldStop, 1) - //close update channel - close(b.updateChan) - //wait for workers to finish - b.poolWG.Wait() + if atomic.LoadUint32(&b.shouldStop) == 0 { + // flag to stop adding data to chan + atomic.SwapUint32(&b.shouldStop, 1) + // close update channel + close(b.updateChan) + // wait for workers to finish + b.poolWG.Wait() + } } func (b *DefaultSessionManager) Store() storage.Handler { @@ -176,39 +195,37 @@ func (b *DefaultSessionManager) ResetQuota(keyName string, session *user.Session // UpdateSession updates the session state in the storage engine func (b *DefaultSessionManager) UpdateSession(keyName string, session *user.SessionState, resetTTLTo int64, hashed bool) error { - if !session.HasChanged() { - log.Debug("Session has not changed, not updating") - return nil - } // async update and return if needed if b.asyncWrites { if atomic.LoadUint32(&b.shouldStop) > 0 { return nil } - b.renewSessionState(keyName, session) - sessionMeta := &SessionUpdate{ + sessionUpdate := &SessionUpdate{ isHashed: hashed, keyVal: keyName, session: session, ttl: resetTTLTo, } - //send sessionupdate object through channel and do hard work in the pool - b.updateChan <- sessionMeta + // send sessionupdate object through channel to pool + b.updateChan <- sessionUpdate return nil } - v, _ := json.Marshal(session) + v, err := json.Marshal(session) + if err != nil { + log.Error("Error marshalling session for sync update") + return err + } if hashed { keyName = b.store.GetKeyPrefix() + keyName } // sync update - var err error if hashed { err = b.store.SetRawKey(keyName, string(v), resetTTLTo) } else { diff --git a/cli/lint/schema.go b/cli/lint/schema.go index c88c72c27b3..a022035a5fc 100644 --- a/cli/lint/schema.go +++ b/cli/lint/schema.go @@ -494,6 +494,9 @@ const confSchema = `{ "session_update_pool_size":{ "type": "integer" }, + "session_update_buffer_size":{ + "type": "integer" + }, "pid_file_location": { "type": "string" }, diff --git a/config/config.go b/config/config.go index d394626290b..e4552a4661a 100644 --- a/config/config.go +++ b/config/config.go @@ -213,6 +213,7 @@ type Config struct { HealthCheck HealthCheckConfig `json:"health_check"` UseAsyncSessionWrite bool `json:"optimisations_use_async_session_write"` SessionUpdatePoolSize int `json:"session_update_pool_size"` + SessionUpdateBufferSize int `json:"session_update_buffer_size"` AllowMasterKeys bool `json:"allow_master_keys"` HashKeys bool `json:"hash_keys"` HashKeyFunction string `json:"hash_key_function"` diff --git a/main.go b/main.go index 6ad66f8524c..694046137f0 100644 --- a/main.go +++ b/main.go @@ -999,11 +999,13 @@ func main() { analytics.Stop() } - //if using async session writes stop workers + // if using async session writes stop workers if config.Global().UseAsyncSessionWrite { DefaultOrgStore.Stop() - DefaultQuotaStore.Stop() - FallbackKeySesionManager.Stop() + for i := range apiSpecs { + apiSpecs[i].StopSessionManagerPool() + } + } // write pprof profiles