Skip to content

Commit

Permalink
switch to using worker pool for async session updates
Browse files Browse the repository at this point in the history
  • Loading branch information
joshblakeley committed Jun 20, 2018
1 parent 66e04ad commit e811758
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 31 deletions.
5 changes: 5 additions & 0 deletions api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,11 @@ func (a *APISpec) Init(authStore, sessionStore, healthStore, orgStore storage.Ha
a.OrgSessionManager.Init(orgStore)
}

func (a *APISpec) StopSessionManagerPool() {
a.SessionManager.Stop()
a.OrgSessionManager.Stop()
}

func (a *APISpec) getURLStatus(stat URLStatus) RequestStatus {
switch stat {
case Ignored:
Expand Down
73 changes: 45 additions & 28 deletions auth_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type DefaultSessionManager struct {
poolSize int
shouldStop uint32
poolWG sync.WaitGroup
bufferSize int
keyPrefix string
}

type SessionUpdate struct {
Expand Down Expand Up @@ -103,15 +105,22 @@ func (b *DefaultSessionManager) Init(store storage.Handler) {
b.store.Connect()

if b.asyncWrites {
//check pool size in config and set to 50 if unset
// check pool size in config and set to 50 if unset
b.poolSize = config.Global().SessionUpdatePoolSize
if b.poolSize <= 0 {
b.poolSize = 50
}
//check size for channel buffer and set to 1000 if unset
b.bufferSize = config.Global().SessionUpdateBufferSize
if b.bufferSize <= 0 {
b.bufferSize = 1000
}

log.WithField("Auth Manager", b.poolSize).Debug("Session update async pool size")

b.updateChan = make(chan *SessionUpdate)
b.updateChan = make(chan *SessionUpdate, b.bufferSize)

b.keyPrefix = b.store.GetKeyPrefix()

//start worker pool
atomic.SwapUint32(&b.shouldStop, 0)
Expand All @@ -125,32 +134,42 @@ func (b *DefaultSessionManager) Init(store storage.Handler) {
func (b *DefaultSessionManager) updateWorker() {
defer b.poolWG.Done()

for {
//grab update object from channel
u, ok := <-b.updateChan
//if channel closed return from worker
if !ok {
return
for range b.updateChan {
// grab update object from channel
u := <-b.updateChan

v, err := json.Marshal(u.session)
if err != nil {
log.Error("Error marshalling session for async session update")
continue
}
v, _ := json.Marshal(u.session)

if u.isHashed {
u.keyVal = b.store.GetKeyPrefix() + u.keyVal
b.store.SetRawKey(u.keyVal, string(v), u.ttl)
u.keyVal = b.keyPrefix + u.keyVal
err := b.store.SetRawKey(u.keyVal, string(v), u.ttl)
if err != nil {
log.Errorf("Error updating hashed key: %v", err)
}
continue

}

b.store.SetKey(u.keyVal, string(v), u.ttl)
err = b.store.SetKey(u.keyVal, string(v), u.ttl)
if err != nil {
log.Errorf("Error updating non-hashed key: %v", err)
}
}
}

func (b *DefaultSessionManager) Stop() {
//flag to stop adding data to chan
atomic.SwapUint32(&b.shouldStop, 1)
//close update channel
close(b.updateChan)
//wait for workers to finish
b.poolWG.Wait()
if atomic.LoadUint32(&b.shouldStop) == 0 {
// flag to stop adding data to chan
atomic.SwapUint32(&b.shouldStop, 1)
// close update channel
close(b.updateChan)
// wait for workers to finish
b.poolWG.Wait()
}
}

func (b *DefaultSessionManager) Store() storage.Handler {
Expand All @@ -176,39 +195,37 @@ func (b *DefaultSessionManager) ResetQuota(keyName string, session *user.Session
// UpdateSession updates the session state in the storage engine
func (b *DefaultSessionManager) UpdateSession(keyName string, session *user.SessionState,
resetTTLTo int64, hashed bool) error {
if !session.HasChanged() {
log.Debug("Session has not changed, not updating")
return nil
}

// async update and return if needed
if b.asyncWrites {
if atomic.LoadUint32(&b.shouldStop) > 0 {
return nil
}
b.renewSessionState(keyName, session)

sessionMeta := &SessionUpdate{
sessionUpdate := &SessionUpdate{
isHashed: hashed,
keyVal: keyName,
session: session,
ttl: resetTTLTo,
}

//send sessionupdate object through channel and do hard work in the pool
b.updateChan <- sessionMeta
// send sessionupdate object through channel to pool
b.updateChan <- sessionUpdate

return nil
}

v, _ := json.Marshal(session)
v, err := json.Marshal(session)
if err != nil {
log.Error("Error marshalling session for sync update")
return err
}

if hashed {
keyName = b.store.GetKeyPrefix() + keyName
}

// sync update
var err error
if hashed {
err = b.store.SetRawKey(keyName, string(v), resetTTLTo)
} else {
Expand Down
3 changes: 3 additions & 0 deletions cli/lint/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ const confSchema = `{
"session_update_pool_size":{
"type": "integer"
},
"session_update_buffer_size":{
"type": "integer"
},
"pid_file_location": {
"type": "string"
},
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type Config struct {
HealthCheck HealthCheckConfig `json:"health_check"`
UseAsyncSessionWrite bool `json:"optimisations_use_async_session_write"`
SessionUpdatePoolSize int `json:"session_update_pool_size"`
SessionUpdateBufferSize int `json:"session_update_buffer_size"`
AllowMasterKeys bool `json:"allow_master_keys"`
HashKeys bool `json:"hash_keys"`
HashKeyFunction string `json:"hash_key_function"`
Expand Down
8 changes: 5 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,11 +999,13 @@ func main() {
analytics.Stop()
}

//if using async session writes stop workers
// if using async session writes stop workers
if config.Global().UseAsyncSessionWrite {
DefaultOrgStore.Stop()
DefaultQuotaStore.Stop()
FallbackKeySesionManager.Stop()
for i := range apiSpecs {
apiSpecs[i].StopSessionManagerPool()
}

}

// write pprof profiles
Expand Down

0 comments on commit e811758

Please sign in to comment.