diff --git a/Tests/CMakeLists.txt b/Tests/CMakeLists.txt index f29af58e5..041f0ead0 100644 --- a/Tests/CMakeLists.txt +++ b/Tests/CMakeLists.txt @@ -34,6 +34,9 @@ osmscout_test_project(NAME ColorParse SOURCES src/ColorParse.cpp) #---- CoordinateEncoding osmscout_test_project(NAME CoordinateEncoding SOURCES src/CoordinateEncoding.cpp COMMAND "${CMAKE_CURRENT_SOURCE_DIR}/data/testregion") +#---- Latch +osmscout_test_project(NAME Latch SOURCES src/Latch.cpp) + #---- LocationLookup osmscout_test_project(NAME LocationLookupTest SOURCES src/LocationServiceTest.cpp src/SearchForLocationByStringTest.cpp src/SearchForLocationByFormTest.cpp src/SearchForPOIByFormTest.cpp TARGET OSMScout::Test OSMScout::Import) set_source_files_properties(src/SearchForLocationByStringTest.cpp src/SearchForLocationByFormTest.cpp src/SearchForPOIByFormTest.cpp src/LocationServiceTest.cpp PROPERTIES SKIP_UNITY_BUILD_INCLUSION TRUE) diff --git a/Tests/meson.build b/Tests/meson.build index c08de8f43..2fd7f5752 100644 --- a/Tests/meson.build +++ b/Tests/meson.build @@ -225,6 +225,16 @@ HeaderCheck = executable('HeaderCheck', test('Check use of \'<\'...\'>\' for includes', HeaderCheck, env: headerCheckEnv) +Latch = executable('Latch', + 'src/Latch.cpp', + include_directories: [testIncDir, osmscoutIncDir], + dependencies: [mathDep, threadDep, openmpDep], + link_with: [osmscout], + install: true, + install_dir: testInstallDir) + +test('Check latch consistency', Latch) + if buildImport LocationServiceTest = executable('LocationServiceTest', [ diff --git a/Tests/src/Latch.cpp b/Tests/src/Latch.cpp new file mode 100644 index 000000000..ac2d54a7b --- /dev/null +++ b/Tests/src/Latch.cpp @@ -0,0 +1,584 @@ +/* + Latch - a test program for libosmscout + Copyright (C) 2024 Jean-Luc Barriere + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + asize_t with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +static size_t iterationCount=250; +static auto taskDuration=1ms; + +static size_t refCounter = 0; +static osmscout::Latch latch; + +class ReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::ReadLock locker(latch); + [[maybe_unused]] size_t c = refCounter; + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit ReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&ReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class WriterWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::WriteLock locker(latch); + ++refCounter; + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit WriterWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&WriterWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class ReaderReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::ReadLock rl1(latch); + { + osmscout::ReadLock rl2(latch); + { + osmscout::ReadLock rl3(latch); + [[maybe_unused]] size_t c = refCounter; + std::this_thread::sleep_for(taskDuration); + } + } + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit ReaderReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&ReaderReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class WriterReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::WriteLock wr1(latch); + { + osmscout::WriteLock wr2(latch); + { + osmscout::ReadLock rl1(latch); + { + osmscout::ReadLock rl2(latch); + [[maybe_unused]] size_t c = refCounter; + } + } + ++refCounter; + } + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit WriterReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&WriterReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +TEST_CASE("Multi Reader Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + ReaderWorker worker1(queue); + ReaderWorker worker2(queue); + ReaderWorker worker3(queue); + ReaderWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReaderWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Writer Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterWorker worker1(queue); + WriterWorker worker2(queue); + WriterWorker worker3(queue); + WriterWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); + REQUIRE(refCounter == pc); +} + +TEST_CASE("Multi Reader Worker One Writer worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterWorker worker1(queue); + ReaderWorker worker2(queue); + ReaderWorker worker3(queue); + ReaderWorker worker4(queue); + + refCounter = 0; // reset counter + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReaderOneWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Recursive Reader Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + ReaderReaderWorker worker1(queue); + ReaderReaderWorker worker2(queue); + ReaderReaderWorker worker3(queue); + ReaderReaderWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReentrantReaderWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Recursive Writer Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterReaderWorker worker1(queue); + WriterReaderWorker worker2(queue); + WriterReaderWorker worker3(queue); + WriterReaderWorker worker4(queue); + + refCounter = 0; // reset counter + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReentrantWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); + REQUIRE(refCounter == pc); +} + +TEST_CASE("Check write precedence") { + volatile int i=0; + osmscout::ReadLock rl(latch); + + std::thread t([&i](){ + osmscout::WriteLock wl(latch); + i++; + }); + + // wait until writer lock is requested + while (true) { + if (!latch.try_lock_shared()) { + // writer lock is requested already + break; + } + latch.unlock_shared(); + std::this_thread::yield(); + } + + REQUIRE(i == 0); + rl.unlock(); + + osmscout::ReadLock rl2(latch); // we should not get shared lock until writer is done + REQUIRE(i == 1); + t.join(); +} + +TEST_CASE("Second shared lock should be blocked when exclusive is requested") { + int nbreader=4; + volatile int i=0; + std::atomic j=0; + std::atomic blocked=0; + std::atomic notblocked=0; + std::vector pools; + { + osmscout::ReadLock rl(latch); + + std::thread t([&i](){ + osmscout::WriteLock wl(latch); + i++; + }); + + // wait until writer lock is requested + while (true) { + if (!latch.try_lock_shared()) { + // writer lock is requested already + break; + } + latch.unlock_shared(); + std::this_thread::yield(); + } + + for (int nr=0; nr < nbreader; ++nr) { + std::thread * tr = new std::thread([&j, &blocked, ¬blocked](){ + if (latch.try_lock_shared()) { + notblocked++; + latch.unlock_shared(); + } else { + blocked++; + osmscout::ReadLock rl(latch); + j++; + } + }); + pools.push_back(tr); + } + + // wait for everyone to get set up + int k=0; + while ((notblocked.load() + blocked.load()) < 1 && k++ < 1000) { + std::this_thread::yield(); + } + REQUIRE(i == 0); // write lock is still waiting + REQUIRE((blocked.load() + notblocked.load()) > 0); + rl.unlock(); + t.join(); + } + std::cout << "#blocked: " << blocked.load() << "/" << nbreader << std::endl; + // hoping that 1 read lock have been blocked because exclusive lock was requested + REQUIRE((blocked.load() > 0)); + // check BUG: thread was not awakened after broadcast signal + // wait for all readers, or fail when lost reader + int k=0; + while (j.load() != blocked.load() && k++ < 1000) { + std::this_thread::yield(); + } + // all blocked readers must be finalized + REQUIRE(j.load() == blocked.load()); + // cleanup + while (!pools.empty()) { + pools.back()->join(); + delete pools.back(); + pools.pop_back(); + } +} + diff --git a/libosmscout-client-qt/include/osmscoutclientqt/DBLoadJob.h b/libosmscout-client-qt/include/osmscoutclientqt/DBLoadJob.h index 08da62507..3b970da6d 100644 --- a/libosmscout-client-qt/include/osmscoutclientqt/DBLoadJob.h +++ b/libosmscout-client-qt/include/osmscoutclientqt/DBLoadJob.h @@ -81,7 +81,7 @@ protected slots: void Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &databases, - std::shared_lock &&locker) override; + ReadLock &&locker) override; void Close() override; diff --git a/libosmscout-client-qt/include/osmscoutclientqt/MapRenderer.h b/libosmscout-client-qt/include/osmscoutclientqt/MapRenderer.h index 9f8934d6c..b8e87d4fa 100644 --- a/libosmscout-client-qt/include/osmscoutclientqt/MapRenderer.h +++ b/libosmscout-client-qt/include/osmscoutclientqt/MapRenderer.h @@ -68,7 +68,7 @@ class OSMSCOUT_CLIENT_QT_API DBRenderJob : public QObject, public DBJob{ void Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &allDatabases, - std::shared_lock &&locker) override; + ReadLock &&locker) override; inline bool IsSuccess() const{ return success; diff --git a/libosmscout-client-qt/src/osmscoutclientqt/DBLoadJob.cpp b/libosmscout-client-qt/src/osmscoutclientqt/DBLoadJob.cpp index 61ded627d..e836154cc 100644 --- a/libosmscout-client-qt/src/osmscoutclientqt/DBLoadJob.cpp +++ b/libosmscout-client-qt/src/osmscoutclientqt/DBLoadJob.cpp @@ -55,7 +55,7 @@ DBLoadJob::~DBLoadJob() void DBLoadJob::Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &databases, - std::shared_lock &&locker) + ReadLock &&locker) { osmscout::GeoBox lookupBox(lookupProjection.GetDimensions()); std::list relevantDatabases; diff --git a/libosmscout-client-qt/src/osmscoutclientqt/MapRenderer.cpp b/libosmscout-client-qt/src/osmscoutclientqt/MapRenderer.cpp index dc882da1f..7c8c12136 100644 --- a/libosmscout-client-qt/src/osmscoutclientqt/MapRenderer.cpp +++ b/libosmscout-client-qt/src/osmscoutclientqt/MapRenderer.cpp @@ -254,7 +254,7 @@ DBRenderJob::DBRenderJob(osmscout::MercatorProjection renderProjection, void DBRenderJob::Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &allDatabases, - std::shared_lock &&locker) + ReadLock &&locker) { std::list databases; // enabled databases for rendering if (renderDatabases) { diff --git a/libosmscout-client/include/osmscoutclient/DBJob.h b/libosmscout-client/include/osmscoutclient/DBJob.h index 0aa17e19c..761619e4b 100644 --- a/libosmscout-client/include/osmscoutclient/DBJob.h +++ b/libosmscout-client/include/osmscoutclient/DBJob.h @@ -24,6 +24,7 @@ #include +#include #include #include @@ -37,15 +38,15 @@ namespace osmscout { */ class OSMSCOUT_CLIENT_API DBJob { - protected: +protected: osmscout::BasemapDatabaseRef basemapDatabase; //!< Optional reference to the basemap db std::list databases; //!< borrowed databases std::thread::id threadId; //!< job thread - private: - std::shared_lock locker; //!< db locker +private: + ReadLock locker; //!< db locker - public: +public: DBJob(); DBJob(const DBJob&) = delete; DBJob(DBJob&&) = delete; @@ -54,8 +55,8 @@ class OSMSCOUT_CLIENT_API DBJob { virtual ~DBJob(); virtual void Run(const osmscout::BasemapDatabaseRef& basemapDatabase, - const std::list &databases, - std::shared_lock &&locker); + const std::list &databases, + ReadLock &&locker); virtual void Close(); }; diff --git a/libosmscout-client/include/osmscoutclient/DBThread.h b/libosmscout-client/include/osmscoutclient/DBThread.h index 909d5433f..0c58d02ab 100644 --- a/libosmscout-client/include/osmscoutclient/DBThread.h +++ b/libosmscout-client/include/osmscoutclient/DBThread.h @@ -30,6 +30,7 @@ #include #include +#include #include @@ -96,7 +97,7 @@ class OSMSCOUT_CLIENT_API DBThread: public AsyncWorker using AsynchronousDBJob = std::function &databases, - std::shared_lock &&locker)>; + ReadLock &&locker)>; // signals Signal<> stylesheetFilenameChanged; @@ -142,7 +143,7 @@ class OSMSCOUT_CLIENT_API DBThread: public AsyncWorker double mapDpi; - mutable std::shared_mutex lock; + mutable Latch latch; osmscout::BasemapDatabaseParameter basemapDatabaseParameter; osmscout::BasemapDatabaseRef basemapDatabase; @@ -235,12 +236,12 @@ class OSMSCOUT_CLIENT_API DBThread: public AsyncWorker const std::list &GetStyleErrors() const { - return styleErrors; + return styleErrors; } StyleConfigRef GetEmptyStyleConfig() const { - std::shared_lock locker(lock); + ReadLock locker(latch); return emptyStyleConfig; } diff --git a/libosmscout-client/src/osmscoutclient/DBJob.cpp b/libosmscout-client/src/osmscoutclient/DBJob.cpp index bcd810152..24e29947a 100644 --- a/libosmscout-client/src/osmscoutclient/DBJob.cpp +++ b/libosmscout-client/src/osmscoutclient/DBJob.cpp @@ -33,7 +33,7 @@ DBJob::~DBJob() void DBJob::Run(const osmscout::BasemapDatabaseRef& basemapDatabase, const std::list &databases, - std::shared_lock &&locker) + ReadLock &&locker) { assert(threadId==std::this_thread::get_id()); this->basemapDatabase=basemapDatabase; diff --git a/libosmscout-client/src/osmscoutclient/DBThread.cpp b/libosmscout-client/src/osmscoutclient/DBThread.cpp index ad4c3482d..3182715ce 100644 --- a/libosmscout-client/src/osmscoutclient/DBThread.cpp +++ b/libosmscout-client/src/osmscoutclient/DBThread.cpp @@ -64,7 +64,7 @@ DBThread::DBThread(const std::string &basemapLookupDirectory, DBThread::~DBThread() { - std::unique_lock locker(lock); + WriteLock locker(latch); osmscout::log.Debug() << "DBThread::~DBThread()"; mapDpiSlot.Disconnect(); @@ -89,7 +89,7 @@ bool DBThread::isInitializedInternal() bool DBThread::isInitialized() { - std::shared_lock locker(lock); + ReadLock locker(latch); return isInitializedInternal(); } @@ -104,7 +104,7 @@ double DBThread::GetPhysicalDpi() const } const GeoBox DBThread::databaseBoundingBox() const { - std::shared_lock locker(lock); + ReadLock locker(latch); GeoBox response; for (const auto& db:databases){ response.Include(db->GetDBGeoBox()); @@ -115,7 +115,7 @@ const GeoBox DBThread::databaseBoundingBox() const { DatabaseCoverage DBThread::databaseCoverage(const osmscout::Magnification &magnification, const osmscout::GeoBox &bbox) { - std::shared_lock locker(lock); + ReadLock locker(latch); osmscout::GeoBox boundingBox; for (const auto &db:databases){ @@ -173,7 +173,7 @@ CancelableFuture DBThread::OnDatabaseListChanged(const std::vectorClose(); @@ -365,7 +365,7 @@ StyleConfigRef DBThread::makeStyleConfig(TypeConfigRef typeConfig, bool suppress CancelableFuture DBThread::ToggleDaylight() { return Async([this](const Breaker &) -> bool { - std::unique_lock locker(lock); + WriteLock locker(latch); if (!isInitializedInternal()) { return false; @@ -384,7 +384,7 @@ CancelableFuture DBThread::ToggleDaylight() CancelableFuture DBThread::OnMapDPIChange(double dpi) { return Async([this, dpi](const Breaker&) -> bool{ - std::unique_lock locker(lock); + WriteLock locker(latch); mapDpi = dpi; return true; }); @@ -394,7 +394,7 @@ CancelableFuture DBThread::SetStyleFlag(const std::string &key, bool value { return Async([this, key, value](const Breaker&) -> bool{ log.Debug() << "SetStyleFlag " << key << " to " << value; - std::unique_lock locker(lock); + WriteLock locker(latch); if (!isInitializedInternal()) { return false; @@ -411,7 +411,7 @@ CancelableFuture DBThread::ReloadStyle(const std::string &suffix) { return Async([this, suffix](const Breaker &) -> bool { log.Debug() << "Reloading style " << stylesheetFilename << suffix << "..."; - std::unique_lock locker(lock); + WriteLock locker(latch); LoadStyleInternal(stylesheetFilename, stylesheetFlags, suffix); log.Debug() << "Reloading style done."; return true; @@ -423,7 +423,7 @@ CancelableFuture DBThread::LoadStyle(const std::string &stylesheetFilename const std::string &suffix) { return Async([this, stylesheetFilename, stylesheetFlags, suffix](const Breaker&){ - std::unique_lock locker(lock); + WriteLock locker(latch); LoadStyleInternal(stylesheetFilename, stylesheetFlags, suffix); return true; }); @@ -455,7 +455,7 @@ void DBThread::LoadStyleInternal(const std::string &stylesheetFilename, const std::map DBThread::GetStyleFlags() const { - std::shared_lock locker(lock); + ReadLock locker(latch); std::map flags; // add flag overrides for (const auto& flag : stylesheetFlags){ @@ -508,7 +508,7 @@ CancelableFuture DBThread::FlushCaches(const std::chrono::milliseconds &id void DBThread::RunJob(AsynchronousDBJob job) { - std::shared_lock locker(lock); + ReadLock locker(latch); if (!isInitializedInternal()){ locker.unlock(); osmscout::log.Warn() << "ignore request, dbs is not initialized"; @@ -519,7 +519,7 @@ void DBThread::RunJob(AsynchronousDBJob job) void DBThread::RunSynchronousJob(SynchronousDBJob job) { - std::shared_lock locker(lock); + ReadLock locker(latch); if (!isInitializedInternal()){ osmscout::log.Warn() << "ignore request, dbs is not initialized"; return; diff --git a/libosmscout/CMakeLists.txt b/libosmscout/CMakeLists.txt index 9efca11cf..d62caaeed 100644 --- a/libosmscout/CMakeLists.txt +++ b/libosmscout/CMakeLists.txt @@ -37,6 +37,7 @@ set(HEADER_FILES_ASYNC include/osmscout/async/Breaker.h include/osmscout/async/CancelableFuture.h include/osmscout/async/ProcessingQueue.h + include/osmscout/async/ReadWriteLock.h include/osmscout/async/Signal.h include/osmscout/async/Thread.h include/osmscout/async/Worker.h @@ -215,6 +216,7 @@ set(SOURCE_FILES src/osmscout/io/NumericIndex.cpp src/osmscout/async/AsyncWorker.cpp src/osmscout/async/Breaker.cpp + src/osmscout/async/ReadWriteLock.cpp src/osmscout/async/Thread.cpp src/osmscout/async/Worker.cpp src/osmscout/async/WorkQueue.cpp diff --git a/libosmscout/include/meson.build b/libosmscout/include/meson.build index 33366821b..1e1c132a6 100644 --- a/libosmscout/include/meson.build +++ b/libosmscout/include/meson.build @@ -9,6 +9,7 @@ osmscoutHeader = [ 'osmscout/async/Breaker.h', 'osmscout/async/CancelableFuture.h', 'osmscout/async/ProcessingQueue.h', + 'osmscout/async/ReadWriteLock.h', 'osmscout/async/Signal.h', 'osmscout/async/Thread.h', 'osmscout/async/Worker.h', diff --git a/libosmscout/include/osmscout/async/ReadWriteLock.h b/libosmscout/include/osmscout/async/ReadWriteLock.h new file mode 100644 index 000000000..60788ff3e --- /dev/null +++ b/libosmscout/include/osmscout/async/ReadWriteLock.h @@ -0,0 +1,226 @@ +#ifndef LIBOSMSCOUT_READWRITELOCK_H +#define LIBOSMSCOUT_READWRITELOCK_H + +/* + This source is part of the libosmscout library + Copyright (C) 2024 Jean-Luc Barriere + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include + +#include + +#include +#include +#include +#include +#include + +namespace osmscout { + +/** + * This implements a pure C++ Latch providing lock-S (shared) and lock-X (exclusive). + * The concept used here allows X requests to be prioritized faster and more smoothly + * than standard implementations. It uses a no-lock strategy whenever possible and + * reverts to lock and wait in race condition. + */ + +constexpr size_t latch_bucket_count = 64; + +class OSMSCOUT_API Latch { +private: + mutable std::atomic_flag s_spin = ATOMIC_FLAG_INIT; + + volatile int x_wait = 0; /* counts requests in wait for X */ + volatile int s_count = 0; /* counts held S locks */ + volatile int x_flag = 0; /* X status: 0, 1, 2, or 3 */ + std::thread::id x_owner; /* X owner (thread id) */ + + std::mutex x_gate_lock; + std::condition_variable x_gate; /* wait for release of X */ + std::mutex s_gate_lock; + std::condition_variable s_gate; /* wait for release of S */ + + bool px = true; /* enable X precedence */ + std::array s_buckets{}; + + void spin_lock() { + while (s_spin.test_and_set(std::memory_order_acquire)) { + std::this_thread::yield(); + } + } + void spin_unlock() { + s_spin.clear(std::memory_order_release); + } + +public: + Latch() = default; + explicit Latch(bool _px) : px(_px) { } + Latch(const Latch&) = delete; + Latch(Latch&&) = delete; + Latch& operator=(const Latch&) = delete; + Latch& operator=(Latch&&) = delete; + ~Latch() = default; + + /* Locks the latch for exclusive ownership, + * blocks if the latch is not available + */ + void lock(); + + /* Unlocks the latch (exclusive ownership) */ + void unlock(); + + /* Locks the latch for shared ownership, + * blocks if the latch is not available + */ + void lock_shared(); + + /* Unlocks the latch (shared ownership) */ + void unlock_shared(); + + /* Tries to lock the latch for shared ownership, + * returns true if the latch has no exclusive ownership or any request for + * exclusive ownership, else false + */ + bool try_lock_shared(); +}; + +/* + * Cannot use the template std::shared_lock as Latch does not implement all the + * requirements of the standard + */ +class OSMSCOUT_API ReadLock +{ +private: + Latch *p = nullptr; + bool owns = false; + + void swap(ReadLock& rl) noexcept { + std::swap(p, rl.p); + std::swap(owns, rl.owns); + } + +public: + + ReadLock() = default; + + explicit ReadLock(Latch& latch) : p(&latch), owns(true) { latch.lock_shared(); } + + /* Assume the calling thread already has ownership of the shared lock */ + ReadLock(Latch& latch, std::adopt_lock_t) : p(&latch), owns(true) { } + + ~ReadLock() { + if (owns) { + p->unlock_shared(); + } + } + + ReadLock(ReadLock const&) = delete; + ReadLock& operator=(ReadLock const&) = delete; + + ReadLock(ReadLock&& rl) noexcept { swap(rl); } + + ReadLock& operator=(ReadLock&& rl) noexcept { + swap(rl); + return *this; + } + + bool owns_lock() const noexcept { + return owns; + } + + void lock() { + if (!owns && p != nullptr) { + p->lock_shared(); + owns = true; + } + } + + void unlock() { + if (owns) { + owns = false; + p->unlock_shared(); + } + } + + bool try_lock() { + if (!owns && p != nullptr) { + owns = p->try_lock_shared(); + } + return owns; + } +}; + +/* + * Cannot use the template std::unique_lock as Latch does not implement all the + * requirements of the standard + */ +class OSMSCOUT_API WriteLock +{ +private: + Latch *p = nullptr; + bool owns = false; + + void swap(WriteLock& wl) noexcept { + std::swap(p, wl.p); + std::swap(owns, wl.owns); + } + +public: + + WriteLock() = default; + + explicit WriteLock(Latch& latch) : p(&latch), owns(true) { latch.lock(); } + + ~WriteLock() { + if (owns) { + p->unlock(); + } + } + + WriteLock(WriteLock const&) = delete; + WriteLock& operator=(WriteLock const&) = delete; + + WriteLock(WriteLock&& wl) noexcept { swap(wl); } + + WriteLock& operator=(WriteLock&& wl) noexcept { + swap(wl); + return *this; + } + + bool owns_lock() const noexcept { + return owns; + } + + void lock() { + if (!owns && p != nullptr) { + p->lock(); + owns = true; + } + } + + void unlock() { + if (owns) { + owns = false; + p->unlock(); + } + } +}; + +} + +#endif //LIBOSMSCOUT_READWRITELOCK_H diff --git a/libosmscout/src/meson.build b/libosmscout/src/meson.build index 3b8cd6951..048c61eab 100644 --- a/libosmscout/src/meson.build +++ b/libosmscout/src/meson.build @@ -5,6 +5,7 @@ osmscoutSrc = [ 'src/osmscout/cli/CmdLineParsing.cpp', 'src/osmscout/async/AsyncWorker.cpp', 'src/osmscout/async/Breaker.cpp', + 'src/osmscout/async/ReadWriteLock.cpp', 'src/osmscout/async/Thread.cpp', 'src/osmscout/async/Worker.cpp', 'src/osmscout/async/WorkQueue.cpp', diff --git a/libosmscout/src/osmscout/async/ReadWriteLock.cpp b/libosmscout/src/osmscout/async/ReadWriteLock.cpp new file mode 100644 index 000000000..27c64b970 --- /dev/null +++ b/libosmscout/src/osmscout/async/ReadWriteLock.cpp @@ -0,0 +1,221 @@ +/* + This source is part of the libosmscout library + Copyright (C) 2024 Jean-Luc Barriere + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include + +#include + +#include + +#include +#include + +namespace osmscout { + +/** + * The X flag is set as follows based on the locking steps + * Step 0 : X is released + * Step 1 : X is held, but waits for release of S + * Step 2 : X was released and left available for one of request in wait + * Step 3 : X is held + * Step N : X recursive N-3 + */ +constexpr int X_STEP_0 = 0; +constexpr int X_STEP_1 = 1; +constexpr int X_STEP_2 = 2; +constexpr int X_STEP_3 = 3; + +void Latch::lock() { + /* Depending on the internal implementation of conditional variable, + * a race condition could arise, permanently blocking the thread; + * Setting a timeout works around the issue. + */ + static constexpr std::chrono::seconds exit_timeout(1); + + std::thread::id tid = std::this_thread::get_id(); + + spin_lock(); + + if (x_owner != tid) { + /* increments the count of request in wait */ + ++x_wait; + for (;;) { + /* if flag is 0 or 2 then it hold X with no wait, + * in other case it have to wait for X gate + */ + if (x_flag == X_STEP_0 || x_flag == X_STEP_2) { + x_flag = X_STEP_1; + --x_wait; + break; + } else { + /* !!! pop gate then unlock spin */ + std::unique_lock lk(x_gate_lock); + spin_unlock(); + x_gate.wait_for(lk, exit_timeout); + lk.unlock(); + } + spin_lock(); + } + + /* X = 1, check the releasing of S */ + for (;;) { + /* if the count of S is zeroed then it finalize with no wait, + * in other case it have to wait for S gate */ + if (s_count == 0) { + x_flag = X_STEP_3; + break; + } else { + /* !!! pop gate then unlock spin (reverse order for S notifier) */ + std::unique_lock lk(s_gate_lock); + spin_unlock(); + s_gate.wait_for(lk, exit_timeout); + lk.unlock(); + spin_lock(); + /* check if the notifier has hand over, else retry */ + if (x_flag == X_STEP_3) { + break; + } + } + } + + /* X = 3, set owner */ + x_owner = tid; + } else { + /* recursive X lock */ + ++x_flag; + } + + spin_unlock(); +} + +void Latch::unlock() { + spin_lock(); + if (x_owner == std::this_thread::get_id()) { + /* decrement recursive lock */ + if (--x_flag == X_STEP_2) { + x_owner = std::thread::id(); + /* hand-over to a request in wait for X, else release */ + if (x_wait == 0) { + x_flag = X_STEP_0; + } + /* !!! unlock spin then pop gate (reverse order for receiver) */ + spin_unlock(); + std::unique_lock lk(x_gate_lock); + x_gate.notify_all(); + lk.unlock(); + } else { + spin_unlock(); + } + } else { + spin_unlock(); + } +} + +namespace { +static const std::hash thread_hash; +} + +void Latch::lock_shared() { + /* Depending on the internal implementation of conditional variable, + * a race condition could arise, permanently blocking the thread; + * Setting a timeout works around the issue. + */ + static constexpr std::chrono::seconds exit_timeout(1); + + std::thread::id tid = std::this_thread::get_id(); + + spin_lock(); + if (x_owner != tid) { + /* if flag is 0 or 1 then it hold S with no wait, + * in other case it have to wait for X gate + */ + for (;;) { + if (!px) { + /* X precedence is false */ + if (x_flag < X_STEP_2) { + break; + } + } else { + /* X precedence is true, + * estimate if this thread holds a recursive S lock + */ + if (x_flag == X_STEP_0 || (x_flag == X_STEP_1 && + s_buckets[thread_hash(tid) % latch_bucket_count] > 0)) { + break; + } + } + /* !!! pop gate then unlock spin */ + std::unique_lock lk(x_gate_lock); + spin_unlock(); + x_gate.wait_for(lk, exit_timeout); + lk.unlock(); + spin_lock(); + } + } + ++s_count; + if (px) { + /* X precedence is true */ + ++s_buckets[thread_hash(tid) % latch_bucket_count]; + } + spin_unlock(); +} + +void Latch::unlock_shared() { + std::thread::id tid = std::this_thread::get_id(); + + spin_lock(); + if (px) { + /* X precedence is true */ + --s_buckets[thread_hash(tid) % latch_bucket_count]; + } + /* on last S, finalize X request in wait, and notify */ + if (--s_count == 0 && x_flag == X_STEP_1 && x_owner != tid) { + x_flag = X_STEP_3; + /* !!! unlock spin then pop gate (reverse order for X receiver) */ + spin_unlock(); + std::unique_lock lk(s_gate_lock); + s_gate.notify_one(); + lk.unlock(); + } else { + spin_unlock(); + } +} + +bool Latch::try_lock_shared() +{ + std::thread::id tid = std::this_thread::get_id(); + + spin_lock(); + /* if X = 0 then it hold S with success, + * in other case fails + */ + if (x_flag == X_STEP_0 || x_owner == tid) { + ++s_count; + if (px) { + /* X precedence is true */ + ++s_buckets[thread_hash(tid) % latch_bucket_count]; + } + spin_unlock(); + return true; + } + spin_unlock(); + return false; +} + +}