Skip to content

Commit

Permalink
jobs: refactor the weejobs exit routine to prevent a corrupted group …
Browse files Browse the repository at this point in the history
…semaphore
  • Loading branch information
gwaldron committed Feb 15, 2024
1 parent 63cbc90 commit 613da80
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ class TileListVisitor : public osgEarth::MultithreadedTileVisitor
{
}

virtual void run(const Profile* mapProfile)
void run(const Profile* mapProfile) override
{
// Start up the task service
OE_INFO << "Starting " << _numThreads << " threads " << std::endl;
Expand All @@ -419,7 +419,7 @@ class TileListVisitor : public osgEarth::MultithreadedTileVisitor
this->handleTile(key);
}

_group.join();
_group->join();
}

std::vector< TileKey > _keys;
Expand Down
2 changes: 2 additions & 0 deletions src/applications/osgearth_bindless/osgearth_bindless.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ int main_NV(int argc, char** argv)
osg::ArgumentParser arguments(&argc, argv);

osgViewer::Viewer viewer(arguments);
viewer.setRealizeOperation(new GL3RealizeOperation());

MapNodeHelper().configureView(&viewer);

if (arguments.read("--pause"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ void computeIntersectionsThreaded(osg::Node* node, std::vector< IntersectionQuer
pool->set_concurrency(num_threads);

// Poor man's parallel for
jobs::jobgroup intersections;
auto intersections = jobs::jobgroup::create();

//unsigned int workSize = 500;
// Try to split the jobs evenly among the threads
Expand All @@ -508,7 +508,7 @@ void computeIntersectionsThreaded(osg::Node* node, std::vector< IntersectionQuer
{
jobs::context context;
context.pool = pool;
context.group = &intersections;
context.group = intersections;

jobs::dispatch([node, curStart, curSize, &queries](Cancelable&) {
computeIntersections(node, queries, curStart, curSize);
Expand All @@ -525,7 +525,7 @@ void computeIntersectionsThreaded(osg::Node* node, std::vector< IntersectionQuer
}
}
//std::cout << "Dispatched " << numJobs << " jobs" << std::endl;
intersections.join();
intersections->join();
}


Expand Down
1 change: 1 addition & 0 deletions src/osgEarth/Threading
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#pragma once
#include <osgEarth/Export>
#include <unordered_map>

// bring in weejobs in the jobs namespace
#define WEEJOBS_EXPORT OSGEARTH_EXPORT
Expand Down
2 changes: 1 addition & 1 deletion src/osgEarth/TileVisitor
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ namespace osgEarth { namespace Util

unsigned int _numThreads;

jobs::jobgroup _group;
std::shared_ptr<jobs::jobgroup> _group;
};


Expand Down
6 changes: 4 additions & 2 deletions src/osgEarth/TileVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ MultithreadedTileVisitor::MultithreadedTileVisitor() :
// We must do this to avoid an error message in OpenSceneGraph b/c the findWrapper method doesn't appear to be threadsafe.
// This really isn't a big deal b/c this only effects data that is already cached.
osgDB::ObjectWrapper* wrapper = osgDB::Registry::instance()->getObjectWrapperManager()->findWrapper("osg::Image");

_group = jobs::jobgroup::create();
}

MultithreadedTileVisitor::MultithreadedTileVisitor(TileHandler* handler) :
Expand Down Expand Up @@ -269,7 +271,7 @@ void MultithreadedTileVisitor::run(const Profile* mapProfile)
// Produce the tiles
TileVisitor::run( mapProfile );

_group.join();
_group->join();
}

bool MultithreadedTileVisitor::handleTile(const TileKey& key)
Expand Down Expand Up @@ -297,7 +299,7 @@ bool MultithreadedTileVisitor::handleTile(const TileKey& key)
jobs::context job;
job.name = "handleTile";
job.pool = jobs::get_pool(MTTV);
job.group = &_group;
job.group = _group;

jobs::dispatch(task, job);

Expand Down
101 changes: 61 additions & 40 deletions src/osgEarth/weejobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
*/
#pragma once
#include <atomic>
#include <mutex>
#include <thread>
#include <cfloat>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <functional>
#include <vector>
#include <unordered_map>
#include <chrono>
#include <list>
#include <mutex>
#include <thread>
#include <type_traits>
#include <cstdlib>
#include <cfloat>
#include <vector>

// OPTIONAL: Define WEEJOBS_EXPORT if you want to use this library from multiple modules (DLLs)
// OPTIONAL: Define WEEJOBS_EXPORT if you want to use this library from multiple modules (DLLs)
#ifndef WEEJOBS_EXPORT
#define WEEJOBS_EXPORT
#endif
Expand Down Expand Up @@ -386,7 +386,13 @@ namespace WEEJOBS_NAMESPACE
* You can then call jobgroup::join() to wait for the whole group
* to finish.
*/
using jobgroup = detail::semaphore;
struct jobgroup : public detail::semaphore
{
static std::shared_ptr<jobgroup> create()
{
return std::make_shared<jobgroup>();
}
};

/**
* Context object you can pass to dispatch(...) to control aspects of
Expand All @@ -397,7 +403,7 @@ namespace WEEJOBS_NAMESPACE
std::string name;
class jobpool* pool = nullptr;
std::function<float()> priority = {};
jobgroup* group = nullptr;
std::shared_ptr<jobgroup> group = nullptr;
};

/**
Expand Down Expand Up @@ -532,34 +538,38 @@ namespace WEEJOBS_NAMESPACE
// (Benchmark: https://stackoverflow.com/a/20365638/4218920)
// Also note: it is indeed possible for the results of
// priority() to change during the search. We don't care.
int index = -1;
//int index = -1;
std::list<job>::iterator ptr = _queue.end();
float highest_priority = -FLT_MAX;
for (unsigned i = 0; i < _queue.size(); ++i)
for (auto iter = _queue.begin(); iter != _queue.end(); ++iter)
{
float priority = _queue[i].ctx.priority != nullptr ?
_queue[i].ctx.priority() :
float priority = iter->ctx.priority != nullptr ?
iter->ctx.priority() :
0.0f;

if (index < 0 || priority > highest_priority)
if (ptr == _queue.end() || priority > highest_priority)
{
index = i;
ptr = iter;
highest_priority = priority;
}
}
if (index < 0)
index = 0;

next = std::move(_queue[index]);
if (ptr == _queue.end())
ptr = _queue.begin();

next = std::move(*ptr); // _queue[index]);
have_next = true;

_queue.erase(ptr);

// move the last element into the empty position:
if (index < _queue.size() - 1)
{
_queue[index] = std::move(_queue.back());
}
//if (index < _queue.size() - 1)
//{
// _queue[index] = std::move(_queue.back());
//}

// and remove the last element.
_queue.erase(_queue.end() - 1);
//_queue.erase(_queue.end() - 1);
}
}

Expand Down Expand Up @@ -624,7 +634,8 @@ namespace WEEJOBS_NAMESPACE
};

std::string _name; // pool name
std::vector<job> _queue; // queued operations to run asynchronously
std::list<job> _queue;
//std::vector<job> _queue; // queued operations to run asynchronously
mutable std::mutex _queueMutex; // protect access to the queue
mutable std::mutex _quitMutex; // protects access to _done
std::atomic<unsigned> _targetConcurrency; // target number of concurrent threads in the pool
Expand Down Expand Up @@ -689,18 +700,9 @@ namespace WEEJOBS_NAMESPACE
{
inline runtime();

inline void kill()
{
_alive = false;

for (auto& pool : _pools)
if (pool)
pool->stop_threads();
inline ~runtime();

for (auto& pool : _pools)
if (pool)
pool->join_threads();
}
inline void shutdown();

bool _alive = true;
std::mutex _mutex;
Expand Down Expand Up @@ -806,7 +808,7 @@ namespace WEEJOBS_NAMESPACE
{
if (queuedjob.ctx.group != nullptr)
{
queuedjob.ctx.group->reset();
queuedjob.ctx.group->release();
}
}
_queue.clear();
Expand Down Expand Up @@ -840,7 +842,7 @@ namespace WEEJOBS_NAMESPACE
//! stop all threads, wait for them to exit, and shut down the system
inline void shutdown()
{
instance().kill();
instance().shutdown();
}

//! Whether the weejobs runtime is still alive (has not been shutdown)
Expand All @@ -856,10 +858,29 @@ namespace WEEJOBS_NAMESPACE
instance()._setThreadName = f;
}

// internal
inline detail::runtime::runtime()
{
std::atexit(shutdown);
//nop
}

inline detail::runtime::~runtime()
{
shutdown();
}

inline void detail::runtime::shutdown()
{
_alive = false;

//std::cout << "stopping " << _pools.size() << " threads..." << std::endl;
for (auto& pool : _pools)
if (pool)
pool->stop_threads();

//std::cout << "joining " << _pools.size() << " threads..." << std::endl;
for (auto& pool : _pools)
if (pool)
pool->join_threads();
}

// Use this macro ONCE in your application in a .cpp file to
Expand Down
6 changes: 2 additions & 4 deletions src/osgEarthDrivers/engine_rex/RexTerrainEngineNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,8 @@ RexTerrainEngineNode::refresh(bool forceDirty)
this->ref();

// Load all the root key tiles.
jobs::jobgroup loadGroup;

jobs::context context;
context.group = &loadGroup;
context.group = jobs::jobgroup::create();
context.pool = jobs::get_pool(ARENA_LOAD_TILE);

for (unsigned i = 0; i < keys.size(); ++i)
Expand All @@ -603,7 +601,7 @@ RexTerrainEngineNode::refresh(bool forceDirty)
}

// wait for all loadSync calls to complete
loadGroup.join();
context.group->join();

// release the self-ref.
this->unref_nodelete();
Expand Down

0 comments on commit 613da80

Please sign in to comment.