This repository has been archived by the owner on Feb 16, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
shard.go
220 lines (190 loc) · 4.37 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package dilithium
import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"github.com/titanous/guid"
)
type Shard interface {
// Get the parent of the shard. nil if the Shard is the root.
Parent() Shard
Children() []Shard
SetParent(s Shard)
AddChild(s Shard)
// RemoveChild removes the child shard with ID() == id.
// The receiver is responsible for calling Destroy() on the child shard.
RemoveChild(id string)
Query(q *Query) error
// ID returns the shard ID. It must uniquely identify the shard.
ID() string
// Setup is called to setup the shard with the specified config.
Setup(config map[string]interface{}) error
// Config returns a JSON-serializable config that could be passed to Setup to recreate this shard.
Config() map[string]interface{}
// Destroy is called when the shard is removed from the tree. The shard must call Destroy on child shards.
Destroy()
// The shard must be locked before unmarshalling the configuration into it.
sync.Locker
}
type ReplicateShard struct {
parent Shard
children []Shard
id string
sync.RWMutex
}
func (r *ReplicateShard) Parent() Shard {
r.RLock()
p := r.parent
r.RUnlock()
return p
}
func (r *ReplicateShard) Children() []Shard {
r.RLock()
c := r.children
r.RUnlock()
return c
}
func (r *ReplicateShard) SetParent(s Shard) {
r.Lock()
r.parent = s
r.Unlock()
}
func (r *ReplicateShard) AddChild(s Shard) {
r.Lock()
r.children = append(r.children, s)
r.Unlock()
}
func (r *ReplicateShard) RemoveChild(id string) {
r.Lock()
for i, s := range r.children {
if s.ID() == id {
s.Destroy()
// remove the element by setting it to the last element and truncating
r.children[i] = r.children[len(r.children)-1]
r.children[len(r.children)-1] = nil
r.children = r.children[:len(r.children)-1]
break
}
}
r.Unlock()
}
func (r *ReplicateShard) ID() string {
r.RLock()
id := r.id
r.RUnlock()
return id
}
func (r *ReplicateShard) Setup(config map[string]interface{}) error {
r.Lock()
id, _ := guid.NextId()
r.id = strconv.FormatInt(id, 10)
r.Unlock()
return nil
}
func (r *ReplicateShard) Config() map[string]interface{} {
return nil
}
func (r *ReplicateShard) Destroy() {
r.Lock()
for _, s := range r.children {
s.Destroy()
}
}
func (r *ReplicateShard) Query(q *Query) (err error) {
r.RLock()
if q.ReadOnly() {
err = r.children[rand.Intn(len(r.children))].Query(q)
} else {
for _, s := range r.children {
s.Query(q)
}
}
r.RUnlock()
return
}
type PhysicalShard struct {
parent Shard
sync.RWMutex
config map[string]interface{}
pool *Pool
}
func (p *PhysicalShard) Parent() Shard {
p.RLock()
pa := p.parent
p.RUnlock()
return pa
}
func (p *PhysicalShard) SetParent(s Shard) {
p.Lock()
p.parent = s
p.Unlock()
}
func (p *PhysicalShard) Children() []Shard {
return nil
}
func (p *PhysicalShard) AddChild(s Shard) {
// no-op
}
func (p *PhysicalShard) RemoveChild(id string) {
// no-op
}
func (p *PhysicalShard) Setup(config map[string]interface{}) error {
p.Lock()
defer p.Unlock()
u, ok := config["url"]
if !ok {
return errors.New("dilithium: Missing 'url' in PhysicalShard config")
}
url, ok := u.(string)
if !ok {
return errors.New("dilithium: Unexpected type for PhysicalShard 'url' config, expecting string")
}
pn, ok := config["pool"]
if !ok {
return errors.New("dilithium: Missing 'pool' in PhysicalShard config")
}
poolName, ok := pn.(string)
if !ok {
return errors.New("dilithium: Unexpected type for PhysicalShard 'pool' config, expecting string")
}
poolTypesMtx.RLock()
defer poolTypesMtx.RUnlock()
poolType, ok := poolTypes[poolName]
if !ok {
return fmt.Errorf("dilithium: Unknown PhysicalShard pool type '%s'", poolName)
}
p.pool = &Pool{url: url, Dial: poolType.Dial, TestOnBorrow: poolType.TestOnBorrow, MaxIdle: poolType.MaxIdle, IdleTimeout: poolType.IdleTimeout}
p.config = config
return nil
}
func (p *PhysicalShard) Config() map[string]interface{} {
p.RLock()
defer p.RUnlock()
return p.config
}
func (p *PhysicalShard) Destroy() {
p.Lock()
p.pool.Close()
}
func (p *PhysicalShard) ID() string {
p.RLock()
defer p.RUnlock()
return p.pool.url
}
func (p *PhysicalShard) Query(q *Query) error {
p.RLock()
defer p.RUnlock()
// TODO: error handling
conn, err := p.pool.Get()
defer conn.Close()
if err != nil {
return err
}
return q.Run(conn.c)
}
func init() {
RegisterShardType(&ReplicateShard{})
RegisterShardType(&PhysicalShard{})
}