Skip to content

Commit

Permalink
Fixed #1001: Zero copy serialization raises assert
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Nov 9, 2013
1 parent 90d7a51 commit ca8a96a
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 11 deletions.
61 changes: 51 additions & 10 deletions hpx/runtime/parcelset/server/tcp/parcelport_server_connection.hpp
Expand Up @@ -167,6 +167,9 @@ namespace hpx { namespace parcelset { namespace server { namespace tcp
static_cast<std::size_t>(
static_cast<boost::uint32_t>(num_chunks_.second));

void (parcelport_connection::*f)(boost::system::error_code const&,
boost::tuple<Handler>) = 0;

if (num_zero_copy_chunks != 0) {
in_transmission_chunks_.resize(static_cast<std::size_t>(
num_zero_copy_chunks + num_non_zero_copy_chunks));
Expand All @@ -178,19 +181,55 @@ namespace hpx { namespace parcelset { namespace server { namespace tcp
in_buffer_->resize(static_cast<std::size_t>(inbound_size));
buffers.push_back(boost::asio::buffer(*in_buffer_));

// add appropriately sized chunk buffers for the zero-copy data
in_chunks_->resize(num_zero_copy_chunks);
for (std::size_t i = 0; i != num_zero_copy_chunks; ++i)
{
std::size_t chunk_size = in_transmission_chunks_[i].second;
(*in_chunks_)[i].resize(chunk_size);
buffers.push_back(boost::asio::buffer((*in_chunks_)[i]));
}
// Start an asynchronous call to receive the data.
f = &parcelport_connection::handle_read_chunk_data<Handler>;
}
else {
// add main buffer holding data which was serialized normally
in_buffer_->resize(static_cast<std::size_t>(inbound_size));
buffers.push_back(boost::asio::buffer(*in_buffer_));

// Start an asynchronous call to receive the data.
f = &parcelport_connection::handle_read_data<Handler>;
}

#if defined(__linux) || defined(linux) || defined(__linux__)
boost::asio::detail::socket_option::boolean<
IPPROTO_TCP, TCP_QUICKACK> quickack(true);
socket_.set_option(quickack);
#endif
boost::asio::async_read(socket_, buffers,
boost::bind(f, shared_from_this(),
boost::asio::placeholders::error, handler));
}
}

/// Handle a completed read of message data.
template <typename Handler>
void handle_read_chunk_data(boost::system::error_code const& e,
boost::tuple<Handler> handler)
{
if (e) {
boost::get<0>(handler)(e);

// Issue a read operation to read the next parcel.
async_read(boost::get<0>(handler));
}
else {
// receive buffers
std::vector<boost::asio::mutable_buffer> buffers;

// add appropriately sized chunk buffers for the zero-copy data
std::size_t num_zero_copy_chunks =
static_cast<std::size_t>(
static_cast<boost::uint32_t>(num_chunks_.first));

in_chunks_->resize(num_zero_copy_chunks);
for (std::size_t i = 0; i != num_zero_copy_chunks; ++i)
{
std::size_t chunk_size = in_transmission_chunks_[i].second;
(*in_chunks_)[i].resize(chunk_size);
buffers.push_back(boost::asio::buffer((*in_chunks_)[i].data(), chunk_size));
}

// Start an asynchronous call to receive the data.
Expand Down Expand Up @@ -245,7 +284,7 @@ namespace hpx { namespace parcelset { namespace server { namespace tcp

if (num_zero_copy_chunks != 0) {
// decode chunk information
boost::shared_ptr<std::vector<std::vector<char> > > in_chunks(in_chunks_);
// boost::shared_ptr<std::vector<std::vector<char> > > in_chunks(in_chunks_);
boost::shared_ptr<std::vector<util::serialization_chunk> > chunks(
boost::make_shared<std::vector<util::serialization_chunk> >());

Expand All @@ -267,7 +306,9 @@ namespace hpx { namespace parcelset { namespace server { namespace tcp
}

std::size_t index = 0;
for (std::size_t i = num_zero_copy_chunks; i != num_non_zero_copy_chunks; ++i)
for (std::size_t i = num_zero_copy_chunks;
i != num_zero_copy_chunks + num_non_zero_copy_chunks;
++i)
{
transmission_chunk_type& c = in_transmission_chunks_[i];
boost::uint64_t first = c.first, second = c.second;
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/parcelset/tcp/parcelport_tcp.cpp
Expand Up @@ -804,7 +804,7 @@ namespace hpx { namespace parcelset { namespace tcp
return;
}

BOOST_ASSERT(client_connection );
BOOST_ASSERT(client_connection.get() != 0);
client_connection->set_parcel(p);
client_connection->async_write(early_write_handler,
early_pending_parcel_handler);
Expand Down
3 changes: 3 additions & 0 deletions tests/regressions/util/CMakeLists.txt
Expand Up @@ -10,6 +10,7 @@ set(tests
function_serialization_728
protect_with_nullary_pfo
tuple_serialization_803
zero_copy_parcels_1001
)

set(function_argument_FLAGS DEPENDENCIES iostreams_component)
Expand All @@ -29,6 +30,8 @@ if(HPX_USE_PARCEL_COALESCING)
endif()
set(function_serialization_728_FLAGS DEPENDENCIES ${function_serialization_728_dependencies})

set(zero_copy_parcels_1001_PARAMETERS LOCALITIES 2)

foreach(test ${tests})
set(sources
${test}.cpp)
Expand Down
78 changes: 78 additions & 0 deletions tests/regressions/util/zero_copy_parcels_1001.cpp
@@ -0,0 +1,78 @@
// Copyright (c) 2007-2013 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// This test verifies that issue #1001 is resolved (Zero copy serialization raises assert).

#include <hpx/hpx_init.hpp>
#include <hpx/include/util.hpp>
#include <hpx/include/actions.hpp>
#include <hpx/include/async.hpp>
#include <hpx/include/lcos.hpp>
#include <hpx/util/serialize_buffer.hpp>
#include <hpx/util/lightweight_test.hpp>

#include <algorithm>

///////////////////////////////////////////////////////////////////////////////
hpx::util::serialize_buffer<int> test(hpx::util::serialize_buffer<int> const& b)
{
return b;
}
HPX_PLAIN_ACTION(test, test_action)

struct inc
{
inc() : cnt_(0) {}

int operator()()
{
return cnt_++;
}

int cnt_;
};

///////////////////////////////////////////////////////////////////////////////
int hpx_main()
{
std::vector<hpx::naming::id_type> localities = hpx::find_remote_localities();

if (localities.empty())
{
HPX_TEST_MSG(!localities.empty(), "This test must be run on more than one locality");
}
else
{
test_action act;

std::size_t size = 1;
for (std::size_t i = 0; i != 14; ++i)
{
std::vector<int> data;
data.resize(size << i);

std::generate(data.begin(), data.end(), inc());

hpx::util::serialize_buffer<int> buffer(data.data(), data.size(),
hpx::util::serialize_buffer<int>::reference);

hpx::util::serialize_buffer<int> result = act(localities[0], buffer);

HPX_TEST(std::equal(data.begin(), data.end(), result.data()));
}
}

return hpx::finalize();
}

///////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
// Initialize and run HPX
HPX_TEST_EQ(hpx::init(argc, argv), 0);

return hpx::util::report_errors();
}

0 comments on commit ca8a96a

Please sign in to comment.