-
Notifications
You must be signed in to change notification settings - Fork 13
/
churn.go
259 lines (209 loc) · 7.08 KB
/
churn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
// Analyse churn rate of a set of consensuses.
package main
import (
"fmt"
"log"
"math"
"reflect"
"sync"
"time"
tor "git.torproject.org/user/phw/zoossh.git"
)
const (
Appeared = true
Disappeared = false
)
// RelayFlags holds the relay flags that will be analysed.
var RelayFlags = []string{
"Authority",
"BadExit",
"Exit",
"Fast",
"Guard",
"HSDir",
"Named",
"Running",
"Stable",
"Unnamed",
"V2Dir",
"Valid"}
// Churn holds two churn values, for relays that went online and relays that
// went offline.
type Churn struct {
Online float64
Offline float64
}
// PerFlagMovAvg maps a relay flag, e.g., "Guard", to a moving average struct.
type PerFlagMovAvg map[string]*MovingAverage
// MovingAverage represents a simple moving average.
type MovingAverage struct {
WindowIndex int
WindowSize int
WindowFill int
Window []Churn
}
// NewMovingAverage allocates and returns a new moving average struct.
func NewMovingAverage(windowSize int) *MovingAverage {
return &MovingAverage{WindowIndex: 0, WindowSize: windowSize, Window: make([]Churn, windowSize)}
}
// CalcAvg determines and returns the mean of the moving average window.
func (ma *MovingAverage) CalcAvg() Churn {
var total Churn
for i := 0; i < ma.WindowSize; i++ {
total.Online += ma.Window[i].Online
total.Offline += ma.Window[i].Offline
}
total.Online /= float64(ma.WindowSize)
total.Offline /= float64(ma.WindowSize)
return total
}
// AddValue adds a churn value to the moving average window.
func (ma *MovingAverage) AddValue(val Churn) {
if ma.WindowFill < ma.WindowSize {
ma.WindowFill++
}
ma.Window[ma.WindowIndex].Online = val.Online
ma.Window[ma.WindowIndex].Offline = val.Offline
ma.WindowIndex = (ma.WindowIndex + 1) % ma.WindowSize
}
// IsWindowFull returns true if the moving average's window is full.
func (ma *MovingAverage) IsWindowFull() bool {
return ma.WindowFill == ma.WindowSize
}
// dumpChurnRelays dumps the given relays to stderr for manual analysis.
func dumpChurnRelays(relays *tor.Consensus, prefix string, date time.Time) {
// Sort relays by nickname.
nickname := func(relay1, relay2 tor.GetStatus) bool {
return relay1().Nickname < relay2().Nickname
}
sliceRelays := relays.ToSlice()
By(nickname).Sort(sliceRelays)
for _, getStatus := range sliceRelays {
status := getStatus()
log.Printf("%s <https://atlas.torproject.org/#details/%s> %s\n",
prefix, status.Fingerprint, status.Nickname)
}
}
// determineChurn determines and returns the churn rate of the two given
// subsequent consensuses. For the churn rate, we adapt the formula shown in
// Section 2.1 of: <http://www.cs.berkeley.edu/~istoica/papers/2006/churn.pdf>.
func determineChurn(prevConsensus, newConsensus *tor.Consensus) Churn {
goneRelays := prevConsensus.Subtract(newConsensus)
newRelays := newConsensus.Subtract(prevConsensus)
max := math.Max(float64(prevConsensus.Length()), float64(newConsensus.Length()))
newChurn := (float64(newRelays.Length()) / max)
goneChurn := (float64(goneRelays.Length()) / max)
return Churn{newChurn, goneChurn}
}
// FilterConsensusByFlag filters the given consensus so that only relays with
// the given flag remain. The resulting consensus is returned.
func FilterConsensusByFlag(consensus *tor.Consensus, flag string) *tor.Consensus {
filteredConsensus := tor.NewConsensus()
for fingerprint, getStatus := range consensus.RouterStatuses {
status := getStatus()
s := reflect.ValueOf(&status.Flags).Elem()
flagSet := s.FieldByName(flag).Interface()
// Check if relay has the flag we are looking for.
if flagSet == true {
filteredConsensus.Set(fingerprint, status)
}
}
return filteredConsensus
}
// DeterminePerFlagChurn determines the churn rate between two subsequent
// consensuses for all relays with a given flag. For example, for all relays
// with the "Guard" flag, we get a churn value for relays that went online and a
// churn value for relays that went offline. A set of relays is dumped to
// stderr once a churn value exceeds the given threshold.
func DeterminePerFlagChurn(prevConsensus, newConsensus *tor.Consensus, movAvg PerFlagMovAvg, params *CmdLineParams) {
var line string
for _, flag := range RelayFlags {
prevFiltered := FilterConsensusByFlag(prevConsensus, flag)
newFiltered := FilterConsensusByFlag(newConsensus, flag)
churn := determineChurn(prevFiltered, newFiltered)
// Determine moving average for captured churn values.
movAvg[flag].AddValue(churn)
churn = movAvg[flag].CalcAvg()
if !movAvg[flag].IsWindowFull() {
continue
}
if churn.Online >= params.Threshold {
dumpChurnRelays(newFiltered.Subtract(prevFiltered), "+"+flag, newConsensus.ValidAfter)
}
if churn.Offline >= params.Threshold {
dumpChurnRelays(prevFiltered.Subtract(newFiltered), "-"+flag, newConsensus.ValidAfter)
}
if params.CSVFormat == longCSVFormat {
fmt.Printf("%s", newConsensus.ValidAfter.Format("2006-01-02T15:04:05Z"))
for _, noFlag := range RelayFlags {
if noFlag != flag {
fmt.Printf(",NA")
} else {
fmt.Printf(",T")
}
}
fmt.Printf(",%.5f,%.5f\n", churn.Online, churn.Offline)
} else {
if line == "" {
line += fmt.Sprintf("%s", newConsensus.ValidAfter.Format("2006-01-02T15:04:05Z"))
}
line += fmt.Sprintf(",%.5f,%.5f", churn.Online, churn.Offline)
}
}
if line != "" {
fmt.Println(line)
}
}
// AnalyseChurn determines the churn rates of a set of consecutive consensuses.
// If the churn rate exceeds the given threshold, all new and disappeared
// relays are dumped to stderr.
func AnalyseChurn(channel chan tor.ObjectSet, params *CmdLineParams, group *sync.WaitGroup) {
defer group.Done()
var newConsensus, prevConsensus *tor.Consensus
if params.WindowSize <= 0 {
log.Printf("Window size set to %d, but cannot be smaller than 1. Setting it to 1.", params.WindowSize)
params.WindowSize = 1
}
log.Printf("Threshold for churn analysis is %.5f.\n", params.Threshold)
// Print CSV header, either in long or wide format.
fmt.Print("Date")
for _, flag := range RelayFlags {
if params.CSVFormat == longCSVFormat {
fmt.Printf(",%s", flag)
} else {
fmt.Printf(",New%s,Gone%s", flag, flag)
}
}
if params.CSVFormat == longCSVFormat {
fmt.Print(",NewChurn,GoneChurn")
}
fmt.Println()
movAvg := make(PerFlagMovAvg)
for _, flag := range RelayFlags {
movAvg[flag] = NewMovingAverage(params.WindowSize)
}
// Every loop iteration processes one consensus. We compare consensus t
// to consensus t - 1.
for objects := range channel {
switch obj := objects.(type) {
case *tor.Consensus:
newConsensus = obj
default:
log.Fatalln("Only router status files are supported for churn analysis.")
}
if prevConsensus == nil {
prevConsensus = newConsensus
continue
}
// Are we missing consensuses?
if prevConsensus.ValidAfter.Add(time.Hour) != newConsensus.ValidAfter {
log.Printf("Missing consensuses between %s and %s.\n",
prevConsensus.ValidAfter.Format(time.RFC3339),
newConsensus.ValidAfter.Format(time.RFC3339))
prevConsensus = newConsensus
continue
}
DeterminePerFlagChurn(prevConsensus, newConsensus, movAvg, params)
prevConsensus = newConsensus
}
}