Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove lens dependency #39

Merged
merged 5 commits into from Jul 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 1 addition & 4 deletions functorrent.cabal
Expand Up @@ -48,7 +48,6 @@ library
QuickCheck,
tasty,
tasty-hunit,
lens,
transformers,
unix

Expand All @@ -70,7 +69,6 @@ executable functorrent
network,
network-uri,
parsec,
lens,
transformers,
unix

Expand Down Expand Up @@ -118,5 +116,4 @@ test-suite control-thread-tests
QuickCheck,
tasty,
tasty-hunit,
functorrent,
lens
functorrent
11 changes: 6 additions & 5 deletions src/FuncTorrent/Bencode.hs
Expand Up @@ -12,14 +12,15 @@ module FuncTorrent.Bencode

import Prelude hiding (length, concat)

import Control.Applicative ((<*))
import Control.Applicative ((<*)) -- This will cause a warning in 7.10.
import Data.ByteString (ByteString, length, concat)
import Data.ByteString.Char8 (unpack, pack)
import Data.Functor ((<$>))
import Data.Functor ((<$>)) -- This will cause a warning in 7.10.
import Data.Map.Strict (Map, fromList, toList)

import Test.QuickCheck
import Text.ParserCombinators.Parsec
import qualified Text.Parsec.ByteString as ParsecBS
import Test.QuickCheck

data BVal = Bint Integer
| Bstr ByteString
Expand Down Expand Up @@ -146,8 +147,8 @@ bencVal = Bstr <$> bencStr <|>
Bdict <$> bencDict

decode :: ByteString -> Either String BVal
decode bs = case (parse bencVal "BVal" bs) of
Left e -> Left "Unable to parse torrent file"
decode bs = case parse bencVal "BVal" bs of
Left _ -> Left "Unable to parse torrent file"
Right torrent -> Right torrent

-- Encode BVal into a bencoded ByteString. Inverse of decode
Expand Down
120 changes: 35 additions & 85 deletions src/FuncTorrent/ControlThread.hs
@@ -1,31 +1,27 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}

module FuncTorrent.ControlThread where

import Control.Concurrent
import GHC.Conc
import Control.Lens
import Control.Monad hiding (forM, forM_, mapM, mapM_, msum, sequence, sequence_)
import Data.IORef
import Control.Monad hiding (
forM , forM_ , mapM , mapM_ , msum , sequence , sequence_ )
import GHC.Conc

import FuncTorrent.Tracker (TrackerResponse(..), tracker, mkTrackerResponse, peers)
import FuncTorrent.Bencode (decode)
import FuncTorrent.Metainfo (Metainfo(..))
import FuncTorrent.Tracker (TrackerResponse(..), tracker, mkTrackerResponse, peers)

import FuncTorrent.Peer
import FuncTorrent.Peer (Peer(..))
import FuncTorrent.PeerThread
import FuncTorrent.PeerThreadData

data ControlThread = ControlThread {
_metaInfo :: Metainfo
, _trackerResponses :: [TrackerResponse]
, _peerList :: [Peer]
, _peerThreads :: [(PeerThread, ThreadId)]
-- , _diskIO_Handle :: Handle
, _controlTStatus :: IORef ControlThreadStatus
, _controlTAction :: IORef ControlThreadAction
data ControlThread = ControlThread
{ metaInfo :: Metainfo
, trackerResponses :: [TrackerResponse]
, peerList :: [Peer]
, peerThreads :: [(PeerThread, ThreadId)]
, controlTStatus :: IORef ControlThreadStatus
, controlTAction :: IORef ControlThreadAction
}

data ControlThreadStatus =
Expand All @@ -41,9 +37,6 @@ data ControlThreadAction =
| Stop
deriving (Eq, Show)

makeLenses ''ControlThread


-- Description
-- ControlThread handles all operations for a single torrent
-- It is responsible for
Expand All @@ -66,8 +59,8 @@ controlThreadMain ct =
doInitialization :: ControlThread -> IO ControlThread
doInitialization ct =
getTrackerResponse ct >>= \x ->
let peerInit = take 4 $ x^.peerList
in foldM forkPeerThread x peerInit
let peerInit = take 4 (peerList x)
in foldM forkPeerThread x peerInit

mainLoop :: ControlThread -> IO ControlThread
mainLoop ct =
Expand All @@ -82,20 +75,22 @@ mainLoop ct =
checkAction

where
checkAction :: ControlThread -> IO ControlThread
checkAction ct1 = do
putStrLn "Check action"
putStrLn "Check control thread action"
-- TODO: This will cause a 4s delay b/w a ^C and the app going down
threadDelay $ 4*1000*1000
action <- readIORef $ view controlTAction ct1
action <- readIORef $ controlTAction ct1
case action of
FuncTorrent.ControlThread.Stop -> return ct1
_ -> mainLoop ct1

doExit :: ControlThread -> IO ()
doExit ct = do
putStrLn "Doing control-thread exit"
let peerTs = ct ^. peerThreads
let peerTs = peerThreads ct
-- let the peer threads stop themselves
mapM_ ((setPeerThreadAction FuncTorrent.PeerThreadData.Stop).fst) peerTs
mapM_ (setPeerThreadAction FuncTorrent.PeerThreadData.Stop . fst) peerTs

-- Let the threads run for a while
-- We may add delay also if required
Expand All @@ -108,97 +103,52 @@ doExit ct = do
-- if they are blocked due to disk write, then wait and retry
-- if thread not responding then kill the thread

unless (null (ct1 ^. peerThreads)) $ doExit ct1
unless (null (peerThreads ct1)) $ doExit ct1

where
clearFinishedThreads :: ControlThread -> IO ControlThread
clearFinishedThreads ct1 = do
remainingThreads <- filterM isRunning $ ct1 ^. peerThreads
return (ct1 & peerThreads .~ remainingThreads)
remainingThreads <- filterM isRunning (peerThreads ct1)
return (ct1 {peerThreads = remainingThreads})
where
isRunning (_,tid) =
threadStatus tid >>= (\x -> return $ ThreadFinished /= x)

getTrackerResponse :: ControlThread -> IO ControlThread
getTrackerResponse ct = do
response <- tracker (ct^.metaInfo) "-HS0001-*-*-20150215"
response <- tracker (metaInfo ct) "-HS0001-*-*-20150215"

-- TODO: Write to ~/.functorrent/caches
-- writeFile (name (info m) ++ ".cache") response

case decode response of
Right trackerInfo ->
case mkTrackerResponse trackerInfo of
Right trackerResp ->
let ct1 = trackerResponses %~ (trackerResp : ) $ ct
ct2 = peerList %~ ((peers trackerResp) ++) $ ct1
in return ct2
let newTrackerResponses = trackerResp : trackerResponses ct
newPeerList = peerList ct ++ peers trackerResp
in return ct {trackerResponses = newTrackerResponses,
peerList = newPeerList}
Left _ -> putStrLn "mkTracker error" >> return ct
Left _ -> putStrLn "tracker resp decode error" >> return ct

-- Forks a peer-thread and add it to the peerThreads list
forkPeerThread :: ControlThread -> Peer -> IO ControlThread
forkPeerThread ct p = do
pt <- initPeerThread p
return (peerThreads %~ (pt : ) $ ct) -- Append pt to peerThreads

-- First try to stop and let the thread exit gracefully.
-- If it does not work, then give thread a kill signal
killPeerThread :: ControlThread -> (PeerThread, ThreadId) -> IO ControlThread
killPeerThread _ _ = undefined
let newPeerThreads = pt : peerThreads ct -- Append pt to peerThreads
return ct { peerThreads = newPeerThreads}

-- Piece Management Stuff

pieceManagement :: ControlThread -> IO ControlThread
pieceManagement ct = do
putStrLn "Doing Piece Management"
putStrLn "Manage pieces"
return ct
--let peerTs = map fst $ ct^.peerThreads
--s <- getIncrementalPeerThreadStatus peerTs
--p <- samplePieceAvailability peerTs
--let u = incrementalJobAssign s p []
--do updatePeerPieceQueue u
-- return ct

updatePeerPieceQueue :: [(PeerThread, [Piece])] -> IO ()
updatePeerPieceQueue =
mapM_ (\x -> do
ts <- takeMVar $ fst x ^. transferStats
let tsnew = queuePieces .~ snd x $ ts
putMVar (fst x ^.transferStats) tsnew)


-- Get information about what pieces are currently downloading + downloaded after the previous status update
getIncrementalPeerThreadStatus :: [PeerThread] -> IO [(PeerThread, [Piece])]
getIncrementalPeerThreadStatus =
mapM (\x -> do
ts <- takeMVar $ x^.transferStats
let ps = ts^.activePieces ++ ts^.downloadedInc
tsnew = downloadedInc .~ [] $ ts
putMVar (x^.transferStats) tsnew
return (x,ps))


-- Sample current piece availability
samplePieceAvailability :: [PeerThread] -> IO [(PeerThread, [Piece])]
samplePieceAvailability = mapM (\x -> do
y <- takeMVar $ x^.peerPieces
return (x,y))

-- Uses the piece availability to distribute the download jobs to peers
-- This should be used to initialize the job distribution
initialJobAssign :: [(PeerThread, [Piece])] -> [(PeerThread, [Piece])]
initialJobAssign p = p

-- Take the initial job assignment, availability and the progress
-- of each peer to decide incremental job distribution.
-- This API also need to do load-balancing
-- Additionaly this can also compute the peer ranking
incrementalJobAssign :: [(PeerThread, [Piece])] -> [(PeerThread, [Piece])] -> [(PeerThread, [Piece])] -> [(PeerThread, [Piece])]
incrementalJobAssign p _ _ = p

filterBadPeers :: ControlThread -> IO ControlThread
filterBadPeers ct = putStrLn "FilterBadPeer" >> return ct
filterBadPeers ct = do
putStrLn "Filter bad peers"
return ct

initControlThread :: Metainfo -> IO (ControlThread, ThreadId)
initControlThread m = do
Expand Down
37 changes: 16 additions & 21 deletions src/FuncTorrent/Peer.hs
Expand Up @@ -8,43 +8,38 @@ module FuncTorrent.Peer

import Prelude hiding (lookup, concat, replicate, splitAt)

import System.IO
import Control.Applicative (liftA3)
import Control.Monad (replicateM, liftM, forever)
import Data.Binary (Binary(..), decode)
import Data.Binary.Get (getWord32be, getWord16be, getWord8, runGet)
import Data.Binary.Put (putWord32be, putWord16be, putWord8)
import Data.ByteString (ByteString, pack, unpack, concat, hGet, hPut, singleton)
import Data.ByteString.Lazy (fromStrict, fromChunks)
import qualified Data.ByteString.Char8 as BC (replicate, pack)
import Data.Functor ((<$>)) -- This will cause a warning in 7.10.
import Network (connectTo, PortID(..))
import Data.Binary (Binary(..), decode)
import Data.Binary.Put (putWord32be, putWord16be, putWord8)
import Data.Binary.Get (getWord32be, getWord16be, getWord8, runGet)
import Control.Monad (replicateM, liftM, forever)
import Control.Applicative ((<$>), liftA3)
import System.IO
import qualified Data.ByteString.Char8 as BC (replicate, pack)

type ID = String
type IP = String
type Port = Integer

data PeerState = PeerState { handle :: Handle
, am_choking :: Bool
, am_interested :: Bool
, peer_choking :: Bool
, peer_interested :: Bool}

-- Maintain info on every piece and the current state of it.
-- should probably be a TVar.
type Pieces = [PieceData]
data PeerState = PeerState {
handle :: Handle
, amChoking :: Bool
, amInterested :: Bool
, peerChoking :: Bool
, peerInterested :: Bool
}

data PieceState = Pending
| InProgress
| Have
deriving (Show)

data PieceData = PieceData { index :: Int -- ^ Piece number
, peers :: [Peer] -- ^ list of peers who have this piece
, state :: PieceState } -- ^ state of the piece from download perspective.

-- | Peer is a PeerID, IP address, port tuple
data Peer = Peer ID IP Port
deriving (Show, Eq)
deriving (Show, Eq)

data PeerMsg = KeepAliveMsg
| ChokeMsg
Expand Down
13 changes: 6 additions & 7 deletions src/FuncTorrent/PeerThread.hs
Expand Up @@ -8,7 +8,6 @@ module FuncTorrent.PeerThread where
-- For each peer a separate instance of PeerThread is used

import Control.Concurrent
import Control.Lens
import Data.IORef

import FuncTorrent.Peer
Expand All @@ -21,19 +20,19 @@ import FuncTorrent.PeerThreadMain (peerThreadMain)
#endif


-- PeerThread is responsible for
-- PeerThread is responsible for
-- 1. Hand-shake with peer
-- 2. Keeping track of peer state and managing our state with peer.
-- This includes the choke/interested status and have properties.
--
--
-- 3. Initiate request to get data.
-- The main thread will allocate a bunch of blocks for fetching from the peer.
--
--
-- 4. Respond to data-request.
-- Algo to manage data-request
--
-- 5. Do data checking and disk IO. (Disk IO might be handled in a separate thread?)
--
--
-- 6. If needed, keep the connection alive.
--

Expand Down Expand Up @@ -65,10 +64,10 @@ stopPeerThread _ = undefined
-- Control thread will get status from this API
-- It should not block due to Peer-Thread
getPeerThreadStatus :: PeerThread -> IO (Maybe PeerThreadStatus)
getPeerThreadStatus pt = tryReadMVar $ pt^.peerTStatus
getPeerThreadStatus pt = tryReadMVar $ peerTStatus pt


-- Peer Thread may block, if no action is recieved from Control-thread
-- It may also kill itself if no communication from Control-thread for some time.
setPeerThreadAction :: PeerThreadAction -> PeerThread -> IO Bool
setPeerThreadAction a pt = tryPutMVar (pt^.peerTAction) a
setPeerThreadAction a pt = tryPutMVar (peerTAction pt) a