Skip to content

Commit

Permalink
Merge pull request #7 from dipper-iot/update_rule_redis_queue
Browse files Browse the repository at this point in the history
update rule redis and control rule
  • Loading branch information
NortonBen committed Oct 29, 2022
2 parents 28bf299 + 098ad83 commit 46869b5
Show file tree
Hide file tree
Showing 29 changed files with 1,262 additions and 185 deletions.
39 changes: 33 additions & 6 deletions README.MD
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Rule Engine Golang
[![Go](https://github.com/dipper-iot/dipper-engine/actions/workflows/go.yml/badge.svg?branch=main)](https://github.com/dipper-iot/dipper-engine/actions/workflows/go.yml) [![CodeQL](https://github.com/dipper-iot/dipper-engine/actions/workflows/codeql.yml/badge.svg?branch=main)](https://github.com/dipper-iot/dipper-engine/actions/workflows/codeql.yml) [![Coverage Status](https://coveralls.io/repos/github/dipper-iot/dipper-engine/badge.svg?branch=main)](https://coveralls.io/github/dipper-iot/dipper-engine?branch=main)

<!-- TOC -->

* [Setup](#setup)
* [Run](#run)
* [Rule Engine](#rule-engine)
* [Example Developer Test](#example-developer-test)
<!-- TOC -->

## Setup

```shell
Expand Down Expand Up @@ -41,18 +50,36 @@ Start Dipper Engine

Rules: 5
-----------------------------------------------------------
No Rule Name Worker Status
1 log-core 1 enable
2 arithmetic 1 enable
3 fork 1 enable
4 conditional 1 enable
5 switch 1 enable
No Rule Name Worker Infinity Status
1 arithmetic 1 false enable
2 conditional 1 false enable
3 fork 1 false enable
4 input-redis-queue 0 true disable
5 input-redis-queue-extend 1 true enable
6 log-core 1 false enable
7 output-redis-queue 0 false disable
8 output-redis-queue-extend 1 false enable
9 switch 1 false enable
-----------------------------------------------------------

Running Engine...

```

## Rule Engine

| No | Rule | Description | Infinity | Doc |
|:-----:|:-----------------------------------|:---------------------------------|:--------:|-----|
| 1 | arithmetic | operator match rule | false | |
| 2 | conditional | compare data rule | false | |
| 3 | fork | fork to rules | false | |
| 4 | input-redis-queue | input data from queue on config | true | |
| 5 | input-redis-queue-extend | input data from queue on option | true | |
| 6 | log-core | log to console | false | |
| 7 | output-redis-queue | output data from queue on config | false | |
| 8 | output-redis-queue-extend | output data from queue on option | false | |
| 9 | switch | switch to rules | false | |

## Example Developer Test

```golang
Expand Down
26 changes: 26 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,32 @@
"enable": true,
"worker": 1,
"options": {}
},
"input-redis-queue": {
"enable": false,
"worker": 1,
"options": {
"redis_address": "127.0.0.1:6379",
"redis_db": 0
}
},
"input-redis-queue-extend": {
"enable": true,
"worker": 1,
"options": {}
},
"output-redis-queue": {
"enable": false,
"worker": 1,
"options": {
"redis_address": "127.0.0.1:6379",
"redis_db": 0
}
},
"output-redis-queue-extend": {
"enable": true,
"worker": 1,
"options": {}
}
},
"log": {
Expand Down
49 changes: 49 additions & 0 deletions core/control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package core

type SessionControl interface {
ListSession() []uint64
StopSession(id uint64)
InfoSession(id uint64) map[string]interface{}
}

func (d *DipperEngine) ListControl() []string {
list := make([]string, 0)

for name, _ := range d.mapSessionControl {
list = append(list, name)
}
return list
}

func (d *DipperEngine) ControlSession(ruleName string) []uint64 {

rule, ok := d.mapSessionControl[ruleName]
if ok {
return rule.ListSession()
}
return []uint64{}
}

func (d *DipperEngine) ControlGetRule(session uint64) []string {
listRule := make([]string, 0)
for ruleId, control := range d.mapSessionControl {
listSession := control.ListSession()
for _, name := range listSession {
if name == session {
listRule = append(listRule, ruleId)
break
}
}
}

return listRule
}

func (d *DipperEngine) ControlStopSession(ruleName string, session uint64) {

rule, ok := d.mapSessionControl[ruleName]
if ok {
rule.StopSession(session)
}
return
}
89 changes: 2 additions & 87 deletions core/dipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@ package core

import (
"context"
"fmt"
bus2 "github.com/dipper-iot/dipper-engine/bus"
"github.com/dipper-iot/dipper-engine/data"
"github.com/dipper-iot/dipper-engine/errors"
"github.com/dipper-iot/dipper-engine/queue"
"github.com/dipper-iot/dipper-engine/store"
log "github.com/sirupsen/logrus"
"os"
"text/tabwriter"
"time"
)

type DipperEngine struct {
ctx context.Context
cancel context.CancelFunc
config *ConfigEngine
mapRule map[string]Rule
mapSessionControl map[string]SessionControl
mapQueueInputRule map[string]queue.QueueEngine[*data.InputEngine]
queueOutputRule queue.QueueEngine[*data.OutputEngine]
factoryQueue FactoryQueue[*data.InputEngine]
Expand Down Expand Up @@ -47,6 +43,7 @@ func NewDipperEngine(
bus: bus,
mapRule: map[string]Rule{},
mapQueueInputRule: map[string]queue.QueueEngine[*data.InputEngine]{},
mapSessionControl: map[string]SessionControl{},
}
}

Expand Down Expand Up @@ -77,88 +74,6 @@ func (d *DipperEngine) addRule(rule Rule) {
d.mapQueueInputRule[rule.Id()] = queue
}

func (d *DipperEngine) Add(ctx context.Context, sessionData *data.Session) error {
sessionInfo := data.NewSessionInfo(time.Duration(d.config.TimeoutSession), sessionData)
d.store.Add(sessionInfo)
return d.startSession(ctx, sessionInfo.Id)
}

func (d *DipperEngine) SessionInputQueue(factoryQueueName FactoryQueueName[*data.Session]) {
defaultTopic := "session-input"
topic, ok := d.config.BusMap[defaultTopic]
if !ok {
topic = defaultTopic
}

d.queueInput = factoryQueueName(topic)

d.queueInput.Subscribe(context.TODO(), func(sessionDeliver *queue.Deliver[*data.Session]) {
err := d.Add(context.TODO(), sessionDeliver.Data)
if err != nil {
sessionDeliver.Reject()
return
}
sessionDeliver.Ack()
})
}

func (d *DipperEngine) SessionOutputQueue(factoryQueueOutputName FactoryQueueName[*data.ResultSession]) {
defaultOutputTopic := "session-output"
topic, ok := d.config.BusMap[defaultOutputTopic]
if !ok {
topic = defaultOutputTopic
}
d.queueOutput = factoryQueueOutputName(topic)
}

func (d *DipperEngine) Start() error {
log.Debug("Start Dipper Engine")
d.queueOutputRule = d.factoryQueueOutput("output")

// init Rule
for name, rule := range d.mapRule {
option, ok := d.config.Rules[name]
if ok && option.Enable {
err := rule.Initialize(d.ctx, map[string]interface{}{})
if err != nil {
return err
}
}

}

w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)

fmt.Println(fmt.Sprintf("Rules: %d", len(d.mapRule)))
fmt.Println("-----------------------------------------------------------")
fmt.Fprintln(w, "No\tRule Name\tWorker\tStatus\t")
index := 1
// Run Rule
for name, rule := range d.mapRule {
queueInput, ok := d.mapQueueInputRule[name]
if !ok {
return errors.ErrorNotFoundQueue
}
option, ok := d.config.Rules[name]
if ok && option.Enable {
for i := 0; i < option.Worker; i++ {
go rule.Run(d.ctx, queueInput.Subscribe, d.queueOutputRule.Publish)
}
fmt.Fprintln(w, fmt.Sprintf("%d\t%s\t%d\t%s\t", index, name, option.Worker, "enable"))
} else {
fmt.Fprintln(w, fmt.Sprintf("%d\t%s\t%d\t%s\t", index, name, 0, "disable"))
}
index++
}
w.Flush()
fmt.Println("-----------------------------------------------------------")
fmt.Println()
go d.registerOutput()
fmt.Println("Running Engine...")

return nil
}

func (d *DipperEngine) Stop() error {
d.cancel()
return nil
Expand Down
29 changes: 0 additions & 29 deletions core/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,6 @@ import (
log "github.com/sirupsen/logrus"
)

func (d *DipperEngine) startSession(ctx context.Context, sessionId uint64) error {
if d.store.Has(sessionId) {
sessionInfo := d.store.Get(sessionId)
if sessionInfo.RootNode != nil {
node := sessionInfo.RootNode
ruleQueue, ok := d.mapQueueInputRule[node.RuleId]
if ok {
err := ruleQueue.Publish(ctx, &data.InputEngine{
SessionId: sessionInfo.Id,
ChanId: sessionInfo.ChanId,
FromEngine: node.NodeId,
ToEngine: "",
Node: node,
Data: sessionInfo.Data,
Time: sessionInfo.Time,
Type: data.TypeOutputEngineSuccess,
Error: nil,
})
if err != nil {
log.Error(err)
return err
}
}
}
}

return nil
}

func (d *DipperEngine) registerOutput() {

err := d.queueOutputRule.Subscribe(d.ctx, func(deliver *queue.Deliver[*data.OutputEngine]) {
Expand Down
1 change: 1 addition & 0 deletions core/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type Rule interface {
Id() string
Infinity() bool
Initialize(ctx context.Context, option map[string]interface{}) error
Run(ctx context.Context,
subscribeQueueInput func(ctx context.Context, callback queue.SubscribeFunction[*data.InputEngine]) error,
Expand Down
Loading

0 comments on commit 46869b5

Please sign in to comment.