/
server.hs
478 lines (443 loc) · 21.9 KB
/
server.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
module Main where
import qualified Data.ByteString.Lazy.Char8 as C
import qualified System.Event as Ev
import Network.Socket (SockAddr)
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Array.IArray
import Data.IORef
import Control.Monad.State.Lazy
import System.Environment
import Data.Maybe (fromMaybe)
import Control.Monad.Maybe
import Data.Time.Clock.POSIX
import Data.List (intercalate, sortBy, groupBy, nub)
import Data.Ix (inRange)
import Text.Tabular
import qualified Text.Tabular.AsciiArt as AA
import qualified ControlSocket
import EventLog
import KRPC
import qualified Node as Node
import NodeId
import qualified BEncoding as B
import InState
data ServerState = Server NodeId (Array Int Bucket) TrackerAddrs TrackerStats
deriving (Show)
type Bucket = Map NodeId Peer
data PeerState = Good | Questionable | Bad
deriving (Eq, Ord, Show)
data Peer = Peer { peerState :: PeerState,
peerAddress :: SockAddr,
peerLastReply :: Time,
peerLastSend :: Time
}
deriving (Show, Eq)
type TrackerStats = Map NodeId (Integer, Integer)
type TrackerAddrs = [SockAddr]
type Time = POSIXTime
type ServerAction a = StateT ServerState IO a
param_n = 160
param_k = 8
param_k_nonfull = param_k * 3
-- Startup
main = do args <- getArgs
case args of
[portS, logPath, ctlPath] ->
do nodeId <- makeRandomNodeId
putStrLn $ "Random nodeId: " ++ show nodeId
run (read portS) logPath ctlPath nodeId []
[portS, logPath, ctlPath, torrentFile] ->
do metaInfo <- (fromMaybe $ error "Unable to metainfo file") `liftM`
runMaybeT (MaybeT (B.parseFile torrentFile))
infoHash <- (fromMaybe $ error "No infohash") `liftM`
runMaybeT (MaybeT $ B.infoHash metaInfo)
let bootstrapNodes = fromMaybe [] $
do B.BList nodes <- metaInfo ?< "nodes"
return $
map (\(B.BList [B.BString host, B.BInteger port]) ->
(C.unpack host, port)
) $
filter (\node ->
case node of
B.BList [B.BString _, B.BInteger _] ->
True
_ ->
False
) nodes
let nodeId = makeNodeId infoHash
putStrLn $ "Extracted infoHash: " ++ show nodeId
run (read portS) logPath ctlPath nodeId bootstrapNodes
_ ->
do progName <- getProgName
putStrLn $ "Usage: " ++ progName ++ " <port> <log-path> <ctl-path> [node-id-source.torrent]"
run port logPath ctlPath nodeId bootstrapNodes
= do log <- newLog 1.0 logPath
mgr <- Ev.new
node <- Node.new mgr port
refServer <- newIORef $
Server nodeId (array (0, param_n) $
zip [0..param_n] $ repeat Map.empty) [] Map.empty
let inContext = withServer refServer
Node.setQueryHandler (\addr query ->
withServer refServer $
handleQuery log addr query
) node
Node.setReplyHandler (\addr reply ->
withServer refServer $
handleReply addr reply
) node
withServer refServer $
case bootstrapNodes of
[] ->
do liftIO $ putStrLn "No bootstrap nodes, using router.bittorrent.com"
bootstrap node "router.bittorrent.com" 6881
_ ->
do liftIO $ putStrLn $ "Bootstrap nodes: " ++ show bootstrapNodes
forM_ bootstrapNodes $ \(host, port) ->
bootstrap node host port
ControlSocket.listenSocket mgr ctlPath $
\command ->
withServer refServer $
case command of
["buckets"] ->
listBuckets
["nodeid"] ->
(++ "\n") `liftM` show `liftM` getNodeId
["peers"] ->
do now <- liftIO $ getPOSIXTime
myNodeId <- getNodeId
Server _ buckets _ _ <- get
let peers = concatMap Map.toList $ elems buckets
return $ AA.render id id id $
Table (Group SingleLine $
map (Group NoLine) $
map (map $ Header . show . peerAddress . snd) $
groupBy (\(nodeId1, _) (nodeId2, _) ->
(myNodeId `distanceOrder` nodeId1)
==
(myNodeId `distanceOrder` nodeId2)
) $
peers)
(Group SingleLine [Header "State",
Header "Dist",
Header "NodeId",
Header "Last send",
Header "Last reply"])
[[show $ peerState peer,
show $ myNodeId `distanceOrder` nodeId,
show $ nodeId,
showTimeDiff now (peerLastSend peer),
showTimeDiff now (peerLastReply peer)]
| (nodeId, peer) <- peers]
["tracker"] ->
do Server _ _ _ stats <- get
let stats' = Map.toList stats
return $ AA.render id id id $
Table (Group NoLine $
map (Header . show . fst) stats')
(Group SingleLine [Header "Reads",
Header "Writes"])
[[show r, show w]
| (_, (r, w)) <- stats']
["pclear"] ->
do Server myNodeId buckets _ stats <- get
put $ Server myNodeId buckets [] stats
return "Cleared\n"
["plist"] ->
do Server _ _ addrs _ <- get
return $ (intercalate " " $ map show addrs) ++ "\n"
"padd" : addrS ->
do Server myNodeId buckets addrs stats <- get
addrs' <- liftIO $
forM addrS $ \s ->
let (host, ':' : port) = break (== ':') s
in nub `liftM`
Node.getAddrs host port
let addrs'' = addrs ++ concat addrs'
put $ Server myNodeId buckets addrs'' stats
return "Added\n"
[] -> return ""
cmd:_ -> return $ "Unknown command " ++ show cmd ++ "\n"
let tick = do inContext $ schedule node
Ev.registerTimeout mgr 100 tick
return ()
tick
Ev.loop mgr
where showTimeDiff _ 0 = "Never"
showTimeDiff now t = show $ now - t
withServer :: IORef ServerState -> ServerAction a -> IO a
withServer = refInStateT
getNodeId :: ServerAction NodeId
getNodeId = get >>= \(Server nodeId _ _ _) -> return nodeId
bootstrap :: Node.Node -> String -> Integer -> ServerAction ()
bootstrap node host port
= do entryAddrs <- liftIO $
Node.getAddrs host (show port)
myNodeId <- getNodeId
forM_ entryAddrs $
\addr ->
sendRequest node myNodeId addr
handleQuery :: Logger -> SockAddr -> Query
-> ServerAction (Either Error Reply)
handleQuery log addr query
= do liftIO $ putStrLn $ "Query from " ++ show addr ++ ": " ++ show query
liftIO $ log query
let nodeId = case query of
Ping nodeId -> nodeId
FindNode nodeId _ -> nodeId
GetPeers nodeId _ -> nodeId
AnnouncePeer nodeId _ _ _ -> nodeId
seen nodeId addr
myNodeId <- getNodeId
let myNodeId' = nodeIdToBuf myNodeId
case query of
Ping nodeId ->
return $
Right $
B.bdict [("id", B.BString myNodeId')]
FindNode nodeId target ->
do nodes <- take 8 `liftM` selectNodesFor target
liftIO $ putStrLn $ "Selected for " ++ show target ++ ":\n" ++ show nodes
return $
Right $
B.bdict [("id", B.BString myNodeId'),
("nodes", B.BString $ encodeNodes nodes)
]
GetPeers nodeId infoHash ->
do let token = C.pack "foo"
nodes <- take 8 `liftM`
filter (\(nodeId, addr) ->
(nodeId <-> infoHash) < (myNodeId <-> infoHash)
) `liftM` selectNodesFor infoHash
case nodes of
-- No nearer nodes to infoHash, we are responsible
[] ->
do addrs <- trackGet infoHash
return $
Right $
B.bdict [("id", B.BString myNodeId'),
("token", B.BString token),
("values", B.BList $ map (B.BString . encodeAddr) addrs)]
-- Nearer nodes available, act like FindNode with
-- token. No idea why token would be needed in this
-- case?
_ ->
do liftIO $ putStrLn $ "Redirecting GetPeers for " ++ show infoHash
return $
Right $
B.bdict [("id", B.BString myNodeId'),
("token", B.BString token),
("nodes", B.BString $ encodeNodes nodes)]
AnnouncePeer nodeId infoHash port token ->
do let valid = True
case valid of
True ->
do trackPeer infoHash addr port
return $
Right $
B.bdict [("id", B.BString myNodeId')]
False ->
return $
Left $
Error 203 $ C.pack "Bad token"
_ ->
return $
Left $
Error 201 $ C.pack "Not implemented"
handleReply :: SockAddr -> Reply -> ServerAction ()
handleReply addr reply
= do case reply ?< "id" of
Just (B.BString nodeId') ->
let nodeId = makeNodeId nodeId'
in receivedReply nodeId addr
Nothing ->
return ()
case reply ?< "nodes" of
Just (B.BString nodes') ->
let nodes = decodeNodes nodes'
in forM_ nodes $
\(nodeId, addr) ->
seen nodeId addr
class BucketIndex a where
getBucket :: a -> ServerAction Bucket
putBucket :: a -> Bucket -> ServerAction ()
instance BucketIndex Int where
getBucket n
= do Server _ buckets _ _ <- get
return $ buckets ! n
putBucket n bucket
= do Server nodeId buckets addrs stats <- get
put $ Server nodeId (buckets // [(n, bucket)]) addrs stats
instance BucketIndex NodeId where
getBucket nodeId
= do myNodeId <- getNodeId
getBucket $ nodeId `distanceOrder` myNodeId
putBucket nodeId bucket
= do myNodeId <- getNodeId
putBucket (nodeId `distanceOrder` myNodeId) bucket
selectNodesFor :: NodeId -> ServerAction [(NodeId, SockAddr)]
selectNodesFor target
= do d <- distanceOrder target `liftM` getNodeId
concat `liftM`
mapM (\n ->
map (\(nodeId, peer) ->
(nodeId, peerAddress peer)
) `liftM`
sortBy (\(nodeId1, peer1) (nodeId2, peer2) ->
(target <-> nodeId1) `compare`
(target <-> nodeId2)
) `liftM`
filter (\(nodeId, peer) ->
peerState peer == Good
) `liftM`
Map.toList `liftM`
getBucket n
) (filter (inRange (0, param_n)) $
reverse [0..d] ++ [(d + 1)..160])
isFull :: Bucket -> Bool
isFull
= (>= 8) .
Map.size .
Map.filter ((== Good) . peerState)
seen :: NodeId -> SockAddr -> ServerAction ()
seen nodeId addr
= do bucket <- getBucket nodeId
isMe <- (nodeId ==) `liftM` getNodeId
when (not $ isMe || isFull bucket) $
putBucket nodeId $
Map.alter update nodeId bucket
where update Nothing
= Just $ Peer { peerState = Questionable,
peerAddress = addr,
peerLastReply = 0,
peerLastSend = 0
}
update (Just peer)
= Just $ case (peerState peer,
addr == peerAddress peer) of
-- Never allow address update of good peer
(Good, _) -> peer
-- Clear Bad ones with updated address
(Bad, False) -> Peer { peerAddress = addr,
peerState = Questionable,
peerLastReply = 0,
peerLastSend = 0
}
-- But others will be updated
(_, False) -> peer { peerAddress = addr,
peerState = Questionable
}
-- No change
(_, True) -> peer
receivedReply :: NodeId -> SockAddr -> ServerAction ()
receivedReply nodeId addr
= do now <- liftIO getPOSIXTime
bucket <- getBucket nodeId
case Map.lookup nodeId bucket of
-- Reply from a known peer
Just peer ->
do liftIO $ putStrLn $ show addr ++ " replied"
putBucket nodeId $
Map.insert nodeId (peer { peerState = Good,
peerLastReply = now
}) bucket
-- Who is this? Do we need this one?
Nothing ->
do liftIO $ putStrLn $ show addr ++ " replied unexpectedly"
seen nodeId addr
queryInterval = 5 * 60
timeout = 60
schedule :: Node.Node -> ServerAction ()
schedule node
= liftIO getPOSIXTime >>= \now ->
getNodeId >>= \myNodeId ->
forM_ [0..param_n] $ \n ->
do bucket <- getBucket n
let (io, bucket') =
case isFull bucket of
True ->
Map.mapAccum (\io peer ->
if peerLastSend peer + queryInterval <= now
then (io >> sendRequest node myNodeId (peerAddress peer),
peer { peerLastSend = now})
else
if peerLastSend peer > peerLastReply peer &&
peerLastReply peer + timeout <= now
then (io, peer { peerState = Bad})
else (io, peer)
) (return ()) $
cutBucket param_k bucket
False ->
Map.mapAccum (\io peer ->
if peerLastSend peer + queryInterval <= now
then (io >> sendRequest node myNodeId (peerAddress peer),
peer { peerLastSend = now})
else
if peerLastSend peer > peerLastReply peer &&
peerLastReply peer + timeout <= now
then (io, peer { peerState = Bad})
else (io, peer)
) (return ()) $
cutBucket param_k_nonfull bucket
putBucket n bucket'
io
where cutBucket n bucket
| Map.size bucket <= n = bucket
| otherwise = Map.fromList $
take n $
sortPeers $
Map.toList bucket
sortPeers = sortBy (\(nodeId1, peer1) (nodeId2, peer2) ->
(peerState peer1 `compare` peerState peer2)
`compareFurther`
(peerLastReply peer1 `compare` peerLastReply peer2)
)
compareFurther EQ = id
compareFurther ord = const ord
sendRequest :: Node.Node -> NodeId -> SockAddr -> ServerAction ()
sendRequest node target dest
= do myNodeId <- getNodeId
let query = FindNode myNodeId target
liftIO $ putStrLn $ "Sending request to " ++ show dest ++ " for " ++ show target
liftIO $ Node.sendQueryNoWait dest query node
listBuckets :: ServerAction String
listBuckets
= concat `liftM`
forM [0..param_n] (\n ->
getBucket n >>= \bucket ->
if Map.size bucket > 0
then return $
show n ++ (if isFull bucket
then "*"
else "") ++ ": " ++
intercalate " " (map showPeer $ Map.toList bucket) ++
"\n"
else return ""
)
where showPeer (nodeId, peer)
= showAddress (peerAddress peer) ++
"[" ++ take 1 (show $ peerState peer) ++ "]"
showAddress = fst . break (== ':') . show
trackPeer :: NodeId -> SockAddr -> Integer -> ServerAction ()
trackPeer infoHash _addr _port
= do Server myNodeId buckets addrs stats <- get
let stats' = Map.alter (\old -> Just $
case old of
Nothing -> (0, 1)
Just (r, w) -> (r, w + 1)
) infoHash stats
stats' `seq`
put $ Server myNodeId buckets addrs stats'
trackGet :: NodeId -> ServerAction TrackerAddrs
trackGet infoHash
= do Server myNodeId buckets addrs stats <- get
let stats' = Map.alter (\old -> Just $
case old of
Nothing -> (1, 0)
Just (r, w) -> (r + 1, w)
) infoHash stats
stats' `seq`
put $ Server myNodeId buckets addrs stats'
return addrs
(?<) = B.bdictLookup