Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using libwsong time logger and fixing a performance issue for persistent stable get operations. #68

Merged
merged 7 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ if ( NOT DEFINED CMAKE_INSTALL_INCLUDEDIR )
set( CMAKE_INSTALL_INCLUDEDIR include )
endif ( )

set(ENABLE_MPROC 1)

find_package(mutils REQUIRED)
if(mutils_FOUND)
message(STATUS "Found mutils in ${mutils_INCLUDE_DIRS}")
Expand All @@ -56,6 +58,9 @@ find_package(nlohmann_json 3.9.1 REQUIRED)
# provides the import target rpclib::rpc
find_package(rpclib 2.3.0 REQUIRED)

# detect libwsong library, which is required for the mproc feature
find_package(libwsong REQUIRED)

# Hyperscan, which isn't packaged correctly and needs a custom Find module
find_package(Hyperscan REQUIRED)
if(Hyperscan_FOUND)
Expand All @@ -65,11 +70,6 @@ endif()
# dotnet
find_program(DOTNET_CMD dotnet)

# detect libwsong library, which is required for the mproc feature
find_package(libwsong QUIET)
if (libwsong_FOUND)
set(ENABLE_MPROC 1)
endif()

# Doxygen, optional to generate documentation HTML
find_package(Doxygen)
Expand Down Expand Up @@ -114,6 +114,7 @@ target_link_libraries(cascade
mutils::mutils
OpenSSL::Crypto
rpclib::rpc
libwsong::perf
${Hyperscan_LIBRARIES})
set_target_properties(cascade PROPERTIES
SOVERSION ${cascade_VERSION}
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ We recommend coordinating with [Weijia Song](mailto:songweijia@gmail.com) if you
- Readline library v7.0 or newer. On Ubuntu, use `apt install libreadline-dev` to install it.
- RPC library [rpclib](https://github.com/rpclib/rpclib). For convenience, install it with [this script](scripts/prerequisites/install-rpclib.sh).
- Intel's regular expression library [Hyperscan](https://github.com/intel/hyperscan). For convenience, install it with [this script](scripts/prerequisites/install-hyperscan.sh). You need to install ragel compiler if you don't have it. On ubuntu, use `apt-get install ragel` to install it.
- [libwsong](https://github.com/songweijia/libwsong) commit 47c37bc706cc859f8b60ca4d19b0608e28a2e530. For convenience, install it with [this script](scripts/prerequisites/install-libwsong.sh).
- [libfuse](https://github.com/libfuse) v3.9.3 or newer (Optional for file system API)
- [boolinq](https://github.com/k06a/boolinq) or newer (Optional for LINQ API)
- Python 3.8 or newer and [pybind11](https://github.com/pybind/pybind11) (Optional for Python API)
Expand Down
1 change: 1 addition & 0 deletions cascadeConfig.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ find_dependency(spdlog 1.3.1)
find_dependency(OpenSSL)
find_dependency(nlohmann_json 3.9.1)
find_dependency(rpclib)
find_dependency(libwsong)

set_and_check(cascade_INCLUDE_DIRS "@PACKAGE_CMAKE_INSTALL_INCLUDEDIR@")
set(cascade_LIBRARIES "-L@PACKAGE_CMAKE_INSTALL_LIBDIR@ -lcascade")
Expand Down
37 changes: 18 additions & 19 deletions include/cascade/detail/debug_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,29 @@ void make_workload(uint32_t payload_size, uint32_t num_distinct_objects, const K

#if __cplusplus > 201703L
// C++ 20
#define LOG_TIMESTAMP_BY_TAG(t, g, v, ...) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id(), \
get_walltime() \
__VA_OPT__(, ) __VA_ARGS__); \
#define LOG_TIMESTAMP_BY_TAG(t, g, v, ...) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id() \
__VA_OPT__(, ) __VA_ARGS__); \
}
#else
// C++ 17
#define LOG_TIMESTAMP_BY_TAG(t, g, v) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id(), \
get_walltime()); \
#define LOG_TIMESTAMP_BY_TAG(t, g, v) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id() \
); \
}

#define LOG_TIMESTAMP_BY_TAG_EXTRA(t, g, v, e) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id(), \
get_walltime(), e); \
#define LOG_TIMESTAMP_BY_TAG_EXTRA(t, g, v, e) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id(),\
e); \
}

#endif //__cplusplus > 201703L
Expand Down
32 changes: 23 additions & 9 deletions include/cascade/detail/persistent_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,37 @@ const VT PersistentCascadeStore<KT, VT, IK, IV, ST>::get(const KT& key, const pe
return *IV;
} else {
// fall back to the slow path.
auto versioned_state_ptr = persistent_core.get(requested_version);
if(versioned_state_ptr->kv_map.find(key) != versioned_state_ptr->kv_map.end()) {
// following the backward chain until its version is behine requested_version.
// TODO: We can introduce a per-key version index to achieve a better performance
// with a 64bit per log entry memory overhead.
VT o = persistent_core->lockless_get(key);
persistent::version_t target_version = o.version;
while (target_version > requested_version) {
target_version =
persistent_core.template getDelta<VT>(target_version,true,
[](const VT& v){
return v.previous_version_by_key;
});
}
if (target_version == persistent::INVALID_VERSION) {
#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_END, group,*IV,ver);
#else
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_END, group,*IV,ver);
#endif
debug_leave_func_with_value("Reconstructed version:0x{:x} for key:{}", requested_version, key);
return versioned_state_ptr->kv_map.at(key);
}
debug_leave_func_with_value("No data found for key:{} before version:0x{:x}", key, requested_version);
return *IV;
} else {
#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_END, group,*IV,ver);
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_END, group,*IV,ver);
#else
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_END, group,*IV,ver);
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_END, group,*IV,ver);
#endif
debug_leave_func_with_value("No data found for key:{} before version:0x{:x}", key, requested_version);
return *IV;
return persistent_core.template getDelta<VT>(target_version,true,
[](const VT& v){
return v;
});
}
}
}
});
Expand Down
4 changes: 2 additions & 2 deletions include/cascade/detail/service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Service<CascadeTypes...>::Service(const std::vector<DeserializationContext*>& ds
nullptr,
// persistent
[this](subgroup_id_t sgid, persistent::version_t ver){
TimestampLogger::log(TLT_PERSISTED,group->get_my_id(),0,get_walltime(),ver);
TimestampLogger::log(TLT_PERSISTED,group->get_my_id(),0,ver);
},
nullptr
#endif
Expand Down Expand Up @@ -147,7 +147,7 @@ std::unique_ptr<CascadeType> client_stub_factory() {

#ifdef ENABLE_EVALUATION
#define LOG_SERVICE_CLIENT_TIMESTAMP(tag,msgid) \
TimestampLogger::log(tag,this->get_my_id(),msgid,get_walltime());
TimestampLogger::log(tag,this->get_my_id(),msgid);
#else
#define LOG_SERVICE_CLIENT_TIMESTAMP(tag,msgid)
#endif
Expand Down
2 changes: 1 addition & 1 deletion include/cascade/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ namespace cascade {
TimestampLogger::log(TLT_ACTION_FIRE_START,
0,
dynamic_cast<const IHasMessageID*>(value_ptr.get())->get_message_id(),
get_time_ns(), 0);
0);
dbg_default_trace("In {}: [worker_id={}] action is fired.", __PRETTY_FUNCTION__, worker_id);
(*ocdpo_ptr)(sender,key_string,prefix_length,version,value_ptr.get(),outputs,ctxt,worker_id);
}
Expand Down
74 changes: 38 additions & 36 deletions include/cascade/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <unordered_set>
#include <derecho/utils/time.h>
#include <cascade/config.h>
#include <wsong/perf/timing.h>

namespace derecho {
namespace cascade {
Expand Down Expand Up @@ -437,61 +438,62 @@ typedef union __attribute__((packed,aligned(8))) action_fire_extra_info {

#define CASCADE_TIMESTAMP_TAG_FILTER "CASCADE/timestamp_tag_enabler"

/**
* @class TimestampLogger utils.hpp "cascade/utils.hpp"
* @brief The timestamp logger tool.
*
* A wrapper class over the thread-safe timestamp logger in libwsong
*/
class TimestampLogger {
private:
std::vector<std::tuple<uint64_t,uint64_t,uint64_t,uint64_t,uint64_t>> _log;
pthread_spinlock_t lck;
std::unordered_set<uint64_t> tag_enabler;
/**
* Constructor
*/
TimestampLogger();
/**
* Log the timestamp
* @param tag timestamp tag
* @param node_id node id
* @param msg_id message id
* @param ts_ns timestamp in nanoseconds
* Only events with a tag included tag_enabler will be logged. Other events are dropped silently.
*/
void instance_log(uint64_t tag, uint64_t node_id, uint64_t msg_id, uint64_t ts_ns, uint64_t extra=0ull);
std::unordered_set<uint64_t> tag_enabler;
/**
* Flush log to file
* @param filename filename
* @param clear True for clear the log after flush
* @fn void instance_log(uint64_t, uint64_t, uint64_t, uint64_t)
* @brief Log an event
* @param[in] tag The event tag
* @param[in] node_id Node id
* @param[in] msg_id Message id
* @param[in] extra Optional extra information
*/
void instance_flush(const std::string& filename, bool clear = true);
/**
* Clear the log
*/
void instance_clear();
void instance_log(uint64_t tag, uint64_t node_id, uint64_t msg_id, uint64_t extra=0ull);

/** singleton */
/** The singleton logger */
static TimestampLogger _tl;

public:
/**
* Log the timestamp
* @param tag timestamp tag
* @param node_id node id
* @param msg_id message id
* @param ts_ns timestamp in nanoseconds
* @fn TimestampLogger()
* @brief Constructor
*/
TimestampLogger();
/**
* @fn void log(uint64_t, uint64_t, uint64_t, uint64_t)
* @brief Log an event
* @param[in] tag The event tag
* @param[in] node_id Node id
* @param[in] msg_id Message id
* @param[in] extra Optional extra information
*/
static inline void log(uint64_t tag, uint64_t node_id, uint64_t msg_id, uint64_t ts_ns=get_time_ns(), uint64_t extra=0ull) {
_tl.instance_log(tag,node_id,msg_id,ts_ns,extra);
static inline void log(uint64_t tag, uint64_t node_id, uint64_t msg_id, uint64_t extra=0ull) {
_tl.instance_log(tag,node_id,msg_id,extra);
}
/**
* Flush log to file
* @param filename filename
* @param clear True for clear the log after flush
* @fn void flush(const std::string&,bool)
* @brief Flush log to file.
* @param[in] filename The file name
*/
static inline void flush(const std::string& filename, bool clear = true) {
_tl.instance_flush(filename,clear);
static inline void flush(const std::string& filename) {
ws_timing_save(filename.c_str());
}
/**
* Clear the log
* @fn void clear()
* @brief Drop all event logs.
*/
static inline void clear() {
_tl.instance_clear();
ws_timing_clear();
}
};

Expand Down
20 changes: 20 additions & 0 deletions scripts/prerequisites/install-libwsong.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash
set -eU
export TMPDIR=/var/tmp
WORKPATH=`mktemp -d`
INSTALL_PREFIX="/usr/local"
if [[ $# -gt 0 ]]; then
INSTALL_PREFIX=$1
fi

echo "Using INSTALL_PREFIX=${INSTALL_PREFIX}"

cd ${WORKPATH}
git clone https://github.com/songweijia/libwsong.git
cd libwsong
git checkout 47c37bc706cc859f8b60ca4d19b0608e28a2e530
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} ..
make -j `nproc`
make install
6 changes: 3 additions & 3 deletions src/applications/standalone/dds/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ static bool run_perftest(
}
}
#if !defined(USE_DDS_TIMESTAMP_LOG)
TimestampLogger::flush(topic+".publisher.log",true);
TimestampLogger::flush(topic+".publisher.log");
#endif

} else {
Expand All @@ -265,7 +265,7 @@ static bool run_perftest(
// ts_log.emplace_back(std::tuple{msg.seqno,msg.sending_ts_us,get_walltime()/1000});
ts_log.emplace_back(std::tuple{header->seqno,header->sending_ts_us,get_walltime()/1000});
#else
TimestampLogger::log(TLT_DDS_SUBSCRIBER_CALLED,-1,0,get_time_ns(),received);
TimestampLogger::log(TLT_DDS_SUBSCRIBER_CALLED,-1,0,received);
received ++;
#endif
// end of test
Expand Down Expand Up @@ -299,7 +299,7 @@ static bool run_perftest(
}
ofile.close();
#else
TimestampLogger::flush(topic+".subscriber.log",true);
TimestampLogger::flush(topic+".subscriber.log");
#endif
}

Expand Down
1 change: 0 additions & 1 deletion src/applications/tests/pipeline/pipeline_udl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class PipelineOCDPO: public OffCriticalDataPathObserver {
TimestampLogger::log(TLT_PIPELINE(stage),
typed_ctxt->get_service_client_ref().get_my_id(),
value->get_message_id(),
get_walltime(),
worker_id+stage*10000);
#endif//ENABLE_EVALUATION
for (auto& okv:outputs) {
Expand Down
6 changes: 3 additions & 3 deletions src/service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ target_include_directories(service PRIVATE
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
)
target_link_libraries(service derecho::derecho)
target_link_libraries(service derecho::derecho libwsong::perf)
add_dependencies(service udl_signature)

if (ENABLE_MPROC)
Expand All @@ -70,7 +70,7 @@ target_include_directories(server PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}>
)
target_link_libraries(server cascade dl pthread)
target_link_libraries(server cascade dl pthread libwsong::perf)
target_link_options(server PUBLIC -rdynamic)
set_target_properties(server PROPERTIES OUTPUT_NAME cascade_server)
add_custom_command(TARGET server POST_BUILD
Expand Down Expand Up @@ -103,7 +103,7 @@ target_include_directories(client PRIVATE
target_include_directories(client PUBLIC
$<BUILD_INTERFACE:${Readline_INCLUDE_DIRS}>
)
target_link_libraries(client cascade ${Readline_LIBRARIES} pthread)
target_link_libraries(client cascade ${Readline_LIBRARIES} pthread libwsong::perf)
if(ENABLE_EVALUATION)
target_link_libraries(client rpclib::rpc)
endif()
Expand Down
Loading