forked from project-iris/iris
-
Notifications
You must be signed in to change notification settings - Fork 0
/
routing.go
208 lines (189 loc) · 6.03 KB
/
routing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Iris - Decentralized cloud messaging
// Copyright (c) 2013 Project Iris. All rights reserved.
//
// Iris is dual licensed: you can redistribute it and/or modify it under the
// terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The framework is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// Alternatively, the Iris framework may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).
// This file contains the routing logic in the overlay network, which currently
// is a simplified version of Pastry: the leafset and routing table is the same,
// but no proximity metric is taken into consideration.
//
// Beside the above, it also contains the system event processing logic.
package pastry
import (
"log"
"math/big"
"net"
"github.com/project-iris/iris/proto"
)
// Pastry routing algorithm.
func (o *Overlay) route(src *peer, msg *proto.Message) {
// Sync the routing table
o.lock.RLock() // Note, unlock is in deliver and forward!!!
// Extract some vars for easier access
tab := o.routes
dest := msg.Head.Meta.(*header).Dest
// Check the leaf set for direct delivery
// TODO: corner cases with if only handful of nodes?
// TODO: binary search with idSlice could be used (worthwhile?)
if delta(tab.leaves[0], dest).Sign() >= 0 && delta(dest, tab.leaves[len(tab.leaves)-1]).Sign() >= 0 {
best := tab.leaves[0]
dist := Distance(best, dest)
for _, leaf := range tab.leaves[1:] {
if d := Distance(leaf, dest); d.Cmp(dist) < 0 {
best, dist = leaf, d
}
}
// If self, deliver, otherwise forward
if o.nodeId.Cmp(best) == 0 {
o.deliver(src, msg)
} else {
o.forward(src, msg, best)
}
return
}
// Check the routing table for indirect delivery
pre, col := prefix(o.nodeId, dest)
if best := tab.routes[pre][col]; best != nil {
o.forward(src, msg, best)
return
}
// Route to anybody closer than the local node
dist := Distance(o.nodeId, dest)
for _, peer := range tab.leaves {
if p, _ := prefix(peer, dest); p >= pre && Distance(peer, dest).Cmp(dist) < 0 {
o.forward(src, msg, peer)
return
}
}
for _, row := range tab.routes {
for _, peer := range row {
if peer != nil {
if p, _ := prefix(peer, dest); p >= pre && Distance(peer, dest).Cmp(dist) < 0 {
o.forward(src, msg, peer)
return
}
}
}
}
// Well, shit. Deliver locally and hope for the best.
o.deliver(src, msg)
}
// Delivers a message to the application layer or processes it if a system message.
func (o *Overlay) deliver(src *peer, msg *proto.Message) {
head := msg.Head.Meta.(*header)
if head.Op != opNop {
o.process(src, head)
o.lock.RUnlock()
} else {
// Remove all overlay infos from the message and send upwards
o.lock.RUnlock()
msg.Head.Meta = head.Meta
o.app.Deliver(msg, head.Dest)
}
}
// Forwards a message to the node with the given id and also checks its contents
// if it's a system message.
func (o *Overlay) forward(src *peer, msg *proto.Message, id *big.Int) {
head := msg.Head.Meta.(*header)
if head.Op != opNop {
// Overlay system message, process and forward
o.process(src, head)
p, ok := o.livePeers[id.String()]
o.lock.RUnlock()
if ok {
o.send(msg, p)
}
return
}
// Upper layer message, pass up and check if forward is needed
o.lock.RUnlock()
msg.Head.Meta = head.Meta
allow := o.app.Forward(msg, head.Dest)
// Forwarding was allowed, repack headers and send
if allow {
o.lock.RLock()
p, ok := o.livePeers[id.String()]
o.lock.RUnlock()
if ok {
head.Meta = msg.Head.Meta
msg.Head.Meta = head
o.send(msg, p)
}
}
}
// Processes overlay system messages: for joins it simply responds with the
// local state, whilst for state updates if verifies the timestamps and merges
// if newer, also always replying if a repair request was included. Finally the
// heartbeat messages are checked and two-way idle connections dropped.
func (o *Overlay) process(src *peer, head *header) {
// Notify the heartbeat mechanism that source is alive
o.heart.heart.Ping(src.nodeId)
// Extract the remote id and state
remId, remState := head.Dest.String(), head.State
switch head.Op {
case opJoin:
// Discard self joins (rare race condition during update)
if o.nodeId.Cmp(head.Dest) == 0 {
return
}
// Node joining into currents responsibility list
if p, ok := o.livePeers[remId]; !ok {
// Connect new peers and let the handshake do the state exchange
peerAddrs := make([]*net.TCPAddr, 0, len(remState.Addrs[remId]))
for _, a := range remState.Addrs[remId] {
if addr, err := net.ResolveTCPAddr("tcp", a); err != nil {
log.Printf("pastry: failed to resolve address %v: %v.", a, err)
} else {
peerAddrs = append(peerAddrs, addr)
}
}
o.authInit.Schedule(func() { o.dial(peerAddrs) })
} else {
// Handshake should have already sent state, unless local isn't joined either
if o.stat != done {
o.stateExch.Schedule(func() { o.sendState(p) })
}
}
case opRepair:
// Respond to any repair requests
o.stateExch.Schedule(func() { o.sendState(src) })
case opActive:
// Ensure the peer is set to an active state
src.passive = false
case opPassive:
// If remote connection reported passive after being already registered as
// such locally too, drop the connection.
if src.passive && !o.active(src.nodeId) {
o.lock.RUnlock()
o.drop(src)
o.lock.RLock()
}
case opExchage:
// State update, merge into local if new
if remState.Version > src.time {
src.time = remState.Version
// Make sure we don't cause a deadlock if blocked
o.lock.RUnlock()
o.exch(src, remState)
o.lock.RLock()
}
case opClose:
// Remote side requested a graceful close
o.lock.RUnlock()
o.drop(src)
o.lock.RLock()
default:
log.Printf("pastry: unknown system message: %+v", head)
}
}