forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
commands.go
216 lines (179 loc) · 6.43 KB
/
commands.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package influxdb
import (
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
)
const (
// Data node messages
createDataNodeMessageType = messaging.MessageType(0x00)
deleteDataNodeMessageType = messaging.MessageType(0x01)
// Database messages
createDatabaseMessageType = messaging.MessageType(0x10)
dropDatabaseMessageType = messaging.MessageType(0x11)
// Retention policy messages
createRetentionPolicyMessageType = messaging.MessageType(0x20)
updateRetentionPolicyMessageType = messaging.MessageType(0x21)
deleteRetentionPolicyMessageType = messaging.MessageType(0x22)
setDefaultRetentionPolicyMessageType = messaging.MessageType(0x23)
// User messages
createUserMessageType = messaging.MessageType(0x30)
updateUserMessageType = messaging.MessageType(0x31)
deleteUserMessageType = messaging.MessageType(0x32)
// Shard messages
createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40)
deleteShardGroupMessageType = messaging.MessageType(0x41)
// Series messages
dropSeriesMessageType = messaging.MessageType(0x50)
// Measurement messages
createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x60)
dropMeasurementMessageType = messaging.MessageType(0x61)
// Continuous Query messages
createContinuousQueryMessageType = messaging.MessageType(0x70)
dropContinuousQueryMessageType = messaging.MessageType(0x71)
// Write series data messages (per-topic)
writeRawSeriesMessageType = messaging.MessageType(0x80)
// Privilege messages
setPrivilegeMessageType = messaging.MessageType(0x90)
)
type createDataNodeCommand struct {
URL string `json:"url"`
}
type deleteDataNodeCommand struct {
ID uint64 `json:"id"`
}
type createDatabaseCommand struct {
Name string `json:"name"`
}
type dropDatabaseCommand struct {
Name string `json:"name"`
}
type createShardGroupIfNotExistsCommand struct {
Database string `json:"database"`
Policy string `json:"policy"`
Timestamp time.Time `json:"timestamp"`
}
type deleteShardGroupCommand struct {
Database string `json:"database"`
Policy string `json:"policy"`
ID uint64 `json:"id"`
}
type createUserCommand struct {
Username string `json:"username"`
Password string `json:"password"`
Admin bool `json:"admin,omitempty"`
}
type updateUserCommand struct {
Username string `json:"username"`
Password string `json:"password,omitempty"`
}
type deleteUserCommand struct {
Username string `json:"username"`
}
type setPrivilegeCommand struct {
Privilege influxql.Privilege `json:"privilege"`
Username string `json:"username"`
Database string `json:"database"`
}
type createRetentionPolicyCommand struct {
Database string `json:"database"`
Name string `json:"name"`
Duration time.Duration `json:"duration"`
ShardGroupDuration time.Duration `json:"shardGroupDuration"`
ReplicaN uint32 `json:"replicaN"`
SplitN uint32 `json:"splitN"`
}
type updateRetentionPolicyCommand struct {
Database string `json:"database"`
Name string `json:"name"`
Policy *RetentionPolicyUpdate `json:"policy"`
}
type deleteRetentionPolicyCommand struct {
Database string `json:"database"`
Name string `json:"name"`
}
type setDefaultRetentionPolicyCommand struct {
Database string `json:"database"`
Name string `json:"name"`
}
type dropMeasurementCommand struct {
Database string `json:"database"`
Name string `json:"name"`
}
type createMeasurementSubcommand struct {
Name string `json:"name"`
Tags []map[string]string `json:"tags"`
Fields []*Field `json:"fields"`
marshaledTags map[string]struct{} // local cache...don't marshal
}
type createMeasurementsIfNotExistsCommand struct {
Database string `json:"database"`
Measurements []*createMeasurementSubcommand `json:"measurements"`
}
func newCreateMeasurementsIfNotExistsCommand(database string) *createMeasurementsIfNotExistsCommand {
return &createMeasurementsIfNotExistsCommand{
Database: database,
Measurements: make([]*createMeasurementSubcommand, 0),
}
}
// addMeasurementIfNotExists adds the Measurement to the command, but only if not already present
// in the command.
func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) *createMeasurementSubcommand {
for _, m := range c.Measurements {
if m.Name == name {
return m
}
}
m := &createMeasurementSubcommand{
Name: name,
Tags: make([]map[string]string, 0),
marshaledTags: make(map[string]struct{}, 0),
Fields: make([]*Field, 0),
}
c.Measurements = append(c.Measurements, m)
return m
}
// addSeriesIfNotExists adds the Series, identified by Measurement name and tag set, to
// the command, but only if not already present in the command.
func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement string, tags map[string]string) {
m := c.addMeasurementIfNotExists(measurement)
tagset := string(marshalTags(tags))
if _, ok := m.marshaledTags[tagset]; ok {
return
}
// Tag-set needs to added to subcommand.
m.Tags = append(m.Tags, tags)
// Store marshaled tags in local cache for performance.
m.marshaledTags[tagset] = struct{}{}
return
}
// addFieldIfNotExists adds the field to the command for the Measurement, but only if it is not already
// present. It will return an error if the field is present in the command, but is of a different type.
func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, name string, typ influxql.DataType) error {
m := c.addMeasurementIfNotExists(measurement)
for _, f := range m.Fields {
if f.Name == name {
if f.Type != typ {
return ErrFieldTypeConflict
}
// Field already present in subcommand with same type, nothing to do.
return nil
}
}
// New field for this measurement so add it to the subcommand.
newField := &Field{Name: name, Type: typ}
m.Fields = append(m.Fields, newField)
return nil
}
type dropSeriesCommand struct {
Database string `json:"database"`
SeriesByMeasurement map[string][]uint32 `json:"seriesIds"`
}
// createContinuousQueryCommand is the raft command for creating a continuous query on a database
type createContinuousQueryCommand struct {
Query string `json:"query"`
}
type dropContinuousQueryCommand struct {
Name string `json:"name"`
Database string `json:"database"`
}