diff --git a/README.MD b/README.MD index b4b9a7d..9e3274c 100644 --- a/README.MD +++ b/README.MD @@ -1,5 +1,14 @@ # Rule Engine Golang [![Go](https://github.com/dipper-iot/dipper-engine/actions/workflows/go.yml/badge.svg?branch=main)](https://github.com/dipper-iot/dipper-engine/actions/workflows/go.yml) [![CodeQL](https://github.com/dipper-iot/dipper-engine/actions/workflows/codeql.yml/badge.svg?branch=main)](https://github.com/dipper-iot/dipper-engine/actions/workflows/codeql.yml) [![Coverage Status](https://coveralls.io/repos/github/dipper-iot/dipper-engine/badge.svg?branch=main)](https://coveralls.io/github/dipper-iot/dipper-engine?branch=main) + + + + * [Setup](#setup) + * [Run](#run) + * [Rule Engine](#rule-engine) + * [Example Developer Test](#example-developer-test) + + ## Setup ```shell @@ -41,18 +50,36 @@ Start Dipper Engine Rules: 5 ----------------------------------------------------------- -No Rule Name Worker Status -1 log-core 1 enable -2 arithmetic 1 enable -3 fork 1 enable -4 conditional 1 enable -5 switch 1 enable +No Rule Name Worker Infinity Status +1 arithmetic 1 false enable +2 conditional 1 false enable +3 fork 1 false enable +4 input-redis-queue 0 true disable +5 input-redis-queue-extend 1 true enable +6 log-core 1 false enable +7 output-redis-queue 0 false disable +8 output-redis-queue-extend 1 false enable +9 switch 1 false enable ----------------------------------------------------------- Running Engine... ``` +## Rule Engine + +| No | Rule | Description | Infinity | Doc | +|:-----:|:-----------------------------------|:---------------------------------|:--------:|-----| +| 1 | arithmetic | operator match rule | false | | +| 2 | conditional | compare data rule | false | | +| 3 | fork | fork to rules | false | | +| 4 | input-redis-queue | input data from queue on config | true | | +| 5 | input-redis-queue-extend | input data from queue on option | true | | +| 6 | log-core | log to console | false | | +| 7 | output-redis-queue | output data from queue on config | false | | +| 8 | output-redis-queue-extend | output data from queue on option | false | | +| 9 | switch | switch to rules | false | | + ## Example Developer Test ```golang diff --git a/config.json b/config.json index 03b28ca..a3afdc9 100644 --- a/config.json +++ b/config.json @@ -24,6 +24,32 @@ "enable": true, "worker": 1, "options": {} + }, + "input-redis-queue": { + "enable": false, + "worker": 1, + "options": { + "redis_address": "127.0.0.1:6379", + "redis_db": 0 + } + }, + "input-redis-queue-extend": { + "enable": true, + "worker": 1, + "options": {} + }, + "output-redis-queue": { + "enable": false, + "worker": 1, + "options": { + "redis_address": "127.0.0.1:6379", + "redis_db": 0 + } + }, + "output-redis-queue-extend": { + "enable": true, + "worker": 1, + "options": {} } }, "log": { diff --git a/core/control.go b/core/control.go new file mode 100644 index 0000000..e723b17 --- /dev/null +++ b/core/control.go @@ -0,0 +1,49 @@ +package core + +type SessionControl interface { + ListSession() []uint64 + StopSession(id uint64) + InfoSession(id uint64) map[string]interface{} +} + +func (d *DipperEngine) ListControl() []string { + list := make([]string, 0) + + for name, _ := range d.mapSessionControl { + list = append(list, name) + } + return list +} + +func (d *DipperEngine) ControlSession(ruleName string) []uint64 { + + rule, ok := d.mapSessionControl[ruleName] + if ok { + return rule.ListSession() + } + return []uint64{} +} + +func (d *DipperEngine) ControlGetRule(session uint64) []string { + listRule := make([]string, 0) + for ruleId, control := range d.mapSessionControl { + listSession := control.ListSession() + for _, name := range listSession { + if name == session { + listRule = append(listRule, ruleId) + break + } + } + } + + return listRule +} + +func (d *DipperEngine) ControlStopSession(ruleName string, session uint64) { + + rule, ok := d.mapSessionControl[ruleName] + if ok { + rule.StopSession(session) + } + return +} diff --git a/core/dipper.go b/core/dipper.go index 35d1702..1aa50c3 100644 --- a/core/dipper.go +++ b/core/dipper.go @@ -2,16 +2,11 @@ package core import ( "context" - "fmt" bus2 "github.com/dipper-iot/dipper-engine/bus" "github.com/dipper-iot/dipper-engine/data" - "github.com/dipper-iot/dipper-engine/errors" "github.com/dipper-iot/dipper-engine/queue" "github.com/dipper-iot/dipper-engine/store" log "github.com/sirupsen/logrus" - "os" - "text/tabwriter" - "time" ) type DipperEngine struct { @@ -19,6 +14,7 @@ type DipperEngine struct { cancel context.CancelFunc config *ConfigEngine mapRule map[string]Rule + mapSessionControl map[string]SessionControl mapQueueInputRule map[string]queue.QueueEngine[*data.InputEngine] queueOutputRule queue.QueueEngine[*data.OutputEngine] factoryQueue FactoryQueue[*data.InputEngine] @@ -47,6 +43,7 @@ func NewDipperEngine( bus: bus, mapRule: map[string]Rule{}, mapQueueInputRule: map[string]queue.QueueEngine[*data.InputEngine]{}, + mapSessionControl: map[string]SessionControl{}, } } @@ -77,88 +74,6 @@ func (d *DipperEngine) addRule(rule Rule) { d.mapQueueInputRule[rule.Id()] = queue } -func (d *DipperEngine) Add(ctx context.Context, sessionData *data.Session) error { - sessionInfo := data.NewSessionInfo(time.Duration(d.config.TimeoutSession), sessionData) - d.store.Add(sessionInfo) - return d.startSession(ctx, sessionInfo.Id) -} - -func (d *DipperEngine) SessionInputQueue(factoryQueueName FactoryQueueName[*data.Session]) { - defaultTopic := "session-input" - topic, ok := d.config.BusMap[defaultTopic] - if !ok { - topic = defaultTopic - } - - d.queueInput = factoryQueueName(topic) - - d.queueInput.Subscribe(context.TODO(), func(sessionDeliver *queue.Deliver[*data.Session]) { - err := d.Add(context.TODO(), sessionDeliver.Data) - if err != nil { - sessionDeliver.Reject() - return - } - sessionDeliver.Ack() - }) -} - -func (d *DipperEngine) SessionOutputQueue(factoryQueueOutputName FactoryQueueName[*data.ResultSession]) { - defaultOutputTopic := "session-output" - topic, ok := d.config.BusMap[defaultOutputTopic] - if !ok { - topic = defaultOutputTopic - } - d.queueOutput = factoryQueueOutputName(topic) -} - -func (d *DipperEngine) Start() error { - log.Debug("Start Dipper Engine") - d.queueOutputRule = d.factoryQueueOutput("output") - - // init Rule - for name, rule := range d.mapRule { - option, ok := d.config.Rules[name] - if ok && option.Enable { - err := rule.Initialize(d.ctx, map[string]interface{}{}) - if err != nil { - return err - } - } - - } - - w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) - - fmt.Println(fmt.Sprintf("Rules: %d", len(d.mapRule))) - fmt.Println("-----------------------------------------------------------") - fmt.Fprintln(w, "No\tRule Name\tWorker\tStatus\t") - index := 1 - // Run Rule - for name, rule := range d.mapRule { - queueInput, ok := d.mapQueueInputRule[name] - if !ok { - return errors.ErrorNotFoundQueue - } - option, ok := d.config.Rules[name] - if ok && option.Enable { - for i := 0; i < option.Worker; i++ { - go rule.Run(d.ctx, queueInput.Subscribe, d.queueOutputRule.Publish) - } - fmt.Fprintln(w, fmt.Sprintf("%d\t%s\t%d\t%s\t", index, name, option.Worker, "enable")) - } else { - fmt.Fprintln(w, fmt.Sprintf("%d\t%s\t%d\t%s\t", index, name, 0, "disable")) - } - index++ - } - w.Flush() - fmt.Println("-----------------------------------------------------------") - fmt.Println() - go d.registerOutput() - fmt.Println("Running Engine...") - - return nil -} - func (d *DipperEngine) Stop() error { d.cancel() return nil diff --git a/core/management.go b/core/management.go index e6dda90..185faca 100644 --- a/core/management.go +++ b/core/management.go @@ -9,35 +9,6 @@ import ( log "github.com/sirupsen/logrus" ) -func (d *DipperEngine) startSession(ctx context.Context, sessionId uint64) error { - if d.store.Has(sessionId) { - sessionInfo := d.store.Get(sessionId) - if sessionInfo.RootNode != nil { - node := sessionInfo.RootNode - ruleQueue, ok := d.mapQueueInputRule[node.RuleId] - if ok { - err := ruleQueue.Publish(ctx, &data.InputEngine{ - SessionId: sessionInfo.Id, - ChanId: sessionInfo.ChanId, - FromEngine: node.NodeId, - ToEngine: "", - Node: node, - Data: sessionInfo.Data, - Time: sessionInfo.Time, - Type: data.TypeOutputEngineSuccess, - Error: nil, - }) - if err != nil { - log.Error(err) - return err - } - } - } - } - - return nil -} - func (d *DipperEngine) registerOutput() { err := d.queueOutputRule.Subscribe(d.ctx, func(deliver *queue.Deliver[*data.OutputEngine]) { diff --git a/core/rule.go b/core/rule.go index 33f44e3..951e292 100644 --- a/core/rule.go +++ b/core/rule.go @@ -8,6 +8,7 @@ import ( type Rule interface { Id() string + Infinity() bool Initialize(ctx context.Context, option map[string]interface{}) error Run(ctx context.Context, subscribeQueueInput func(ctx context.Context, callback queue.SubscribeFunction[*data.InputEngine]) error, diff --git a/core/session.go b/core/session.go new file mode 100644 index 0000000..cadc344 --- /dev/null +++ b/core/session.go @@ -0,0 +1,113 @@ +package core + +import ( + "context" + "github.com/dipper-iot/dipper-engine/data" + "github.com/dipper-iot/dipper-engine/internal/util" + "github.com/dipper-iot/dipper-engine/queue" + log "github.com/sirupsen/logrus" + "time" +) + +func NewSessionInfo(timeout time.Duration, session *data.Session, mapRule map[string]Rule) *data.Info { + now := time.Now() + var ( + id uint64 + err error + ) + for { + id, err = util.NextID() + if err != nil { + log.Error(err) + continue + } + break + } + + endCount := 0 + infinite := false + for _, rule := range session.MapNode { + if rule.End { + endCount++ + } + rulInfo, ok := mapRule[rule.RuleId] + if ok && rulInfo.Infinity() { + infinite = true + } + } + + return &data.Info{ + Id: id, + Time: &now, + Infinite: infinite, + ChanId: session.ChanId, + Timeout: timeout, + MapNode: session.MapNode, + EndCount: endCount, + RootNode: session.MapNode[session.RootNode], + Data: session.Data, + } +} + +func (d *DipperEngine) StartSession(ctx context.Context, sessionId uint64) error { + if d.store.Has(sessionId) { + sessionInfo := d.store.Get(sessionId) + if sessionInfo.RootNode != nil { + node := sessionInfo.RootNode + ruleQueue, ok := d.mapQueueInputRule[node.RuleId] + if ok { + err := ruleQueue.Publish(ctx, &data.InputEngine{ + SessionId: sessionInfo.Id, + ChanId: sessionInfo.ChanId, + FromEngine: node.NodeId, + ToEngine: "", + Node: node, + Data: sessionInfo.Data, + Time: sessionInfo.Time, + Type: data.TypeOutputEngineSuccess, + Error: nil, + }) + if err != nil { + log.Error(err) + return err + } + } + } + } + + return nil +} + +func (d *DipperEngine) Add(ctx context.Context, sessionData *data.Session) error { + sessionInfo := NewSessionInfo(time.Duration(d.config.TimeoutSession), sessionData, d.mapRule) + d.store.Add(sessionInfo) + return d.StartSession(ctx, sessionInfo.Id) +} + +func (d *DipperEngine) SessionInputQueue(factoryQueueName FactoryQueueName[*data.Session]) { + defaultTopic := "session-input" + topic, ok := d.config.BusMap[defaultTopic] + if !ok { + topic = defaultTopic + } + + d.queueInput = factoryQueueName(topic) + + d.queueInput.Subscribe(context.TODO(), func(sessionDeliver *queue.Deliver[*data.Session]) { + err := d.Add(context.TODO(), sessionDeliver.Data) + if err != nil { + sessionDeliver.Reject() + return + } + sessionDeliver.Ack() + }) +} + +func (d *DipperEngine) SessionOutputQueue(factoryQueueOutputName FactoryQueueName[*data.ResultSession]) { + defaultOutputTopic := "session-output" + topic, ok := d.config.BusMap[defaultOutputTopic] + if !ok { + topic = defaultOutputTopic + } + d.queueOutput = factoryQueueOutputName(topic) +} diff --git a/core/start.go b/core/start.go new file mode 100644 index 0000000..f28f3eb --- /dev/null +++ b/core/start.go @@ -0,0 +1,101 @@ +package core + +import ( + "fmt" + "github.com/dipper-iot/dipper-engine/errors" + log "github.com/sirupsen/logrus" + "os" + "sort" + "strings" + "text/tabwriter" +) + +func (d *DipperEngine) Start() error { + log.Debug("Start Dipper Engine") + d.queueOutputRule = d.factoryQueueOutput("output") + + // init Rule + for name, rule := range d.mapRule { + option, ok := d.config.Rules[name] + if ok && option.Enable { + err := rule.Initialize(d.ctx, map[string]interface{}{}) + if err != nil { + return err + } + } + + } + + list := make([]*viewRule, 0) + index := 1 + // Run Rule + for name, rule := range d.mapRule { + infinity := rule.Infinity() + infinityStr := "false" + if infinity { + infinityStr = "true" + control, ok := rule.(SessionControl) + if !ok { + log.Error("Error: ", errors.ErrorNotControlEngine, rule.Id()) + return errors.ErrorNotControlEngine + } + d.mapSessionControl[rule.Id()] = control + } + + queueInput, ok := d.mapQueueInputRule[name] + if !ok { + return errors.ErrorNotFoundQueue + } + status := "" + worker := 0 + option, ok := d.config.Rules[name] + if ok && option.Enable { + for i := 0; i < option.Worker; i++ { + go rule.Run(d.ctx, queueInput.Subscribe, d.queueOutputRule.Publish) + } + status = "enable" + worker = option.Worker + } else { + status = "disable" + } + list = append(list, &viewRule{ + Name: name, + Infinity: infinityStr, + Worker: worker, + Status: status, + }) + + index++ + } + viewListRule(list) + go d.registerOutput() + fmt.Println("Running Engine...") + + return nil +} + +type viewRule struct { + Name string + Worker int + Infinity string + Status string +} + +func viewListRule(list []*viewRule) { + sort.Slice(list, func(i, j int) bool { + return strings.Compare(list[i].Name, list[j].Name) < 0 + }) + w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) + fmt.Println(fmt.Sprintf("Rules: %d", len(list))) + fmt.Println("-----------------------------------------------------------") + fmt.Fprintln(w, "No\t\tRule Name\t\tWorker\t\tInfinity\t\tStatus\t") + index := 1 + // Run Rule + for _, rule := range list { + fmt.Fprintln(w, fmt.Sprintf("%d\t\t%s\t\t%d\t\t%s\t\t%s\t", index, rule.Name, rule.Worker, rule.Infinity, rule.Status)) + index++ + } + w.Flush() + fmt.Println("-----------------------------------------------------------") + fmt.Println() +} diff --git a/data/session.go b/data/session.go index 3996877..fe50d19 100644 --- a/data/session.go +++ b/data/session.go @@ -1,18 +1,15 @@ package data import ( - "github.com/dipper-iot/dipper-engine/internal/util" - log "github.com/sirupsen/logrus" "time" ) type NodeRule struct { - NodeId string `json:"node_id"` - RuleId string `json:"rule_id"` - Option map[string]interface{} `json:"option"` - Infinite bool `json:"infinite"` - Debug bool `json:"debug"` - End bool `json:"end"` + NodeId string `json:"node_id"` + RuleId string `json:"rule_id"` + Option map[string]interface{} `json:"option"` + Debug bool `json:"debug"` + End bool `json:"end"` } type Session struct { @@ -42,42 +39,3 @@ type Info struct { Result map[string]*OutputEngine `json:"result"` EndCount int `json:"end_count"` } - -func NewSessionInfo(timeout time.Duration, data *Session) *Info { - now := time.Now() - var ( - id uint64 - err error - ) - for { - id, err = util.NextID() - if err != nil { - log.Error(err) - continue - } - break - } - - endCount := 0 - infinite := false - for _, rule := range data.MapNode { - if rule.End { - endCount++ - } - if rule.Infinite { - infinite = true - } - } - - return &Info{ - Id: id, - Time: &now, - Infinite: infinite, - ChanId: data.ChanId, - Timeout: timeout, - MapNode: data.MapNode, - EndCount: endCount, - RootNode: data.MapNode[data.RootNode], - Data: data.Data, - } -} diff --git a/engine/run.go b/engine/run.go index bada2d5..bc236c2 100644 --- a/engine/run.go +++ b/engine/run.go @@ -5,7 +5,11 @@ import ( "github.com/dipper-iot/dipper-engine/rules/arithmetic" "github.com/dipper-iot/dipper-engine/rules/conditional" "github.com/dipper-iot/dipper-engine/rules/fork" + "github.com/dipper-iot/dipper-engine/rules/input_redis_queue" + "github.com/dipper-iot/dipper-engine/rules/input_redis_queue_extend" log2 "github.com/dipper-iot/dipper-engine/rules/log" + "github.com/dipper-iot/dipper-engine/rules/output_redis_queue" + output_redis_extend_queue "github.com/dipper-iot/dipper-engine/rules/output_redis_queue_extend" _switch "github.com/dipper-iot/dipper-engine/rules/switch" log "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" @@ -35,11 +39,15 @@ func (a *App) Run(args []string) error { } a.dipper.AddRule( - &log2.LogRule{}, - &arithmetic.Arithmetic{}, - &fork.ForkRule{}, - &conditional.ConditionalRule{}, - &_switch.SwitchRule{}, + log2.NewLogRule(), + arithmetic.NewArithmetic(), + fork.NewForkRule(), + conditional.NewConditionalRule(), + _switch.NewSwitchRule(), + input_redis_queue.NewInputRedisQueueRule(), + input_redis_queue_extend.NewInputRedisQueueExtendRule(), + output_redis_queue.NewOutputRedisQueueRule(), + output_redis_extend_queue.NewOutputRedisQueueExtendRule(), ) err = a.dipper.Start() diff --git a/errors/error.go b/errors/error.go index a255eb7..501ff07 100644 --- a/errors/error.go +++ b/errors/error.go @@ -7,4 +7,5 @@ import ( var ( ErrorNotFoundQueue = errors.New("NoT Found Queue") ErrorNotFoundProperty = errors.New("NoT Found Property") + ErrorNotControlEngine = errors.New("Not Have Session Control With Rule Infinity") ) diff --git a/rules/arithmetic/match_rule_test.go b/rules/arithmetic/match_rule_test.go new file mode 100644 index 0000000..1ca2e09 --- /dev/null +++ b/rules/arithmetic/match_rule_test.go @@ -0,0 +1,96 @@ +package arithmetic + +import ( + "context" + "github.com/dipper-iot/dipper-engine/data" + "github.com/dipper-iot/dipper-engine/queue" + "testing" +) + +func TestMatchRule_Run(t *testing.T) { + a := NewArithmetic() + a.Infinity() + a.Initialize(context.TODO(), map[string]interface{}{}) + qsub := queue.NewDefaultQueue[*data.InputEngine]("qsub") + qpub := queue.NewDefaultQueue[*data.OutputEngine]("qpub") + + qsub.Publish(context.TODO(), &data.InputEngine{ + SessionId: 1, + ChanId: "1", + IdNode: "noed1", + FromEngine: "test", + ToEngine: "test", + Node: &data.NodeRule{ + Option: map[string]interface{}{ + "operator": map[string]interface{}{ + "right": map[string]interface{}{ + "value": "default.a", + "type": "val", + }, + "left": map[string]interface{}{ + "type": "val", + "value": "default.b", + }, + "operator": "+", + "type": "operator", + }, + "set_param_result_to": "default.cond_a_b", + "next_error": "4", + "next_true": "4", + "next_false": "4", + }, + End: false, + Debug: false, + RuleId: "1", + NodeId: "1", + }, + Data: map[string]interface{}{ + "default": map[string]interface{}{ + "a": 2, + "b": 2, + "x": 3, + }, + }, + BranchMain: "default", + }) + qsub.Publish(context.TODO(), &data.InputEngine{ + SessionId: 1, + ChanId: "1", + IdNode: "noed1", + FromEngine: "test", + ToEngine: "test", + Node: &data.NodeRule{ + Option: map[string]interface{}{ + "operator": map[string]interface{}{ + "right": map[string]interface{}{ + "value": "default.f", + "type": "val", + }, + "left": map[string]interface{}{ + "type": "val", + "value": "default.b", + }, + "operator": "-", + "type": "operator", + }, + "set_param_result_to": "default.cond_a_b", + "next_error": "4", + "next_true": "4", + "next_false": "4", + }, + End: false, + Debug: false, + RuleId: "1", + NodeId: "1", + }, + Data: map[string]interface{}{ + "default": map[string]interface{}{ + "a": 2, + "b": 2, + "x": 3, + }, + }, + BranchMain: "default", + }) + a.Run(context.TODO(), qsub.Subscribe, qpub.Publish) +} diff --git a/rules/arithmetic/rule.go b/rules/arithmetic/rule.go index 3c23e54..9eaa07c 100644 --- a/rules/arithmetic/rule.go +++ b/rules/arithmetic/rule.go @@ -11,6 +11,14 @@ import ( type Arithmetic struct { } +func NewArithmetic() *Arithmetic { + return &Arithmetic{} +} + +func (a Arithmetic) Infinity() bool { + return false +} + func (a Arithmetic) Id() string { return "arithmetic" } diff --git a/rules/common/connect_redis.go b/rules/common/connect_redis.go new file mode 100644 index 0000000..c61fcb3 --- /dev/null +++ b/rules/common/connect_redis.go @@ -0,0 +1,22 @@ +package common + +import ( + "context" + "github.com/go-redis/redis/v9" +) + +func ConnectRedis(ctx context.Context, option *OptionRedis) (client *redis.Client, err error) { + + client = redis.NewClient(&redis.Options{ + Addr: option.Address, + Password: option.Password, + DB: option.Db, + }) + + err = client.Ping(ctx).Err() + if err == nil { + return + } + + return +} diff --git a/rules/common/redis.go b/rules/common/redis.go new file mode 100644 index 0000000..5f0107f --- /dev/null +++ b/rules/common/redis.go @@ -0,0 +1,7 @@ +package common + +type OptionRedis struct { + Address string `json:"address"` + Password string `json:"password"` + Db int `json:"db"` +} diff --git a/rules/conditional/conditional_rule.go b/rules/conditional/conditional_rule.go index b7cc2af..9862f88 100644 --- a/rules/conditional/conditional_rule.go +++ b/rules/conditional/conditional_rule.go @@ -11,6 +11,14 @@ import ( type ConditionalRule struct { } +func NewConditionalRule() *ConditionalRule { + return &ConditionalRule{} +} + +func (a ConditionalRule) Infinity() bool { + return false +} + func (a ConditionalRule) Id() string { return "conditional" } diff --git a/rules/conditional/conditional_rule_test.go b/rules/conditional/conditional_rule_test.go index ee61eb8..19ea162 100644 --- a/rules/conditional/conditional_rule_test.go +++ b/rules/conditional/conditional_rule_test.go @@ -8,7 +8,8 @@ import ( ) func TestConditionalRule_Run(t *testing.T) { - a := ConditionalRule{} + a := NewConditionalRule() + a.Infinity() a.Initialize(context.TODO(), map[string]interface{}{}) qsub := queue.NewDefaultQueue[*data.InputEngine]("qsub") qpub := queue.NewDefaultQueue[*data.OutputEngine]("qpub") @@ -38,11 +39,10 @@ func TestConditionalRule_Run(t *testing.T) { "next_true": "4", "next_false": "4", }, - End: false, - Infinite: false, - Debug: false, - RuleId: "1", - NodeId: "1", + End: false, + Debug: false, + RuleId: "1", + NodeId: "1", }, Data: map[string]interface{}{ "default": map[string]interface{}{ @@ -78,11 +78,10 @@ func TestConditionalRule_Run(t *testing.T) { "next_true": "4", "next_false": "4", }, - End: false, - Infinite: false, - Debug: false, - RuleId: "1", - NodeId: "1", + End: false, + Debug: false, + RuleId: "1", + NodeId: "1", }, Data: map[string]interface{}{ "default": map[string]interface{}{ diff --git a/rules/fork/fork_rule.go b/rules/fork/fork_rule.go index 4237547..1000061 100644 --- a/rules/fork/fork_rule.go +++ b/rules/fork/fork_rule.go @@ -11,6 +11,14 @@ import ( type ForkRule struct { } +func NewForkRule() *ForkRule { + return &ForkRule{} +} + +func (f ForkRule) Infinity() bool { + return false +} + func (f ForkRule) Id() string { return "fork" } diff --git a/rules/input_redis_queue/input_redis_queue.go b/rules/input_redis_queue/input_redis_queue.go new file mode 100644 index 0000000..e646ed6 --- /dev/null +++ b/rules/input_redis_queue/input_redis_queue.go @@ -0,0 +1,194 @@ +package input_redis_queue + +import ( + "context" + "encoding/json" + "github.com/dipper-iot/dipper-engine/data" + "github.com/dipper-iot/dipper-engine/errors" + "github.com/dipper-iot/dipper-engine/queue" + "github.com/dipper-iot/dipper-engine/rules/common" + "github.com/go-redis/redis/v9" + log "github.com/sirupsen/logrus" + "io" + "sync" +) + +type InputRedisQueueRule struct { + client *redis.Client + mapGet sync.Map +} + +func (l *InputRedisQueueRule) Infinity() bool { + return true +} + +func NewInputRedisQueueRule() *InputRedisQueueRule { + return &InputRedisQueueRule{ + mapGet: sync.Map{}, + } +} + +func (l *InputRedisQueueRule) ListSession() []uint64 { + list := make([]uint64, 0) + + l.mapGet.Range(func(key, value any) bool { + list = append(list, key.(uint64)) + return true + }) + + return list +} + +func (l *InputRedisQueueRule) StopSession(id uint64) { + cancelFn, ok := l.mapGet.Load(id) + if ok { + cancel, ok := cancelFn.(context.CancelFunc) + if ok { + cancel() + } + } +} + +func (l *InputRedisQueueRule) InfoSession(id uint64) map[string]interface{} { + return nil +} + +func (l *InputRedisQueueRule) Id() string { + return "input-redis-queue" +} + +func (l *InputRedisQueueRule) Initialize(ctx context.Context, optionRaw map[string]interface{}) error { + + var option Option + err := data.MapToStruct(optionRaw, &option) + if err != nil { + log.Error(err) + return err + } + + l.client, err = common.ConnectRedis(ctx, &common.OptionRedis{ + Address: option.RedisAddress, + Password: option.RedisPassword, + Db: option.RedisDb, + }) + if err == nil { + return err + } + + return nil +} + +func (l *InputRedisQueueRule) Run(ctx context.Context, subscribeQueueInput func(ctx context.Context, callback queue.SubscribeFunction[*data.InputEngine]) error, pushQueueOutput func(ctx context.Context, input *data.OutputEngine) error) { + + err := subscribeQueueInput(ctx, func(deliver *queue.Deliver[*data.InputEngine]) { + + output := data.CreateOutput(deliver.Data, l.Id()) + + var option OptionSession + err := data.MapToStruct(deliver.Data.Node.Option, &option) + if err != nil { + log.Error(err) + output.Error = &errors.ErrorEngine{ + Message: errors.MsgErrorOptionRuleNotMatch, + ErrorDetail: err, + FromEngine: l.Id(), + Code: errors.CodeConvert, + SessionId: deliver.Data.SessionId, + Id: deliver.Data.ChanId, + } + output.Debug = deliver.Data.Node.Debug + + pushQueueOutput(ctx, output) + err = nil + return + } + + ctx2, cancel := context.WithCancel(ctx) + + _, ok := l.mapGet.Load(deliver.Data.SessionId) + if !ok { + go l.getData(ctx2, &OptionLoop{ + nextSuccess: option.NextSuccess, + nextError: option.NextError, + output: output, + pushQueueOutput: pushQueueOutput, + queueName: option.Queue, + }) + + l.mapGet.Store(deliver.Data.SessionId, cancel) + } + + deliver.Ack() + }) + if err != nil { + log.Error(err) + return + } + +} + +func (l *InputRedisQueueRule) Stop(ctx context.Context) error { + return l.client.Close() +} + +func (l *InputRedisQueueRule) getData(ctx context.Context, option *OptionLoop) { + go func() { + for { + select { + case <-ctx.Done(): + return + default: + { + send := option.output.Clone() + dataRaw, err := l.client.RPop(ctx, option.queueName).Bytes() + if err == io.EOF { + return + } + if err == redis.Nil { + continue + } + + if err != nil { + log.Error(err) + l.sendError(ctx, err, "Redis POP message error", send, option) + continue + } + + var transferData map[string]interface{} + err = json.Unmarshal(dataRaw, &transferData) + if err != nil { + log.Error(err) + l.sendError(ctx, err, "Redis POP unmarshal error", send, option) + continue + } + + send.Next = []string{option.nextSuccess} + send.Type = data.TypeOutputEngineSuccess + send.Data = transferData + err = option.pushQueueOutput(ctx, send) + if err != nil { + log.Error(err) + } + continue + } + } + } + }() +} + +func (l *InputRedisQueueRule) sendError(ctx context.Context, e error, message string, send *data.OutputEngine, option *OptionLoop) { + send.Error = &errors.ErrorEngine{ + ErrorDetail: e, + Message: message, + Code: errors.CodeProgress, + SessionId: send.SessionId, + FromEngine: l.Id(), + Id: send.IdNode, + } + send.Next = []string{option.nextError} + send.Type = data.TypeOutputEngineError + err := option.pushQueueOutput(ctx, send) + if err != nil { + log.Error(err) + } +} diff --git a/rules/input_redis_queue/option.go b/rules/input_redis_queue/option.go new file mode 100644 index 0000000..be70cd8 --- /dev/null +++ b/rules/input_redis_queue/option.go @@ -0,0 +1,26 @@ +package input_redis_queue + +import ( + "context" + "github.com/dipper-iot/dipper-engine/data" +) + +type Option struct { + RedisAddress string `json:"redis_address"` + RedisPassword string `json:"redis_password"` + RedisDb int `json:"redis_db"` +} + +type OptionSession struct { + Queue string `json:"queue"` + NextSuccess string `json:"next_success"` + NextError string `json:"next_error"` +} + +type OptionLoop struct { + pushQueueOutput func(ctx context.Context, input *data.OutputEngine) error + queueName string + output *data.OutputEngine + nextSuccess string + nextError string +} diff --git a/rules/input_redis_queue_extend/input_redis_queue.go b/rules/input_redis_queue_extend/input_redis_queue.go new file mode 100644 index 0000000..24aba9d --- /dev/null +++ b/rules/input_redis_queue_extend/input_redis_queue.go @@ -0,0 +1,210 @@ +package input_redis_queue_extend + +import ( + "context" + "encoding/json" + "github.com/dipper-iot/dipper-engine/data" + "github.com/dipper-iot/dipper-engine/errors" + "github.com/dipper-iot/dipper-engine/queue" + "github.com/dipper-iot/dipper-engine/rules/common" + "github.com/go-redis/redis/v9" + log "github.com/sirupsen/logrus" + "io" + "sync" +) + +type InputRedisQueueExtendRule struct { + mapGet sync.Map +} + +func (l *InputRedisQueueExtendRule) Infinity() bool { + return true +} + +func NewInputRedisQueueExtendRule() *InputRedisQueueExtendRule { + return &InputRedisQueueExtendRule{ + mapGet: sync.Map{}, + } +} + +func (l *InputRedisQueueExtendRule) ListSession() []uint64 { + list := make([]uint64, 0) + + l.mapGet.Range(func(key, value any) bool { + list = append(list, key.(uint64)) + return true + }) + + return list +} + +func (l *InputRedisQueueExtendRule) StopSession(id uint64) { + cancelFn, ok := l.mapGet.Load(id) + if ok { + redisInfo, ok := cancelFn.(*RedisSessionInfo) + if ok { + redisInfo.cancel() + redisInfo.client.Close() + } + } +} + +func (l *InputRedisQueueExtendRule) InfoSession(id uint64) map[string]interface{} { + return nil +} + +func (l *InputRedisQueueExtendRule) Id() string { + return "input-redis-queue-extend" +} + +func (l *InputRedisQueueExtendRule) Initialize(ctx context.Context, optionRaw map[string]interface{}) error { + + return nil +} + +func (l *InputRedisQueueExtendRule) Run(ctx context.Context, subscribeQueueInput func(ctx context.Context, callback queue.SubscribeFunction[*data.InputEngine]) error, pushQueueOutput func(ctx context.Context, input *data.OutputEngine) error) { + + err := subscribeQueueInput(ctx, func(deliver *queue.Deliver[*data.InputEngine]) { + + output := data.CreateOutput(deliver.Data, l.Id()) + + var option OptionSession + err := data.MapToStruct(deliver.Data.Node.Option, &option) + if err != nil { + log.Error(err) + output.Error = &errors.ErrorEngine{ + Message: errors.MsgErrorOptionRuleNotMatch, + ErrorDetail: err, + FromEngine: l.Id(), + Code: errors.CodeConvert, + SessionId: deliver.Data.SessionId, + Id: deliver.Data.ChanId, + } + output.Debug = deliver.Data.Node.Debug + + pushQueueOutput(ctx, output) + err = nil + return + } + + ctx2, cancel := context.WithCancel(ctx) + + client, err := common.ConnectRedis(ctx, &common.OptionRedis{ + Address: option.RedisAddress, + Password: option.RedisPassword, + Db: option.RedisDb, + }) + if err != nil { + log.Error(err) + output.Error = &errors.ErrorEngine{ + Message: errors.MsgErrorOptionRuleNotMatch, + ErrorDetail: err, + FromEngine: l.Id(), + Code: errors.CodeConvert, + SessionId: deliver.Data.SessionId, + Id: deliver.Data.ChanId, + } + output.Debug = deliver.Data.Node.Debug + + pushQueueOutput(ctx, output) + err = nil + return + } + + _, ok := l.mapGet.Load(deliver.Data.SessionId) + if !ok { + go l.getData(ctx2, &OptionLoop{ + option: &option, + output: output, + pushQueueOutput: pushQueueOutput, + client: client, + }) + + l.mapGet.Store(deliver.Data.SessionId, &RedisSessionInfo{ + option: &option, + client: client, + cancel: cancel, + }) + } + + deliver.Ack() + }) + if err != nil { + log.Error(err) + return + } + +} + +func (l *InputRedisQueueExtendRule) Stop(ctx context.Context) error { + + l.mapGet.Range(func(key, value any) bool { + l.StopSession(key.(uint64)) + return true + }) + + return nil + +} + +func (l *InputRedisQueueExtendRule) getData(ctx context.Context, option *OptionLoop) { + go func() { + for { + select { + case <-ctx.Done(): + return + default: + { + send := option.output.Clone() + dataRaw, err := option.client.RPop(ctx, option.option.Queue).Bytes() + if err == io.EOF { + return + } + if err == redis.Nil { + continue + } + + if err != nil { + log.Error(err) + l.sendError(ctx, err, "Redis POP message error", send, option) + continue + } + + var transferData map[string]interface{} + err = json.Unmarshal(dataRaw, &transferData) + if err != nil { + log.Error(err) + l.sendError(ctx, err, "Redis POP unmarshal error", send, option) + continue + } + + send.Next = []string{option.option.NextSuccess} + send.Type = data.TypeOutputEngineSuccess + send.Data = transferData + err = option.pushQueueOutput(ctx, send) + if err != nil { + log.Error(err) + } + continue + } + } + } + }() +} + +func (l *InputRedisQueueExtendRule) sendError(ctx context.Context, e error, message string, send *data.OutputEngine, option *OptionLoop) { + send.Error = &errors.ErrorEngine{ + ErrorDetail: e, + Message: message, + Code: errors.CodeProgress, + SessionId: send.SessionId, + FromEngine: l.Id(), + Id: send.IdNode, + } + send.Next = []string{option.option.NextError} + send.Type = data.TypeOutputEngineError + err := option.pushQueueOutput(ctx, send) + if err != nil { + log.Error(err) + } +} diff --git a/rules/input_redis_queue_extend/option.go b/rules/input_redis_queue_extend/option.go new file mode 100644 index 0000000..3b539ad --- /dev/null +++ b/rules/input_redis_queue_extend/option.go @@ -0,0 +1,29 @@ +package input_redis_queue_extend + +import ( + "context" + "github.com/dipper-iot/dipper-engine/data" + "github.com/go-redis/redis/v9" +) + +type OptionSession struct { + Queue string `json:"queue"` + RedisAddress string `json:"redis_address"` + RedisPassword string `json:"redis_password"` + RedisDb int `json:"redis_db"` + NextSuccess string `json:"next_success"` + NextError string `json:"next_error"` +} + +type RedisSessionInfo struct { + client *redis.Client + cancel context.CancelFunc + option *OptionSession +} + +type OptionLoop struct { + pushQueueOutput func(ctx context.Context, input *data.OutputEngine) error + output *data.OutputEngine + option *OptionSession + client *redis.Client +} diff --git a/rules/log/log_rule.go b/rules/log/log_rule.go index a5ae911..e326609 100644 --- a/rules/log/log_rule.go +++ b/rules/log/log_rule.go @@ -11,6 +11,14 @@ import ( type LogRule struct { } +func NewLogRule() *LogRule { + return &LogRule{} +} + +func (l LogRule) Infinity() bool { + return false +} + func (l LogRule) Id() string { return "log-core" } diff --git a/rules/output_redis_queue/input_redis_queue.go b/rules/output_redis_queue/input_redis_queue.go new file mode 100644 index 0000000..3ea8d7c --- /dev/null +++ b/rules/output_redis_queue/input_redis_queue.go @@ -0,0 +1,125 @@ +package output_redis_queue + +import ( + "context" + "encoding/json" + "github.com/dipper-iot/dipper-engine/data" + "github.com/dipper-iot/dipper-engine/errors" + "github.com/dipper-iot/dipper-engine/queue" + "github.com/dipper-iot/dipper-engine/rules/common" + "github.com/go-redis/redis/v9" + log "github.com/sirupsen/logrus" +) + +type OutputRedisQueueRule struct { + client *redis.Client +} + +func (l *OutputRedisQueueRule) Infinity() bool { + return false +} + +func NewOutputRedisQueueRule() *OutputRedisQueueRule { + return &OutputRedisQueueRule{} +} +func (l *OutputRedisQueueRule) Id() string { + return "output-redis-queue" +} + +func (l *OutputRedisQueueRule) Initialize(ctx context.Context, optionRaw map[string]interface{}) error { + + var option Option + err := data.MapToStruct(optionRaw, &option) + if err != nil { + log.Error(err) + return err + } + + l.client, err = common.ConnectRedis(ctx, &common.OptionRedis{ + Address: option.RedisAddress, + Password: option.RedisPassword, + Db: option.RedisDb, + }) + if err == nil { + return err + } + + return nil +} + +func (l *OutputRedisQueueRule) Run(ctx context.Context, subscribeQueueInput func(ctx context.Context, callback queue.SubscribeFunction[*data.InputEngine]) error, pushQueueOutput func(ctx context.Context, input *data.OutputEngine) error) { + + err := subscribeQueueInput(ctx, func(deliver *queue.Deliver[*data.InputEngine]) { + + output := data.CreateOutput(deliver.Data, l.Id()) + + var option OptionSession + err := data.MapToStruct(deliver.Data.Node.Option, &option) + if err != nil { + log.Error(err) + output.Error = &errors.ErrorEngine{ + Message: errors.MsgErrorOptionRuleNotMatch, + ErrorDetail: err, + FromEngine: l.Id(), + Code: errors.CodeConvert, + SessionId: deliver.Data.SessionId, + Id: deliver.Data.ChanId, + } + output.Debug = deliver.Data.Node.Debug + + pushQueueOutput(ctx, output) + err = nil + return + } + + dataByte, err := json.Marshal(deliver.Data.Data) + if err != nil { + log.Error(err) + l.sendError(ctx, err, "Redis POP unmarshal error", output, &option, pushQueueOutput) + return + } + + err = l.client.RPush(ctx, option.Queue, dataByte).Err() + if err != nil { + log.Error(err) + l.sendError(ctx, err, "Redis RPush error", output, &option, pushQueueOutput) + return + } + + output.Next = []string{option.NextSuccess} + output.Type = data.TypeOutputEngineSuccess + output.Data = deliver.Data.Data + err = pushQueueOutput(ctx, output) + if err != nil { + log.Error(err) + } + + deliver.Ack() + }) + if err != nil { + log.Error(err) + return + } + +} + +func (l *OutputRedisQueueRule) Stop(ctx context.Context) error { + return l.client.Close() +} + +func (l *OutputRedisQueueRule) sendError(ctx context.Context, e error, message string, send *data.OutputEngine, option *OptionSession, pushQueueOutput func(ctx context.Context, input *data.OutputEngine) error) { + send.Error = &errors.ErrorEngine{ + ErrorDetail: e, + Message: message, + Code: errors.CodeProgress, + SessionId: send.SessionId, + FromEngine: l.Id(), + Id: send.IdNode, + } + send.Next = []string{option.NextSuccess} + send.Type = data.TypeOutputEngineError + err := pushQueueOutput(ctx, send) + if err != nil { + log.Error(err) + } +} diff --git a/rules/output_redis_queue/option.go b/rules/output_redis_queue/option.go new file mode 100644 index 0000000..27b2dea --- /dev/null +++ b/rules/output_redis_queue/option.go @@ -0,0 +1,13 @@ +package output_redis_queue + +type Option struct { + RedisAddress string `json:"redis_address"` + RedisPassword string `json:"redis_password"` + RedisDb int `json:"redis_db"` +} + +type OptionSession struct { + Queue string `json:"queue"` + NextSuccess string `json:"next_success"` + NextError string `json:"next_error"` +} diff --git a/rules/output_redis_queue_extend/input_redis_queue_extend.go b/rules/output_redis_queue_extend/input_redis_queue_extend.go new file mode 100644 index 0000000..7ffa5ce --- /dev/null +++ b/rules/output_redis_queue_extend/input_redis_queue_extend.go @@ -0,0 +1,132 @@ +package output_redis_extend_queue + +import ( + "context" + "encoding/json" + "github.com/dipper-iot/dipper-engine/data" + "github.com/dipper-iot/dipper-engine/errors" + "github.com/dipper-iot/dipper-engine/queue" + "github.com/dipper-iot/dipper-engine/rules/common" + "github.com/go-redis/redis/v9" + log "github.com/sirupsen/logrus" +) + +type OutputRedisQueueExtendRule struct { + client *redis.Client +} + +func (l *OutputRedisQueueExtendRule) Infinity() bool { + return false +} + +func NewOutputRedisQueueExtendRule() *OutputRedisQueueExtendRule { + return &OutputRedisQueueExtendRule{} +} + +func (l *OutputRedisQueueExtendRule) Id() string { + return "output-redis-queue-extend" +} + +func (l *OutputRedisQueueExtendRule) Initialize(ctx context.Context, optionRaw map[string]interface{}) error { + + return nil +} + +func (l *OutputRedisQueueExtendRule) Run(ctx context.Context, subscribeQueueInput func(ctx context.Context, callback queue.SubscribeFunction[*data.InputEngine]) error, pushQueueOutput func(ctx context.Context, input *data.OutputEngine) error) { + + err := subscribeQueueInput(ctx, func(deliver *queue.Deliver[*data.InputEngine]) { + + output := data.CreateOutput(deliver.Data, l.Id()) + + var option OptionSession + err := data.MapToStruct(deliver.Data.Node.Option, &option) + if err != nil { + log.Error(err) + output.Error = &errors.ErrorEngine{ + Message: errors.MsgErrorOptionRuleNotMatch, + ErrorDetail: err, + FromEngine: l.Id(), + Code: errors.CodeConvert, + SessionId: deliver.Data.SessionId, + Id: deliver.Data.ChanId, + } + output.Debug = deliver.Data.Node.Debug + + pushQueueOutput(ctx, output) + err = nil + return + } + + client, err := common.ConnectRedis(ctx, &common.OptionRedis{ + Address: option.RedisAddress, + Password: option.RedisPassword, + Db: option.RedisDb, + }) + if err != nil { + log.Error(err) + output.Error = &errors.ErrorEngine{ + Message: errors.MsgErrorOptionRuleNotMatch, + ErrorDetail: err, + FromEngine: l.Id(), + Code: errors.CodeConvert, + SessionId: deliver.Data.SessionId, + Id: deliver.Data.ChanId, + } + output.Debug = deliver.Data.Node.Debug + + pushQueueOutput(ctx, output) + err = nil + return + } + + dataByte, err := json.Marshal(deliver.Data.Data) + if err != nil { + log.Error(err) + l.sendError(ctx, err, "Redis POP unmarshal error", output, &option, pushQueueOutput) + return + } + + err = client.RPush(ctx, option.Queue, dataByte).Err() + if err != nil { + log.Error(err) + l.sendError(ctx, err, "Redis RPush error", output, &option, pushQueueOutput) + return + } + + output.Next = []string{option.NextSuccess} + output.Type = data.TypeOutputEngineSuccess + output.Data = deliver.Data.Data + err = pushQueueOutput(ctx, output) + if err != nil { + log.Error(err) + } + + deliver.Ack() + }) + if err != nil { + log.Error(err) + return + } + +} + +func (l *OutputRedisQueueExtendRule) Stop(ctx context.Context) error { + return l.client.Close() +} + +func (l *OutputRedisQueueExtendRule) sendError(ctx context.Context, e error, message string, send *data.OutputEngine, option *OptionSession, pushQueueOutput func(ctx context.Context, input *data.OutputEngine) error) { + send.Error = &errors.ErrorEngine{ + ErrorDetail: e, + Message: message, + Code: errors.CodeProgress, + SessionId: send.SessionId, + FromEngine: l.Id(), + Id: send.IdNode, + } + send.Next = []string{option.NextSuccess} + send.Type = data.TypeOutputEngineError + err := pushQueueOutput(ctx, send) + if err != nil { + log.Error(err) + } +} diff --git a/rules/output_redis_queue_extend/option.go b/rules/output_redis_queue_extend/option.go new file mode 100644 index 0000000..e64c3f5 --- /dev/null +++ b/rules/output_redis_queue_extend/option.go @@ -0,0 +1,10 @@ +package output_redis_extend_queue + +type OptionSession struct { + RedisAddress string `json:"redis_address"` + RedisPassword string `json:"redis_password"` + RedisDb int `json:"redis_db"` + Queue string `json:"queue"` + NextSuccess string `json:"next_success"` + NextError string `json:"next_error"` +} diff --git a/rules/switch/switch_rule.go b/rules/switch/switch_rule.go index 1e46c25..8a4079f 100644 --- a/rules/switch/switch_rule.go +++ b/rules/switch/switch_rule.go @@ -11,6 +11,14 @@ import ( type SwitchRule struct { } +func NewSwitchRule() *SwitchRule { + return &SwitchRule{} +} + +func (a SwitchRule) Infinity() bool { + return false +} + func (a SwitchRule) Id() string { return "switch" } diff --git a/server_test.go b/server_test.go index 4337046..5c5289b 100644 --- a/server_test.go +++ b/server_test.go @@ -162,6 +162,10 @@ type LogTest struct { wg *sync.WaitGroup } +func (l *LogTest) Infinity() bool { + return false +} + func (l *LogTest) Id() string { return "test" }