Skip to content

Commit

Permalink
switch to using worker pool for async session updates
Browse files Browse the repository at this point in the history
  • Loading branch information
joshblakeley committed Jun 20, 2018
1 parent 058d561 commit 66e04ad
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 10 deletions.
97 changes: 88 additions & 9 deletions auth_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/base64"
"encoding/json"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/satori/go.uuid"
Expand Down Expand Up @@ -35,6 +37,7 @@ type SessionHandler interface {
Sessions(filter string) []string
Store() storage.Handler
ResetQuota(string, *user.SessionState)
Stop()
}

// DefaultAuthorisationManager implements AuthorisationHandler,
Expand All @@ -44,8 +47,20 @@ type DefaultAuthorisationManager struct {
}

type DefaultSessionManager struct {
store storage.Handler
asyncWrites bool
store storage.Handler
asyncWrites bool
disableCacheSessionState bool
updateChan chan *SessionUpdate
poolSize int
shouldStop uint32
poolWG sync.WaitGroup
}

type SessionUpdate struct {
isHashed bool
keyVal string
session *user.SessionState
ttl int64
}

func (b *DefaultAuthorisationManager) Init(store storage.Handler) {
Expand Down Expand Up @@ -86,6 +101,56 @@ func (b *DefaultSessionManager) Init(store storage.Handler) {
b.asyncWrites = config.Global().UseAsyncSessionWrite
b.store = store
b.store.Connect()

if b.asyncWrites {
//check pool size in config and set to 50 if unset
b.poolSize = config.Global().SessionUpdatePoolSize
if b.poolSize <= 0 {
b.poolSize = 50
}

log.WithField("Auth Manager", b.poolSize).Debug("Session update async pool size")

b.updateChan = make(chan *SessionUpdate)

//start worker pool
atomic.SwapUint32(&b.shouldStop, 0)
for i := 0; i < b.poolSize; i++ {
b.poolWG.Add(1)
go b.updateWorker()
}
}
}

func (b *DefaultSessionManager) updateWorker() {
defer b.poolWG.Done()

for {
//grab update object from channel
u, ok := <-b.updateChan
//if channel closed return from worker
if !ok {
return
}
v, _ := json.Marshal(u.session)

if u.isHashed {
u.keyVal = b.store.GetKeyPrefix() + u.keyVal
b.store.SetRawKey(u.keyVal, string(v), u.ttl)

}

b.store.SetKey(u.keyVal, string(v), u.ttl)
}
}

func (b *DefaultSessionManager) Stop() {
//flag to stop adding data to chan
atomic.SwapUint32(&b.shouldStop, 1)
//close update channel
close(b.updateChan)
//wait for workers to finish
b.poolWG.Wait()
}

func (b *DefaultSessionManager) Store() storage.Handler {
Expand All @@ -111,23 +176,37 @@ func (b *DefaultSessionManager) ResetQuota(keyName string, session *user.Session
// UpdateSession updates the session state in the storage engine
func (b *DefaultSessionManager) UpdateSession(keyName string, session *user.SessionState,
resetTTLTo int64, hashed bool) error {
v, _ := json.Marshal(session)

if hashed {
keyName = b.store.GetKeyPrefix() + keyName
if !session.HasChanged() {
log.Debug("Session has not changed, not updating")
return nil
}

// async update and return if needed
if b.asyncWrites {
if hashed {
go b.store.SetRawKey(keyName, string(v), resetTTLTo)
if atomic.LoadUint32(&b.shouldStop) > 0 {
return nil
}
b.renewSessionState(keyName, session)

sessionMeta := &SessionUpdate{
isHashed: hashed,
keyVal: keyName,
session: session,
ttl: resetTTLTo,
}

//send sessionupdate object through channel and do hard work in the pool
b.updateChan <- sessionMeta

go b.store.SetKey(keyName, string(v), resetTTLTo)
return nil
}

v, _ := json.Marshal(session)

if hashed {
keyName = b.store.GetKeyPrefix() + keyName
}

// sync update
var err error
if hashed {
Expand Down
3 changes: 3 additions & 0 deletions cli/lint/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ const confSchema = `{
"optimisations_use_async_session_write": {
"type": "boolean"
},
"session_update_pool_size":{
"type": "integer"
},
"pid_file_location": {
"type": "string"
},
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ type Config struct {
AnalyticsConfig AnalyticsConfigConfig `json:"analytics_config"`
HealthCheck HealthCheckConfig `json:"health_check"`
UseAsyncSessionWrite bool `json:"optimisations_use_async_session_write"`
SessionUpdatePoolSize int `json:"session_update_pool_size"`
AllowMasterKeys bool `json:"allow_master_keys"`
HashKeys bool `json:"hash_keys"`
HashKeyFunction string `json:"hash_key_function"`
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,13 @@ func main() {
analytics.Stop()
}

//if using async session writes stop workers
if config.Global().UseAsyncSessionWrite {
DefaultOrgStore.Stop()
DefaultQuotaStore.Stop()
FallbackKeySesionManager.Stop()
}

// write pprof profiles
writeProfiles()

Expand Down
2 changes: 1 addition & 1 deletion policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type dummySessionManager struct {
DefaultSessionManager
}

func (dummySessionManager) UpdateSession(key string, sess *user.SessionState, ttl int64, hashed bool) error {
func (*dummySessionManager) UpdateSession(key string, sess *user.SessionState, ttl int64, hashed bool) error {
return nil
}

Expand Down

0 comments on commit 66e04ad

Please sign in to comment.