Skip to content

Commit

Permalink
add Params
Browse files Browse the repository at this point in the history
  • Loading branch information
aceld committed Jan 26, 2024
1 parent ca41501 commit d093901
Show file tree
Hide file tree
Showing 24 changed files with 292 additions and 17 deletions.
8 changes: 7 additions & 1 deletion common/const.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package common

import "time"

// 用户生成KisId的字符串前缀
const (
KisIdTypeFlow = "flow"
Expand Down Expand Up @@ -55,6 +57,10 @@ const (
ES KisConnType = "es"
)

// cache
const (
ActionNoJump = "NoJump"
// DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间
DeFaultFlowCacheCleanUp = 5 //单位 min
// DefaultExpiration 默认GoCahce时间 ,永久保存
DefaultExpiration time.Duration = 0
)
27 changes: 27 additions & 0 deletions conn/kis_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type KisConnector struct {

// Connector Init
onceInit sync.Once

// KisConnector的自定义临时数据
metaData map[string]interface{}
// 管理metaData的读写锁
mLock sync.RWMutex
}

// NewKisConnector 根据配置策略创建一个KisConnector
Expand All @@ -27,6 +32,7 @@ func NewKisConnector(config *config.KisConnConfig) *KisConnector {
conn.CId = id.KisID(common.KisIdTypeConnnector)
conn.CName = config.CName
conn.Conf = config
conn.metaData = make(map[string]interface{})

return conn
}
Expand Down Expand Up @@ -63,3 +69,24 @@ func (conn *KisConnector) GetConfig() *config.KisConnConfig {
func (conn *KisConnector) GetId() string {
return conn.CId
}

// GetMetaData 得到当前Connector的临时数据
func (conn *KisConnector) GetMetaData(key string) interface{} {
conn.mLock.RLock()
defer conn.mLock.RUnlock()

data, ok := conn.metaData[key]
if !ok {
return nil
}

return data
}

// SetMetaData 设置当前Connector的临时数据
func (conn *KisConnector) SetMetaData(key string, value interface{}) {
conn.mLock.Lock()
defer conn.mLock.Unlock()

conn.metaData[key] = value
}
15 changes: 15 additions & 0 deletions flow/kis_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flow
import (
"context"
"errors"
"github.com/patrickmn/go-cache"
"kis-flow/common"
"kis-flow/config"
"kis-flow/conn"
Expand All @@ -11,6 +12,7 @@ import (
"kis-flow/kis"
"kis-flow/log"
"sync"
"time"
)

// KisFlow 用于贯穿整条流式计算的上下文环境
Expand Down Expand Up @@ -39,6 +41,13 @@ type KisFlow struct {
inPut common.KisRowArr // 当前Function的计算输入数据
abort bool // 是否中断Flow
action kis.Action // 当前Flow所携带的Action动作

// flow的本地缓存
cache *cache.Cache // Flow流的临时缓存上线文环境

// flow的metaData
metaData map[string]interface{} // Flow的自定义临时数据
mLock sync.RWMutex // 管理metaData的读写锁
}

// NewKisFlow 创建一个KisFlow.
Expand All @@ -58,6 +67,12 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
// 数据data
flow.data = make(common.KisDataMap)

// 初始化本地缓存
flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute)

// 初始化临时数据
flow.metaData = make(map[string]interface{})

return flow
}

Expand Down
68 changes: 68 additions & 0 deletions flow/kis_flow_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"errors"
"fmt"
"github.com/patrickmn/go-cache"
"kis-flow/common"
"kis-flow/config"
"kis-flow/log"
"time"
)

// CommitRow 提交Flow数据, 一行数据,如果是批量数据可以提交多次
Expand Down Expand Up @@ -132,3 +135,68 @@ func (flow *KisFlow) clearData(data common.KisDataMap) {
delete(data, k)
}
}

func (flow *KisFlow) GetCacheData(key string) interface{} {

if data, found := flow.cache.Get(key); found {
return data
}

return nil
}

func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
if Exp == common.DefaultExpiration {
flow.cache.Set(key, value, cache.DefaultExpiration)
} else {
flow.cache.Set(key, value, Exp)
}
}

// GetMetaData 得到当前Flow对象的临时数据
func (flow *KisFlow) GetMetaData(key string) interface{} {
flow.mLock.RLock()
defer flow.mLock.RUnlock()

data, ok := flow.metaData[key]
if !ok {
return nil
}

return data
}

// SetMetaData 设置当前Flow对象的临时数据
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
flow.mLock.Lock()
defer flow.mLock.Unlock()

flow.metaData[key] = value
}

// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
func (flow *KisFlow) GetFuncParam(key string) string {
flow.fplock.RLock()
defer flow.fplock.RUnlock()

if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
if value, vok := param[key]; vok {
return value
}
}

return ""
}

// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
func (flow *KisFlow) GetFuncParamAll() config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()

param, ok := flow.funcParams[flow.ThisFunctionId]
if !ok {
return nil
}

return param
}
38 changes: 32 additions & 6 deletions function/kis_base_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"kis-flow/config"
"kis-flow/id"
"kis-flow/kis"
"sync"
)

type BaseFunction struct {
Expand All @@ -20,6 +21,11 @@ type BaseFunction struct {
// connector
connector kis.Connector

// Function的自定义临时数据
metaData map[string]interface{}
// 管理metaData的读写锁
mLock sync.RWMutex

// link
N kis.Function //下一个流计算Function
P kis.Function //上一个流计算Function
Expand Down Expand Up @@ -120,16 +126,15 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
//工厂生产泛化对象
switch common.KisMode(config.FMode) {
case common.V:
f = new(KisFunctionV)
break
f = NewKisFunctionV()
case common.S:
f = new(KisFunctionS)
f = NewKisFunctionS()
case common.L:
f = new(KisFunctionL)
f = NewKisFunctionL()
case common.C:
f = new(KisFunctionC)
f = NewKisFunctionC()
case common.E:
f = new(KisFunctionE)
f = NewKisFunctionE()
default:
//LOG ERROR
return nil
Expand All @@ -150,3 +155,24 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {

return f
}

// GetMetaData 得到当前Function的临时数据
func (base *BaseFunction) GetMetaData(key string) interface{} {
base.mLock.RLock()
defer base.mLock.RUnlock()

data, ok := base.metaData[key]
if !ok {
return nil
}

return data
}

// SetMetaData 设置当前Function的临时数据
func (base *BaseFunction) SetMetaData(key string, value interface{}) {
base.mLock.Lock()
defer base.mLock.Unlock()

base.metaData[key] = value
}
9 changes: 9 additions & 0 deletions function/kis_function_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ type KisFunctionC struct {
BaseFunction
}

func NewKisFunctionC() kis.Function {
f := new(KisFunctionC)

// 初始化metaData
f.metaData = make(map[string]interface{})

return f
}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)

Expand Down
9 changes: 9 additions & 0 deletions function/kis_function_e.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ type KisFunctionE struct {
BaseFunction
}

func NewKisFunctionE() kis.Function {
f := new(KisFunctionE)

// 初始化metaData
f.metaData = make(map[string]interface{})

return f
}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)

Expand Down
9 changes: 9 additions & 0 deletions function/kis_function_l.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ type KisFunctionL struct {
BaseFunction
}

func NewKisFunctionL() kis.Function {
f := new(KisFunctionL)

// 初始化metaData
f.metaData = make(map[string]interface{})

return f
}

func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow)

Expand Down
9 changes: 9 additions & 0 deletions function/kis_function_s.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ type KisFunctionS struct {
BaseFunction
}

func NewKisFunctionS() kis.Function {
f := new(KisFunctionS)

// 初始化metaData
f.metaData = make(map[string]interface{})

return f
}

func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow)

Expand Down
9 changes: 9 additions & 0 deletions function/kis_function_v.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ type KisFunctionV struct {
BaseFunction
}

func NewKisFunctionV() kis.Function {
f := new(KisFunctionV)

// 初始化metaData
f.metaData = make(map[string]interface{})

return f
}

func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow)

Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module kis-flow

go 1.18

require github.com/google/uuid v1.5.0

require gopkg.in/yaml.v3 v3.0.1 // indirect
require (
github.com/google/uuid v1.5.0
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 4 additions & 0 deletions kis/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ type Connector interface {
GetName() string
// GetConfig 获取Connector的配置信息
GetConfig() *config.KisConnConfig
// GetMetaData 得到当前Connector的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Connector的临时数据
SetMetaData(key string, value interface{})
}
13 changes: 13 additions & 0 deletions kis/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"kis-flow/common"
"kis-flow/config"
"time"
)

type Flow interface {
Expand Down Expand Up @@ -31,4 +32,16 @@ type Flow interface {
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
Next(acts ...ActionFunc) error
// GetCacheData 得到当前Flow的缓存数据
GetCacheData(key string) interface{}
// SetCacheData 设置当前Flow的缓存数据
SetCacheData(key string, value interface{}, Exp time.Duration)
// GetMetaData 得到当前Flow的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Flow的临时数据
SetMetaData(key string, value interface{})
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
GetFuncParam(key string) string
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
GetFuncParamAll() config.FParam
}
4 changes: 4 additions & 0 deletions kis/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ type Function interface {
SetN(f Function)
// SetP 设置上一层Function实例
SetP(f Function)
// GetMetaData 得到当前Function的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Function的临时数据
SetMetaData(key string, value interface{})
}
Loading

0 comments on commit d093901

Please sign in to comment.