/
listen.go
127 lines (109 loc) · 3.41 KB
/
listen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package cli
import (
"github.com/dairlair/sentimentd/pkg/interface/cli/util"
stan "github.com/nats-io/go-nats-streaming"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
// QueueCreator describes dependency which is used to run-time getting connection to NATS Streaming
type QueueCreator func() (stan.Conn, string, string)
type producer interface {
}
type processor func(input string) []Prediction
type consumer func([]byte)
// NewCmdListen returns command for a queue listening.
// @TODO This command should be completely refactored.
func (runner *CommandsRunner) NewCmdListen(queueCreator QueueCreator) *cobra.Command {
return &cobra.Command{
Use: "listen",
Short: "Listen a configured queue and push analysed message to queue",
Run: func(cmd *cobra.Command, args []string) {
var brainReferences []string
if len(args) == 0 {
// Try to gen brain references from config
brainReferences = viper.GetStringSlice("listen.brains")
} else {
brainReferences = args
}
log.Infof("Listen brains: %v", brainReferences)
brainsMap := getBrainsMap(runner, brainReferences)
if len(brainsMap) == 0 {
log.Fatalf("No brains found, exit...")
}
queue, source, target := queueCreator()
var pr processor = func(input string) []Prediction {
return analyse(runner, brainsMap, input)
}
readFromReaderAndWriteToWrite(queue, source, target, pr)
},
}
}
func readFromReaderAndWriteToWrite(conn stan.Conn, source string, target string, pr processor) {
var cb consumer = func(data []byte) {
err := conn.Publish(target, data)
if err != nil {
log.Errorf("Prediction results publish failed: %s", err)
}
log.Infof("Message published: %s", data)
}
subscription, err := conn.Subscribe(source, func(msg *stan.Msg) {
processJSONAndPushBack(cb, string(msg.Data), func(text string) []Prediction {
return pr(text)
})
msg.Ack()
})
if err != nil {
log.Fatalf("Can not subscribe to channel [%s]: %s", source, err)
}
util.WaitInterruption(func() {
_ = subscription.Close()
})
}
func processJSONAndPushBack(cb consumer, json string, analyser func(text string) []Prediction) {
text := gjson.Get(json, "originText")
sentiment := analyser(text.Str)
data, err := sjson.Set(json, "sentiment", sentiment)
if err != nil {
log.Errorf("JSON Modification failed: %s", err)
return
}
cb([]byte(data))
}
func analyse(runner *CommandsRunner, brainsMap map[int64]string, text string) []Prediction {
predictions := make([]Prediction, 0)
for brainID, brainReference := range brainsMap {
prediction, err := runner.app.HumanizedPredict(brainID, text)
if err != nil {
log.Errorf("Prediction failed. %s", err)
continue
}
for class, probability := range prediction.Probabilities {
predictions = append(predictions, Prediction{
Brain: brainReference,
Class: class,
Probability: probability,
})
}
}
return predictions
}
func getBrainsMap(runner *CommandsRunner, references []string) map[int64]string {
brainsMap := make(map[int64]string)
for _, reference := range references {
brain, err := runner.app.GetBrainByReference(reference)
if err != nil {
log.Error(err)
} else {
brainsMap[brain.GetID()] = brain.GetName()
}
}
return brainsMap
}
type Prediction struct {
Brain string `json:"brain"`
Class string `json:"class"`
Probability float64 `json:"probability"`
}