Skip to content

Commit

Permalink
✨ feat: use sync.Map to replace original one
Browse files Browse the repository at this point in the history
  • Loading branch information
0xE8551CCB committed Oct 8, 2019
1 parent 55055ec commit 47100e0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 32 deletions.
20 changes: 10 additions & 10 deletions field.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
)

var (
defaultTagName = "portal"
cachedFieldTagSettings = make(map[string]map[string]string)
lockCachedFieldTagSettings sync.RWMutex
defaultTagName = "portal"
cachedFieldTagSettings sync.Map
)

type schemaField struct {
Expand All @@ -25,15 +24,16 @@ type schemaField struct {

func newField(schema *schema, field *structs.Field) *schemaField {
tagStr := field.Tag(defaultTagName)
lockCachedFieldTagSettings.RLock()
settings, ok := cachedFieldTagSettings[tagStr]
lockCachedFieldTagSettings.RUnlock()
if !ok {
lockCachedFieldTagSettings.Lock()

var settings map[string]string
cachedSettings, ok := cachedFieldTagSettings.Load(tagStr)
if ok {
result, _ := cachedSettings.(map[string]string)
settings = result
} else {
result := parseTagSettings(tagStr)
cachedFieldTagSettings[tagStr] = result
cachedFieldTagSettings.Store(tagStr, result)
settings = result
lockCachedFieldTagSettings.Unlock()
}

return &schemaField{
Expand Down
9 changes: 5 additions & 4 deletions filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
)

var (
parsedFilterResultMap sync.Map
cachedFilterResultMap sync.Map
)

type filterNode struct {
Expand Down Expand Up @@ -75,9 +75,10 @@ func parseFilterString(s string) (map[int][]*filterNode, error) {
return nil, ErrPrefixIsNotBracket
}

cachedResult, ok := parsedFilterResultMap.Load(s)
cachedResult, ok := cachedFilterResultMap.Load(s)
if ok {
return cachedResult.(map[int][]*filterNode), nil
rv, _ := cachedResult.(map[int][]*filterNode)
return rv, nil
}

// don't care about non-ascii chars.
Expand All @@ -88,7 +89,7 @@ func parseFilterString(s string) (map[int][]*filterNode, error) {
}

result := doParse(filterInBytes)
parsedFilterResultMap.Store(s, result)
cachedFilterResultMap.Store(s, result)
return result, nil
}

Expand Down
51 changes: 33 additions & 18 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ var (
// responsible for processing schema fields asynchronously.
// Note that each dumping level gets a worker pool to avoid
// dead lock.
levelWorkerPoolMap = make(map[int]*ants.PoolWithFunc)
lockLevelWorkerPoolMap sync.Mutex
levelWorkerPoolMap sync.Map
)

var (
Expand Down Expand Up @@ -55,20 +54,21 @@ func submitJobs(ctx context.Context, pf processFunc, payloads ...interface{}) (<
defer cancel()

level := dumpDepthFromContext(ctx)
workerPool, ok := levelWorkerPoolMap[level]
if !ok {
lockLevelWorkerPoolMap.Lock()

var workerPool *ants.PoolWithFunc
pool, ok := levelWorkerPoolMap.Load(level)
if ok {
workerPool = pool.(*ants.PoolWithFunc)
} else {
logger.Debugf("[portal.pool] worker pool with level %d not found, try to create a new one", level)
pool, err := ants.NewPoolWithFunc(1, processRequest)
if err != nil {
lockLevelWorkerPoolMap.Unlock()
return nil, errors.WithStack(err)
}

levelWorkerPoolMap[level] = pool
levelWorkerPoolMap.Store(level, pool)
workerPool = pool
SetMaxPoolSize(maxWorkerPoolSize)
lockLevelWorkerPoolMap.Unlock()
}

resultChan := make(chan *jobResult, len(payloads))
Expand Down Expand Up @@ -113,28 +113,43 @@ func SetMaxPoolSize(size int) {
}

maxWorkerPoolSize = size
if len(levelWorkerPoolMap) == 0 {

var length int
levelWorkerPoolMap.Range(func(key, value interface{}) bool {
length++
return true
})

if length == 0 {
return
}

// make sure capacity is valid.
capacity := size / len(levelWorkerPoolMap)
capacity := size / length
if capacity == 0 {
capacity = 1
}

for level, pool := range levelWorkerPoolMap {
logger.Debugf("[portal.pool] tune pool.%d capacity to %d", level, capacity)
pool.Tune(capacity)
}
levelWorkerPoolMap.Range(func(level, value interface{}) bool {
pool, ok := value.(*ants.PoolWithFunc)
if ok {
logger.Debugf("[portal.pool] tune pool.%d capacity to %d", level.(int), capacity)
pool.Tune(capacity)
}
return true
})
}

// CleanUp releases the global worker pool.
// You should call this function only once before the main goroutine exits.
func CleanUp() {
for _, pool := range levelWorkerPoolMap {
pool.Release()
}
levelWorkerPoolMap.Range(func(key, value interface{}) bool {
pool, ok := value.(*ants.PoolWithFunc)
if ok {
pool.Release()
}
return true
})
}

func processRequest(request interface{}) {
Expand Down Expand Up @@ -163,5 +178,5 @@ func init() {
panic(ErrFailedToInitWorkerPool)
}

levelWorkerPoolMap[0] = p
levelWorkerPoolMap.Store(0, p)
}

0 comments on commit 47100e0

Please sign in to comment.