This repository has been archived by the owner on Aug 30, 2022. It is now read-only.
/
pipeline.go
123 lines (104 loc) · 2.85 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/*
*
* In The Name of God
*
* +===============================================
* | Author: Parham Alvani <parham.alvani@gmail.com>
* |
* | Creation Date: 02-08-2018
* |
* | File Name: pipeline.go
* +===============================================
*/
package core
import (
"context"
"fmt"
"runtime"
"github.com/sirupsen/logrus"
)
func (a *Application) project() {
// this thread is mine
runtime.LockOSThread()
logrus.WithFields(logrus.Fields{
"component": "link",
}).Info("project pipeline stage has started")
for d := range a.projectStream {
// find the thing in I1820/pm
t, err := a.TMService.Show(d.ThingID)
if err != nil {
logrus.WithFields(logrus.Fields{
"component": "link",
}).Errorf("tm show: %s", err)
} else {
d.Project = t.Project
d.Model = t.Model
}
if d.Project != "" && d.Model == "generic" {
// publish raw data to decode it in runner
if err := a.ns.Publish(fmt.Sprintf("/i1820/%s/raw", d.Project), d); err != nil {
logrus.WithFields(logrus.Fields{
"component": "link",
}).Errorf("nats produce: %s", err)
}
logrus.WithFields(logrus.Fields{
"component": "link",
}).Infof("publish raw data: %s", d.Project)
}
a.decodeStream <- d
}
a.projectWG.Done()
}
func (a *Application) decode() {
// this thread is mine
runtime.LockOSThread()
logrus.WithFields(logrus.Fields{
"component": "link",
}).Info("decode pipeline stage has started")
for d := range a.decodeStream {
// run decode when data is coming from thing with project and it needs decode.
if d.Project != "" && d.Data == nil {
if d.Model != "generic" {
m, ok := a.models[d.Model]
if !ok {
// data will be parsed in project docker and pushed into mqtt parsed channel
logrus.WithFields(logrus.Fields{
"component": "link",
}).Errorf("model %s not found (setting the model will improves performance)", d.Model)
} else {
d.Data = m.Decode(d.Raw)
// publish parsed data
if err := a.ns.Publish(fmt.Sprintf("/i1820/%s/parsed", d.Project), d); err != nil {
logrus.WithFields(logrus.Fields{
"component": "link",
}).Errorf("nats produce: %s", err)
}
logrus.WithFields(logrus.Fields{
"component": "link",
}).Infof("publish parsed data: %s", d.Project)
}
}
}
a.insertStream <- d
}
a.decodeWG.Done()
}
func (a *Application) insert() {
// this thread is mine
runtime.LockOSThread()
logrus.WithFields(logrus.Fields{
"component": "link",
}).Info("insert pipeline stage has started")
for d := range a.insertStream {
if err := a.Store.Insert(context.Background(), d); err != nil {
logrus.WithFields(logrus.Fields{
"component": "link",
}).Errorf("insert into mongodb error: %s", err)
} else {
logrus.WithFields(logrus.Fields{
"component": "link",
}).Infof("insert into mongodb: %#v", d)
}
}
a.insertWG.Done()
}