-
Notifications
You must be signed in to change notification settings - Fork 19
/
thing.go
executable file
·204 lines (169 loc) · 6.16 KB
/
thing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package api
import (
"errors"
"fmt"
entities "github.com/gost/core"
gostErrors "github.com/gost/server/errors"
"github.com/gost/server/sensorthings/odata"
)
// GetThing returns a thing entity based on the given id and QueryOptions
// returns an error when the entity cannot be found
func (a *APIv1) GetThing(id interface{}, qo *odata.QueryOptions, path string) (*entities.Thing, error) {
t, err := a.db.GetThing(id, qo)
if err != nil {
return nil, err
}
a.SetLinks(t, qo)
return t, nil
}
// GetThingByDatastream returns a thing entity based on the given datastream id and QueryOptions
func (a *APIv1) GetThingByDatastream(id interface{}, qo *odata.QueryOptions, path string) (*entities.Thing, error) {
t, err := a.db.GetThingByDatastream(id, qo)
if err != nil {
return nil, err
}
a.SetLinks(t, qo)
return t, nil
}
// GetThingsByLocation returns things based on the given location id and QueryOptions
func (a *APIv1) GetThingsByLocation(id interface{}, qo *odata.QueryOptions, path string) (*entities.ArrayResponse, error) {
things, count, hasNext, err := a.db.GetThingsByLocation(id, qo)
return processThings(a, things, qo, path, count, hasNext, err)
}
// GetThingByHistoricalLocation returns a thing entity based on the given HistoricalLocation id and QueryOptions
func (a *APIv1) GetThingByHistoricalLocation(id interface{}, qo *odata.QueryOptions, path string) (*entities.Thing, error) {
t, err := a.db.GetThingByHistoricalLocation(id, qo)
if err != nil {
return nil, err
}
a.SetLinks(t, qo)
return t, nil
}
// GetThings returns an array of thing entities based on the QueryOptions
func (a *APIv1) GetThings(qo *odata.QueryOptions, path string) (*entities.ArrayResponse, error) {
things, count, hasNext, err := a.db.GetThings(qo)
return processThings(a, things, qo, path, count, hasNext, err)
}
func processThings(a *APIv1, things []*entities.Thing, qo *odata.QueryOptions, path string, count int, hasNext bool, err error) (*entities.ArrayResponse, error) {
if err != nil {
return nil, err
}
for idx, item := range things {
i := *item
a.SetLinks(&i, qo)
things[idx] = &i
}
var data interface{} = things
return a.createArrayResponse(count, hasNext, path, qo, data), nil
}
// PostThing checks if a posted thing entity is valid and adds it to the database
// a posted thing can also contain Locations and DataStreams
func (a *APIv1) PostThing(thing *entities.Thing) (*entities.Thing, []error) {
var err []error
var err2 error
_, err = containsMandatoryParams(thing)
if len(err) > 0 {
return nil, err
}
nt, err2 := a.db.PostThing(thing)
if err2 != nil {
return nil, []error{err2}
}
var postedLocations []*entities.Location
var postedDatastreams []*entities.Datastream
// Handle deep insert locations
if thing.Locations != nil {
for _, l := range thing.Locations {
// New location posted
if l.ID == nil { //Id is null so a new location is posted
var nl *entities.Location
if nl, err = a.PostLocationByThing(nt.ID, l); len(err) > 0 {
a.reverseInserts(nt, postedLocations, postedDatastreams)
err = append(err, gostErrors.NewConflictRequestError(errors.New("Location deep insert went wrong")))
return nil, err
}
postedLocations = append(postedLocations, nl)
} else { // posted id: link
if err2 = a.LinkLocation(nt.ID, l.ID); len(err) > 0 {
a.reverseInserts(nt, postedLocations, postedDatastreams)
err = append(err, gostErrors.NewConflictRequestError(errors.New("Location linking went wrong")))
err = append(err, err2)
return nil, err
}
hl := &entities.HistoricalLocation{
Thing: nt,
Locations: []*entities.Location{l},
}
hl.ContainsMandatoryParams()
if _, err = a.PostHistoricalLocation(hl); len(err) > 0 {
a.reverseInserts(nt, postedLocations, postedDatastreams)
err = append(err, gostErrors.NewConflictRequestError(errors.New("Creating Historical Location went wrong")))
return nil, err
}
}
}
}
// Handle deep insert datastreams
if thing.Datastreams != nil {
for _, d := range thing.Datastreams {
// New location posted
if d.ID == nil { //Id is null so a new datastream is posted
var nd *entities.Datastream
if nd, err = a.PostDatastreamByThing(nt.ID, d); err != nil {
a.reverseInserts(nt, postedLocations, postedDatastreams)
err = append(err, gostErrors.NewConflictRequestError(errors.New("Creating Datastrean went wrong")))
return nil, err
}
postedDatastreams = append(postedDatastreams, nd)
} else {
a.reverseInserts(nt, postedLocations, postedDatastreams)
err = append(err, gostErrors.NewConflictRequestError(errors.New("ID found for deep inserted datastream, linking to an existing Datastream is not allowed")))
return nil, err
}
}
}
nt.SetAllLinks(a.config.GetExternalServerURI())
//push to mqtt
a.sendOverMQTT(nt, "Things")
return nt, nil
}
func (a *APIv1) reverseInserts(thing *entities.Thing, locations []*entities.Location, datastreams []*entities.Datastream) {
for _, datastream := range datastreams {
a.DeleteDatastream(datastream.ID)
}
for _, location := range locations {
a.DeleteLocation(location.ID)
}
a.DeleteThing(thing.ID)
}
// DeleteThing deletes a given Thing from the database
func (a *APIv1) DeleteThing(id interface{}) error {
return a.db.DeleteThing(id)
}
// PatchThing updates the given thing in the database
func (a *APIv1) PatchThing(id interface{}, thing *entities.Thing) (*entities.Thing, error) {
if thing.Datastreams != nil || thing.HistoricalLocations != nil || isDeepPatchLocations(thing.Locations) {
return nil, gostErrors.NewBadRequestError(errors.New("Unable to deep patch Thing"))
}
return a.db.PatchThing(id, thing)
}
// PutThing updates the given thing in the database
func (a *APIv1) PutThing(id interface{}, thing *entities.Thing) (*entities.Thing, []error) {
var err error
putthing, err := a.db.PutThing(id, thing)
if err != nil {
return nil, []error{err}
}
putthing.SetAllLinks(a.config.GetExternalServerURI())
return putthing, nil
}
func isDeepPatchLocations(locations []*entities.Location) bool {
if locations != nil {
for _, l := range locations {
if l.ID == nil || len(fmt.Sprintf("%v", l.ID)) == 0 {
return true
}
}
}
return false
}