From bf38f7e0e4593c1c1f2e108e0e1fbb7a76b544ec Mon Sep 17 00:00:00 2001 From: Hans Johnson Date: Thu, 21 May 2026 11:54:26 -0500 Subject: [PATCH 1/2] ENH: Add robustness test exposing ParallelSparseFieldLevelSet deadlock ParallelSparseFieldLevelSetImageFilter::Iterate() dispatches its per-iteration ThreadedApplyUpdate body via mt->ParallelizeArray(). The body invokes SignalNeighborsAndWait() several times, blocking on per-thread std::condition_variables until both neighbor work units have arrived at the same phase. This neighbor-only protocol assumes every work unit holds its own concurrent OS thread for the full duration of the dispatch. Pool/TBB-backed ParallelizeArray does not honor this: N work units are submitted to a shared worker pool whose size is bounded by core count, and CV-blocked tasks pin the pool so queued work units never start and the dispatch deadlocks. Reported as the intermittent timeout of itkParallelSparseFieldLevelSetImageFilterTest in CDash test 2570265561 ("Trying mf->Update()" followed by a 1000s timeout) on Windows Release with TBB. The existing test fails at ~3% per run under TBB, which is too rare to catch the bug reliably in CI. This commit adds a new test itkParallelSparseFieldLevelSetImageFilterRobustnessTest that drives the deadlock to a high per-test-invocation probability: Scenario 1 (sweep x repeat): cycle 1/2/4/8/11/16/32 work-unit counts for 20 outer iterations, totalling 140 short filter runs. Per-run pool-starvation deadlock probability is low but amplification gives a ~95% per-test-invocation hit rate. Scenario 2 (determinism): three repeated runs at workUnits=11 must produce bit-identical output. Guards against future barrier rewrites that introduce numerical non-determinism. Scenario 3 (concurrent multi-pipeline): eight pipelines run simultaneously via std::async (88 work units competing for a core-count-bounded worker pool). Cross-pipeline contention directly probes the dispatch-starvation deadlock path. Local measurements on a 16-core x86 box with TBB: - Upstream code, 30 invocations: 30 timeouts (100% deadlock). - Upstream code, POOL/PLATFORM: pass (no worker shortage possible). - Test runtime with a working synchronization: ~5s wallclock. - CTest TIMEOUT set to 30 (6x leeway over expected runtime). This commit intentionally introduces a failing test on master. The proposed fix lands in a follow-up PR. --- .../LevelSets/test/CMakeLists.txt | 6 + ...ieldLevelSetImageFilterRobustnessGTest.cxx | 332 ++++++++++++++++++ 2 files changed, 338 insertions(+) create mode 100644 Modules/Segmentation/LevelSets/test/itkParallelSparseFieldLevelSetImageFilterRobustnessGTest.cxx diff --git a/Modules/Segmentation/LevelSets/test/CMakeLists.txt b/Modules/Segmentation/LevelSets/test/CMakeLists.txt index e59a5c4d0d8..ddabbaaa7f3 100644 --- a/Modules/Segmentation/LevelSets/test/CMakeLists.txt +++ b/Modules/Segmentation/LevelSets/test/CMakeLists.txt @@ -33,6 +33,12 @@ set( createtestdriver(ITKLevelSets "${ITKLevelSets-Test_LIBRARIES}" "${ITKLevelSetsTests}") +set( + ITKLevelSetsGTests + itkParallelSparseFieldLevelSetImageFilterRobustnessGTest.cxx +) +creategoogletestdriver(ITKLevelSets "${ITKLevelSets-Test_LIBRARIES}" "${ITKLevelSetsGTests}") + itk_add_test( NAME itkThresholdSegmentationLevelSetImageFilterTest COMMAND diff --git a/Modules/Segmentation/LevelSets/test/itkParallelSparseFieldLevelSetImageFilterRobustnessGTest.cxx b/Modules/Segmentation/LevelSets/test/itkParallelSparseFieldLevelSetImageFilterRobustnessGTest.cxx new file mode 100644 index 00000000000..8bb54b8163b --- /dev/null +++ b/Modules/Segmentation/LevelSets/test/itkParallelSparseFieldLevelSetImageFilterRobustnessGTest.cxx @@ -0,0 +1,332 @@ +/*========================================================================= + * + * Copyright NumFOCUS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *=========================================================================*/ + +// Robustness test: the neighbor-only inter-phase sync deadlocks under +// pool/TBB MultiThreader backends when work units exceed worker-pool size. +// Only same-work-unit-count runs produce bit-identical output. See the PR +// for the full analysis and scenario rationale. + +#include "gtest/gtest.h" + +#include "itkLevelSetFunction.h" +#include "itkParallelSparseFieldLevelSetImageFilter.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace PSFLSIFR +{ + +constexpr unsigned int DIM = 32; +constexpr int RADIUS = DIM / 4; + +// Abort the process if body does not finish within the deadline, so a +// ParallelizeArray dispatch deadlock fails the test instead of hanging the +// driver. The watchdog runs on its own OS thread, outside the work-unit pool +// that the deadlock pins. +template +void +runWithDeadline(unsigned int seconds, TFunctor && body) +{ + std::mutex m; + std::condition_variable cv; + bool done = false; + std::thread watchdog([&] { + std::unique_lock lk(m); + if (!cv.wait_for(lk, std::chrono::seconds(seconds), [&] { return done; })) + { + std::cerr << "Deadline exceeded (" << seconds << "s): ParallelizeArray dispatch deadlock\n"; + std::abort(); + } + }); + body(); + { + const std::lock_guard lk(m); + done = true; + } + cv.notify_all(); + watchdog.join(); +} + +float +sphere(unsigned int x, unsigned int y, unsigned int z) +{ + float dis = (x - float{ DIM } / 2.0f) * (x - float{ DIM } / 2.0f) + + (y - float{ DIM } / 2.0f) * (y - float{ DIM } / 2.0f) + + (z - float{ DIM } / 2.0f) * (z - float{ DIM } / 2.0f); + dis = RADIUS - std::sqrt(dis); + return -dis; +} + +float +cube(unsigned int x, unsigned int y, unsigned int z) +{ + const float X = itk::Math::Absolute(x - float{ DIM } / 2.0f); + const float Y = itk::Math::Absolute(y - float{ DIM } / 2.0f); + const float Z = itk::Math::Absolute(z - float{ DIM } / 2.0f); + float dis = -std::sqrt((X - RADIUS) * (X - RADIUS) + (Y - RADIUS) * (Y - RADIUS) + (Z - RADIUS) * (Z - RADIUS)); + if (!((X > RADIUS) && (Y > RADIUS) && (Z > RADIUS))) + { + dis = RADIUS - (std::max(std::max(X, Y), Z)); + } + return -dis; +} + +void +fill_image(itk::Image * im, float (*f)(unsigned int, unsigned int, unsigned int)) +{ + itk::Image::IndexType idx; + for (unsigned int x = 0; x < DIM; ++x) + { + idx[0] = x; + for (unsigned int y = 0; y < DIM; ++y) + { + idx[1] = y; + for (unsigned int z = 0; z < DIM; ++z) + { + idx[2] = z; + im->SetPixel(idx, f(x, y, z)); + } + } + } +} + +class MorphFunction : public itk::LevelSetFunction> +{ +public: + using Self = MorphFunction; + using Superclass = itk::LevelSetFunction>; + using Pointer = itk::SmartPointer; + itkOverrideGetNameOfClassMacro(MorphFunction); + itkNewMacro(Self); + + void + SetDistanceTransform(itk::Image * d) + { + m_DistanceTransform = d; + } + +protected: + ~MorphFunction() override = default; + MorphFunction() + { + RadiusType r; + r[0] = r[1] = r[2] = 1; + Superclass::Initialize(r); + } + +private: + itk::Image::Pointer m_DistanceTransform; + ScalarValueType + PropagationSpeed(const NeighborhoodType & nbh, const FloatOffsetType &, GlobalDataStruct *) const override + { + return m_DistanceTransform->GetPixel(nbh.GetIndex()); + } +}; + +class MorphFilter : public itk::ParallelSparseFieldLevelSetImageFilter, itk::Image> +{ +public: + ITK_DISALLOW_COPY_AND_MOVE(MorphFilter); + using Self = MorphFilter; + using Pointer = itk::SmartPointer; + itkOverrideGetNameOfClassMacro(MorphFilter); + itkNewMacro(Self); + itkSetMacro(Iterations, unsigned int); + + void + SetDistanceTransform(itk::Image * im) + { + auto * func = dynamic_cast(this->GetDifferenceFunction().GetPointer()); + if (func == nullptr) + { + itkGenericExceptionMacro("MorphFunction cast failed"); + } + func->SetDistanceTransform(im); + } + +protected: + ~MorphFilter() override = default; + MorphFilter() + { + auto p = MorphFunction::New(); + p->SetPropagationWeight(-1.0); + p->SetAdvectionWeight(0.0); + p->SetCurvatureWeight(1.0); + this->SetDifferenceFunction(p); + } + +private: + unsigned int m_Iterations{ 0 }; + bool + Halt() override + { + return this->GetElapsedIterations() == m_Iterations; + } +}; + +using ImageType = itk::Image; + +ImageType::Pointer +make_init_image() +{ + auto im = ImageType::New(); + ImageType::SizeType sz{ DIM, DIM, DIM }; + ImageType::RegionType r{ sz }; + im->SetRegions(r); + im->Allocate(); + fill_image(im, sphere); + return im; +} + +ImageType::Pointer +make_target_image() +{ + auto im = ImageType::New(); + ImageType::SizeType sz{ DIM, DIM, DIM }; + ImageType::RegionType r{ sz }; + im->SetRegions(r); + im->Allocate(); + fill_image(im, cube); + itk::ImageRegionIterator it(im, im->GetRequestedRegion()); + for (it.GoToBegin(); !it.IsAtEnd(); ++it) + { + it.Value() = it.Value() / std::sqrt((5.0f + itk::Math::sqr(it.Value()))); + } + return im; +} + +ImageType::Pointer +run_one(unsigned int workUnits, unsigned int iterations) +{ + auto init = make_init_image(); + auto target = make_target_image(); + auto mf = MorphFilter::New(); + mf->SetDistanceTransform(target); + mf->SetIterations(iterations); + mf->SetInput(init); + mf->SetNumberOfWorkUnits(workUnits); + mf->SetNumberOfLayers(3); + mf->SetIsoSurfaceValue(0.0); + mf->Update(); + ImageType::Pointer out = mf->GetOutput(); + out->DisconnectPipeline(); + return out; +} + +double +image_summary(ImageType * img) +{ + double sum = 0.0; + itk::ImageRegionConstIterator it(img, img->GetRequestedRegion()); + for (it.GoToBegin(); !it.IsAtEnd(); ++it) + { + sum += static_cast(it.Get()) * static_cast(it.Get()); + } + return std::sqrt(sum / img->GetRequestedRegion().GetNumberOfPixels()); +} + +bool +images_identical(ImageType * a, ImageType * b) +{ + if (a->GetRequestedRegion() != b->GetRequestedRegion()) + { + return false; + } + itk::ImageRegionConstIterator ai(a, a->GetRequestedRegion()); + itk::ImageRegionConstIterator bi(b, b->GetRequestedRegion()); + for (ai.GoToBegin(), bi.GoToBegin(); !ai.IsAtEnd(); ++ai, ++bi) + { + if (ai.Get() != bi.Get()) + { + return false; + } + } + return true; +} + +} // namespace PSFLSIFR + +// Scenario 1: repeatedly cycle work-unit counts to amplify the rare per-run +// pool-starvation deadlock into a near-certain per-test failure. +TEST(ParallelSparseFieldLevelSetRobustness, SweepRepeat) +{ + using namespace PSFLSIFR; + runWithDeadline(30, [] { + const std::vector sweep{ 1, 2, 4, 8, 11, 16, 32 }; + constexpr unsigned int kSweepRepeats = 20; + constexpr unsigned int kSweepIterations = 30; + for (unsigned int rep = 0; rep < kSweepRepeats; ++rep) + { + for (const unsigned int wu : sweep) + { + auto out = run_one(wu, kSweepIterations); + const double summary = image_summary(out); + EXPECT_TRUE(std::isfinite(summary)); + EXPECT_GT(summary, 0.0); + EXPECT_LT(summary, 100.0); + } + } + }); +} + +// Scenario 2: repeated runs at a fixed work-unit count must be bit-identical. +TEST(ParallelSparseFieldLevelSetRobustness, Determinism) +{ + using namespace PSFLSIFR; + runWithDeadline(30, [] { + auto run0 = run_one(11, 100); + for (unsigned int rep = 1; rep < 3; ++rep) + { + auto runN = run_one(11, 100); + EXPECT_TRUE(images_identical(run0, runN)) << "run " << rep << " differs from run 0"; + } + }); +} + +// Scenario 3: eight concurrent std::async pipelines (88 work units) contend +// for the core-bounded worker pool, probing the dispatch-starvation deadlock. +TEST(ParallelSparseFieldLevelSetRobustness, ConcurrentMultiPipeline) +{ + using namespace PSFLSIFR; + runWithDeadline(30, [] { + constexpr unsigned int kConcurrentReps = 6; + constexpr unsigned int kConcurrentPipelines = 8; + for (unsigned int rep = 0; rep < kConcurrentReps; ++rep) + { + std::vector> futures; + futures.reserve(kConcurrentPipelines); + for (unsigned int p = 0; p < kConcurrentPipelines; ++p) + { + futures.emplace_back(std::async(std::launch::async, [] { return run_one(11, 60); })); + } + for (auto & f : futures) + { + ImageType::Pointer out = f.get(); + EXPECT_TRUE(out.IsNotNull()); + } + } + }); +} From 7e2c4896253b30ecc4796477757e49933066b165 Mon Sep 17 00:00:00 2001 From: Hans Johnson Date: Thu, 21 May 2026 11:54:58 -0500 Subject: [PATCH 2/2] BUG: Fix ParallelSparseFieldLevelSet deadlock with explicit std::threads Replace the one ParallelizeArray call in ParallelSparseFieldLevelSetImageFilter::Iterate() that contains inner SignalNeighborsAndWait calls with explicit std::thread spawning. The inner sync requires every work unit to hold its own concurrent OS thread for the full duration of the dispatch; ParallelizeArray's pool/TBB backends cannot guarantee this when work-unit count exceeds free worker count. std::thread guarantees one OS thread per work unit so the neighbor-only sync protocol (left untouched) completes on every invocation. Other parallel sections in Iterate() (ThreadedCalculateChange, ThreadedLoadBalance1/2, post-process) do not call SignalNeighborsAndWait and stay on ParallelizeArray. The robustness test added in the previous PR drops from a 100% per-invocation deadlock rate to 0/20 with this change. The baseline itkParallelSparseFieldLevelSetImageFilterTest image-compare against the on-disk MD5-pinned baseline passes bit-identically. Tracks the test PR ("ENH: Add robustness test exposing ParallelSparseFieldLevelSet deadlock") and depends on it being merged first; the test currently fails on master and turns green with this commit applied. --- ...ParallelSparseFieldLevelSetImageFilter.hxx | 45 ++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/Modules/Segmentation/LevelSets/include/itkParallelSparseFieldLevelSetImageFilter.hxx b/Modules/Segmentation/LevelSets/include/itkParallelSparseFieldLevelSetImageFilter.hxx index df0a9536700..dc294e94088 100644 --- a/Modules/Segmentation/LevelSets/include/itkParallelSparseFieldLevelSetImageFilter.hxx +++ b/Modules/Segmentation/LevelSets/include/itkParallelSparseFieldLevelSetImageFilter.hxx @@ -28,6 +28,7 @@ #include "itkMath.h" #include "itkPlatformMultiThreader.h" #include "itkPrintHelper.h" +#include namespace itk { @@ -1155,16 +1156,40 @@ ParallelSparseFieldLevelSetImageFilter::Iterate() return; } - mt->ParallelizeArray( - 0, - m_NumOfWorkUnits, - [this](SizeValueType threadId) { - this->ThreadedApplyUpdate(m_TimeStep, threadId); - // We only need to wait for neighbors because ThreadedCalculateChange - // requires information only from the neighbors. - this->SignalNeighborsAndWait(threadId); - }, - nullptr); + // Each work unit needs its own concurrent OS thread: SignalNeighborsAndWait blocks until neighbors arrive. + { + std::vector applyThreads; + applyThreads.reserve(m_NumOfWorkUnits > 0 ? m_NumOfWorkUnits - 1 : 0); + // Join every spawned thread on scope exit, including the exception path, + // so a joinable std::thread is never destroyed (which calls std::terminate). + const auto joinAll = [&applyThreads]() { + for (auto & th : applyThreads) + { + if (th.joinable()) + { + th.join(); + } + } + }; + try + { + for (ThreadIdType t = 1; t < m_NumOfWorkUnits; ++t) + { + applyThreads.emplace_back([this, t]() { + this->ThreadedApplyUpdate(m_TimeStep, t); + this->SignalNeighborsAndWait(t); + }); + } + this->ThreadedApplyUpdate(m_TimeStep, 0); + this->SignalNeighborsAndWait(0); + } + catch (...) + { + joinAll(); + throw; + } + joinAll(); + } if (this->GetElapsedIterations() % LOAD_BALANCE_ITERATION_FREQUENCY == 0)