Skip to content

Commit

Permalink
feat: add utils.SyncMap with genericType key and value to replace val…
Browse files Browse the repository at this point in the history
…ues control by mutex.
  • Loading branch information
xufeixiang committed Jul 2, 2024
1 parent 30fd81f commit 727c08c
Show file tree
Hide file tree
Showing 24 changed files with 198 additions and 202 deletions.
7 changes: 3 additions & 4 deletions pkg/api/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (
"github.com/wundergraph/wundergraph/pkg/wgpb"
"net/http"
"strings"
"sync"
)

func DatasourceExtraRouter(_, datasourceRouter *echo.Group, baseHandler *base.Handler[models.Datasource], modelRoot *fileloader.Model[models.Datasource]) {
handler := &datasource{
baseHandler: baseHandler,
modelRoot: modelRoot, modelName: modelRoot.GetModelName(),
queryEngines: &sync.Map{},
queryEngines: &utils.SyncMap[string, *engineClient.QueryEngine]{},
}
base.AddRouterMetas(modelRoot,
datasourceRouter.POST("/checkConnection", handler.checkConnection),
Expand All @@ -49,7 +48,7 @@ type (
baseHandler *base.Handler[models.Datasource]
modelRoot *fileloader.Model[models.Datasource]
modelName string
queryEngines *sync.Map
queryEngines *utils.SyncMap[string, *engineClient.QueryEngine]
}
datasourcePing struct {
models.Datasource
Expand Down Expand Up @@ -262,7 +261,7 @@ func (d *datasource) graphqlQuery(c echo.Context) (err error) {
return
}

queryEngine, ok := utils.LoadFromSyncMap[*engineClient.QueryEngine](d.queryEngines, data.Name)
queryEngine, ok := d.queryEngines.Load(data.Name)
if !ok {
var prismaSchema string
prismaSchema, err = engineDatasource.CachePrismaSchemaText.Read(data.Name)
Expand Down
14 changes: 8 additions & 6 deletions pkg/api/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,16 @@ func (o *operation) bindRoles(c echo.Context) (err error) {
Children: children,
},
}}}
user := o.baseHandler.GetUser(c)
var succeedPaths []string
var graphqlText string
var queryItem *build.QueryDocumentItem
var (
succeedPaths []string
graphqlText string
queryItem *build.QueryDocumentItem
user = o.baseHandler.GetUser(c)
)
for _, item := range operations {
dataName := o.modelRoot.GetDataName(item)
itemResult := models.LoadOperationResult(dataName)
if itemResult == nil {
itemResult, itemOk := models.OperationResultMap.Load(dataName)
if !itemOk {
continue
}

Expand Down
21 changes: 9 additions & 12 deletions pkg/common/configs/logger_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
package configs

import (
"fireboom-server/pkg/common/utils"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"
"sync"
)

var (
logCollectors []*LogCollector
collectorMutex = &sync.Mutex{}
logCollectors = &utils.SyncMap[*LogCollector, bool]{}
AddFileLoaderQuestionCollector func(string, func(string) map[string]any)
)

Expand All @@ -26,10 +25,7 @@ type LogCollector struct {
}

func AddCollector(c *LogCollector) {
collectorMutex.Lock()
defer collectorMutex.Unlock()

logCollectors = append(logCollectors, c)
logCollectors.Store(c, true)
}

func analysis(entry zapcore.Entry, fields []zapcore.Field) error {
Expand All @@ -47,19 +43,20 @@ func analysis(entry zapcore.Entry, fields []zapcore.Field) error {
return nil
}

func handlerCollectors(entry zapcore.Entry, collectors []*LogCollector, fieldMap map[string]*zapcore.Field) {
for _, collector := range collectors {
func handlerCollectors(entry zapcore.Entry, collectors *utils.SyncMap[*LogCollector, bool], fieldMap map[string]*zapcore.Field) {
collectors.Range(func(collector *LogCollector, _ bool) bool {
if !slices.Contains(collector.MatchLevel, entry.Level) {
continue
return true
}

value, ok := fieldMap[collector.IdentifyField]
if !ok {
continue
return true
}

delete(fieldMap, value.Key)
result := collector.Handle(entry, value, fieldMap)
WebsocketInstance.WriteWsMsgBodyForAll(result)
}
return true
})
}
23 changes: 9 additions & 14 deletions pkg/common/configs/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ type (

type (
Websocket struct {
Conns []*WebsocketConn
*sync.Mutex
Conns utils.SyncMap[*WebsocketConn, bool]
}
WebsocketConn struct {
Conn *websocket.Conn
Expand All @@ -53,18 +52,14 @@ func (ws *Websocket) Write(p []byte) (n int, err error) {
}

func (ws *Websocket) WriteWsMsgBodyForAll(body *WsMsgBody) {
if len(ws.Conns) == 0 {
return
}

ws.Lock()
defer ws.Unlock()

bodyBytes, _ := json.Marshal(body)
for _, item := range ws.Conns {
var bodyBytes []byte
ws.Conns.Range(func(item *WebsocketConn, _ bool) bool {
if bodyBytes == nil {
bodyBytes, _ = json.Marshal(body)
}
item.WriteMessage(bodyBytes)
}
return
return true
})
}

func (ws *WebsocketConn) WriteMessage(bodyBytes []byte) {
Expand All @@ -90,7 +85,7 @@ var (

func init() {
utils.RegisterInitMethod(12, func() {
WebsocketInstance = &Websocket{Mutex: &sync.Mutex{}}
WebsocketInstance = &Websocket{}
addLoggerWriteSyncer(WebsocketInstance)
})
}
16 changes: 1 addition & 15 deletions pkg/common/models/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/wundergraph/wundergraph/pkg/wgpb"
"net/http"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -62,22 +61,9 @@ const fieldOriginContent = "originContent"
var (
OperationRoot *fileloader.Model[Operation]
OperationMethodMap map[wgpb.OperationType]string
operationResultMap sync.Map
OperationResultMap utils.SyncMap[string, *wgpb.Operation]
)

func StoreOperationResult(path string, result *wgpb.Operation) {
operationResultMap.Store(path, result)
}

func LoadOperationResult(path string) *wgpb.Operation {
value, ok := operationResultMap.Load(path)
if !ok {
return nil
}

return value.(*wgpb.Operation)
}

func init() {
OperationMethodMap = map[wgpb.OperationType]string{
wgpb.OperationType_QUERY: http.MethodGet,
Expand Down
19 changes: 6 additions & 13 deletions pkg/common/utils/init_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ package utils

import (
"fireboom-server/pkg/plugins/i18n"
"sync"

"golang.org/x/exp/slices"
)

var (
initMethods []*initMethod
initMutex = &sync.Mutex{}
)
var initMethods SyncMap[*initMethod, bool]

type initMethod struct {
order int
Expand All @@ -21,23 +16,21 @@ type initMethod struct {
// RegisterInitMethod 注册初始化函数,使得原本不可控的init函数得以按顺序执行
// 编排系统启动时的初始化函数
func RegisterInitMethod(order int, method func()) {
initMutex.Lock()
defer initMutex.Unlock()

initMethods = append(initMethods, &initMethod{
initMethods.Store(&initMethod{
order: order,
method: method,
caller: i18n.GetCallerMode(),
})
}, true)
}

// ExecuteInitMethods 执行初始化函数,order优先,再按照caller排序
func ExecuteInitMethods() {
slices.SortFunc(initMethods, func(a, b *initMethod) bool {
inits := initMethods.Keys()
slices.SortFunc(inits, func(a, b *initMethod) bool {
return a.order < b.order || a.order == b.order && a.caller < b.caller
})

for _, item := range initMethods {
for _, item := range inits {
item.method()
}
}
95 changes: 88 additions & 7 deletions pkg/common/utils/sync_map.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,100 @@
package utils

import "sync"
import (
json "github.com/json-iterator/go"
"sync"
)

// LoadFromSyncMap 根据范型转换sync.Map的value
// 解决sync.Map返回的值没有具体类型的问题
func LoadFromSyncMap[T any](syncMap *sync.Map, key string) (v T, ok bool) {
if syncMap == nil {
type SyncMap[K comparable, V any] struct {
sync.Map
}

func (s *SyncMap[K, V]) MarshalJSON() ([]byte, error) {
if s == nil {
return nil, nil
}

return json.Marshal(s.ToMap())
}

func (s *SyncMap[K, V]) UnmarshalJSON(data []byte) error {
dataMap := make(map[K]V)
if err := json.Unmarshal(data, &dataMap); err != nil {
return err
}

*s = SyncMap[K, V]{}
for k, v := range dataMap {
s.Map.Store(k, v)
}
return nil
}

func (s *SyncMap[K, V]) Load(key K) (v V, ok bool) {
data, ok := s.Map.Load(key)
if !ok {
return
}

data, ok := syncMap.Load(key)
v, ok = data.(V)
return
}

func (s *SyncMap[K, V]) LoadOrStore(key K, value V) (v V, ok bool) {
data, ok := s.Map.LoadOrStore(key, value)
if !ok {
return
}

v, ok = data.(T)
v, ok = data.(V)
return
}

func (s *SyncMap[K, V]) Store(key K, value V) {
s.Map.Store(key, value)
}

func (s *SyncMap[K, V]) Range(f func(key K, value V) bool) {
s.Map.Range(func(key, value any) bool {
keyStr, keyOk := key.(K)
valObj, valOk := value.(V)
if !keyOk || !valOk {
return false
}
return f(keyStr, valObj)
})
}

func (s *SyncMap[K, V]) ToMap() (dataMap map[K]V) {
dataMap = make(map[K]V)
s.Range(func(key K, value V) bool {
dataMap[key] = value
return true
})
return
}

func (s *SyncMap[K, V]) Keys() (keys []K) {
keys = make([]K, 0)
s.Range(func(key K, _ V) bool {
keys = append(keys, key)
return true
})
return
}

func (s *SyncMap[K, V]) FirstValue() (v V) {
s.Range(func(_ K, value V) bool {
v = value
return false
})
return
}

func (s *SyncMap[K, V]) Clear() (keys []K) {
s.Range(func(key K, _ V) bool {
s.Map.Delete(key)
return true
})
return
}
Loading

0 comments on commit 727c08c

Please sign in to comment.