From 8c875b6940026bf70a427f1061c31bf94960987d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Karas?= Date: Sat, 6 Jan 2024 18:51:17 +0100 Subject: [PATCH 1/3] provide read-write lock with writer-preference C++ std::shared_mutex may lead to writer starvation with multiple highly concurrent readers, and it doesn't provide API to configure priority. Luckily, with Pthread implementation, we may configure writer preference. It expect that write operations are rare, compared to read ones. --- cmake/Config.h.cmake | 5 ++ cmake/features.cmake | 4 + .../include/osmscoutclientqt/DBLoadJob.h | 2 +- .../include/osmscoutclientqt/MapRenderer.h | 2 +- .../src/osmscoutclientqt/DBLoadJob.cpp | 2 +- .../src/osmscoutclientqt/MapRenderer.cpp | 2 +- .../include/osmscoutclient/DBJob.h | 13 +-- .../include/osmscoutclient/DBThread.h | 5 +- .../src/osmscoutclient/DBJob.cpp | 2 +- .../src/osmscoutclient/DBThread.cpp | 26 +++--- libosmscout/CMakeLists.txt | 2 + libosmscout/include/meson.build | 1 + .../include/osmscout/async/ReadWriteLock.h | 72 +++++++++++++++++ libosmscout/src/meson.build | 1 + .../src/osmscout/async/ReadWriteLock.cpp | 80 +++++++++++++++++++ 15 files changed, 193 insertions(+), 26 deletions(-) create mode 100644 libosmscout/include/osmscout/async/ReadWriteLock.h create mode 100644 libosmscout/src/osmscout/async/ReadWriteLock.cpp diff --git a/cmake/Config.h.cmake b/cmake/Config.h.cmake index be28a0a37..a8468de4a 100644 --- a/cmake/Config.h.cmake +++ b/cmake/Config.h.cmake @@ -376,6 +376,11 @@ #cmakedefine OSMSCOUT_MAP_SVG_HAVE_LIB_PANGO 1 #endif +#ifndef OSMSCOUT_PTHREAD +/* Threads are pthreads */ +#cmakedefine OSMSCOUT_PTHREAD +#endif + #ifndef OSMSCOUT_PTHREAD_NAME /* Threads are pthreads and non-posix setname is available */ #cmakedefine OSMSCOUT_PTHREAD_NAME diff --git a/cmake/features.cmake b/cmake/features.cmake index eb612f1f1..87c387fce 100644 --- a/cmake/features.cmake +++ b/cmake/features.cmake @@ -359,6 +359,10 @@ if(THREADS_HAVE_PTHREAD_ARG) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${THREADS_PTHREAD_ARG}") endif() +if (CMAKE_USE_PTHREADS_INIT) + set(OSMSCOUT_PTHREAD TRUE) +endif() + try_compile(PTHREAD_NAME_OK "${PROJECT_BINARY_DIR}" "${PROJECT_SOURCE_DIR}/cmake/TestPThreadName.cpp") if(PTHREAD_NAME_OK) diff --git a/libosmscout-client-qt/include/osmscoutclientqt/DBLoadJob.h b/libosmscout-client-qt/include/osmscoutclientqt/DBLoadJob.h index 08da62507..3b970da6d 100644 --- a/libosmscout-client-qt/include/osmscoutclientqt/DBLoadJob.h +++ b/libosmscout-client-qt/include/osmscoutclientqt/DBLoadJob.h @@ -81,7 +81,7 @@ protected slots: void Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &databases, - std::shared_lock &&locker) override; + ReadLock &&locker) override; void Close() override; diff --git a/libosmscout-client-qt/include/osmscoutclientqt/MapRenderer.h b/libosmscout-client-qt/include/osmscoutclientqt/MapRenderer.h index 9f8934d6c..b8e87d4fa 100644 --- a/libosmscout-client-qt/include/osmscoutclientqt/MapRenderer.h +++ b/libosmscout-client-qt/include/osmscoutclientqt/MapRenderer.h @@ -68,7 +68,7 @@ class OSMSCOUT_CLIENT_QT_API DBRenderJob : public QObject, public DBJob{ void Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &allDatabases, - std::shared_lock &&locker) override; + ReadLock &&locker) override; inline bool IsSuccess() const{ return success; diff --git a/libosmscout-client-qt/src/osmscoutclientqt/DBLoadJob.cpp b/libosmscout-client-qt/src/osmscoutclientqt/DBLoadJob.cpp index 61ded627d..e836154cc 100644 --- a/libosmscout-client-qt/src/osmscoutclientqt/DBLoadJob.cpp +++ b/libosmscout-client-qt/src/osmscoutclientqt/DBLoadJob.cpp @@ -55,7 +55,7 @@ DBLoadJob::~DBLoadJob() void DBLoadJob::Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &databases, - std::shared_lock &&locker) + ReadLock &&locker) { osmscout::GeoBox lookupBox(lookupProjection.GetDimensions()); std::list relevantDatabases; diff --git a/libosmscout-client-qt/src/osmscoutclientqt/MapRenderer.cpp b/libosmscout-client-qt/src/osmscoutclientqt/MapRenderer.cpp index dc882da1f..7c8c12136 100644 --- a/libosmscout-client-qt/src/osmscoutclientqt/MapRenderer.cpp +++ b/libosmscout-client-qt/src/osmscoutclientqt/MapRenderer.cpp @@ -254,7 +254,7 @@ DBRenderJob::DBRenderJob(osmscout::MercatorProjection renderProjection, void DBRenderJob::Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &allDatabases, - std::shared_lock &&locker) + ReadLock &&locker) { std::list databases; // enabled databases for rendering if (renderDatabases) { diff --git a/libosmscout-client/include/osmscoutclient/DBJob.h b/libosmscout-client/include/osmscoutclient/DBJob.h index 0aa17e19c..761619e4b 100644 --- a/libosmscout-client/include/osmscoutclient/DBJob.h +++ b/libosmscout-client/include/osmscoutclient/DBJob.h @@ -24,6 +24,7 @@ #include +#include #include #include @@ -37,15 +38,15 @@ namespace osmscout { */ class OSMSCOUT_CLIENT_API DBJob { - protected: +protected: osmscout::BasemapDatabaseRef basemapDatabase; //!< Optional reference to the basemap db std::list databases; //!< borrowed databases std::thread::id threadId; //!< job thread - private: - std::shared_lock locker; //!< db locker +private: + ReadLock locker; //!< db locker - public: +public: DBJob(); DBJob(const DBJob&) = delete; DBJob(DBJob&&) = delete; @@ -54,8 +55,8 @@ class OSMSCOUT_CLIENT_API DBJob { virtual ~DBJob(); virtual void Run(const osmscout::BasemapDatabaseRef& basemapDatabase, - const std::list &databases, - std::shared_lock &&locker); + const std::list &databases, + ReadLock &&locker); virtual void Close(); }; diff --git a/libosmscout-client/include/osmscoutclient/DBThread.h b/libosmscout-client/include/osmscoutclient/DBThread.h index 909d5433f..c55b93293 100644 --- a/libosmscout-client/include/osmscoutclient/DBThread.h +++ b/libosmscout-client/include/osmscoutclient/DBThread.h @@ -30,6 +30,7 @@ #include #include +#include #include @@ -96,7 +97,7 @@ class OSMSCOUT_CLIENT_API DBThread: public AsyncWorker using AsynchronousDBJob = std::function &databases, - std::shared_lock &&locker)>; + ReadLock &&locker)>; // signals Signal<> stylesheetFilenameChanged; @@ -142,7 +143,7 @@ class OSMSCOUT_CLIENT_API DBThread: public AsyncWorker double mapDpi; - mutable std::shared_mutex lock; + mutable SharedMutex lock; osmscout::BasemapDatabaseParameter basemapDatabaseParameter; osmscout::BasemapDatabaseRef basemapDatabase; diff --git a/libosmscout-client/src/osmscoutclient/DBJob.cpp b/libosmscout-client/src/osmscoutclient/DBJob.cpp index bcd810152..24e29947a 100644 --- a/libosmscout-client/src/osmscoutclient/DBJob.cpp +++ b/libosmscout-client/src/osmscoutclient/DBJob.cpp @@ -33,7 +33,7 @@ DBJob::~DBJob() void DBJob::Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &databases, - std::shared_lock &&locker) + ReadLock &&locker) { assert(threadId==std::this_thread::get_id()); this->basemapDatabase=basemapDatabase; diff --git a/libosmscout-client/src/osmscoutclient/DBThread.cpp b/libosmscout-client/src/osmscoutclient/DBThread.cpp index ad4c3482d..d13f5bcec 100644 --- a/libosmscout-client/src/osmscoutclient/DBThread.cpp +++ b/libosmscout-client/src/osmscoutclient/DBThread.cpp @@ -64,7 +64,7 @@ DBThread::DBThread(const std::string &basemapLookupDirectory, DBThread::~DBThread() { - std::unique_lock locker(lock); + WriteLock locker(lock); osmscout::log.Debug() << "DBThread::~DBThread()"; mapDpiSlot.Disconnect(); @@ -89,7 +89,7 @@ bool DBThread::isInitializedInternal() bool DBThread::isInitialized() { - std::shared_lock locker(lock); + ReadLock locker(lock); return isInitializedInternal(); } @@ -104,7 +104,7 @@ double DBThread::GetPhysicalDpi() const } const GeoBox DBThread::databaseBoundingBox() const { - std::shared_lock locker(lock); + ReadLock locker(lock); GeoBox response; for (const auto& db:databases){ response.Include(db->GetDBGeoBox()); @@ -115,7 +115,7 @@ const GeoBox DBThread::databaseBoundingBox() const { DatabaseCoverage DBThread::databaseCoverage(const osmscout::Magnification &magnification, const osmscout::GeoBox &bbox) { - std::shared_lock locker(lock); + ReadLock locker(lock); osmscout::GeoBox boundingBox; for (const auto &db:databases){ @@ -173,7 +173,7 @@ CancelableFuture DBThread::OnDatabaseListChanged(const std::vectorClose(); @@ -365,7 +365,7 @@ StyleConfigRef DBThread::makeStyleConfig(TypeConfigRef typeConfig, bool suppress CancelableFuture DBThread::ToggleDaylight() { return Async([this](const Breaker &) -> bool { - std::unique_lock locker(lock); + WriteLock locker(lock); if (!isInitializedInternal()) { return false; @@ -384,7 +384,7 @@ CancelableFuture DBThread::ToggleDaylight() CancelableFuture DBThread::OnMapDPIChange(double dpi) { return Async([this, dpi](const Breaker&) -> bool{ - std::unique_lock locker(lock); + WriteLock locker(lock); mapDpi = dpi; return true; }); @@ -394,7 +394,7 @@ CancelableFuture DBThread::SetStyleFlag(const std::string &key, bool value { return Async([this, key, value](const Breaker&) -> bool{ log.Debug() << "SetStyleFlag " << key << " to " << value; - std::unique_lock locker(lock); + WriteLock locker(lock); if (!isInitializedInternal()) { return false; @@ -411,7 +411,7 @@ CancelableFuture DBThread::ReloadStyle(const std::string &suffix) { return Async([this, suffix](const Breaker &) -> bool { log.Debug() << "Reloading style " << stylesheetFilename << suffix << "..."; - std::unique_lock locker(lock); + WriteLock locker(lock); LoadStyleInternal(stylesheetFilename, stylesheetFlags, suffix); log.Debug() << "Reloading style done."; return true; @@ -423,7 +423,7 @@ CancelableFuture DBThread::LoadStyle(const std::string &stylesheetFilename const std::string &suffix) { return Async([this, stylesheetFilename, stylesheetFlags, suffix](const Breaker&){ - std::unique_lock locker(lock); + WriteLock locker(lock); LoadStyleInternal(stylesheetFilename, stylesheetFlags, suffix); return true; }); @@ -455,7 +455,7 @@ void DBThread::LoadStyleInternal(const std::string &stylesheetFilename, const std::map DBThread::GetStyleFlags() const { - std::shared_lock locker(lock); + ReadLock locker(lock); std::map flags; // add flag overrides for (const auto& flag : stylesheetFlags){ @@ -508,7 +508,7 @@ CancelableFuture DBThread::FlushCaches(const std::chrono::milliseconds &id void DBThread::RunJob(AsynchronousDBJob job) { - std::shared_lock locker(lock); + ReadLock locker(lock); if (!isInitializedInternal()){ locker.unlock(); osmscout::log.Warn() << "ignore request, dbs is not initialized"; @@ -519,7 +519,7 @@ void DBThread::RunJob(AsynchronousDBJob job) void DBThread::RunSynchronousJob(SynchronousDBJob job) { - std::shared_lock locker(lock); + ReadLock locker(lock); if (!isInitializedInternal()){ osmscout::log.Warn() << "ignore request, dbs is not initialized"; return; diff --git a/libosmscout/CMakeLists.txt b/libosmscout/CMakeLists.txt index 9efca11cf..7749ed370 100644 --- a/libosmscout/CMakeLists.txt +++ b/libosmscout/CMakeLists.txt @@ -37,6 +37,7 @@ set(HEADER_FILES_ASYNC include/osmscout/async/Breaker.h include/osmscout/async/CancelableFuture.h include/osmscout/async/ProcessingQueue.h + include/osmscout/async/ReadWriteLock.h include/osmscout/async/Signal.h include/osmscout/async/Thread.h include/osmscout/async/Worker.h @@ -214,6 +215,7 @@ set(SOURCE_FILES src/osmscout/io/FileWriter.cpp src/osmscout/io/NumericIndex.cpp src/osmscout/async/AsyncWorker.cpp + src/osmscout/async/ReadWriteLock.cpp src/osmscout/async/Breaker.cpp src/osmscout/async/Thread.cpp src/osmscout/async/Worker.cpp diff --git a/libosmscout/include/meson.build b/libosmscout/include/meson.build index 33366821b..1e1c132a6 100644 --- a/libosmscout/include/meson.build +++ b/libosmscout/include/meson.build @@ -9,6 +9,7 @@ osmscoutHeader = [ 'osmscout/async/Breaker.h', 'osmscout/async/CancelableFuture.h', 'osmscout/async/ProcessingQueue.h', + 'osmscout/async/ReadWriteLock.h', 'osmscout/async/Signal.h', 'osmscout/async/Thread.h', 'osmscout/async/Worker.h', diff --git a/libosmscout/include/osmscout/async/ReadWriteLock.h b/libosmscout/include/osmscout/async/ReadWriteLock.h new file mode 100644 index 000000000..003c85a9a --- /dev/null +++ b/libosmscout/include/osmscout/async/ReadWriteLock.h @@ -0,0 +1,72 @@ +#ifndef LIBOSMSCOUT_READWRITELOCK_H +#define LIBOSMSCOUT_READWRITELOCK_H + +/* + This source is part of the libosmscout library + Copyright (C) 2024 Lukas Karas + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include + +#include + +#include +#include + +#ifdef OSMSCOUT_PTHREAD +#include +#endif + +namespace osmscout { + +#ifdef OSMSCOUT_PTHREAD + + /** C++ std::shared_mutex may lead to writer starvation with multiple highly concurrent readers, + * and it doesn't provide API to configure priority. Luckily, with Pthread implementation, + * we may configure writer preference. It expect that write operations are rare, compared to read ones. + */ + class OSMSCOUT_API SharedMutex { + private: + pthread_rwlock_t rwlock{}; + + public: + SharedMutex(); + SharedMutex(const SharedMutex&) = delete; + SharedMutex(SharedMutex&&) = delete; + SharedMutex& operator=(const SharedMutex&) = delete; + SharedMutex& operator=(SharedMutex&&) = delete; + ~SharedMutex(); + + void lock(); + void unlock(); + + void lock_shared(); + void unlock_shared(); + }; + +#else + + using SharedMutex = std::shared_mutex; + +#endif + + using ReadLock = std::shared_lock; + using WriteLock = std::unique_lock; + +} + +#endif //LIBOSMSCOUT_READWRITELOCK_H diff --git a/libosmscout/src/meson.build b/libosmscout/src/meson.build index 3b8cd6951..048c61eab 100644 --- a/libosmscout/src/meson.build +++ b/libosmscout/src/meson.build @@ -5,6 +5,7 @@ osmscoutSrc = [ 'src/osmscout/cli/CmdLineParsing.cpp', 'src/osmscout/async/AsyncWorker.cpp', 'src/osmscout/async/Breaker.cpp', + 'src/osmscout/async/ReadWriteLock.cpp', 'src/osmscout/async/Thread.cpp', 'src/osmscout/async/Worker.cpp', 'src/osmscout/async/WorkQueue.cpp', diff --git a/libosmscout/src/osmscout/async/ReadWriteLock.cpp b/libosmscout/src/osmscout/async/ReadWriteLock.cpp new file mode 100644 index 000000000..ff0cd3295 --- /dev/null +++ b/libosmscout/src/osmscout/async/ReadWriteLock.cpp @@ -0,0 +1,80 @@ +/* + This source is part of the libosmscout library + Copyright (C) 2024 Lukas Karas + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include + +#include + +#include + +namespace osmscout { + +#ifdef OSMSCOUT_PTHREAD + + SharedMutex::SharedMutex() + { + pthread_rwlockattr_t attr{}; + [[maybe_unused]] int res = pthread_rwlockattr_init(&attr); + assert(res==0); + +#if defined __USE_UNIX98 || defined __USE_XOPEN2K + res = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + assert(res==0); +#endif + + pthread_rwlock_t rwlock{}; + res = pthread_rwlock_init(&rwlock, &attr); + assert(res==0); + res = pthread_rwlockattr_destroy(&attr); + assert(res==0); + } + + SharedMutex::~SharedMutex() + { + [[maybe_unused]] int res = pthread_rwlock_destroy(&rwlock); + assert(res==0); + } + + void SharedMutex::lock() + { + [[maybe_unused]] int res = pthread_rwlock_wrlock(&rwlock); + assert(res==0); + } + + void SharedMutex::unlock() + { + [[maybe_unused]] int res = pthread_rwlock_unlock(&rwlock); + assert(res==0); + } + + void SharedMutex::lock_shared() + { + [[maybe_unused]] int res = pthread_rwlock_rdlock(&rwlock); + assert(res==0); + } + + void SharedMutex::unlock_shared() + { + [[maybe_unused]] int res = pthread_rwlock_unlock(&rwlock); + assert(res==0); + } + +#endif + +} From 1cd032238879b439cd5213d96f7f127b4c5b850d Mon Sep 17 00:00:00 2001 From: janbar Date: Sat, 27 Jan 2024 17:36:57 +0100 Subject: [PATCH 2/3] read write lock This implements a pure C++ Latch providing lock-S (shared) and lock-X (exclusive). --- cmake/Config.h.cmake | 5 - cmake/features.cmake | 4 - .../include/osmscoutclient/DBThread.h | 6 +- .../src/osmscoutclient/DBThread.cpp | 26 +- libosmscout/CMakeLists.txt | 2 +- .../include/osmscout/async/ReadWriteLock.h | 216 ++++++++++++++--- .../src/osmscout/async/ReadWriteLock.cpp | 223 ++++++++++++++---- 7 files changed, 384 insertions(+), 98 deletions(-) diff --git a/cmake/Config.h.cmake b/cmake/Config.h.cmake index a8468de4a..be28a0a37 100644 --- a/cmake/Config.h.cmake +++ b/cmake/Config.h.cmake @@ -376,11 +376,6 @@ #cmakedefine OSMSCOUT_MAP_SVG_HAVE_LIB_PANGO 1 #endif -#ifndef OSMSCOUT_PTHREAD -/* Threads are pthreads */ -#cmakedefine OSMSCOUT_PTHREAD -#endif - #ifndef OSMSCOUT_PTHREAD_NAME /* Threads are pthreads and non-posix setname is available */ #cmakedefine OSMSCOUT_PTHREAD_NAME diff --git a/cmake/features.cmake b/cmake/features.cmake index 87c387fce..eb612f1f1 100644 --- a/cmake/features.cmake +++ b/cmake/features.cmake @@ -359,10 +359,6 @@ if(THREADS_HAVE_PTHREAD_ARG) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${THREADS_PTHREAD_ARG}") endif() -if (CMAKE_USE_PTHREADS_INIT) - set(OSMSCOUT_PTHREAD TRUE) -endif() - try_compile(PTHREAD_NAME_OK "${PROJECT_BINARY_DIR}" "${PROJECT_SOURCE_DIR}/cmake/TestPThreadName.cpp") if(PTHREAD_NAME_OK) diff --git a/libosmscout-client/include/osmscoutclient/DBThread.h b/libosmscout-client/include/osmscoutclient/DBThread.h index c55b93293..0c58d02ab 100644 --- a/libosmscout-client/include/osmscoutclient/DBThread.h +++ b/libosmscout-client/include/osmscoutclient/DBThread.h @@ -143,7 +143,7 @@ class OSMSCOUT_CLIENT_API DBThread: public AsyncWorker double mapDpi; - mutable SharedMutex lock; + mutable Latch latch; osmscout::BasemapDatabaseParameter basemapDatabaseParameter; osmscout::BasemapDatabaseRef basemapDatabase; @@ -236,12 +236,12 @@ class OSMSCOUT_CLIENT_API DBThread: public AsyncWorker const std::list &GetStyleErrors() const { - return styleErrors; + return styleErrors; } StyleConfigRef GetEmptyStyleConfig() const { - std::shared_lock locker(lock); + ReadLock locker(latch); return emptyStyleConfig; } diff --git a/libosmscout-client/src/osmscoutclient/DBThread.cpp b/libosmscout-client/src/osmscoutclient/DBThread.cpp index d13f5bcec..3182715ce 100644 --- a/libosmscout-client/src/osmscoutclient/DBThread.cpp +++ b/libosmscout-client/src/osmscoutclient/DBThread.cpp @@ -64,7 +64,7 @@ DBThread::DBThread(const std::string &basemapLookupDirectory, DBThread::~DBThread() { - WriteLock locker(lock); + WriteLock locker(latch); osmscout::log.Debug() << "DBThread::~DBThread()"; mapDpiSlot.Disconnect(); @@ -89,7 +89,7 @@ bool DBThread::isInitializedInternal() bool DBThread::isInitialized() { - ReadLock locker(lock); + ReadLock locker(latch); return isInitializedInternal(); } @@ -104,7 +104,7 @@ double DBThread::GetPhysicalDpi() const } const GeoBox DBThread::databaseBoundingBox() const { - ReadLock locker(lock); + ReadLock locker(latch); GeoBox response; for (const auto& db:databases){ response.Include(db->GetDBGeoBox()); @@ -115,7 +115,7 @@ const GeoBox DBThread::databaseBoundingBox() const { DatabaseCoverage DBThread::databaseCoverage(const osmscout::Magnification &magnification, const osmscout::GeoBox &bbox) { - ReadLock locker(lock); + ReadLock locker(latch); osmscout::GeoBox boundingBox; for (const auto &db:databases){ @@ -173,7 +173,7 @@ CancelableFuture DBThread::OnDatabaseListChanged(const std::vectorClose(); @@ -365,7 +365,7 @@ StyleConfigRef DBThread::makeStyleConfig(TypeConfigRef typeConfig, bool suppress CancelableFuture DBThread::ToggleDaylight() { return Async([this](const Breaker &) -> bool { - WriteLock locker(lock); + WriteLock locker(latch); if (!isInitializedInternal()) { return false; @@ -384,7 +384,7 @@ CancelableFuture DBThread::ToggleDaylight() CancelableFuture DBThread::OnMapDPIChange(double dpi) { return Async([this, dpi](const Breaker&) -> bool{ - WriteLock locker(lock); + WriteLock locker(latch); mapDpi = dpi; return true; }); @@ -394,7 +394,7 @@ CancelableFuture DBThread::SetStyleFlag(const std::string &key, bool value { return Async([this, key, value](const Breaker&) -> bool{ log.Debug() << "SetStyleFlag " << key << " to " << value; - WriteLock locker(lock); + WriteLock locker(latch); if (!isInitializedInternal()) { return false; @@ -411,7 +411,7 @@ CancelableFuture DBThread::ReloadStyle(const std::string &suffix) { return Async([this, suffix](const Breaker &) -> bool { log.Debug() << "Reloading style " << stylesheetFilename << suffix << "..."; - WriteLock locker(lock); + WriteLock locker(latch); LoadStyleInternal(stylesheetFilename, stylesheetFlags, suffix); log.Debug() << "Reloading style done."; return true; @@ -423,7 +423,7 @@ CancelableFuture DBThread::LoadStyle(const std::string &stylesheetFilename const std::string &suffix) { return Async([this, stylesheetFilename, stylesheetFlags, suffix](const Breaker&){ - WriteLock locker(lock); + WriteLock locker(latch); LoadStyleInternal(stylesheetFilename, stylesheetFlags, suffix); return true; }); @@ -455,7 +455,7 @@ void DBThread::LoadStyleInternal(const std::string &stylesheetFilename, const std::map DBThread::GetStyleFlags() const { - ReadLock locker(lock); + ReadLock locker(latch); std::map flags; // add flag overrides for (const auto& flag : stylesheetFlags){ @@ -508,7 +508,7 @@ CancelableFuture DBThread::FlushCaches(const std::chrono::milliseconds &id void DBThread::RunJob(AsynchronousDBJob job) { - ReadLock locker(lock); + ReadLock locker(latch); if (!isInitializedInternal()){ locker.unlock(); osmscout::log.Warn() << "ignore request, dbs is not initialized"; @@ -519,7 +519,7 @@ void DBThread::RunJob(AsynchronousDBJob job) void DBThread::RunSynchronousJob(SynchronousDBJob job) { - ReadLock locker(lock); + ReadLock locker(latch); if (!isInitializedInternal()){ osmscout::log.Warn() << "ignore request, dbs is not initialized"; return; diff --git a/libosmscout/CMakeLists.txt b/libosmscout/CMakeLists.txt index 7749ed370..d62caaeed 100644 --- a/libosmscout/CMakeLists.txt +++ b/libosmscout/CMakeLists.txt @@ -215,8 +215,8 @@ set(SOURCE_FILES src/osmscout/io/FileWriter.cpp src/osmscout/io/NumericIndex.cpp src/osmscout/async/AsyncWorker.cpp - src/osmscout/async/ReadWriteLock.cpp src/osmscout/async/Breaker.cpp + src/osmscout/async/ReadWriteLock.cpp src/osmscout/async/Thread.cpp src/osmscout/async/Worker.cpp src/osmscout/async/WorkQueue.cpp diff --git a/libosmscout/include/osmscout/async/ReadWriteLock.h b/libosmscout/include/osmscout/async/ReadWriteLock.h index 003c85a9a..60788ff3e 100644 --- a/libosmscout/include/osmscout/async/ReadWriteLock.h +++ b/libosmscout/include/osmscout/async/ReadWriteLock.h @@ -3,7 +3,7 @@ /* This source is part of the libosmscout library - Copyright (C) 2024 Lukas Karas + Copyright (C) 2024 Jean-Luc Barriere This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -24,48 +24,202 @@ #include -#include +#include +#include +#include #include - -#ifdef OSMSCOUT_PTHREAD -#include -#endif +#include namespace osmscout { -#ifdef OSMSCOUT_PTHREAD +/** + * This implements a pure C++ Latch providing lock-S (shared) and lock-X (exclusive). + * The concept used here allows X requests to be prioritized faster and more smoothly + * than standard implementations. It uses a no-lock strategy whenever possible and + * reverts to lock and wait in race condition. + */ - /** C++ std::shared_mutex may lead to writer starvation with multiple highly concurrent readers, - * and it doesn't provide API to configure priority. Luckily, with Pthread implementation, - * we may configure writer preference. It expect that write operations are rare, compared to read ones. +constexpr size_t latch_bucket_count = 64; + +class OSMSCOUT_API Latch { +private: + mutable std::atomic_flag s_spin = ATOMIC_FLAG_INIT; + + volatile int x_wait = 0; /* counts requests in wait for X */ + volatile int s_count = 0; /* counts held S locks */ + volatile int x_flag = 0; /* X status: 0, 1, 2, or 3 */ + std::thread::id x_owner; /* X owner (thread id) */ + + std::mutex x_gate_lock; + std::condition_variable x_gate; /* wait for release of X */ + std::mutex s_gate_lock; + std::condition_variable s_gate; /* wait for release of S */ + + bool px = true; /* enable X precedence */ + std::array s_buckets{}; + + void spin_lock() { + while (s_spin.test_and_set(std::memory_order_acquire)) { + std::this_thread::yield(); + } + } + void spin_unlock() { + s_spin.clear(std::memory_order_release); + } + +public: + Latch() = default; + explicit Latch(bool _px) : px(_px) { } + Latch(const Latch&) = delete; + Latch(Latch&&) = delete; + Latch& operator=(const Latch&) = delete; + Latch& operator=(Latch&&) = delete; + ~Latch() = default; + + /* Locks the latch for exclusive ownership, + * blocks if the latch is not available */ - class OSMSCOUT_API SharedMutex { - private: - pthread_rwlock_t rwlock{}; - - public: - SharedMutex(); - SharedMutex(const SharedMutex&) = delete; - SharedMutex(SharedMutex&&) = delete; - SharedMutex& operator=(const SharedMutex&) = delete; - SharedMutex& operator=(SharedMutex&&) = delete; - ~SharedMutex(); + void lock(); - void lock(); - void unlock(); + /* Unlocks the latch (exclusive ownership) */ + void unlock(); - void lock_shared(); - void unlock_shared(); - }; + /* Locks the latch for shared ownership, + * blocks if the latch is not available + */ + void lock_shared(); -#else + /* Unlocks the latch (shared ownership) */ + void unlock_shared(); - using SharedMutex = std::shared_mutex; + /* Tries to lock the latch for shared ownership, + * returns true if the latch has no exclusive ownership or any request for + * exclusive ownership, else false + */ + bool try_lock_shared(); +}; -#endif +/* + * Cannot use the template std::shared_lock as Latch does not implement all the + * requirements of the standard + */ +class OSMSCOUT_API ReadLock +{ +private: + Latch *p = nullptr; + bool owns = false; + + void swap(ReadLock& rl) noexcept { + std::swap(p, rl.p); + std::swap(owns, rl.owns); + } + +public: + + ReadLock() = default; + + explicit ReadLock(Latch& latch) : p(&latch), owns(true) { latch.lock_shared(); } + + /* Assume the calling thread already has ownership of the shared lock */ + ReadLock(Latch& latch, std::adopt_lock_t) : p(&latch), owns(true) { } + + ~ReadLock() { + if (owns) { + p->unlock_shared(); + } + } + + ReadLock(ReadLock const&) = delete; + ReadLock& operator=(ReadLock const&) = delete; + + ReadLock(ReadLock&& rl) noexcept { swap(rl); } + + ReadLock& operator=(ReadLock&& rl) noexcept { + swap(rl); + return *this; + } + + bool owns_lock() const noexcept { + return owns; + } + + void lock() { + if (!owns && p != nullptr) { + p->lock_shared(); + owns = true; + } + } + + void unlock() { + if (owns) { + owns = false; + p->unlock_shared(); + } + } + + bool try_lock() { + if (!owns && p != nullptr) { + owns = p->try_lock_shared(); + } + return owns; + } +}; - using ReadLock = std::shared_lock; - using WriteLock = std::unique_lock; +/* + * Cannot use the template std::unique_lock as Latch does not implement all the + * requirements of the standard + */ +class OSMSCOUT_API WriteLock +{ +private: + Latch *p = nullptr; + bool owns = false; + + void swap(WriteLock& wl) noexcept { + std::swap(p, wl.p); + std::swap(owns, wl.owns); + } + +public: + + WriteLock() = default; + + explicit WriteLock(Latch& latch) : p(&latch), owns(true) { latch.lock(); } + + ~WriteLock() { + if (owns) { + p->unlock(); + } + } + + WriteLock(WriteLock const&) = delete; + WriteLock& operator=(WriteLock const&) = delete; + + WriteLock(WriteLock&& wl) noexcept { swap(wl); } + + WriteLock& operator=(WriteLock&& wl) noexcept { + swap(wl); + return *this; + } + + bool owns_lock() const noexcept { + return owns; + } + + void lock() { + if (!owns && p != nullptr) { + p->lock(); + owns = true; + } + } + + void unlock() { + if (owns) { + owns = false; + p->unlock(); + } + } +}; } diff --git a/libosmscout/src/osmscout/async/ReadWriteLock.cpp b/libosmscout/src/osmscout/async/ReadWriteLock.cpp index ff0cd3295..27c64b970 100644 --- a/libosmscout/src/osmscout/async/ReadWriteLock.cpp +++ b/libosmscout/src/osmscout/async/ReadWriteLock.cpp @@ -1,6 +1,6 @@ /* This source is part of the libosmscout library - Copyright (C) 2024 Lukas Karas + Copyright (C) 2024 Jean-Luc Barriere This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -21,60 +21,201 @@ #include +#include + #include +#include namespace osmscout { -#ifdef OSMSCOUT_PTHREAD - - SharedMutex::SharedMutex() - { - pthread_rwlockattr_t attr{}; - [[maybe_unused]] int res = pthread_rwlockattr_init(&attr); - assert(res==0); +/** + * The X flag is set as follows based on the locking steps + * Step 0 : X is released + * Step 1 : X is held, but waits for release of S + * Step 2 : X was released and left available for one of request in wait + * Step 3 : X is held + * Step N : X recursive N-3 + */ +constexpr int X_STEP_0 = 0; +constexpr int X_STEP_1 = 1; +constexpr int X_STEP_2 = 2; +constexpr int X_STEP_3 = 3; + +void Latch::lock() { + /* Depending on the internal implementation of conditional variable, + * a race condition could arise, permanently blocking the thread; + * Setting a timeout works around the issue. + */ + static constexpr std::chrono::seconds exit_timeout(1); + + std::thread::id tid = std::this_thread::get_id(); + + spin_lock(); + + if (x_owner != tid) { + /* increments the count of request in wait */ + ++x_wait; + for (;;) { + /* if flag is 0 or 2 then it hold X with no wait, + * in other case it have to wait for X gate + */ + if (x_flag == X_STEP_0 || x_flag == X_STEP_2) { + x_flag = X_STEP_1; + --x_wait; + break; + } else { + /* !!! pop gate then unlock spin */ + std::unique_lock lk(x_gate_lock); + spin_unlock(); + x_gate.wait_for(lk, exit_timeout); + lk.unlock(); + } + spin_lock(); + } + + /* X = 1, check the releasing of S */ + for (;;) { + /* if the count of S is zeroed then it finalize with no wait, + * in other case it have to wait for S gate */ + if (s_count == 0) { + x_flag = X_STEP_3; + break; + } else { + /* !!! pop gate then unlock spin (reverse order for S notifier) */ + std::unique_lock lk(s_gate_lock); + spin_unlock(); + s_gate.wait_for(lk, exit_timeout); + lk.unlock(); + spin_lock(); + /* check if the notifier has hand over, else retry */ + if (x_flag == X_STEP_3) { + break; + } + } + } + + /* X = 3, set owner */ + x_owner = tid; + } else { + /* recursive X lock */ + ++x_flag; + } -#if defined __USE_UNIX98 || defined __USE_XOPEN2K - res = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); - assert(res==0); -#endif + spin_unlock(); +} - pthread_rwlock_t rwlock{}; - res = pthread_rwlock_init(&rwlock, &attr); - assert(res==0); - res = pthread_rwlockattr_destroy(&attr); - assert(res==0); +void Latch::unlock() { + spin_lock(); + if (x_owner == std::this_thread::get_id()) { + /* decrement recursive lock */ + if (--x_flag == X_STEP_2) { + x_owner = std::thread::id(); + /* hand-over to a request in wait for X, else release */ + if (x_wait == 0) { + x_flag = X_STEP_0; + } + /* !!! unlock spin then pop gate (reverse order for receiver) */ + spin_unlock(); + std::unique_lock lk(x_gate_lock); + x_gate.notify_all(); + lk.unlock(); + } else { + spin_unlock(); + } + } else { + spin_unlock(); } +} - SharedMutex::~SharedMutex() - { - [[maybe_unused]] int res = pthread_rwlock_destroy(&rwlock); - assert(res==0); - } +namespace { +static const std::hash thread_hash; +} - void SharedMutex::lock() - { - [[maybe_unused]] int res = pthread_rwlock_wrlock(&rwlock); - assert(res==0); +void Latch::lock_shared() { + /* Depending on the internal implementation of conditional variable, + * a race condition could arise, permanently blocking the thread; + * Setting a timeout works around the issue. + */ + static constexpr std::chrono::seconds exit_timeout(1); + + std::thread::id tid = std::this_thread::get_id(); + + spin_lock(); + if (x_owner != tid) { + /* if flag is 0 or 1 then it hold S with no wait, + * in other case it have to wait for X gate + */ + for (;;) { + if (!px) { + /* X precedence is false */ + if (x_flag < X_STEP_2) { + break; + } + } else { + /* X precedence is true, + * estimate if this thread holds a recursive S lock + */ + if (x_flag == X_STEP_0 || (x_flag == X_STEP_1 && + s_buckets[thread_hash(tid) % latch_bucket_count] > 0)) { + break; + } + } + /* !!! pop gate then unlock spin */ + std::unique_lock lk(x_gate_lock); + spin_unlock(); + x_gate.wait_for(lk, exit_timeout); + lk.unlock(); + spin_lock(); + } } - - void SharedMutex::unlock() - { - [[maybe_unused]] int res = pthread_rwlock_unlock(&rwlock); - assert(res==0); + ++s_count; + if (px) { + /* X precedence is true */ + ++s_buckets[thread_hash(tid) % latch_bucket_count]; } + spin_unlock(); +} - void SharedMutex::lock_shared() - { - [[maybe_unused]] int res = pthread_rwlock_rdlock(&rwlock); - assert(res==0); - } +void Latch::unlock_shared() { + std::thread::id tid = std::this_thread::get_id(); - void SharedMutex::unlock_shared() - { - [[maybe_unused]] int res = pthread_rwlock_unlock(&rwlock); - assert(res==0); + spin_lock(); + if (px) { + /* X precedence is true */ + --s_buckets[thread_hash(tid) % latch_bucket_count]; + } + /* on last S, finalize X request in wait, and notify */ + if (--s_count == 0 && x_flag == X_STEP_1 && x_owner != tid) { + x_flag = X_STEP_3; + /* !!! unlock spin then pop gate (reverse order for X receiver) */ + spin_unlock(); + std::unique_lock lk(s_gate_lock); + s_gate.notify_one(); + lk.unlock(); + } else { + spin_unlock(); } +} -#endif +bool Latch::try_lock_shared() +{ + std::thread::id tid = std::this_thread::get_id(); + + spin_lock(); + /* if X = 0 then it hold S with success, + * in other case fails + */ + if (x_flag == X_STEP_0 || x_owner == tid) { + ++s_count; + if (px) { + /* X precedence is true */ + ++s_buckets[thread_hash(tid) % latch_bucket_count]; + } + spin_unlock(); + return true; + } + spin_unlock(); + return false; +} } From 2ae9b4d4ac75ff06274f0af39f7b24722754bdb7 Mon Sep 17 00:00:00 2001 From: janbar Date: Tue, 30 Jan 2024 19:13:37 +0100 Subject: [PATCH 3/3] add testing for latch (readwritelock) It includes testing for meson build and cmake build. --- Tests/CMakeLists.txt | 3 + Tests/meson.build | 10 + Tests/src/Latch.cpp | 584 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 597 insertions(+) create mode 100644 Tests/src/Latch.cpp diff --git a/Tests/CMakeLists.txt b/Tests/CMakeLists.txt index f29af58e5..041f0ead0 100644 --- a/Tests/CMakeLists.txt +++ b/Tests/CMakeLists.txt @@ -34,6 +34,9 @@ osmscout_test_project(NAME ColorParse SOURCES src/ColorParse.cpp) #---- CoordinateEncoding osmscout_test_project(NAME CoordinateEncoding SOURCES src/CoordinateEncoding.cpp COMMAND "${CMAKE_CURRENT_SOURCE_DIR}/data/testregion") +#---- Latch +osmscout_test_project(NAME Latch SOURCES src/Latch.cpp) + #---- LocationLookup osmscout_test_project(NAME LocationLookupTest SOURCES src/LocationServiceTest.cpp src/SearchForLocationByStringTest.cpp src/SearchForLocationByFormTest.cpp src/SearchForPOIByFormTest.cpp TARGET OSMScout::Test OSMScout::Import) set_source_files_properties(src/SearchForLocationByStringTest.cpp src/SearchForLocationByFormTest.cpp src/SearchForPOIByFormTest.cpp src/LocationServiceTest.cpp PROPERTIES SKIP_UNITY_BUILD_INCLUSION TRUE) diff --git a/Tests/meson.build b/Tests/meson.build index c08de8f43..2fd7f5752 100644 --- a/Tests/meson.build +++ b/Tests/meson.build @@ -225,6 +225,16 @@ HeaderCheck = executable('HeaderCheck', test('Check use of \'<\'...\'>\' for includes', HeaderCheck, env: headerCheckEnv) +Latch = executable('Latch', + 'src/Latch.cpp', + include_directories: [testIncDir, osmscoutIncDir], + dependencies: [mathDep, threadDep, openmpDep], + link_with: [osmscout], + install: true, + install_dir: testInstallDir) + +test('Check latch consistency', Latch) + if buildImport LocationServiceTest = executable('LocationServiceTest', [ diff --git a/Tests/src/Latch.cpp b/Tests/src/Latch.cpp new file mode 100644 index 000000000..ac2d54a7b --- /dev/null +++ b/Tests/src/Latch.cpp @@ -0,0 +1,584 @@ +/* + Latch - a test program for libosmscout + Copyright (C) 2024 Jean-Luc Barriere + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + asize_t with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +static size_t iterationCount=250; +static auto taskDuration=1ms; + +static size_t refCounter = 0; +static osmscout::Latch latch; + +class ReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::ReadLock locker(latch); + [[maybe_unused]] size_t c = refCounter; + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit ReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&ReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class WriterWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::WriteLock locker(latch); + ++refCounter; + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit WriterWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&WriterWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class ReaderReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::ReadLock rl1(latch); + { + osmscout::ReadLock rl2(latch); + { + osmscout::ReadLock rl3(latch); + [[maybe_unused]] size_t c = refCounter; + std::this_thread::sleep_for(taskDuration); + } + } + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit ReaderReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&ReaderReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class WriterReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::WriteLock wr1(latch); + { + osmscout::WriteLock wr2(latch); + { + osmscout::ReadLock rl1(latch); + { + osmscout::ReadLock rl2(latch); + [[maybe_unused]] size_t c = refCounter; + } + } + ++refCounter; + } + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit WriterReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&WriterReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +TEST_CASE("Multi Reader Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + ReaderWorker worker1(queue); + ReaderWorker worker2(queue); + ReaderWorker worker3(queue); + ReaderWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReaderWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Writer Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterWorker worker1(queue); + WriterWorker worker2(queue); + WriterWorker worker3(queue); + WriterWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); + REQUIRE(refCounter == pc); +} + +TEST_CASE("Multi Reader Worker One Writer worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterWorker worker1(queue); + ReaderWorker worker2(queue); + ReaderWorker worker3(queue); + ReaderWorker worker4(queue); + + refCounter = 0; // reset counter + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReaderOneWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Recursive Reader Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + ReaderReaderWorker worker1(queue); + ReaderReaderWorker worker2(queue); + ReaderReaderWorker worker3(queue); + ReaderReaderWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReentrantReaderWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Recursive Writer Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterReaderWorker worker1(queue); + WriterReaderWorker worker2(queue); + WriterReaderWorker worker3(queue); + WriterReaderWorker worker4(queue); + + refCounter = 0; // reset counter + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReentrantWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); + REQUIRE(refCounter == pc); +} + +TEST_CASE("Check write precedence") { + volatile int i=0; + osmscout::ReadLock rl(latch); + + std::thread t([&i](){ + osmscout::WriteLock wl(latch); + i++; + }); + + // wait until writer lock is requested + while (true) { + if (!latch.try_lock_shared()) { + // writer lock is requested already + break; + } + latch.unlock_shared(); + std::this_thread::yield(); + } + + REQUIRE(i == 0); + rl.unlock(); + + osmscout::ReadLock rl2(latch); // we should not get shared lock until writer is done + REQUIRE(i == 1); + t.join(); +} + +TEST_CASE("Second shared lock should be blocked when exclusive is requested") { + int nbreader=4; + volatile int i=0; + std::atomic j=0; + std::atomic blocked=0; + std::atomic notblocked=0; + std::vector pools; + { + osmscout::ReadLock rl(latch); + + std::thread t([&i](){ + osmscout::WriteLock wl(latch); + i++; + }); + + // wait until writer lock is requested + while (true) { + if (!latch.try_lock_shared()) { + // writer lock is requested already + break; + } + latch.unlock_shared(); + std::this_thread::yield(); + } + + for (int nr=0; nr < nbreader; ++nr) { + std::thread * tr = new std::thread([&j, &blocked, ¬blocked](){ + if (latch.try_lock_shared()) { + notblocked++; + latch.unlock_shared(); + } else { + blocked++; + osmscout::ReadLock rl(latch); + j++; + } + }); + pools.push_back(tr); + } + + // wait for everyone to get set up + int k=0; + while ((notblocked.load() + blocked.load()) < 1 && k++ < 1000) { + std::this_thread::yield(); + } + REQUIRE(i == 0); // write lock is still waiting + REQUIRE((blocked.load() + notblocked.load()) > 0); + rl.unlock(); + t.join(); + } + std::cout << "#blocked: " << blocked.load() << "/" << nbreader << std::endl; + // hoping that 1 read lock have been blocked because exclusive lock was requested + REQUIRE((blocked.load() > 0)); + // check BUG: thread was not awakened after broadcast signal + // wait for all readers, or fail when lost reader + int k=0; + while (j.load() != blocked.load() && k++ < 1000) { + std::this_thread::yield(); + } + // all blocked readers must be finalized + REQUIRE(j.load() == blocked.load()); + // cleanup + while (!pools.empty()) { + pools.back()->join(); + delete pools.back(); + pools.pop_back(); + } +} +