Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making parcel coalescing functional #1942

Merged
merged 6 commits into from Jan 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 13 additions & 6 deletions CMakeLists.txt
Expand Up @@ -579,9 +579,19 @@ if(HPX_WITH_RUN_MAIN_EVERYWHERE)
endif()

# Options for our plugins
hpx_option(HPX_WITH_COMPRESSION_BZIP2 BOOL "Enable bzip2 compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_SNAPPY BOOL "Enable snappy compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_ZLIB BOOL "Enable zlib compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_BZIP2 BOOL
"Enable bzip2 compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_SNAPPY BOOL
"Enable snappy compression for parcel data (default: OFF)." OFF ADVANCED)
hpx_option(HPX_WITH_COMPRESSION_ZLIB BOOL
"Enable zlib compression for parcel data (default: OFF)." OFF ADVANCED)

# Parcel coalescing is used by the main HPX library, enable it always
hpx_option(HPX_WITH_PARCEL_COALESCING BOOL
"Enable the parcel coalescing plugin (default: ON)." ON ADVANCED)
if(HPX_WITH_PARCEL_COALESCING)
hpx_add_config_define(HPX_HAVE_PARCEL_COALESCING)
endif()

################################################################################
# Backwards compatibility options (edit for each release)
Expand Down Expand Up @@ -1328,9 +1338,6 @@ if(HPX_WITH_COMPRESSION_ZLIB)
hpx_add_config_define(HPX_HAVE_COMPRESSION_ZLIB)
endif()

# Parcel coalescing is used by the main HPX library, enable it always
hpx_add_config_define(HPX_HAVE_PARCEL_COALESCING)

################################################################################
# Documentation toolchain (DocBook, BoostBook, QuickBook, xsltproc)
################################################################################
Expand Down
2 changes: 1 addition & 1 deletion cmake/HPX_SetupTarget.cmake
Expand Up @@ -127,7 +127,7 @@ function(hpx_setup_target target)
CLEAN_DIRECT_OUTPUT 1
OUTPUT_NAME ${name})
if(target_PLUGIN)
set(plugin_name "HPX_PLUGIN_NAME=${name}")
set(plugin_name "HPX_PLUGIN_NAME=hpx_${name}")
endif()
set(nohpxinit TRUE)

Expand Down
2 changes: 2 additions & 0 deletions docs/manual/build_system/cmake_variables.qbk
Expand Up @@ -46,6 +46,7 @@ The options are split into these categories:
* [link build_system.cmake_variables.HPX_WITH_LOGGING HPX_WITH_LOGGING]
* [link build_system.cmake_variables.HPX_WITH_MALLOC HPX_WITH_MALLOC]
* [link build_system.cmake_variables.HPX_WITH_NATIVE_TLS HPX_WITH_NATIVE_TLS]
* [link build_system.cmake_variables.HPX_WITH_PARCEL_COALESCING HPX_WITH_PARCEL_COALESCING]
* [link build_system.cmake_variables.HPX_WITH_RUN_MAIN_EVERYWHERE HPX_WITH_RUN_MAIN_EVERYWHERE]
* [link build_system.cmake_variables.HPX_WITH_SECURITY HPX_WITH_SECURITY]
* [link build_system.cmake_variables.HPX_WITH_STATIC_LINKING HPX_WITH_STATIC_LINKING]
Expand All @@ -71,6 +72,7 @@ The options are split into these categories:
[[[#build_system.cmake_variables.HPX_WITH_LOGGING] `HPX_WITH_LOGGING:BOOL`][Build HPX with logging enabled (default: ON).]]
[[[#build_system.cmake_variables.HPX_WITH_MALLOC] `HPX_WITH_MALLOC:STRING`][Define which allocator should be linked in. Options are: system, tcmalloc, jemalloc, tbbmalloc, and custom (default is: tcmalloc)]]
[[[#build_system.cmake_variables.HPX_WITH_NATIVE_TLS] `HPX_WITH_NATIVE_TLS:BOOL`][Use native TLS support if available (default: ON)]]
[[[#build_system.cmake_variables.HPX_WITH_PARCEL_COALESCING] `HPX_WITH_PARCEL_COALESCING:BOOL`][Enable the parcel coalescing plugin (default: ON).]]
[[[#build_system.cmake_variables.HPX_WITH_RUN_MAIN_EVERYWHERE] `HPX_WITH_RUN_MAIN_EVERYWHERE:BOOL`][Run hpx_main by default on all localities (default: OFF).]]
[[[#build_system.cmake_variables.HPX_WITH_SECURITY] `HPX_WITH_SECURITY:BOOL`][Enable security support via libsodium.]]
[[[#build_system.cmake_variables.HPX_WITH_STATIC_LINKING] `HPX_WITH_STATIC_LINKING:BOOL`][Compile HPX statically linked libraries (Default: OFF)]]
Expand Down
2 changes: 1 addition & 1 deletion hpx/config/defaults.hpp
Expand Up @@ -34,7 +34,7 @@
#endif
#if !defined(HPX_DEFAULT_COMPONENT_PATH)
#define HPX_DEFAULT_COMPONENT_PATH \
hpx::util::find_prefixes("/lib/hpx") \
hpx::util::find_prefixes("/hpx") \
/**/
#endif

Expand Down
10 changes: 9 additions & 1 deletion hpx/performance_counters/performance_counter.hpp
Expand Up @@ -41,7 +41,7 @@ namespace hpx { namespace performance_counters

///////////////////////////////////////////////////////////////////////
future<counter_info> get_info() const;
counter_info get_info_sync(error_code& ec = throws);
counter_info get_info_sync(error_code& ec = throws) const;

future<counter_value> get_counter_value(bool reset = false);
future<counter_value> get_counter_value() const;
Expand All @@ -60,6 +60,10 @@ namespace hpx { namespace performance_counters
future<void> reset();
void reset_sync(error_code& ec = throws);

///////////////////////////////////////////////////////////////////////
future<std::string> get_name() const;
std::string get_name_sync() const;

private:
template <typename T>
static T extract_value(future<counter_value> && value, error_code& ec)
Expand Down Expand Up @@ -94,6 +98,10 @@ namespace hpx { namespace performance_counters
return get_counter_value_sync().get_value<T>(ec);
}
};

/// Return all counters matching the given name (with optional wildcards).
HPX_API_EXPORT std::vector<performance_counter> discover_counters(
std::string const& name, error_code& ec = throws);
}}

#endif
41 changes: 25 additions & 16 deletions hpx/plugins/parcel/message_buffer.hpp
Expand Up @@ -34,10 +34,13 @@ namespace hpx { namespace plugins { namespace parcel { namespace detail

message_buffer(std::size_t max_messages)
: max_messages_(max_messages)
{}
{
messages_.reserve(max_messages);
handlers_.reserve(max_messages);
}

message_buffer(message_buffer && rhs)
: dests_(std::move(rhs.dests_)),
: dest_(std::move(rhs.dest_)),
messages_(std::move(rhs.messages_)),
handlers_(std::move(rhs.handlers_)),
max_messages_(rhs.max_messages_)
Expand All @@ -47,31 +50,36 @@ namespace hpx { namespace plugins { namespace parcel { namespace detail
{
if (&rhs != this) {
max_messages_ = rhs.max_messages_;
dests_ = std::move(rhs.dests_);
dest_ = std::move(rhs.dest_);
messages_ = std::move(rhs.messages_);
handlers_ = std::move(rhs.handlers_);
}
return *this;
}

void operator()(parcelset::parcelport* set)
void operator()(parcelset::parcelport* pp)
{
if (!messages_.empty())
set->put_parcels(dests_, std::move(messages_), std::move(handlers_));
pp->put_parcels(dest_, std::move(messages_), std::move(handlers_));
}

message_buffer_append_state append(parcelset::locality const & dest,
parcelset::parcel p,
parcelset::write_handler_type f)
parcelset::parcel p, parcelset::write_handler_type f)
{
HPX_ASSERT(messages_.size() == handlers_.size());
HPX_ASSERT(dests_.size() == handlers_.size());

int result = normal;
if (messages_.empty())
{
HPX_ASSERT(handlers_.empty());

result = first_message;
dest_ = dest;
}
else
{
HPX_ASSERT(messages_.size() == handlers_.size());
HPX_ASSERT(dest_ == dest);
}

dests_.push_back(dest);
messages_.push_back(std::move(p));
handlers_.push_back(std::move(f));

Expand All @@ -84,21 +92,22 @@ namespace hpx { namespace plugins { namespace parcel { namespace detail
bool empty() const
{
HPX_ASSERT(messages_.size() == handlers_.size());
HPX_ASSERT(dests_.size() == handlers_.size());
return messages_.empty();
}

void clear()
{
dests_.clear();
dest_ = parcelset::locality();
messages_.clear();
handlers_.clear();

messages_.reserve(max_messages_);
handlers_.reserve(max_messages_);
}

std::size_t size() const
{
HPX_ASSERT(messages_.size() == handlers_.size());
HPX_ASSERT(dests_.size() == handlers_.size());
return messages_.size();
}

Expand All @@ -110,15 +119,15 @@ namespace hpx { namespace plugins { namespace parcel { namespace detail
void swap(message_buffer& o)
{
std::swap(max_messages_, o.max_messages_);
std::swap(dests_, o.dests_);
std::swap(dest_, o.dest_);
std::swap(messages_, o.messages_);
std::swap(handlers_, o.handlers_);
}

std::size_t capacity() const { return max_messages_; }

private:
std::vector<parcelset::locality> dests_;
parcelset::locality dest_;
std::vector<parcelset::parcel> messages_;
std::vector<parcelset::write_handler_type> handlers_;
std::size_t max_messages_;
Expand Down
2 changes: 1 addition & 1 deletion hpx/plugins/parcelport_factory.hpp
Expand Up @@ -64,7 +64,7 @@ namespace hpx { namespace plugins
fillini += std::string("[hpx.parcel.") + name + "]";
fillini += "name = " HPX_PLUGIN_STRING;
fillini += std::string("path = ") +
util::find_prefixes("/lib/hpx", HPX_PLUGIN_STRING);
util::find_prefixes("/hpx", HPX_PLUGIN_STRING);
fillini += "enable = 1";

std::string name_uc = boost::to_upper_copy(name);
Expand Down
5 changes: 5 additions & 0 deletions hpx/plugins/plugin_factory_base.hpp
Expand Up @@ -71,5 +71,10 @@ namespace hpx { namespace util { namespace plugin
HPX_REGISTER_PLUGIN_REGISTRY_MODULE() \
/**/

#define HPX_REGISTER_PLUGIN_MODULE_DYNAMIC() \
HPX_PLUGIN_EXPORT_LIST(HPX_PLUGIN_PLUGIN_PREFIX, factory); \
HPX_REGISTER_PLUGIN_REGISTRY_MODULE_DYNAMIC() \
/**/

#endif

2 changes: 1 addition & 1 deletion hpx/plugins/plugin_registry.hpp
Expand Up @@ -60,7 +60,7 @@ namespace hpx { namespace plugins
unique_plugin_name<plugin_registry>::call() + "]";
fillini += "name = " HPX_PLUGIN_STRING;
fillini += std::string("path = ") +
util::find_prefixes("/lib/hpx", HPX_PLUGIN_STRING);
util::find_prefixes("/hpx", HPX_PLUGIN_STRING);
fillini += "enabled = 1";

char const* more = traits::plugin_config_data<Plugin>::call();
Expand Down
4 changes: 4 additions & 0 deletions hpx/plugins/plugin_registry_base.hpp
Expand Up @@ -61,5 +61,9 @@ namespace hpx { namespace plugins
HPX_PLUGIN_EXPORT_LIST(HPX_PLUGIN_PLUGIN_PREFIX, plugin) \
/**/

#define HPX_REGISTER_PLUGIN_REGISTRY_MODULE_DYNAMIC() \
HPX_PLUGIN_EXPORT_LIST_DYNAMIC(HPX_PLUGIN_PLUGIN_PREFIX, plugin) \
/**/

#endif

2 changes: 1 addition & 1 deletion hpx/runtime/components/component_registry.hpp
Expand Up @@ -63,7 +63,7 @@ namespace hpx { namespace components
{
if (filepath.empty()) {
fillini += std::string("path = ") +
util::find_prefixes("/lib/hpx", HPX_COMPONENT_STRING);
util::find_prefixes("/hpx", HPX_COMPONENT_STRING);
}
else {
fillini += std::string("path = ") + filepath;
Expand Down
40 changes: 40 additions & 0 deletions hpx/runtime/parcelset/parcelhandler.hpp
Expand Up @@ -197,6 +197,46 @@ namespace hpx { namespace parcelset
&parcelhandler::invoke_write_handler, this, _1, _2));
}

/// A parcel is submitted for transport at the source locality site to
/// the parcel set of the locality with the put-parcel command
//
/// \note The function \a put_parcel() is asynchronous, the provided
/// function or function object gets invoked on completion of the send
/// operation or on any error.
///
/// \param p [in] The parcels to send.
/// \param f [in] The function objects to be invoked on
/// successful completion or on errors. The signature
/// of these function object are expected to be:
///
/// \code
/// void f (boost::system::error_code const& err, std::size_t );
/// \endcode
///
/// where \a err is the status code of the operation and
/// \a size is the number of successfully
/// transferred bytes.
void put_parcels(std::vector<parcel> p, std::vector<write_handler_type> f);

/// This put_parcel() function overload is asynchronous, but no
/// callback functor is provided by the user.
///
/// \note The function \a put_parcel() is asynchronous.
///
/// \param p [in, out] A reference to the parcel to send. The
/// parcel \a p will be modified in place, as it will
/// get set the resolved destination address and parcel
/// id (if not already set).
void put_parcels(std::vector<parcel> parcels)
{
using util::placeholders::_1;
using util::placeholders::_2;
std::vector<write_handler_type> handlers(parcels.size(),
util::bind(&parcelhandler::invoke_write_handler, this, _1, _2));

put_parcels(std::move(parcels), std::move(handlers));
}

double get_current_time() const
{
return util::high_resolution_timer::now();
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/parcelset/parcelport.hpp
Expand Up @@ -142,7 +142,7 @@ namespace hpx { namespace parcelset
/// void handler(boost::system::error_code const& err,
/// std::size_t bytes_written);
/// \endcode
virtual void put_parcels(std::vector<locality> dests,
virtual void put_parcels(locality const& dests,
std::vector<parcel> parcels,
std::vector<write_handler_type> handlers) = 0;

Expand Down