Skip to content

Commit

Permalink
introduce a fast-path for the hash disk job
Browse files Browse the repository at this point in the history
  • Loading branch information
arvidn committed Jun 18, 2018
1 parent 81ce7aa commit 3150a81
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 76 deletions.
1 change: 1 addition & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

* fixed disk I/O performance of checking hashes and creating torrents
* fix race condition during part_file export
* fix part_file open mode compatibility test
* fixed race condition in random number generator
Expand Down
228 changes: 152 additions & 76 deletions src/disk_io_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2374,8 +2374,28 @@ namespace libtorrent
}
partial_hash* ph = pe->hash;

int block_size = m_disk_cache.block_size();
int blocks_in_piece = (piece_size + block_size - 1) / block_size;
int const block_size = m_disk_cache.block_size();
int const blocks_in_piece = (piece_size + block_size - 1) / block_size;

// we don't care about anything to the left of ph->offset
// since those blocks have already been hashed.
// we just care about [firs_block, first_block + blocks_left]
int const first_block = ph->offset / block_size;
int const blocks_left = blocks_in_piece - first_block;

// ph->offset
// | first_block
// | |
// v v
// +---+---+---+---+---+---+
// | | | | | | |
// +---+---+---+---+---+---+
//
// \-----------/
// blocks_left
//
// \-----------------------/
// blocks_in_piece

// keep track of which blocks we have locked by incrementing
// their refcounts. This is used to decrement only these blocks
Expand All @@ -2387,13 +2407,13 @@ namespace libtorrent
// increment the refcounts of all
// blocks up front, and then hash them without holding the lock
TORRENT_PIECE_ASSERT(ph->offset % block_size == 0, pe);
for (int i = ph->offset / block_size; i < blocks_in_piece; ++i)
for (int i = 0; i < blocks_left; ++i)
{
// is the block not in the cache?
if (pe->blocks[i].buf == NULL) continue;
if (pe->blocks[first_block + i].buf == NULL) continue;

// if we fail to lock the block, it' no longer in the cache
if (m_disk_cache.inc_block_refcount(pe, i, block_cache::ref_hashing) == false)
if (m_disk_cache.inc_block_refcount(pe, first_block + i, block_cache::ref_hashing) == false)
continue;

locked_blocks[num_locked_blocks++] = i;
Expand All @@ -2410,96 +2430,152 @@ namespace libtorrent

l.unlock();

bool slow_path = true;
int ret = 0;
int next_locked_block = 0;
for (int i = offset / block_size; i < blocks_in_piece; ++i)
{
file::iovec_t iov;
iov.iov_len = (std::min)(block_size, piece_size - offset);

if (next_locked_block < num_locked_blocks
&& locked_blocks[next_locked_block] == i)
if (num_locked_blocks == 0)
{
// this is the fast path where we don't have any blocks in the cache.
// We'll need to read all (remaining blocks) from disk
file::iovec_t* iov = TORRENT_ALLOCA(file::iovec_t, blocks_left);
ret = m_disk_cache.allocate_iovec(iov, blocks_left);
if (ret >= 0)
{
++next_locked_block;
TORRENT_PIECE_ASSERT(pe->blocks[i].buf, pe);
TORRENT_PIECE_ASSERT(offset == i * block_size, pe);
offset += iov.iov_len;
ph->h.update(pe->blocks[i].buf, iov.iov_len);
// this is the offset that's aligned to block boundaries
boost::int64_t adjusted_offset = j->d.io.offset & ~(block_size-1);

// if this is the last piece, adjust the size of the
// last buffer to match up
iov[blocks_left-1].iov_len = std::min(int(piece_size - adjusted_offset)
- (blocks_left - 1) * block_size, block_size);
TORRENT_ASSERT(iov[blocks_left-1].iov_len > 0);

time_point const start_time = clock_type::now();
ret = j->storage->get_storage_impl()->readv(iov, blocks_left
, j->piece, offset, file_flags, j->error);

if (ret >= 0)
{
boost::uint32_t const read_time = total_microseconds(clock_type::now() - start_time);

m_stats_counters.inc_stats_counter(counters::num_read_back, blocks_left);
m_stats_counters.inc_stats_counter(counters::num_blocks_read, blocks_left);
m_stats_counters.inc_stats_counter(counters::num_read_ops);
m_stats_counters.inc_stats_counter(counters::disk_read_time, read_time);
m_stats_counters.inc_stats_counter(counters::disk_job_time, read_time);

for (int i = 0; i < blocks_left; ++i)
{
offset += iov[i].iov_len;
ph->h.update(static_cast<char const*>(iov[i].iov_base), iov[i].iov_len);
}
slow_path = false;

l.lock();
m_disk_cache.insert_blocks(pe, first_block, iov, blocks_left, j);
l.unlock();
}
else
{
TORRENT_ASSERT(j->error.ec && j->error.operation != 0);
m_disk_cache.free_iovec(iov, blocks_left);
}
}
else
}

if (slow_path)
{
ret = 0;
int next_locked_block = 0;
for (int i = 0; i < blocks_left; ++i)
{
iov.iov_base = m_disk_cache.allocate_buffer("hashing");
file::iovec_t iov;
iov.iov_len = (std::min)(block_size, piece_size - offset);

if (iov.iov_base == NULL)
if (next_locked_block < num_locked_blocks
&& locked_blocks[next_locked_block] == i)
{
l.lock();
// TODO: introduce a holder class that automatically increments
// and decrements the piece_refcount
++next_locked_block;
TORRENT_PIECE_ASSERT(pe->blocks[first_block + i].buf, pe);
TORRENT_PIECE_ASSERT(offset == (first_block + i) * block_size, pe);
offset += iov.iov_len;
ph->h.update(pe->blocks[first_block + i].buf, iov.iov_len);
}
else
{
iov.iov_base = m_disk_cache.allocate_buffer("hashing");

// decrement the refcounts of the blocks we just hashed
for (int k = 0; k < num_locked_blocks; ++k)
m_disk_cache.dec_block_refcount(pe, locked_blocks[k], block_cache::ref_hashing);
if (iov.iov_base == NULL)
{
l.lock();
// TODO: introduce a holder class that automatically increments
// and decrements the piece_refcount

--pe->piece_refcount;
pe->hashing = false;
delete pe->hash;
pe->hash = NULL;
// decrement the refcounts of the blocks we just hashed
for (int k = 0; k < num_locked_blocks; ++k)
m_disk_cache.dec_block_refcount(pe, first_block + locked_blocks[k], block_cache::ref_hashing);

m_disk_cache.maybe_free_piece(pe);
--pe->piece_refcount;
pe->hashing = false;
delete pe->hash;
pe->hash = NULL;

j->error.ec = errors::no_memory;
j->error.operation = storage_error::alloc_cache_piece;
return -1;
}
m_disk_cache.maybe_free_piece(pe);

DLOG("do_hash: reading (piece: %d block: %d)\n", int(pe->piece), i);
j->error.ec = errors::no_memory;
j->error.operation = storage_error::alloc_cache_piece;
return -1;
}

time_point const start_time = clock_type::now();
DLOG("do_hash: reading (piece: %d block: %d)\n", int(pe->piece), first_block + i);

TORRENT_PIECE_ASSERT(offset == i * block_size, pe);
ret = j->storage->get_storage_impl()->readv(&iov, 1, j->piece
, offset, file_flags, j->error);
time_point const start_time = clock_type::now();

if (ret < 0)
{
TORRENT_ASSERT(j->error.ec && j->error.operation != 0);
m_disk_cache.free_buffer(static_cast<char*>(iov.iov_base));
l.lock();
break;
}
TORRENT_PIECE_ASSERT(offset == (first_block + i) * block_size, pe);
ret = j->storage->get_storage_impl()->readv(&iov, 1, j->piece
, offset, file_flags, j->error);

// treat a short read as an error. The hash will be invalid, the
// block cannot be cached and the main thread should skip the rest
// of this file
if (ret != iov.iov_len)
{
ret = -1;
j->error.ec.assign(boost::asio::error::eof
, boost::asio::error::get_misc_category());
j->error.operation = storage_error::read;
m_disk_cache.free_buffer(static_cast<char*>(iov.iov_base));
l.lock();
break;
}
if (ret < 0)
{
TORRENT_ASSERT(j->error.ec && j->error.operation != 0);
m_disk_cache.free_buffer(static_cast<char*>(iov.iov_base));
l.lock();
break;
}

if (!j->error.ec)
{
boost::uint32_t const read_time = total_microseconds(clock_type::now() - start_time);
// treat a short read as an error. The hash will be invalid, the
// block cannot be cached and the main thread should skip the rest
// of this file
if (ret != iov.iov_len)
{
ret = -1;
j->error.ec.assign(boost::asio::error::eof
, boost::asio::error::get_misc_category());
j->error.operation = storage_error::read;
m_disk_cache.free_buffer(static_cast<char*>(iov.iov_base));
l.lock();
break;
}

m_stats_counters.inc_stats_counter(counters::num_read_back);
m_stats_counters.inc_stats_counter(counters::num_blocks_read);
m_stats_counters.inc_stats_counter(counters::num_read_ops);
m_stats_counters.inc_stats_counter(counters::disk_read_time, read_time);
m_stats_counters.inc_stats_counter(counters::disk_job_time, read_time);
}
if (!j->error.ec)
{
boost::uint32_t const read_time = total_microseconds(clock_type::now() - start_time);

TORRENT_PIECE_ASSERT(offset == i * block_size, pe);
offset += iov.iov_len;
ph->h.update(static_cast<char const*>(iov.iov_base), iov.iov_len);
m_stats_counters.inc_stats_counter(counters::num_read_back);
m_stats_counters.inc_stats_counter(counters::num_blocks_read);
m_stats_counters.inc_stats_counter(counters::num_read_ops);
m_stats_counters.inc_stats_counter(counters::disk_read_time, read_time);
m_stats_counters.inc_stats_counter(counters::disk_job_time, read_time);
}

l.lock();
m_disk_cache.insert_blocks(pe, i, &iov, 1, j);
l.unlock();
TORRENT_PIECE_ASSERT(offset == (first_block + i) * block_size, pe);
offset += iov.iov_len;
ph->h.update(static_cast<char const*>(iov.iov_base), iov.iov_len);

l.lock();
m_disk_cache.insert_blocks(pe, (first_block + i), &iov, 1, j);
l.unlock();
}
}
}

Expand All @@ -2510,7 +2586,7 @@ namespace libtorrent

// decrement the refcounts of the blocks we just hashed
for (int i = 0; i < num_locked_blocks; ++i)
m_disk_cache.dec_block_refcount(pe, locked_blocks[i], block_cache::ref_hashing);
m_disk_cache.dec_block_refcount(pe, first_block + locked_blocks[i], block_cache::ref_hashing);

--pe->piece_refcount;

Expand Down

0 comments on commit 3150a81

Please sign in to comment.