Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update meta_data and multi root and redis version #9

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
/dipper-engine
/plugins/
/coverage.out
/examples/base/go.sum
go.sum
2 changes: 1 addition & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ c.Hook(engine.AfterStart, func(dipper *core.DipperEngine, c *cli.Context) error

return dipper.Add(context.Background(), &data.Session{
Data: map[string]interface{}{
"default": map[string]interface{}{
"default": map[string]interface{}{
"a": 10,
"b": 20,
"d": 5,
Expand Down
42 changes: 42 additions & 0 deletions core/daq/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
)

Expand Down Expand Up @@ -211,6 +212,21 @@ func (q *Query) Number() (float64, error) {
if !ok {
return 0, fmt.Errorf("%s: %s", NotFoundPath, strings.Join(q.paths, "."))
}
switch v := n.(type) {
case string:
s, err := strconv.ParseFloat(v, 64)
return s, err
case float64:
return v, nil
case float32:
return float64(v), nil
case int64:
return float64(v), nil
case int:
return float64(v), nil
default:
}

v := reflect.ValueOf(n)
if v.CanInt() {
v2 := v.Int()
Expand Down Expand Up @@ -238,6 +254,32 @@ func (q *Query) String() (string, error) {
if q.dataString != "" {
return q.dataString, nil
}
if q.index > len(q.paths)-1 {
return "", fmt.Errorf("%s: %s", NotFoundPath, strings.Join(q.paths, "."))
}
if q.Type() == Object {
name := q.paths[q.index]
data, err := q.object()
if err != nil {
return "", err
}
n, ok := data[name]
if !ok {
return "", fmt.Errorf("%s: %s", NotFoundPath, strings.Join(q.paths, "."))
}
switch v := n.(type) {
case string:
return v, nil
case float64, float32:
return fmt.Sprintf("%f", v), nil
case int, int64:
return fmt.Sprintf("%d", v), nil
default:
}
v := reflect.ValueOf(n)
q.dataString = v.String()
return q.dataString, nil
}
v := reflect.ValueOf(q.data)
q.dataString = v.String()
return q.dataString, nil
Expand Down
1 change: 1 addition & 0 deletions core/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (d *DipperEngine) handlerOutput(ctx context.Context, dataOutput *data.Outpu
SessionId: sessionInfo.Id,
ChanId: sessionInfo.ChanId,
IdNode: nextId,
MetaData: dataOutput.MetaData,
BranchMain: dataOutput.BranchMain,
FromEngine: node.RuleId,
ToEngine: dataOutput.FromEngine,
Expand Down
18 changes: 10 additions & 8 deletions core/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import (
func NewSessionInfo(timeout time.Duration, session *data.Session, mapRule map[string]Rule) *data.Info {
now := time.Now()
var (
id uint64
id uint64 = session.Id
err error
)
for {
id, err = util.NextID()
if err != nil {
log.Error(err)
continue
if id == 0 {
for {
id, err = util.NextID()
if err != nil {
log.Error(err)
continue
}
break
}
break
}

endCount := 0
Expand Down Expand Up @@ -68,7 +70,7 @@ func (d *DipperEngine) StartSession(ctx context.Context, sessionId uint64) error
Error: nil,
})
if err != nil {
log.Error(err)
log.Error("Publish have error ", err)
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions data/map_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func CreateOutput(input *InputEngine, id string) (output *OutputEngine) {
output.BranchMain = input.BranchMain
output.ChanId = input.ChanId
output.IdNode = input.IdNode
output.MetaData = input.MetaData
output.FromEngine = id
output.SessionId = input.SessionId
output.Time = &timeData
Expand Down
2 changes: 2 additions & 0 deletions data/ouput.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type OutputEngine struct {
ChanId string `json:"chan_id"`
IdNode string `json:"id_node"`
FromEngine string `json:"from_engine"`
MetaData map[string]interface{} `json:"meta_data"`
Data map[string]interface{} `json:"data"`
BranchMain string `json:"branch_main"`
Next []string `json:"next"`
Expand All @@ -33,6 +34,7 @@ func (o OutputEngine) Clone() *OutputEngine {
IdNode: o.IdNode,
FromEngine: o.FromEngine,
Data: o.Data,
MetaData: o.MetaData,
BranchMain: o.BranchMain,
Next: o.Next,
Time: o.Time,
Expand Down
12 changes: 8 additions & 4 deletions data/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ type NodeRule struct {
}

type Session struct {
Id uint64 `json:"id"`
ChanId string `json:"chan_id"`
MapNode map[string]*NodeRule `json:"map_node"`
MetaData map[string]interface{} `json:"meta_data"`
RootNode string `json:"root_node"`
Data map[string]interface{} `json:"data"`
Result map[string]*OutputEngine `json:"result"`
}

type ResultSession struct {
Id uint64 `json:"id"`
ChanId string `json:"chan_id"`
Data map[string]interface{} `json:"data"`
Result map[string]*OutputEngine `json:"result"`
Id uint64 `json:"id"`
ChanId string `json:"chan_id"`
MetaData map[string]interface{} `json:"meta_data"`
Data map[string]interface{} `json:"data"`
Result map[string]*OutputEngine `json:"result"`
}

type Info struct {
Expand All @@ -35,6 +38,7 @@ type Info struct {
Infinite bool `json:"infinite"`
MapNode map[string]*NodeRule `json:"map_node"`
RootNode *NodeRule `json:"root_node"`
MetaData map[string]interface{} `json:"meta_data"`
Data map[string]interface{} `json:"data"`
Result map[string]*OutputEngine `json:"result"`
EndCount int `json:"end_count"`
Expand Down
2 changes: 1 addition & 1 deletion engine/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import (
"fmt"
"github.com/dipper-iot/dipper-engine/core"
"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v8"
"github.com/urfave/cli/v2"

Check failure on line 7 in engine/cli.go

View workflow job for this annotation

GitHub Actions / build

missing go.sum entry for module providing package github.com/urfave/cli/v2 (imported by github.com/dipper-iot/dipper-engine/engine); to add:
"os"
"os/signal"
)
Expand Down
2 changes: 1 addition & 1 deletion engine/dipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/dipper-iot/dipper-engine/internal/util"
rs "github.com/dipper-iot/dipper-engine/redis"
"github.com/dipper-iot/dipper-engine/store"
"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"time"
Expand Down
2 changes: 1 addition & 1 deletion engine/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/dipper-iot/dipper-engine/core"
"github.com/dipper-iot/dipper-engine/data"
"github.com/dipper-iot/dipper-engine/internal/debug"
"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"io"
Expand Down
2 changes: 1 addition & 1 deletion examples/base/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/dipper-iot/dipper-engine v0.0.0-20221022050351-f7e9b09096a6
github.com/go-redis/redis/v9 v9.0.0-rc.1
github.com/go-redis/redis/v8 v8.11.5
github.com/sirupsen/logrus v1.9.0
github.com/urfave/cli/v2 v2.11.2
)
Expand Down
2 changes: 1 addition & 1 deletion examples/base/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/dipper-iot/dipper-engine/engine"
"github.com/dipper-iot/dipper-engine/internal/debug"
"github.com/dipper-iot/dipper-engine/queue"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"log"
"os"
)

Expand Down
2 changes: 1 addition & 1 deletion examples/redis-queue/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/dipper-iot/dipper-engine v0.0.0-20221022050351-f7e9b09096a6
github.com/go-redis/redis/v9 v9.0.0-rc.1
github.com/go-redis/redis/v8 v8.11.5
github.com/sirupsen/logrus v1.9.0
github.com/urfave/cli/v2 v2.11.2
)
Expand Down
46 changes: 0 additions & 46 deletions examples/redis-queue/go.sum

This file was deleted.

2 changes: 1 addition & 1 deletion examples/redis-queue/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/dipper-iot/dipper-engine/core"
"github.com/dipper-iot/dipper-engine/data"
"github.com/dipper-iot/dipper-engine/engine"
"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"io"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
require (
github.com/Knetic/govaluate v3.0.0+incompatible
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/go-redis/redis/v9 v9.0.0-rc.1
github.com/go-redis/redis/v8 v8.11.5
github.com/sirupsen/logrus v1.9.0
github.com/sony/sonyflake v1.1.0
github.com/urfave/cli/v2 v2.11.2
Expand Down
50 changes: 0 additions & 50 deletions go.sum

This file was deleted.

2 changes: 1 addition & 1 deletion internal/lock/redis_lock/try_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import (
"context"
"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v8"

Check failure on line 5 in internal/lock/redis_lock/try_lock.go

View workflow job for this annotation

GitHub Actions / build

missing go.sum entry for module providing package github.com/go-redis/redis/v8 (imported by github.com/dipper-iot/dipper-engine/engine); to add:
log "github.com/sirupsen/logrus"
"time"
)
Expand Down
2 changes: 1 addition & 1 deletion redis/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package redis
import (
"context"
"github.com/dipper-iot/dipper-engine/internal/util"
"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"io"
)
Expand Down
2 changes: 1 addition & 1 deletion redis/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/dipper-iot/dipper-engine/core"
"github.com/dipper-iot/dipper-engine/internal/util"
"github.com/dipper-iot/dipper-engine/queue"
"github.com/go-redis/redis/v9"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"io"
"time"
Expand Down