diff --git a/common/const.go b/common/const.go index e81df25..d9d2901 100644 --- a/common/const.go +++ b/common/const.go @@ -1,5 +1,7 @@ package common +import "time" + // 用户生成KisId的字符串前缀 const ( KisIdTypeFlow = "flow" @@ -55,6 +57,10 @@ const ( ES KisConnType = "es" ) +// cache const ( - ActionNoJump = "NoJump" + // DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间 + DeFaultFlowCacheCleanUp = 5 //单位 min + // DefaultExpiration 默认GoCahce时间 ,永久保存 + DefaultExpiration time.Duration = 0 ) diff --git a/conn/kis_connector.go b/conn/kis_connector.go index 23acaa1..df48d45 100644 --- a/conn/kis_connector.go +++ b/conn/kis_connector.go @@ -19,6 +19,11 @@ type KisConnector struct { // Connector Init onceInit sync.Once + + // KisConnector的自定义临时数据 + metaData map[string]interface{} + // 管理metaData的读写锁 + mLock sync.RWMutex } // NewKisConnector 根据配置策略创建一个KisConnector @@ -27,6 +32,7 @@ func NewKisConnector(config *config.KisConnConfig) *KisConnector { conn.CId = id.KisID(common.KisIdTypeConnnector) conn.CName = config.CName conn.Conf = config + conn.metaData = make(map[string]interface{}) return conn } @@ -63,3 +69,24 @@ func (conn *KisConnector) GetConfig() *config.KisConnConfig { func (conn *KisConnector) GetId() string { return conn.CId } + +// GetMetaData 得到当前Connector的临时数据 +func (conn *KisConnector) GetMetaData(key string) interface{} { + conn.mLock.RLock() + defer conn.mLock.RUnlock() + + data, ok := conn.metaData[key] + if !ok { + return nil + } + + return data +} + +// SetMetaData 设置当前Connector的临时数据 +func (conn *KisConnector) SetMetaData(key string, value interface{}) { + conn.mLock.Lock() + defer conn.mLock.Unlock() + + conn.metaData[key] = value +} diff --git a/flow/kis_flow.go b/flow/kis_flow.go index f6baa56..a883670 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -3,6 +3,7 @@ package flow import ( "context" "errors" + "github.com/patrickmn/go-cache" "kis-flow/common" "kis-flow/config" "kis-flow/conn" @@ -11,6 +12,7 @@ import ( "kis-flow/kis" "kis-flow/log" "sync" + "time" ) // KisFlow 用于贯穿整条流式计算的上下文环境 @@ -39,6 +41,13 @@ type KisFlow struct { inPut common.KisRowArr // 当前Function的计算输入数据 abort bool // 是否中断Flow action kis.Action // 当前Flow所携带的Action动作 + + // flow的本地缓存 + cache *cache.Cache // Flow流的临时缓存上线文环境 + + // flow的metaData + metaData map[string]interface{} // Flow的自定义临时数据 + mLock sync.RWMutex // 管理metaData的读写锁 } // NewKisFlow 创建一个KisFlow. @@ -58,6 +67,12 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { // 数据data flow.data = make(common.KisDataMap) + // 初始化本地缓存 + flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute) + + // 初始化临时数据 + flow.metaData = make(map[string]interface{}) + return flow } diff --git a/flow/kis_flow_data.go b/flow/kis_flow_data.go index b5adf5d..dd223ab 100644 --- a/flow/kis_flow_data.go +++ b/flow/kis_flow_data.go @@ -4,8 +4,11 @@ import ( "context" "errors" "fmt" + "github.com/patrickmn/go-cache" "kis-flow/common" + "kis-flow/config" "kis-flow/log" + "time" ) // CommitRow 提交Flow数据, 一行数据,如果是批量数据可以提交多次 @@ -132,3 +135,68 @@ func (flow *KisFlow) clearData(data common.KisDataMap) { delete(data, k) } } + +func (flow *KisFlow) GetCacheData(key string) interface{} { + + if data, found := flow.cache.Get(key); found { + return data + } + + return nil +} + +func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) { + if Exp == common.DefaultExpiration { + flow.cache.Set(key, value, cache.DefaultExpiration) + } else { + flow.cache.Set(key, value, Exp) + } +} + +// GetMetaData 得到当前Flow对象的临时数据 +func (flow *KisFlow) GetMetaData(key string) interface{} { + flow.mLock.RLock() + defer flow.mLock.RUnlock() + + data, ok := flow.metaData[key] + if !ok { + return nil + } + + return data +} + +// SetMetaData 设置当前Flow对象的临时数据 +func (flow *KisFlow) SetMetaData(key string, value interface{}) { + flow.mLock.Lock() + defer flow.mLock.Unlock() + + flow.metaData[key] = value +} + +// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value +func (flow *KisFlow) GetFuncParam(key string) string { + flow.fplock.RLock() + defer flow.fplock.RUnlock() + + if param, ok := flow.funcParams[flow.ThisFunctionId]; ok { + if value, vok := param[key]; vok { + return value + } + } + + return "" +} + +// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value +func (flow *KisFlow) GetFuncParamAll() config.FParam { + flow.fplock.RLock() + defer flow.fplock.RUnlock() + + param, ok := flow.funcParams[flow.ThisFunctionId] + if !ok { + return nil + } + + return param +} diff --git a/function/kis_base_function.go b/function/kis_base_function.go index b004168..b18b3bc 100644 --- a/function/kis_base_function.go +++ b/function/kis_base_function.go @@ -7,6 +7,7 @@ import ( "kis-flow/config" "kis-flow/id" "kis-flow/kis" + "sync" ) type BaseFunction struct { @@ -20,6 +21,11 @@ type BaseFunction struct { // connector connector kis.Connector + // Function的自定义临时数据 + metaData map[string]interface{} + // 管理metaData的读写锁 + mLock sync.RWMutex + // link N kis.Function //下一个流计算Function P kis.Function //上一个流计算Function @@ -120,16 +126,15 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function { //工厂生产泛化对象 switch common.KisMode(config.FMode) { case common.V: - f = new(KisFunctionV) - break + f = NewKisFunctionV() case common.S: - f = new(KisFunctionS) + f = NewKisFunctionS() case common.L: - f = new(KisFunctionL) + f = NewKisFunctionL() case common.C: - f = new(KisFunctionC) + f = NewKisFunctionC() case common.E: - f = new(KisFunctionE) + f = NewKisFunctionE() default: //LOG ERROR return nil @@ -150,3 +155,24 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function { return f } + +// GetMetaData 得到当前Function的临时数据 +func (base *BaseFunction) GetMetaData(key string) interface{} { + base.mLock.RLock() + defer base.mLock.RUnlock() + + data, ok := base.metaData[key] + if !ok { + return nil + } + + return data +} + +// SetMetaData 设置当前Function的临时数据 +func (base *BaseFunction) SetMetaData(key string, value interface{}) { + base.mLock.Lock() + defer base.mLock.Unlock() + + base.metaData[key] = value +} diff --git a/function/kis_function_c.go b/function/kis_function_c.go index 4bb870e..ff64157 100644 --- a/function/kis_function_c.go +++ b/function/kis_function_c.go @@ -10,6 +10,15 @@ type KisFunctionC struct { BaseFunction } +func NewKisFunctionC() kis.Function { + f := new(KisFunctionC) + + // 初始化metaData + f.metaData = make(map[string]interface{}) + + return f +} + func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow) diff --git a/function/kis_function_e.go b/function/kis_function_e.go index 88b1e4c..3fa7533 100644 --- a/function/kis_function_e.go +++ b/function/kis_function_e.go @@ -10,6 +10,15 @@ type KisFunctionE struct { BaseFunction } +func NewKisFunctionE() kis.Function { + f := new(KisFunctionE) + + // 初始化metaData + f.metaData = make(map[string]interface{}) + + return f +} + func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow) diff --git a/function/kis_function_l.go b/function/kis_function_l.go index 9149e7b..68ec19c 100644 --- a/function/kis_function_l.go +++ b/function/kis_function_l.go @@ -10,6 +10,15 @@ type KisFunctionL struct { BaseFunction } +func NewKisFunctionL() kis.Function { + f := new(KisFunctionL) + + // 初始化metaData + f.metaData = make(map[string]interface{}) + + return f +} + func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow) diff --git a/function/kis_function_s.go b/function/kis_function_s.go index d9a9516..8e467eb 100644 --- a/function/kis_function_s.go +++ b/function/kis_function_s.go @@ -10,6 +10,15 @@ type KisFunctionS struct { BaseFunction } +func NewKisFunctionS() kis.Function { + f := new(KisFunctionS) + + // 初始化metaData + f.metaData = make(map[string]interface{}) + + return f +} + func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow) diff --git a/function/kis_function_v.go b/function/kis_function_v.go index aa2eb44..14d5cf7 100644 --- a/function/kis_function_v.go +++ b/function/kis_function_v.go @@ -10,6 +10,15 @@ type KisFunctionV struct { BaseFunction } +func NewKisFunctionV() kis.Function { + f := new(KisFunctionV) + + // 初始化metaData + f.metaData = make(map[string]interface{}) + + return f +} + func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow) diff --git a/go.mod b/go.mod index 30791b8..6231234 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module kis-flow go 1.18 -require github.com/google/uuid v1.5.0 - -require gopkg.in/yaml.v3 v3.0.1 // indirect +require ( + github.com/google/uuid v1.5.0 + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/kis/connector.go b/kis/connector.go index c01b458..ea9031f 100644 --- a/kis/connector.go +++ b/kis/connector.go @@ -16,4 +16,8 @@ type Connector interface { GetName() string // GetConfig 获取Connector的配置信息 GetConfig() *config.KisConnConfig + // GetMetaData 得到当前Connector的临时数据 + GetMetaData(key string) interface{} + // SetMetaData 设置当前Connector的临时数据 + SetMetaData(key string, value interface{}) } diff --git a/kis/flow.go b/kis/flow.go index 86d7f47..ddd8a43 100644 --- a/kis/flow.go +++ b/kis/flow.go @@ -4,6 +4,7 @@ import ( "context" "kis-flow/common" "kis-flow/config" + "time" ) type Flow interface { @@ -31,4 +32,16 @@ type Flow interface { GetFuncConfigByName(funcName string) *config.KisFuncConfig // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作 Next(acts ...ActionFunc) error + // GetCacheData 得到当前Flow的缓存数据 + GetCacheData(key string) interface{} + // SetCacheData 设置当前Flow的缓存数据 + SetCacheData(key string, value interface{}, Exp time.Duration) + // GetMetaData 得到当前Flow的临时数据 + GetMetaData(key string) interface{} + // SetMetaData 设置当前Flow的临时数据 + SetMetaData(key string, value interface{}) + // GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value + GetFuncParam(key string) string + // GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value + GetFuncParamAll() config.FParam } diff --git a/kis/function.go b/kis/function.go index 24d6738..7dcc8b4 100644 --- a/kis/function.go +++ b/kis/function.go @@ -43,4 +43,8 @@ type Function interface { SetN(f Function) // SetP 设置上一层Function实例 SetP(f Function) + // GetMetaData 得到当前Function的临时数据 + GetMetaData(key string) interface{} + // SetMetaData 设置当前Function的临时数据 + SetMetaData(key string, value interface{}) } diff --git a/kis/pool.go b/kis/pool.go index 75e275d..b1a9b51 100644 --- a/kis/pool.go +++ b/kis/pool.go @@ -22,9 +22,8 @@ type kisPool struct { cInitRouter connInitRouter // 全部的Connector初始化路由 ciLock sync.RWMutex // cInitRouter 锁 - cTree connTree //全部Connector管理路由 - connectors map[string]Connector // 全部的Connector对象 - cLock sync.RWMutex // cTree 锁 + cTree connTree // 全部Connector管理路由 + cLock sync.RWMutex // cTree 锁 } // 单例 @@ -45,7 +44,6 @@ func Pool() *kisPool { // connTree初始化 _pool.cTree = make(connTree) _pool.cInitRouter = make(connInitRouter) - _pool.connectors = make(map[string]Connector) }) return _pool diff --git a/test/caas/caas_demo1.go b/test/caas/caas_demo1.go index 8a258b8..23b5958 100644 --- a/test/caas/caas_demo1.go +++ b/test/caas/caas_demo1.go @@ -12,6 +12,8 @@ func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, f fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n", flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode) + fmt.Printf("Params = %+v\n", conn.GetConfig().Params) + fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args) return nil diff --git a/test/faas/faas_demo1.go b/test/faas/faas_demo1.go index 1d1c47e..5935a08 100644 --- a/test/faas/faas_demo1.go +++ b/test/faas/faas_demo1.go @@ -10,6 +10,7 @@ import ( func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName1Handler ----") + fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for index, row := range flow.Input() { // 打印数据 diff --git a/test/faas/faas_demo2.go b/test/faas/faas_demo2.go index 20431ab..2e648ef 100644 --- a/test/faas/faas_demo2.go +++ b/test/faas/faas_demo2.go @@ -11,6 +11,7 @@ import ( func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName2Handler ----") + fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for index, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) diff --git a/test/faas/faas_demo3.go b/test/faas/faas_demo3.go index 54cd010..635580b 100644 --- a/test/faas/faas_demo3.go +++ b/test/faas/faas_demo3.go @@ -10,6 +10,7 @@ import ( func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName3Handler ----") + fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) diff --git a/test/kis_params_test.go b/test/kis_params_test.go new file mode 100644 index 0000000..8ee0ef4 --- /dev/null +++ b/test/kis_params_test.go @@ -0,0 +1,42 @@ +package test + +import ( + "context" + "kis-flow/common" + "kis-flow/file" + "kis-flow/kis" + "kis-flow/test/caas" + "kis-flow/test/faas" + "testing" +) + +func TestParams(t *testing.T) { + ctx := context.Background() + + // 0. 注册Function 回调业务 + kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) + kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) + kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) + + // 0. 注册ConnectorInit 和 Connector 回调业务 + kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) + kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) + + // 1. 加载配置文件并构建Flow + if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { + panic(err) + } + + // 2. 获取Flow + flow1 := kis.Pool().GetFlow("flowName1") + + // 3. 提交原始数据 + _ = flow1.CommitRow("This is Data1 from Test") + _ = flow1.CommitRow("This is Data2 from Test") + _ = flow1.CommitRow("This is Data3 from Test") + + // 4. 执行flow1 + if err := flow1.Run(ctx); err != nil { + panic(err) + } +} diff --git a/test/load_conf/flow/flow-FlowName1.yml b/test/load_conf/flow/flow-FlowName1.yml index dcc97d2..5b1ca30 100644 --- a/test/load_conf/flow/flow-FlowName1.yml +++ b/test/load_conf/flow/flow-FlowName1.yml @@ -3,5 +3,14 @@ status: 1 flow_name: flowName1 flows: - fname: funcName1 + params: + myKey1: flowValue1-1 + myKey2: flowValue1-2 - fname: funcName2 + params: + myKey1: flowValue2-1 + myKey2: flowValue2-2 - fname: funcName3 + params: + myKey1: flowValue3-1 + myKey2: flowValue3-2 diff --git a/test/load_conf/func/func-FuncName1.yml b/test/load_conf/func/func-FuncName1.yml index a9c0c49..f1148ca 100644 --- a/test/load_conf/func/func-FuncName1.yml +++ b/test/load_conf/func/func-FuncName1.yml @@ -5,4 +5,8 @@ source: name: 公众号抖音商城户订单数据 must: - order_id - - user_id \ No newline at end of file + - user_id +option: + default_params: + default1: funcName1_param1 + default2: funcName1_param2 diff --git a/test/load_conf/func/func-FuncName2.yml b/test/load_conf/func/func-FuncName2.yml index f1589d7..e45eaa7 100644 --- a/test/load_conf/func/func-FuncName2.yml +++ b/test/load_conf/func/func-FuncName2.yml @@ -7,4 +7,7 @@ source: - order_id - user_id option: - cname: ConnName1 \ No newline at end of file + cname: ConnName1 + default_params: + default1: funcName2_param1 + default2: funcName2_param2 diff --git a/test/load_conf/func/func-FuncName3.yml b/test/load_conf/func/func-FuncName3.yml index 98f2fc1..9e16e8b 100644 --- a/test/load_conf/func/func-FuncName3.yml +++ b/test/load_conf/func/func-FuncName3.yml @@ -5,4 +5,8 @@ source: name: 用户订单错误率 must: - order_id - - user_id \ No newline at end of file + - user_id +option: + default_params: + default1: funcName3_param1 + default2: funcName3_param2