Skip to content

Commit

Permalink
feat(p2p): peer to peer conn
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianLusina committed Jul 19, 2022
1 parent c8f7288 commit c7f35cb
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 0 deletions.
85 changes: 85 additions & 0 deletions internal/p2p/p2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package p2p

import (
"bytes"
"crypto/sha1"
"fmt"
"log"
"net"
"time"

"github.com/brianlusina/tclient/internal/client"
)

// MaxBlockSize is the largest number of bytes a request can ask for
const MaxBlockSize = 16384

// MaxBacklog is the number of unfulfilled requests a client can have in its pipeline
const MaxBacklog = 5

type pieceWork struct {
index int
hash [20]byte
length int
}

type pieceResult struct {
index int
buf []byte
}

func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
state := pieceProgress{
index: pw.index,
client: c,
buf: make([]byte, pw.length),
}

// Setting a deadline helps get unresponsive peers unstuck.
// 30 seconds is more than enough time to download a 262 KB piece
c.Conn.SetDeadline(time.Now().Add(30 * time.Second))

// defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline
defer func(conn net.Conn) {
err := conn.SetDeadline(time.Time{}) // Disable the deadline
if err != nil {
log.Fatalf("An error occurred setting connection deadline")
return
}
}(c.Conn)

for state.downloaded < pw.length {
// If unchoked, send requests until we have enough unfulfilled requests
if !state.client.Choked {
for state.backlog < MaxBacklog && state.requested < pw.length {
blockSize := MaxBlockSize
// Last block might be shorter than the typical block
if pw.length-state.requested < blockSize {
blockSize = pw.length - state.requested
}

err := c.SendRequest(pw.index, state.requested, blockSize)
if err != nil {
return nil, err
}
state.backlog++
state.requested += blockSize
}
}

err := state.readMessage()
if err != nil {
return nil, err
}
}

return state.buf, nil
}

func checkIntegrity(pw *pieceWork, buf []byte) error {
hash := sha1.Sum(buf)
if !bytes.Equal(hash[:], pw.hash[:]) {
return fmt.Errorf("Index %d failed integrity check", pw.index)
}
return nil
}
48 changes: 48 additions & 0 deletions internal/p2p/piece_progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package p2p

import (
"github.com/brianlusina/tclient/internal/client"
"github.com/brianlusina/tclient/pkg/message"
)

type pieceProgress struct {
index int
client *client.Client
buf []byte
downloaded int
requested int
backlog int
}

func (state *pieceProgress) readMessage() error {
msg, err := state.client.Read() // blocks

if err != nil {
return err
}

if msg == nil { // keep alive
return nil
}

switch msg.ID {
case message.MsgUnchoke:
state.client.Choked = false
case message.MsgChoke:
state.client.Choked = true
case message.MsgHave:
index, err := message.ParseHave(msg)
if err != nil {
return err
}
state.client.Bitfield.SetPiece(index)
case message.MsgPiece:
n, err := message.ParsePiece(state.index, state.buf, msg)
if err != nil {
return err
}
state.downloaded += n
state.backlog--
}
return nil
}
116 changes: 116 additions & 0 deletions internal/p2p/torrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package p2p

import (
"log"
"net"
"runtime"

"github.com/brianlusina/tclient/internal/client"
"github.com/brianlusina/tclient/pkg/peers"
)

// Torrent holds data required to download a torrent from a list of peers
type Torrent struct {
Peers []peers.Peer
PeerID [20]byte
InfoHash [20]byte
PieceHashes [][20]byte
PieceLength int
Length int
Name string
}

func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
c, err := client.New(peer, t.PeerID, t.InfoHash)
if err != nil {
log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP)
return
}

// defer c.Conn.Close()
defer func(conn net.Conn) {
err := conn.Close()
if err != nil {
log.Fatalf("An error occurred setting connection deadline")
return
}
}(c.Conn)

log.Printf("Completed handshake with %s\n", peer.IP)

c.SendUnchoke()
c.SendInterested()

for pw := range workQueue {
if !c.Bitfield.HasPiece(pw.index) {
workQueue <- pw // Put piece back on the queue
continue
}

// Download the piece
buf, err := attemptDownloadPiece(c, pw)
if err != nil {
log.Println("Exiting", err)
workQueue <- pw // Put piece back on the queue
return
}

err = checkIntegrity(pw, buf)
if err != nil {
log.Printf("Piece #%d failed integrity check\n", pw.index)
workQueue <- pw // Put piece back on the queue
continue
}

c.SendHave(pw.index)
results <- &pieceResult{pw.index, buf}
}
}

func (t *Torrent) calculateBoundsForPiece(index int) (begin int, end int) {
begin = index * t.PieceLength
end = begin + t.PieceLength
if end > t.Length {
end = t.Length
}
return begin, end
}

func (t *Torrent) calculatePieceSize(index int) int {
begin, end := t.calculateBoundsForPiece(index)
return end - begin
}

// Download downloads the torrent. This stores the entire file in memory.
func (t *Torrent) Download() ([]byte, error) {
log.Println("Starting download for", t.Name)
// Init queues for workers to retrieve work and send results
workQueue := make(chan *pieceWork, len(t.PieceHashes))
results := make(chan *pieceResult)
for index, hash := range t.PieceHashes {
length := t.calculatePieceSize(index)
workQueue <- &pieceWork{index, hash, length}
}

// Start workers
for _, peer := range t.Peers {
go t.startDownloadWorker(peer, workQueue, results)
}

// Collect results into a buffer until full
buf := make([]byte, t.Length)
donePieces := 0
for donePieces < len(t.PieceHashes) {
res := <-results
begin, end := t.calculateBoundsForPiece(res.index)
copy(buf[begin:end], res.buf)
donePieces++

percent := float64(donePieces) / float64(len(t.PieceHashes)) * 100
numWorkers := runtime.NumGoroutine() - 1 // subtract 1 for main thread
log.Printf("(%0.2f%%) Downloaded piece #%d from %d peers\n", percent, res.index, numWorkers)
}
close(workQueue)

return buf, nil
}

0 comments on commit c7f35cb

Please sign in to comment.