-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
180 lines (162 loc) · 5.02 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright © 2017 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.
package device
import (
"fmt"
"sort"
"time"
"github.com/TheThingsNetwork/ttn/core/handler/device/migrate"
"github.com/TheThingsNetwork/ttn/core/storage"
"github.com/TheThingsNetwork/ttn/utils/errors"
"gopkg.in/redis.v5"
)
// Store interface for Devices
type Store interface {
Count() (int, error)
CountForApp(appID string) (int, error)
List(opts *storage.ListOptions) ([]*Device, error)
ListForApp(appID string, opts *storage.ListOptions) ([]*Device, error)
Get(appID, devID string) (*Device, error)
DownlinkQueue(appID, devID string) (DownlinkQueue, error)
Set(new *Device, properties ...string) (err error)
Delete(appID, devID string) error
AddBuiltinAttribute(attr ...string)
}
const defaultRedisPrefix = "handler"
const redisDevicePrefix = "device"
const redisDownlinkQueuePrefix = "downlink"
var defaultDeviceAttributes = []string{
"ttn-brand",
"ttn-model",
"ttn-version",
"ttn-antenna",
"ttn-module-type",
}
const (
maxDeviceAttributeKeyLength = 64
maxDeviceAttributeValueLength = 64
maxDeviceAttributes = 5
)
// NewRedisDeviceStore creates a new Redis-based Device store
func NewRedisDeviceStore(client *redis.Client, prefix string) *RedisDeviceStore {
if prefix == "" {
prefix = defaultRedisPrefix
}
store := storage.NewRedisMapStore(client, prefix+":"+redisDevicePrefix)
store.SetBase(Device{}, "")
for v, f := range migrate.DeviceMigrations(prefix) {
store.AddMigration(v, f)
}
queues := storage.NewRedisQueueStore(client, prefix+":"+redisDownlinkQueuePrefix)
s := &RedisDeviceStore{
store: store,
queues: queues,
}
s.AddBuiltinAttribute(defaultDeviceAttributes...)
return s
}
// RedisDeviceStore stores Devices in Redis.
// - Devices are stored as a Hash
type RedisDeviceStore struct {
store *storage.RedisMapStore
queues *storage.RedisQueueStore
builtinAttibutes []string // sorted
}
// Count all devices in the store
func (s *RedisDeviceStore) Count() (int, error) {
return s.store.Count("")
}
// CountForApp counts all devices for an Application
func (s *RedisDeviceStore) CountForApp(appID string) (int, error) {
return s.store.Count(fmt.Sprintf("%s:*", appID))
}
// List all Devices
func (s *RedisDeviceStore) List(opts *storage.ListOptions) ([]*Device, error) {
devicesI, err := s.store.List("", opts)
if err != nil {
return nil, err
}
devices := make([]*Device, len(devicesI))
for i, deviceI := range devicesI {
if device, ok := deviceI.(Device); ok {
devices[i] = &device
}
}
return devices, nil
}
// ListForApp lists all devices for a specific Application
func (s *RedisDeviceStore) ListForApp(appID string, opts *storage.ListOptions) ([]*Device, error) {
devicesI, err := s.store.List(fmt.Sprintf("%s:*", appID), opts)
if err != nil {
return nil, err
}
devices := make([]*Device, len(devicesI))
for i, deviceI := range devicesI {
if device, ok := deviceI.(Device); ok {
devices[i] = &device
}
}
return devices, nil
}
// Get a specific Device
func (s *RedisDeviceStore) Get(appID, devID string) (*Device, error) {
deviceI, err := s.store.Get(fmt.Sprintf("%s:%s", appID, devID))
if err != nil {
return nil, err
}
if device, ok := deviceI.(Device); ok {
return &device, nil
}
return nil, errors.New("Database did not return a Device")
}
// DownlinkQueue for a specific Device
func (s *RedisDeviceStore) DownlinkQueue(appID, devID string) (DownlinkQueue, error) {
return &RedisDownlinkQueue{
appID: appID,
devID: devID,
queues: s.queues,
}, nil
}
// Set a new Device or update an existing one
func (s *RedisDeviceStore) Set(new *Device, properties ...string) (err error) {
now := time.Now()
new.UpdatedAt = now
key := fmt.Sprintf("%s:%s", new.AppID, new.DevID)
if new.old == nil {
new.CreatedAt = now
}
customAttributeSlots := maxDeviceAttributes
for k, v := range new.Attributes {
if idx := sort.SearchStrings(s.builtinAttibutes, k); idx < len(s.builtinAttibutes) && s.builtinAttibutes[idx] == k {
continue
}
if len(k) > maxDeviceAttributeKeyLength {
return fmt.Errorf(`Attribute key "%s" exceeds maximum length (%d)`, k, maxDeviceAttributeKeyLength)
}
if len(v) > maxDeviceAttributeValueLength {
return fmt.Errorf(`Attribute value for key "%s" exceeds maximum length (%d)`, k, maxDeviceAttributeValueLength)
}
if customAttributeSlots < 1 {
return fmt.Errorf(`Maximum number of custom attributes (%d) exceeded`, maxDeviceAttributes)
}
customAttributeSlots--
}
err = s.store.Set(key, *new, properties...)
if err != nil {
return
}
return nil
}
// Delete a Device
func (s *RedisDeviceStore) Delete(appID, devID string) error {
key := fmt.Sprintf("%s:%s", appID, devID)
if err := s.queues.Delete(key); err != nil {
return err
}
return s.store.Delete(key)
}
// AddBuiltinAttribute adds builtin device attributes to the list.
func (s *RedisDeviceStore) AddBuiltinAttribute(attr ...string) {
s.builtinAttibutes = append(s.builtinAttibutes, attr...)
sort.Strings(s.builtinAttibutes)
}