forked from linksmart/historical-datastore
/
registry.go
129 lines (116 loc) · 3.95 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2016 Fraunhofer Institute for Applied Information Technology FIT
// Package registry implements Registry API
package registry
import (
"code.linksmart.eu/hds/historical-datastore/common"
"encoding/json"
"fmt"
"hash/crc32"
"net/url"
"sort"
"strings"
)
// Registry describes a registry of registered Data Sources
type Registry struct {
// URL is the URL of the Registry API
URL string `json:"url"`
// Entries is an array of Data Sources
Entries []DataSource `json:"entries"`
// Page is the current page in Entries pagination
Page int `json:"page"`
// PerPage is the results per page in Entries pagination
PerPage int `json:"per_page"`
// Total is the total #of pages in Entries pagination
Total int `json:"total"`
}
// DataSource describes a single data source such as a sensor (LinkSmart Resource)
type DataSource struct {
keepSensitiveInfo bool
// ID is a unique ID of the data source
ID string `json:"id"`
// URL is the URL of the Data Source in the Registry API
URL string `json:"url"`
// Data is the URL to the data of this Data Source Data API
Data string `json:"data"`
// Resource URI (i.e., name in SenML)
Resource string `json:"resource"`
// Meta is a hash-map with optional meta-information
Meta map[string]interface{} `json:"meta"`
// Data connector
Connector Connector `json:"connector"`
// Retention is the retention duration for data
Retention string `json:"retention"`
// Aggregation is an array of configured aggregations
Aggregation []Aggregation `json:"aggregation"`
// Type is the values type used in payload
Type string `json:"type"`
}
// MarshalJSON masks sensitive information when using the default marshaller
func (ds DataSource) MarshalJSON() ([]byte, error) {
if !ds.keepSensitiveInfo {
// mask MQTT credentials and key paths
if ds.Connector.MQTT != nil {
if ds.Connector.MQTT.Username != "" {
ds.Connector.MQTT.Username = "*****"
}
if ds.Connector.MQTT.Password != "" {
ds.Connector.MQTT.Password = "*****"
}
if ds.Connector.MQTT.CaFile != "" {
ds.Connector.MQTT.CaFile = "*****"
}
if ds.Connector.MQTT.CertFile != "" {
ds.Connector.MQTT.CertFile = "*****"
}
if ds.Connector.MQTT.KeyFile != "" {
ds.Connector.MQTT.KeyFile = "*****"
}
}
}
type Alias DataSource
return json.Marshal((*Alias)(&ds))
}
// MarshalSensitiveJSON serializes the datasource including the sensitive information
func (ds DataSource) MarshalSensitiveJSON() ([]byte, error) {
ds.keepSensitiveInfo = true
return json.Marshal(&ds)
}
// Connector describes additional connectors to the Data API
type Connector struct {
MQTT *MQTTConf `json:"mqtt,omitempty"`
}
// MQTT describes a MQTT Connector
type MQTTConf struct {
URL string `json:"url"`
Topic string `json:"topic"`
QoS byte `json:"qos"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
CaFile string `json:"caFile,omitempty"`
CertFile string `json:"certFile,omitempty"`
KeyFile string `json:"keyFile,omitempty"`
}
func (ds *DataSource) ParsedResource() *url.URL {
parsedResource, _ := url.Parse(ds.Resource)
return parsedResource
}
// Aggregation describes a data aggregatoin for a Data Source
type Aggregation struct {
ID string `json:"id"`
// Interval is the aggregation interval
Interval string `json:"interval"`
// Data is the URL to the data in the Aggregate API
Data string `json:"data"`
// Aggregates is an array of aggregates calculated on each interval
// Valid values: mean, stddev, sum, min, max, median
Aggregates []string `json:"aggregates"`
// Retention is the retention duration
Retention string `json:"retention"`
}
// Generate ID and Data attributes for a given aggregation
// ID is the checksum of aggregation interval and all its aggregates
func (a *Aggregation) Make(dsID string) {
sort.Strings(a.Aggregates)
a.ID = fmt.Sprintf("%x", crc32.ChecksumIEEE([]byte(a.Interval+strings.Join(a.Aggregates, ""))))
a.Data = fmt.Sprintf("%s/%s/%s", common.AggrAPILoc, a.ID, dsID)
}