Skip to content

Commit

Permalink
update rule
Browse files Browse the repository at this point in the history
  • Loading branch information
NortonBen committed Apr 20, 2023
1 parent 46869b5 commit 1273c26
Show file tree
Hide file tree
Showing 28 changed files with 655 additions and 937 deletions.
63 changes: 25 additions & 38 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -87,87 +87,74 @@ Running Engine...
c := engine.New()

c.Hook(engine.AfterStart, func(dipper *core.DipperEngine, c *cli.Context) error {

factoryResultSessionName := core.FactoryQueueNameDefault[*data.ResultSession]()
dipper.SessionOutputQueue(factoryResultSessionName)

dipper.OutputSubscribe(context.TODO(), func(sessionDeliver *queue.Deliver[*data.ResultSession]) {
debug.PrintJson(sessionDeliver.Data, "Result: ")
sessionDeliver.Ack()
})

return dipper.Add(context.Background(), &data.Session{
Data: map[string]interface{}{
"default": map[string]interface{}{
"a": 10,
"b": 20,
"default": map[string]interface{}{
"a": 10,
"b": 20,
"d": 5,
},
},
ChanId: "test-1",
RootNode: "1",
MapNode: map[string]*data.NodeRule{
"1": {
Debug: true,
Debug: false,
Option: map[string]interface{}{
"list": map[string]interface{}{
"default.c": map[string]interface{}{
"right": map[string]interface{}{
"value": "default.a",
"type": "val",
},
"left": map[string]interface{}{
"type": "val",
"value": "default.b",
},
"operator": "add",
"type": "operator",
},
"operators": map[string]string{
"c": "a+b",
},
"next_error": "2",
"next_success": "3",
"debug": true,
"debug": false,
},
NodeId: "4",
RuleId: "arithmetic",
End: false,
},
"2": {
Debug: true,
Debug: false,
Option: map[string]interface{}{
"debug": true,
"debug": false,
},
NodeId: "2",
RuleId: "log-core",
End: true,
},
"3": {
Debug: true,
Debug: false,
Option: map[string]interface{}{
"next_success": []string{"5", "2"},
"debug": true,
"debug": false,
},
NodeId: "3",
RuleId: "fork",
End: false,
},
"5": {
Debug: true,
Debug: false,
Option: map[string]interface{}{
"operator": map[string]interface{}{
"right": map[string]interface{}{
"value": "default.a",
"type": "val",
},
"left": map[string]interface{}{
"type": "val",
"value": "default.b",
},
"operator": "==",
"type": "operator",
},
"set_param_result_to": "default.cond_a_b",
"conditional": "a == b",
"set_param_result_to": "cond_a_b",
"next_error": "2",
"next_true": "2",
"next_false": "2",
"debug": true,
"debug": false,
},
NodeId: "5",
RuleId: "conditional",
End: false,
},
},
},
})
})

Expand Down
17 changes: 17 additions & 0 deletions core/daq/query.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package daq

import (
"errors"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -132,6 +133,9 @@ func (q *Query) Update(data interface{}) error {
output = q.dataArray
break
}
if q.index == 0 {
return nil
}

return q.update(q.index-1, output)
}
Expand Down Expand Up @@ -239,6 +243,19 @@ func (q *Query) String() (string, error) {
return q.dataString, nil
}

func (q *Query) Interface() (interface{}, error) {

mapData, err := q.Object()
if err != nil {
return nil, err
}
name := q.paths[q.index]
if !mapData.Has(name) {
return nil, errors.New("not found " + name)
}
return mapData.data[name], nil
}

func (q *Query) object() (map[string]interface{}, error) {
if q.dataObject != nil {
return q.dataObject, nil
Expand Down
4 changes: 4 additions & 0 deletions core/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@ func (d *DipperEngine) SessionOutputQueue(factoryQueueOutputName FactoryQueueNam
}
d.queueOutput = factoryQueueOutputName(topic)
}

func (d *DipperEngine) OutputSubscribe(ctx context.Context, callback queue.SubscribeFunction[*data.ResultSession]) {
d.queueOutput.Subscribe(ctx, callback)
}
6 changes: 6 additions & 0 deletions engine/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

type App struct {
config *core.ConfigEngine
flags []cli.Flag
app *cli.App
dipper *core.DipperEngine
Expand All @@ -22,6 +23,10 @@ type App struct {
}

func New(flags ...cli.Flag) *App {
return NewWithConfig(nil, flags...)
}

func NewWithConfig(config *core.ConfigEngine, flags ...cli.Flag) *App {

signalStop := make(chan os.Signal)
signal.Notify(signalStop, os.Interrupt, os.Kill)
Expand Down Expand Up @@ -84,6 +89,7 @@ func New(flags ...cli.Flag) *App {
},
)
return &App{
config: config,
flags: flags,
signalStop: signalStop,
beforeStartHooks: []HookFunc{},
Expand Down
13 changes: 8 additions & 5 deletions engine/dipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,17 @@ func (a *App) newEngine(c *cli.Context) error {
break
}

err := util.ReadFile(&config, configFile)
if err != nil {
log.Println(err)
return err
if a.config == nil {
err := util.ReadFile(&config, configFile)
if err != nil {
log.Println(err)
return err
}
a.config = &config
}

a.dipper = core.NewDipperEngine(
&config,
a.config,
factoryQueue,
factoryQueueName,
storeSession,
Expand Down
28 changes: 3 additions & 25 deletions engine/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,8 @@ func TestApp_Run(t *testing.T) {
"1": {
Debug: false,
Option: map[string]interface{}{
"list": map[string]interface{}{
"default.c": map[string]interface{}{
"right": map[string]interface{}{
"value": "default.a",
"type": "val",
},
"left": map[string]interface{}{
"type": "val",
"value": "default.b",
},
"operator": "add",
"type": "operator",
},
"operators": map[string]string{
"c": "a+b",
},
"next_error": "2",
"next_success": "2",
Expand Down Expand Up @@ -133,18 +122,7 @@ func TestApp_Run(t *testing.T) {
"4": {
Debug: false,
Option: map[string]interface{}{
"operator": map[string]interface{}{
"right": map[string]interface{}{
"value": "default.a",
"type": "val",
},
"left": map[string]interface{}{
"type": "val",
"value": "default.b",
},
"operator": "<>",
"type": "operator",
},
"conditional": "a != b ",
"set_param_result_to": "default.cond_a_b",
"next_error": "2",
"next_true": "",
Expand Down
76 changes: 76 additions & 0 deletions examples/base/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"encoding/json"
"github.com/dipper-iot/dipper-engine/core"
log "github.com/sirupsen/logrus"
)

var configMap = map[string]interface{}{
"rules": map[string]interface{}{
"log-core": map[string]interface{}{
"enable": true,
"worker": 1,
},
"arithmetic": map[string]interface{}{
"enable": true,
"worker": 1,
},
"fork": map[string]interface{}{
"enable": true,
"worker": 1,
},
"switch": map[string]interface{}{
"enable": true,
"worker": 1,
},
"conditional": map[string]interface{}{
"enable": true,
"worker": 1,
},
"input-redis-queue": map[string]interface{}{
"enable": false,
"worker": 1,
"options": map[string]interface{}{
"redis_address": "127.0.0.1:6379",
"redis_db": 0,
},
},
"input-redis-queue-extend": map[string]interface{}{
"enable": true,
"worker": 1,
},
"output-redis-queue": map[string]interface{}{
"enable": false,
"worker": 1,
"options": map[string]interface{}{
"redis_address": "127.0.0.1:6379",
"redis_db": 0,
},
},
"output-redis-queue-extend": map[string]interface{}{
"enable": true,
"worker": 1,
},
},
"log": map[string]interface{}{
"level": "info",
"out": "console",
"file_name": "dipper-engine.log",
},
"timeout_session": 30,
"plugins": []string{},
}

func getConfig() *core.ConfigEngine {
var config core.ConfigEngine
data, err := json.Marshal(configMap)
if err != nil {
log.Println(err)
}
err = json.Unmarshal(data, &config)
if err != nil {
log.Println(err)
}
return &config
}
24 changes: 24 additions & 0 deletions examples/base/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module github.com/dipper-iot/dipper-engine/examples/base

go 1.18

require (
github.com/dipper-iot/dipper-engine v0.0.0-20221022050351-f7e9b09096a6
github.com/go-redis/redis/v9 v9.0.0-rc.1
github.com/sirupsen/logrus v1.9.0
github.com/urfave/cli/v2 v2.11.2
)

require (
github.com/Knetic/govaluate v3.0.0+incompatible // indirect
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sony/sonyflake v1.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
)

replace github.com/dipper-iot/dipper-engine => ../../
Loading

0 comments on commit 1273c26

Please sign in to comment.