Skip to content

Commit

Permalink
Add download rate limiting
Browse files Browse the repository at this point in the history
Fixes #121.
  • Loading branch information
anacrolix committed Oct 10, 2016
1 parent ed0dbba commit d4cbdc5
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 16 deletions.
11 changes: 9 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type Client struct {
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
extensionBytes peerExtensionBytes
// The net.Addr.String part that should be common to all active listeners.
listenAddr string
uploadLimit *rate.Limiter
listenAddr string
uploadLimit *rate.Limiter
downloadLimit *rate.Limiter

// Set of addresses that have our client ID. This intentionally will
// include ourselves if we end up trying to connect to our own address
Expand Down Expand Up @@ -263,6 +264,11 @@ func NewClient(cfg *Config) (cl *Client, err error) {
} else {
cl.uploadLimit = cfg.UploadRateLimiter
}
if cfg.DownloadRateLimiter == nil {
cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
} else {
cl.downloadLimit = cfg.DownloadRateLimiter
}
missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
cl.event.L = &cl.mu
storageImpl := cfg.DefaultStorage
Expand Down Expand Up @@ -1583,5 +1589,6 @@ func (cl *Client) newConnection(nc net.Conn) (c *connection) {
PeerMaxRequests: 250,
}
c.setRW(connStatsReadWriter{nc, &cl.mu, c})
c.r = rateLimitedReader{cl.downloadLimit, c.r}
return
}
24 changes: 16 additions & 8 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestClientTransferDefault(t *testing.T) {
})
}

func TestClientTransferRateLimited(t *testing.T) {
func TestClientTransferRateLimitedUpload(t *testing.T) {
started := time.Now()
testClientTransfer(t, testClientTransferParams{
// We are uploading 13 bytes (the length of the greeting torrent). The
Expand All @@ -282,6 +282,12 @@ func TestClientTransferRateLimited(t *testing.T) {
require.True(t, time.Since(started) > time.Second)
}

func TestClientTransferRateLimitedDownload(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
})
}

func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
return storage.NewResourcePieces(fc.AsResourceProvider())
}
Expand Down Expand Up @@ -344,13 +350,14 @@ func TestClientTransferVarious(t *testing.T) {
}

type testClientTransferParams struct {
Responsive bool
Readahead int64
SetReadahead bool
ExportClientStatus bool
LeecherStorage func(string) storage.ClientImpl
SeederStorage func(string) storage.ClientImpl
SeederUploadRateLimiter *rate.Limiter
Responsive bool
Readahead int64
SetReadahead bool
ExportClientStatus bool
LeecherStorage func(string) storage.ClientImpl
SeederStorage func(string) storage.ClientImpl
SeederUploadRateLimiter *rate.Limiter
LeecherDownloadRateLimiter *rate.Limiter
}

// Creates a seeder and a leecher, and ensures the data transfers when a read
Expand Down Expand Up @@ -387,6 +394,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
} else {
cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
}
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
// cfg.ListenAddr = "localhost:4001"
leecher, err := NewClient(&cfg)
require.NoError(t, err)
Expand Down
22 changes: 17 additions & 5 deletions cmd/torrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/anacrolix/tagflag"
"github.com/dustin/go-humanize"
"github.com/gosuri/uiprogress"
"golang.org/x/time/rate"

"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
Expand Down Expand Up @@ -110,13 +111,18 @@ func addTorrents(client *torrent.Client) {
}
}

var flags struct {
Mmap bool `help:"memory-map torrent data"`
TestPeer []*net.TCPAddr `help:"addresses of some starting peers"`
Seed bool `help:"seed after download is complete"`
Addr *net.TCPAddr `help:"network listen addr"`
var flags = struct {
Mmap bool `help:"memory-map torrent data"`
TestPeer []*net.TCPAddr `help:"addresses of some starting peers"`
Seed bool `help:"seed after download is complete"`
Addr *net.TCPAddr `help:"network listen addr"`
UploadRate tagflag.Bytes `help:"max piece bytes to send per second"`
DownloadRate tagflag.Bytes `help:"max bytes per second down from peers"`
tagflag.StartPos
Torrent []string `arity:"+" help:"torrent file path or magnet uri"`
}{
UploadRate: -1,
DownloadRate: -1,
}

func main() {
Expand All @@ -132,6 +138,12 @@ func main() {
if flags.Seed {
clientConfig.Seed = true
}
if flags.UploadRate != -1 {
clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(flags.UploadRate), 256<<10)
}
if flags.DownloadRate != -1 {
clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(flags.DownloadRate), 1<<20)
}

client, err := torrent.NewClient(&clientConfig)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ type Config struct {
Seed bool `long:"seed"`
// Events are data bytes sent in pieces. The burst must be large enough to
// fit a whole chunk.
UploadRateLimiter *rate.Limiter
UploadRateLimiter *rate.Limiter
// The events are bytes read from connections. The burst must be bigger
// than the largest Read performed on a Conn minus one. This is likely to
// be the larger of the main read loop buffer (~4096), and the requested
// chunk size (~16KiB).
DownloadRateLimiter *rate.Limiter

// User-provided Client peer ID. If not present, one is generated automatically.
PeerID string
Expand Down
25 changes: 25 additions & 0 deletions ratelimitreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package torrent

import (
"context"
"io"
"time"

"golang.org/x/time/rate"
)

type rateLimitedReader struct {
l *rate.Limiter
r io.Reader
}

func (me rateLimitedReader) Read(b []byte) (n int, err error) {
if err := me.l.WaitN(context.Background(), 1); err != nil {
panic(err)
}
n, err = me.r.Read(b)
if !me.l.ReserveN(time.Now(), n-1).OK() {
panic(n - 1)
}
return
}

2 comments on commit d4cbdc5

@hustcat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anacrolix Check rate limit when reading data from socket. This will not only affect reading Piece message, but also affect reading Request and other messages. I think it's not a good idea limiting read data from socket without distinguishing message.

@anacrolix
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this, but realised the following:

Ultimately the user wants to limit bandwidth usage, not actual data rates. But by targeting data rates, they hope that processing and latency of control messages are optimal. Optimal control message flow occurs when control messages can be given priority over data messages, which currently does not occur. By rate limiting when Piece messages, instead of the entire message stream, the only improvement is to maybe read a handful of control messages that have been jammed between the current Piece, and the next, the improvement drowned out by the sheer size of data messages. The implementation is also much more complex.

Note however the situation is reversed with uploading, which is something we control. There we can improve the latency of control messages to the remote peer, by not filling our outbound queue too quickly, or somehow prioritizing the control messages without invalidating the order of the messages we send. We don't want to rate limit the entire upload stream there, as control messages have critical impact on future uploads and downloads.

Simplicity and performance are the highest goals here.

Please sign in to comment.