Skip to content

Commit

Permalink
Merge pull request #1942 from STEllAR-GROUP/fixing_1941
Browse files Browse the repository at this point in the history
Making parcel coalescing functional
  • Loading branch information
sithhell committed Jan 12, 2016
2 parents c1deb17 + 01e3359 commit a48312d
Show file tree
Hide file tree
Showing 24 changed files with 948 additions and 125 deletions.
19 changes: 13 additions & 6 deletions CMakeLists.txt
Expand Up @@ -579,9 +579,19 @@ if(HPX_WITH_RUN_MAIN_EVERYWHERE)
endif()

# Options for our plugins
hpx_option(HPX_WITH_COMPRESSION_BZIP2 BOOL "Enable bzip2 compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_SNAPPY BOOL "Enable snappy compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_ZLIB BOOL "Enable zlib compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_BZIP2 BOOL
"Enable bzip2 compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_SNAPPY BOOL
"Enable snappy compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_ZLIB BOOL
"Enable zlib compression for parcel data (default: OFF)." OFF ADVANCED)

# Parcel coalescing is used by the main HPX library, enable it always
hpx_option(HPX_WITH_PARCEL_COALESCING BOOL
"Enable the parcel coalescing plugin (default: ON)." ON ADVANCED)
if(HPX_WITH_PARCEL_COALESCING)
hpx_add_config_define(HPX_HAVE_PARCEL_COALESCING)
endif()

################################################################################
# Backwards compatibility options (edit for each release)
Expand Down Expand Up @@ -1331,9 +1341,6 @@ if(HPX_WITH_COMPRESSION_ZLIB)
hpx_add_config_define(HPX_HAVE_COMPRESSION_ZLIB)
endif()

# Parcel coalescing is used by the main HPX library, enable it always
hpx_add_config_define(HPX_HAVE_PARCEL_COALESCING)

################################################################################
# Documentation toolchain (DocBook, BoostBook, QuickBook, xsltproc)
################################################################################
Expand Down
2 changes: 1 addition & 1 deletion cmake/HPX_SetupTarget.cmake
Expand Up @@ -127,7 +127,7 @@ function(hpx_setup_target target)
CLEAN_DIRECT_OUTPUT 1
OUTPUT_NAME ${name})
if(target_PLUGIN)
set(plugin_name "HPX_PLUGIN_NAME=${name}")
set(plugin_name "HPX_PLUGIN_NAME=hpx_${name}")
endif()
set(nohpxinit TRUE)

Expand Down
2 changes: 2 additions & 0 deletions docs/manual/build_system/cmake_variables.qbk
Expand Up @@ -46,6 +46,7 @@ The options are split into these categories:
* [link build_system.cmake_variables.HPX_WITH_LOGGING HPX_WITH_LOGGING]
* [link build_system.cmake_variables.HPX_WITH_MALLOC HPX_WITH_MALLOC]
* [link build_system.cmake_variables.HPX_WITH_NATIVE_TLS HPX_WITH_NATIVE_TLS]
* [link build_system.cmake_variables.HPX_WITH_PARCEL_COALESCING HPX_WITH_PARCEL_COALESCING]
* [link build_system.cmake_variables.HPX_WITH_RUN_MAIN_EVERYWHERE HPX_WITH_RUN_MAIN_EVERYWHERE]
* [link build_system.cmake_variables.HPX_WITH_SECURITY HPX_WITH_SECURITY]
* [link build_system.cmake_variables.HPX_WITH_STATIC_LINKING HPX_WITH_STATIC_LINKING]
Expand All @@ -71,6 +72,7 @@ The options are split into these categories:
[[[#build_system.cmake_variables.HPX_WITH_LOGGING] `HPX_WITH_LOGGING:BOOL`][Build HPX with logging enabled (default: ON).]]
[[[#build_system.cmake_variables.HPX_WITH_MALLOC] `HPX_WITH_MALLOC:STRING`][Define which allocator should be linked in. Options are: system, tcmalloc, jemalloc, tbbmalloc, and custom (default is: tcmalloc)]]
[[[#build_system.cmake_variables.HPX_WITH_NATIVE_TLS] `HPX_WITH_NATIVE_TLS:BOOL`][Use native TLS support if available (default: ON)]]
[[[#build_system.cmake_variables.HPX_WITH_PARCEL_COALESCING] `HPX_WITH_PARCEL_COALESCING:BOOL`][Enable the parcel coalescing plugin (default: ON).]]
[[[#build_system.cmake_variables.HPX_WITH_RUN_MAIN_EVERYWHERE] `HPX_WITH_RUN_MAIN_EVERYWHERE:BOOL`][Run hpx_main by default on all localities (default: OFF).]]
[[[#build_system.cmake_variables.HPX_WITH_SECURITY] `HPX_WITH_SECURITY:BOOL`][Enable security support via libsodium.]]
[[[#build_system.cmake_variables.HPX_WITH_STATIC_LINKING] `HPX_WITH_STATIC_LINKING:BOOL`][Compile HPX statically linked libraries (Default: OFF)]]
Expand Down
2 changes: 1 addition & 1 deletion hpx/config/defaults.hpp
Expand Up @@ -34,7 +34,7 @@
#endif
#if !defined(HPX_DEFAULT_COMPONENT_PATH)
#define HPX_DEFAULT_COMPONENT_PATH \
hpx::util::find_prefixes("/lib/hpx") \
hpx::util::find_prefixes("/hpx") \
/**/
#endif

Expand Down
10 changes: 9 additions & 1 deletion hpx/performance_counters/performance_counter.hpp
Expand Up @@ -41,7 +41,7 @@ namespace hpx { namespace performance_counters

///////////////////////////////////////////////////////////////////////
future<counter_info> get_info() const;
counter_info get_info_sync(error_code& ec = throws);
counter_info get_info_sync(error_code& ec = throws) const;

future<counter_value> get_counter_value(bool reset = false);
future<counter_value> get_counter_value() const;
Expand All @@ -60,6 +60,10 @@ namespace hpx { namespace performance_counters
future<void> reset();
void reset_sync(error_code& ec = throws);

///////////////////////////////////////////////////////////////////////
future<std::string> get_name() const;
std::string get_name_sync() const;

private:
template <typename T>
static T extract_value(future<counter_value> && value, error_code& ec)
Expand Down Expand Up @@ -94,6 +98,10 @@ namespace hpx { namespace performance_counters
return get_counter_value_sync().get_value<T>(ec);
}
};

/// Return all counters matching the given name (with optional wildcards).
HPX_API_EXPORT std::vector<performance_counter> discover_counters(
std::string const& name, error_code& ec = throws);
}}

#endif
41 changes: 25 additions & 16 deletions hpx/plugins/parcel/message_buffer.hpp
Expand Up @@ -34,10 +34,13 @@ namespace hpx { namespace plugins { namespace parcel { namespace detail

message_buffer(std::size_t max_messages)
: max_messages_(max_messages)
{}
{
messages_.reserve(max_messages);
handlers_.reserve(max_messages);
}

message_buffer(message_buffer && rhs)
: dests_(std::move(rhs.dests_)),
: dest_(std::move(rhs.dest_)),
messages_(std::move(rhs.messages_)),
handlers_(std::move(rhs.handlers_)),
max_messages_(rhs.max_messages_)
Expand All @@ -47,31 +50,36 @@ namespace hpx { namespace plugins { namespace parcel { namespace detail
{
if (&rhs != this) {
max_messages_ = rhs.max_messages_;
dests_ = std::move(rhs.dests_);
dest_ = std::move(rhs.dest_);
messages_ = std::move(rhs.messages_);
handlers_ = std::move(rhs.handlers_);
}
return *this;
}

void operator()(parcelset::parcelport* set)
void operator()(parcelset::parcelport* pp)
{
if (!messages_.empty())
set->put_parcels(dests_, std::move(messages_), std::move(handlers_));
pp->put_parcels(dest_, std::move(messages_), std::move(handlers_));
}

message_buffer_append_state append(parcelset::locality const & dest,
parcelset::parcel p,
parcelset::write_handler_type f)
parcelset::parcel p, parcelset::write_handler_type f)
{
HPX_ASSERT(messages_.size() == handlers_.size());
HPX_ASSERT(dests_.size() == handlers_.size());

int result = normal;
if (messages_.empty())
{
HPX_ASSERT(handlers_.empty());

result = first_message;
dest_ = dest;
}
else
{
HPX_ASSERT(messages_.size() == handlers_.size());
HPX_ASSERT(dest_ == dest);
}

dests_.push_back(dest);
messages_.push_back(std::move(p));
handlers_.push_back(std::move(f));

Expand All @@ -84,21 +92,22 @@ namespace hpx { namespace plugins { namespace parcel { namespace detail
bool empty() const
{
HPX_ASSERT(messages_.size() == handlers_.size());
HPX_ASSERT(dests_.size() == handlers_.size());
return messages_.empty();
}

void clear()
{
dests_.clear();
dest_ = parcelset::locality();
messages_.clear();
handlers_.clear();

messages_.reserve(max_messages_);
handlers_.reserve(max_messages_);
}

std::size_t size() const
{
HPX_ASSERT(messages_.size() == handlers_.size());
HPX_ASSERT(dests_.size() == handlers_.size());
return messages_.size();
}

Expand All @@ -110,15 +119,15 @@ namespace hpx { namespace plugins { namespace parcel { namespace detail
void swap(message_buffer& o)
{
std::swap(max_messages_, o.max_messages_);
std::swap(dests_, o.dests_);
std::swap(dest_, o.dest_);
std::swap(messages_, o.messages_);
std::swap(handlers_, o.handlers_);
}

std::size_t capacity() const { return max_messages_; }

private:
std::vector<parcelset::locality> dests_;
parcelset::locality dest_;
std::vector<parcelset::parcel> messages_;
std::vector<parcelset::write_handler_type> handlers_;
std::size_t max_messages_;
Expand Down
2 changes: 1 addition & 1 deletion hpx/plugins/parcelport_factory.hpp
Expand Up @@ -64,7 +64,7 @@ namespace hpx { namespace plugins
fillini += std::string("[hpx.parcel.") + name + "]";
fillini += "name = " HPX_PLUGIN_STRING;
fillini += std::string("path = ") +
util::find_prefixes("/lib/hpx", HPX_PLUGIN_STRING);
util::find_prefixes("/hpx", HPX_PLUGIN_STRING);
fillini += "enable = 1";

std::string name_uc = boost::to_upper_copy(name);
Expand Down
5 changes: 5 additions & 0 deletions hpx/plugins/plugin_factory_base.hpp
Expand Up @@ -71,5 +71,10 @@ namespace hpx { namespace util { namespace plugin
HPX_REGISTER_PLUGIN_REGISTRY_MODULE() \
/**/

#define HPX_REGISTER_PLUGIN_MODULE_DYNAMIC() \
HPX_PLUGIN_EXPORT_LIST(HPX_PLUGIN_PLUGIN_PREFIX, factory); \
HPX_REGISTER_PLUGIN_REGISTRY_MODULE_DYNAMIC() \
/**/

#endif

2 changes: 1 addition & 1 deletion hpx/plugins/plugin_registry.hpp
Expand Up @@ -60,7 +60,7 @@ namespace hpx { namespace plugins
unique_plugin_name<plugin_registry>::call() + "]";
fillini += "name = " HPX_PLUGIN_STRING;
fillini += std::string("path = ") +
util::find_prefixes("/lib/hpx", HPX_PLUGIN_STRING);
util::find_prefixes("/hpx", HPX_PLUGIN_STRING);
fillini += "enabled = 1";

char const* more = traits::plugin_config_data<Plugin>::call();
Expand Down
4 changes: 4 additions & 0 deletions hpx/plugins/plugin_registry_base.hpp
Expand Up @@ -61,5 +61,9 @@ namespace hpx { namespace plugins
HPX_PLUGIN_EXPORT_LIST(HPX_PLUGIN_PLUGIN_PREFIX, plugin) \
/**/

#define HPX_REGISTER_PLUGIN_REGISTRY_MODULE_DYNAMIC() \
HPX_PLUGIN_EXPORT_LIST_DYNAMIC(HPX_PLUGIN_PLUGIN_PREFIX, plugin) \
/**/

#endif

2 changes: 1 addition & 1 deletion hpx/runtime/components/component_registry.hpp
Expand Up @@ -63,7 +63,7 @@ namespace hpx { namespace components
{
if (filepath.empty()) {
fillini += std::string("path = ") +
util::find_prefixes("/lib/hpx", HPX_COMPONENT_STRING);
util::find_prefixes("/hpx", HPX_COMPONENT_STRING);
}
else {
fillini += std::string("path = ") + filepath;
Expand Down
40 changes: 40 additions & 0 deletions hpx/runtime/parcelset/parcelhandler.hpp
Expand Up @@ -197,6 +197,46 @@ namespace hpx { namespace parcelset
&parcelhandler::invoke_write_handler, this, _1, _2));
}

/// A parcel is submitted for transport at the source locality site to
/// the parcel set of the locality with the put-parcel command
//
/// \note The function \a put_parcel() is asynchronous, the provided
/// function or function object gets invoked on completion of the send
/// operation or on any error.
///
/// \param p [in] The parcels to send.
/// \param f [in] The function objects to be invoked on
/// successful completion or on errors. The signature
/// of these function object are expected to be:
///
/// \code
/// void f (boost::system::error_code const& err, std::size_t );
/// \endcode
///
/// where \a err is the status code of the operation and
/// \a size is the number of successfully
/// transferred bytes.
void put_parcels(std::vector<parcel> p, std::vector<write_handler_type> f);

/// This put_parcel() function overload is asynchronous, but no
/// callback functor is provided by the user.
///
/// \note The function \a put_parcel() is asynchronous.
///
/// \param p [in, out] A reference to the parcel to send. The
/// parcel \a p will be modified in place, as it will
/// get set the resolved destination address and parcel
/// id (if not already set).
void put_parcels(std::vector<parcel> parcels)
{
using util::placeholders::_1;
using util::placeholders::_2;
std::vector<write_handler_type> handlers(parcels.size(),
util::bind(&parcelhandler::invoke_write_handler, this, _1, _2));

put_parcels(std::move(parcels), std::move(handlers));
}

double get_current_time() const
{
return util::high_resolution_timer::now();
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/parcelset/parcelport.hpp
Expand Up @@ -142,7 +142,7 @@ namespace hpx { namespace parcelset
/// void handler(boost::system::error_code const& err,
/// std::size_t bytes_written);
/// \endcode
virtual void put_parcels(std::vector<locality> dests,
virtual void put_parcels(locality const& dests,
std::vector<parcel> parcels,
std::vector<write_handler_type> handlers) = 0;

Expand Down

0 comments on commit a48312d

Please sign in to comment.