Skip to content

Commit

Permalink
update meta_data and multi root and redis version
Browse files Browse the repository at this point in the history
  • Loading branch information
NortonBen committed Jun 5, 2024
1 parent 8ab2851 commit 9d6b974
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 19 deletions.
1 change: 1 addition & 0 deletions core/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (d *DipperEngine) handlerOutput(ctx context.Context, dataOutput *data.Outpu
SessionId: sessionInfo.Id,
ChanId: sessionInfo.ChanId,
IdNode: nextId,
MetaData: dataOutput.MetaData,
BranchMain: dataOutput.BranchMain,
FromEngine: node.RuleId,
ToEngine: dataOutput.FromEngine,
Expand Down
18 changes: 10 additions & 8 deletions core/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import (
func NewSessionInfo(timeout time.Duration, session *data.Session, mapRule map[string]Rule) *data.Info {
now := time.Now()
var (
id uint64
id uint64 = session.Id
err error
)
for {
id, err = util.NextID()
if err != nil {
log.Error(err)
continue
if id == 0 {
for {
id, err = util.NextID()
if err != nil {
log.Error(err)
continue
}
break
}
break
}

endCount := 0
Expand Down Expand Up @@ -68,7 +70,7 @@ func (d *DipperEngine) StartSession(ctx context.Context, sessionId uint64) error
Error: nil,
})
if err != nil {
log.Error(err)
log.Error("Publish have error ", err)
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions data/map_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func CreateOutput(input *InputEngine, id string) (output *OutputEngine) {
output.BranchMain = input.BranchMain
output.ChanId = input.ChanId
output.IdNode = input.IdNode
output.MetaData = input.MetaData
output.FromEngine = id
output.SessionId = input.SessionId
output.Time = &timeData
Expand Down
1 change: 1 addition & 0 deletions data/ouput.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (o OutputEngine) Clone() *OutputEngine {
IdNode: o.IdNode,
FromEngine: o.FromEngine,
Data: o.Data,
MetaData: o.MetaData,
BranchMain: o.BranchMain,
Next: o.Next,
Time: o.Time,
Expand Down
11 changes: 7 additions & 4 deletions data/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type NodeRule struct {
}

type Session struct {
Id uint64 `json:"id"`
ChanId string `json:"chan_id"`
MapNode map[string]*NodeRule `json:"map_node"`
MetaData map[string]interface{} `json:"meta_data"`
Expand All @@ -22,10 +23,11 @@ type Session struct {
}

type ResultSession struct {
Id uint64 `json:"id"`
ChanId string `json:"chan_id"`
Data map[string]interface{} `json:"data"`
Result map[string]*OutputEngine `json:"result"`
Id uint64 `json:"id"`
ChanId string `json:"chan_id"`
MetaData map[string]interface{} `json:"meta_data"`
Data map[string]interface{} `json:"data"`
Result map[string]*OutputEngine `json:"result"`
}

type Info struct {
Expand All @@ -36,6 +38,7 @@ type Info struct {
Infinite bool `json:"infinite"`
MapNode map[string]*NodeRule `json:"map_node"`
RootNode *NodeRule `json:"root_node"`
MetaData map[string]interface{} `json:"meta_data"`
Data map[string]interface{} `json:"data"`
Result map[string]*OutputEngine `json:"result"`
EndCount int `json:"end_count"`
Expand Down
2 changes: 1 addition & 1 deletion examples/base/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/dipper-iot/dipper-engine/engine"
"github.com/dipper-iot/dipper-engine/internal/debug"
"github.com/dipper-iot/dipper-engine/queue"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"log"
"os"
)

Expand Down
10 changes: 5 additions & 5 deletions redis/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r redisStore) Add(sessionInfo *data.Info) error {
return err
}

return r.client.Set(context.TODO(), key, data, r.timeout).Err()
return r.client.SetNX(context.TODO(), key, data, 0).Err()
}

func (r redisStore) Get(sessionId uint64) *data.Info {
Expand All @@ -53,9 +53,9 @@ func (r redisStore) Get(sessionId uint64) *data.Info {
}

var data data.Info
err = json.Unmarshal([]byte(dataStr), data)
err = json.Unmarshal([]byte(dataStr), &data)
if err != nil {
log.Error(err)
log.Error("Get json.Unmarshal => ", err)
return nil
}

Expand All @@ -71,9 +71,9 @@ func (r redisStore) Has(sessionId uint64) bool {
return false
}
var data data.Info
err = json.Unmarshal([]byte(dataStr), data)
err = json.Unmarshal([]byte(dataStr), &data)
if err != nil {
log.Error(err)
log.Error("Has json.Unmarshal => ", err)
return false
}

Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"github.com/dipper-iot/dipper-engine/engine"
"log"
log "github.com/sirupsen/logrus"
"os"
)

Expand Down

0 comments on commit 9d6b974

Please sign in to comment.