-
Notifications
You must be signed in to change notification settings - Fork 13
/
transformer.go
83 lines (70 loc) · 1.9 KB
/
transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package senml
import (
"github.com/MainfluxLabs/mainflux/pkg/errors"
"github.com/MainfluxLabs/mainflux/pkg/messaging"
"github.com/MainfluxLabs/mainflux/pkg/transformers"
"github.com/MainfluxLabs/senml"
)
const (
// JSON represents SenML in JSON format content type.
JSON = "application/senml+json"
// CBOR represents SenML in CBOR format content type.
CBOR = "application/senml+cbor"
)
var (
errDecode = errors.New("failed to decode senml")
errNormalize = errors.New("failed to normalize senml")
)
var formats = map[string]senml.Format{
JSON: senml.JSON,
CBOR: senml.CBOR,
}
type transformer struct {
format senml.Format
}
// New returns transformer service implementation for SenML messages.
func New() transformers.Transformer {
return transformer{}
}
func (t transformer) Transform(msg messaging.Message) (interface{}, error) {
contentFormat := msg.Profile.ContentType
format, ok := formats[contentFormat]
if !ok {
format = formats[JSON]
}
raw, err := senml.Decode(msg.Payload, format)
if err != nil {
return nil, errors.Wrap(errDecode, err)
}
normalized, err := senml.Normalize(raw)
if err != nil {
return nil, errors.Wrap(errNormalize, err)
}
msgs := make([]Message, len(normalized.Records))
for i, v := range normalized.Records {
// Use reception timestamp if SenML message Time is missing
t := v.Time
if t == 0 {
// Convert the Unix timestamp in nanoseconds to float64
t = float64(msg.Created) / float64(1e9)
}
msgs[i] = Message{
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Publisher: msg.Publisher,
Protocol: msg.Protocol,
Name: v.Name,
Unit: v.Unit,
Time: t,
UpdateTime: v.UpdateTime,
Value: v.Value,
BoolValue: v.BoolValue,
DataValue: v.DataValue,
StringValue: v.StringValue,
Sum: v.Sum,
}
}
return msgs, nil
}