Skip to content

Commit

Permalink
Support custom DHT servers
Browse files Browse the repository at this point in the history
Addresses #266.
  • Loading branch information
anacrolix committed Feb 20, 2020
1 parent 71f18f1 commit afe4d87
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 25 deletions.
32 changes: 17 additions & 15 deletions client.go
Expand Up @@ -58,7 +58,7 @@ type Client struct {
onClose []func()
dialers []Dialer
listeners []Listener
dhtServers []*dht.Server
dhtServers []DhtServer
ipBlockList iplist.Ranger
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
extensionBytes pp.PeerExtensionBits
Expand Down Expand Up @@ -101,12 +101,10 @@ func (cl *Client) LocalPort() (port int) {
return
}

func writeDhtServerStatus(w io.Writer, s *dht.Server) {
func writeDhtServerStatus(w io.Writer, s DhtServer) {
dhtStats := s.Stats()
fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
spew.Fdump(w, dhtStats)
}

// Writes out a human readable status of the client, such as for writing to a
Expand All @@ -120,7 +118,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
cl.eachDhtServer(func(s *dht.Server) {
cl.eachDhtServer(func(s DhtServer) {
fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
writeDhtServerStatus(w, s)
})
Expand Down Expand Up @@ -237,11 +235,11 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
if !cfg.NoDHT {
for _, s := range sockets {
if pc, ok := s.(net.PacketConn); ok {
ds, err := cl.newDhtServer(pc)
ds, err := cl.newAnacrolixDhtServer(pc)
if err != nil {
panic(err)
}
cl.dhtServers = append(cl.dhtServers, ds)
cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
cl.onClose = append(cl.onClose, func() { ds.Close() })
}
}
Expand All @@ -250,6 +248,10 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
return
}

func (cl *Client) AddDhtServer(d DhtServer) {
cl.dhtServers = append(cl.dhtServers, d)
}

// Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
// given address for any Torrent.
func (cl *Client) AddDialer(d Dialer) {
Expand Down Expand Up @@ -300,7 +302,7 @@ func (cl *Client) listenNetworks() (ns []network) {
return
}

func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
cfg := dht.ServerConfig{
IPBlocklist: cl.ipBlockList,
Conn: conn,
Expand Down Expand Up @@ -335,7 +337,7 @@ func (cl *Client) Closed() <-chan struct{} {
return cl.closed.C()
}

func (cl *Client) eachDhtServer(f func(*dht.Server)) {
func (cl *Client) eachDhtServer(f func(DhtServer)) {
for _, ds := range cl.dhtServers {
f(ds)
}
Expand Down Expand Up @@ -929,14 +931,14 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
}

func (cl *Client) dhtPort() (ret uint16) {
cl.eachDhtServer(func(s *dht.Server) {
cl.eachDhtServer(func(s DhtServer) {
ret = uint16(missinggo.AddrPort(s.Addr()))
})
return
}

func (cl *Client) haveDhtServer() (ret bool) {
cl.eachDhtServer(func(_ *dht.Server) {
cl.eachDhtServer(func(_ DhtServer) {
ret = true
})
return
Expand Down Expand Up @@ -1071,7 +1073,7 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
new = true

t = cl.newTorrent(infoHash, specStorage)
cl.eachDhtServer(func(s *dht.Server) {
cl.eachDhtServer(func(s DhtServer) {
go t.dhtAnnouncer(s)
})
cl.torrents[infoHash] = t
Expand Down Expand Up @@ -1188,7 +1190,7 @@ func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
return cl.AddTorrent(mi)
}

func (cl *Client) DhtServers() []*dht.Server {
func (cl *Client) DhtServers() []DhtServer {
return cl.dhtServers
}

Expand All @@ -1206,7 +1208,7 @@ func (cl *Client) AddDHTNodes(nodes []string) {
Port: hmp.Port,
},
}
cl.eachDhtServer(func(s *dht.Server) {
cl.eachDhtServer(func(s DhtServer) {
s.AddNode(ni)
})
}
Expand Down
8 changes: 4 additions & 4 deletions client_test.go
Expand Up @@ -316,8 +316,8 @@ func TestDHTInheritBlocklist(t *testing.T) {
require.NoError(t, err)
defer cl.Close()
numServers := 0
cl.eachDhtServer(func(s *dht.Server) {
assert.Equal(t, ipl, s.IPBlocklist())
cl.eachDhtServer(func(s DhtServer) {
assert.Equal(t, ipl, s.(anacrolixDhtServerWrapper).IPBlocklist())
numServers++
})
assert.EqualValues(t, 2, numServers)
Expand Down Expand Up @@ -434,8 +434,8 @@ func TestAddMetainfoWithNodes(t *testing.T) {
require.NoError(t, err)
defer cl.Close()
sum := func() (ret int64) {
cl.eachDhtServer(func(s *dht.Server) {
ret += s.Stats().OutboundQueriesAttempted
cl.eachDhtServer(func(s DhtServer) {
ret += s.Stats().(dht.ServerStats).OutboundQueriesAttempted
})
return
}
Expand Down
5 changes: 2 additions & 3 deletions connection.go
Expand Up @@ -12,7 +12,6 @@ import (
"sync"
"time"

"github.com/anacrolix/dht/v2"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/iter"
Expand Down Expand Up @@ -1060,8 +1059,8 @@ func (c *connection) mainReadLoop() (err error) {
if msg.Port != 0 {
pingAddr.Port = int(msg.Port)
}
cl.eachDhtServer(func(s *dht.Server) {
go s.Ping(&pingAddr, nil)
cl.eachDhtServer(func(s DhtServer) {
go s.Ping(&pingAddr)
})
case pp.Suggest:
torrent.Add("suggests received", 1)
Expand Down
51 changes: 51 additions & 0 deletions dht.go
@@ -0,0 +1,51 @@
package torrent

import (
"io"
"net"

"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc"
)

type DhtServer interface {
Stats() interface{}
ID() [20]byte
Addr() net.Addr
AddNode(ni krpc.NodeInfo) error
Ping(addr *net.UDPAddr)
Announce(hash [20]byte, port int, impliedPort bool) (DhtAnnounce, error)
WriteStatus(io.Writer)
}

type DhtAnnounce interface {
Close()
Peers() <-chan dht.PeersValues
}

type anacrolixDhtServerWrapper struct {
*dht.Server
}

func (me anacrolixDhtServerWrapper) Stats() interface{} {
return me.Server.Stats()
}

type anacrolixDhtAnnounceWrapper struct {
*dht.Announce
}

func (me anacrolixDhtAnnounceWrapper) Peers() <-chan dht.PeersValues {
return me.Announce.Peers
}

func (me anacrolixDhtServerWrapper) Announce(hash [20]byte, port int, impliedPort bool) (DhtAnnounce, error) {
ann, err := me.Server.Announce(hash, port, impliedPort)
return anacrolixDhtAnnounceWrapper{ann}, err
}

func (me anacrolixDhtServerWrapper) Ping(addr *net.UDPAddr) {
me.Server.Ping(addr, nil)
}

var _ DhtServer = anacrolixDhtServerWrapper{}
6 changes: 3 additions & 3 deletions torrent.go
Expand Up @@ -1359,12 +1359,12 @@ func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
}
}

func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error {
func (t *Torrent) announceToDht(impliedPort bool, s DhtServer) error {
ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
if err != nil {
return err
}
go t.consumeDhtAnnouncePeers(ps.Peers)
go t.consumeDhtAnnouncePeers(ps.Peers())
select {
case <-t.closed.LockedChan(t.cl.locker()):
case <-time.After(5 * time.Minute):
Expand All @@ -1373,7 +1373,7 @@ func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error {
return nil
}

func (t *Torrent) dhtAnnouncer(s *dht.Server) {
func (t *Torrent) dhtAnnouncer(s DhtServer) {
cl := t.cl
for {
select {
Expand Down

0 comments on commit afe4d87

Please sign in to comment.