Skip to content

Commit

Permalink
Integrate new weejobs with workstealing
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 346a7f470107fc5eaa52a2a1d374ba930dbe63de
Author: Glenn Waldron <gwaldron@gmail.com>
Date:   Fri Feb 23 08:54:44 2024 -0500

    integrate stealing
  • Loading branch information
gwaldron committed Feb 23, 2024
1 parent 7ca4d8e commit 0e6c31b
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 377 deletions.
1 change: 1 addition & 0 deletions src/osgEarth/ElevationPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,7 @@ AsyncElevationSampler::AsyncElevationSampler(
_arena(nullptr)
{
_arena = jobs::get_pool("oe.asyncelevation");
_arena->set_can_steal_work(false);
_arena->set_concurrency(numThreads > 0 ? numThreads : _arena->concurrency());
}

Expand Down
82 changes: 1 addition & 81 deletions src/osgEarth/GLUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,52 +600,13 @@ GLObjectPool::track(osg::GraphicsContext* gc)
_gcs[i]._gc = gc;
_gcs[i]._operation = new GCServicingOperation(this);
gc->add(_gcs[i]._operation.get());

//auto iter = _non_shared_objects.find(gc);
//if (iter == _non_shared_objects.end())
//{
// _non_shared_objects.emplace(gc, Collection());

// // add this object to the GC's operations thread so it
// // can service it once per frame:
// if (_gc_operation == nullptr)
// {
// _gc_operation = new FlushOperation(this);
// }
// gc->add(_gc_operation.get());
//}
}

//void
//GLObjectPool::flush(osg::GraphicsContext* gc)
//{
// // This function is invoked by the FlushOperation once per
// // frame with the active GC.
// // Here we will look for per-state objects (like VAOs and FBOs)
// // that may only be deleted in the same GC that created them.
// unsigned num = flush(_non_shared_objects[gc]);
// if (num > 0)
// {
// OE_DEBUG << LC << "GC " << (std::uintptr_t)gc << " flushed " << num << " shared objects" << std::endl;
// }
//}
}

void
GLObjectPool::watch(GLObject::Ptr object)
{
std::unique_lock<std::mutex> lock(_mutex);
_objects.emplace_back(object);

//if (object->shareable())
//{
// // either this is a shareable object, or we are inserting into
// // a non-shared pool and that is why state is nullptr.
// _objects.insert(object);
//}
//else
//{
// _non_shared_objects[state.getGraphicsContext()].insert(object);
//}
}

void
Expand All @@ -657,15 +618,6 @@ GLObjectPool::releaseGLObjects(osg::State* state)
}
}

//void
//GLObjectPool::releaseAll()
//{
// std::lock_guard<std::mutex> lock(_mutex);
// for (auto& object : _objects)
// object->release();
// _objects.clear();
//}

GLsizeiptr
GLObjectPool::totalBytes() const
{
Expand Down Expand Up @@ -770,37 +722,6 @@ GLObjectPool::releaseOrphans(const osg::GraphicsContext* gc)
_totalBytes = bytes;
}

#if 0
unsigned
GLObjectPool::flush(GLObjectPool::Collection& objects)
{
std::lock_guard<std::mutex> lock(_mutex);

GLsizeiptr bytes_released = 0;
std::unordered_set<GLObject::Ptr> keep;
unsigned maxNumToRelease = std::max(1u, (unsigned)pow(4.0f, _avarice));
unsigned numReleased = 0u;

for (auto& object : objects)
{
if (object.use_count() == 1 && numReleased < maxNumToRelease)
{
bytes_released += object->size();
object->release();
++numReleased;
}
else
{
keep.insert(object);
}
}
objects.swap(keep);
_totalBytes = _totalBytes - bytes_released;

return numReleased;
}
#endif

GLObject::GLObject(GLenum ns, osg::State& state) :
_name(0),
_ns(ns),
Expand Down Expand Up @@ -1970,7 +1891,6 @@ GLObjectsCompiler::requestIncrementalCompile(
}
}


void
GLObjectsCompiler::compileNow(
const osg::ref_ptr<osg::Node>& node,
Expand Down
12 changes: 6 additions & 6 deletions src/osgEarth/ImGui/SystemGUI
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ namespace osgEarth
}
float get_timing_ms(void* data, int index) {
return 1e-6 * get_average_timing_ns(data, 4, index - 3);
//int index0 = (index - 1) % frame_count;
//auto value0 = (float)(*(Timings*)data)[index0].count();
//auto value1 = (float)(*(Timings*)data)[index].count();
//return 1e-6 * ((value0 + value1) / 2);
};
};

Expand Down Expand Up @@ -125,7 +121,7 @@ namespace osgEarth

for(auto pool_metrics : all_pool_metrics)
{
if (pool_metrics && pool_metrics->total > 0)
if (pool_metrics) // && pool_metrics->total > 0)
{
ImGui::TableNextColumn();
ImGui::Text("%s", (pool_metrics->name.empty() ? "default" : pool_metrics->name.c_str()));
Expand All @@ -148,6 +144,8 @@ namespace osgEarth
ImGui::EndTable();
}

//OE_HARD_ASSERT(jobs::get_metrics()->total() == jobs::get_metrics()->total_running() + jobs::get_metrics()->total_pending() + jobs::get_metrics()->total_postprocessing());

ImGui::Separator();

if (ImGuiLTable::Begin("SystemGUIPlots"))
Expand All @@ -160,7 +158,7 @@ namespace osgEarth
sprintf(buf, "%.1f ms / %d fps", avg_timing_ms, (int)std::ceil(1000.0 / avg_timing_ms));
ImGuiLTable::PlotLines("Frame", get_timing_ms, &frame_times, frame_count, frame_num, buf, 0.0f, 32.0f);

total_jobs[f] = jobs::get_metrics()->totalJobs();
total_jobs[f] = jobs::get_metrics()->total();
sprintf(buf, "%d", total_jobs[f]);
ImGuiLTable::PlotLines("Jobs", get_counts, &total_jobs, frame_count, frame_num, buf, 0u, 100u);

Expand All @@ -173,6 +171,8 @@ namespace osgEarth
ImGuiLTable::PlotLines("ICO", get_counts, &ico_jobs, frame_count, frame_num, buf, 0u, 4u);
}
}

ImGuiLTable::Text("Canceled", std::to_string(jobs::get_metrics()->total_canceled()).c_str());
ImGuiLTable::End();
}

Expand Down
2 changes: 1 addition & 1 deletion src/osgEarth/ImageLayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ FutureTexture2D::dispatch() const

jobs::context context{
Stringify() << key.str() << " " << _layer->getName(),
jobs::get_pool(ARENA_ASYNC_LAYER),
nullptr, // pool
[key]() { return key.getLOD(); }
};

Expand Down
8 changes: 1 addition & 7 deletions src/osgEarth/PagedNode
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,6 @@ namespace osgEarth { namespace Util
private:
friend class PagingManager;

struct Loaded {
osg::ref_ptr<osg::Node> node;
osg::ref_ptr<osgUtil::StateToCompile> state;
};

std::function<osg::ref_ptr<osg::Node>(Cancelable*)> _load_function;

void* _token = nullptr;
Expand All @@ -207,8 +202,7 @@ namespace osgEarth { namespace Util

std::atomic_bool _loadGate = { false };

jobs::future<Loaded> _loaded;
jobs::future<osg::ref_ptr<osg::Node>> _compiled;
jobs::future<osg::ref_ptr<osg::Node>> _loaded;
jobs::future<bool> _merged;

Mutex _mutex;
Expand Down
95 changes: 37 additions & 58 deletions src/osgEarth/PagedNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ PagedNode2::traverse(osg::NodeVisitor& nv)
else
{
// child out of range; just accept static children
auto paged_child = _merged.has_value(true) ? _loaded.value().node : nullptr;
auto paged_child = _merged.has_value(true) ? _loaded.value() : nullptr;

for (auto& child : _children)
{
Expand Down Expand Up @@ -139,7 +139,7 @@ PagedNode2::traverseChildren(osg::NodeVisitor& nv)
{
if (_refinePolicy == REFINE_REPLACE && _merged.has_value(true))
{
_loaded.value().node->accept(nv);
_loaded.value()->accept(nv);
}
else
{
Expand Down Expand Up @@ -172,13 +172,13 @@ PagedNode2::merge(int revision)
// This is called from PagingManager.
// We're in the UPDATE traversal.
OE_SOFT_ASSERT_AND_RETURN(_loaded.available(), false);
OE_SOFT_ASSERT_AND_RETURN(_loaded.value().node.valid(), false);
OE_SOFT_ASSERT_AND_RETURN(_loaded.value().node->getNumParents() == 0, false);
OE_SOFT_ASSERT_AND_RETURN(_loaded.value().valid(), false);
OE_SOFT_ASSERT_AND_RETURN(_loaded.value()->getNumParents() == 0, false);

addChild(_loaded.value().node);
addChild(_loaded.value());

if (_callbacks.valid())
_callbacks->firePostMergeNode(_loaded.value().node.get());
_callbacks->firePostMergeNode(_loaded.value().get());

_merged.resolve(true);
return true;
Expand All @@ -202,9 +202,9 @@ PagedNode2::computeBound() const
{
osg::BoundingSphere bs = osg::Group::computeBound();

if (!_merged.available() && _loaded.available() && _loaded.value().node.valid())
if (!_merged.available() && _loaded.available() && _loaded.value().valid())
{
bs.expandBy(_loaded.value().node->computeBound());
bs.expandBy(_loaded.value()->computeBound());
}

return bs;
Expand All @@ -220,50 +220,44 @@ PagedNode2::startLoad(float priority, const osg::Object* host)
bool preCompile = _preCompile;
auto pnode_weak = osg::observer_ptr<PagedNode2>(this);

// Job to load the child node and collect GL state
auto load_job = [load_function, callbacks_weak, preCompile](jobs::cancelable& c)
jobs::context context;
context.pool = jobs::get_pool(PAGEDNODE_ARENA_NAME);
context.priority = [pnode_weak]() {
osg::ref_ptr<PagedNode2> pnode;
return pnode_weak.lock(pnode) ? pnode->_lastPriority : -FLT_MAX;
};

// Job to load the child node and compile its GL objects if necessary
auto load_and_compile_job = [load_function, callbacks_weak, preCompile, host](auto& promise)
{
Loaded result;
osg::ref_ptr<osg::Node> result;

osg::ref_ptr<ProgressCallback> progress = new ProgressCallback(&c);
osg::ref_ptr<ProgressCallback> progress = new ProgressCallback(&promise);

// invoke the loader function
result.node = load_function(progress.get());
result = load_function(progress.get());

// Fire any pre-merge callbacks
if (result.node.valid())
if (result.valid())
{
osg::ref_ptr<SceneGraphCallbacks> callbacks;
if (callbacks_weak.lock(callbacks))
{
callbacks->firePreMergeNode(result.node.get());
callbacks->firePreMergeNode(result.get());
}

if (preCompile && result.node->getBound().valid())
if (result->getBound().valid())
{
// Collect the GL objects for later compilation.
// Don't waste precious ICO time doing this later
GLObjectsCompiler compiler;
result.state = compiler.collectState(result.node.get());
auto state = compiler.collectState(result.get());
compiler.requestIncrementalCompile(result, state.get(), host, promise);
return;
}
}

return result;
};

// Job to pre-compile GL objects for the loaded data
auto compile_job = [preCompile, host](const Loaded& data, auto& promise)
{
if (preCompile && data.node.valid())
{
// Compile the loaded node using the ICO if possible.
GLObjectsCompiler compiler;
compiler.requestIncrementalCompile(data.node, data.state, host, promise);
}
else
{
promise.resolve(data.node);
}

promise.resolve(result);
};

// Job to request a scene graph merge.
Expand All @@ -272,42 +266,27 @@ PagedNode2::startLoad(float priority, const osg::Object* host)
auto merge_job = [pnode_weak](const osg::ref_ptr<osg::Node>& node, auto& promise)
{
osg::ref_ptr<PagedNode2> pnode;
if (pnode_weak.lock(pnode))
if (pnode_weak.lock(pnode) && pnode->_loaded.available() && pnode->_pagingManager)
{
if (pnode->_compiled.available() && pnode->_pagingManager)
{
pnode->_pagingManager->merge(pnode);
pnode->dirtyBound();
}
pnode->_pagingManager->merge(pnode);
pnode->dirtyBound();
}
else promise.resolve(false);
};

// Dispatch a chain of jobs.
jobs::context context;
context.pool = jobs::get_pool(PAGEDNODE_ARENA_NAME);
context.priority = [pnode_weak]() {
osg::ref_ptr<PagedNode2> pnode;
if (pnode_weak.lock(pnode))
return pnode->_lastPriority;
else
return -FLT_MAX;
};

_loaded = jobs::dispatch(load_job, context);

_compiled = _loaded.then_dispatch<osg::ref_ptr<osg::Node>>(compile_job, context);
_loaded = jobs::dispatch(load_and_compile_job, _loaded, context);

_merged = _compiled.then_dispatch<bool>(merge_job, context);
_merged = _loaded.then_dispatch<bool>(merge_job, context);
}

void PagedNode2::unload()
{
if (_merged.has_value(true))
{
removeChild(_loaded.value().node);
removeChild(_loaded.value());
}

_compiled.reset();
//_compiled.reset();
_loaded.reset();
_merged.reset();

Expand Down Expand Up @@ -337,7 +316,7 @@ PagingManager::~PagingManager()
{
if (_mergeQueue.size() > 0)
{
_metrics->running.exchange(_metrics->running - _mergeQueue.size());
_metrics->postprocessing.exchange(_metrics->postprocessing - _mergeQueue.size());
}
}

Expand Down
9 changes: 6 additions & 3 deletions src/osgEarth/Registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,15 @@ Registry::Registry() :
//getGDALMutex().disable();
}

// disable work stealing in the jobs system?
if (getenv("OSGEARTH_DISABLE_WORK_STEALING"))
{
jobs::set_allow_work_stealing(false);
}

// register the system stock Units.
Units::registerAll( this );

// Default concurrency for async image layers
jobs::get_pool("oe.layer.async")->set_concurrency(4u);

// register the chonk bin with OSG
osgUtil::RenderBin::addRenderBinPrototype(
"ChonkBin",
Expand Down

0 comments on commit 0e6c31b

Please sign in to comment.