Skip to content

[bugfix, enhancement] Address affinity bug by using threadpoolctl/joblib for n_jobs dispatching #2364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 51 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
c78859f
Update _n_jobs_support.py
icfaust Mar 17, 2025
e207110
Update test_run_to_run_stability.py
icfaust Mar 17, 2025
043b09d
Update test_n_jobs_support.py
icfaust Mar 17, 2025
66b0b6d
add changes
icfaust Mar 17, 2025
bc66055
add other changes
icfaust Mar 17, 2025
280c0e0
add an affinity test
icfaust Mar 17, 2025
eb0df7f
reduce lines
icfaust Mar 17, 2025
2403e6d
use pylance
icfaust Mar 17, 2025
009348b
further fixes
icfaust Mar 17, 2025
29c318f
better docs
icfaust Mar 17, 2025
ed726de
better docs
icfaust Mar 18, 2025
2b58453
mark
icfaust Mar 18, 2025
78d07bb
Update _n_jobs_support.py
icfaust Mar 18, 2025
8da0891
Update _n_jobs_support.py
icfaust Mar 18, 2025
79ced00
Update test_n_jobs_support.py
icfaust Mar 18, 2025
e021335
Update _n_jobs_support.py
icfaust Mar 18, 2025
30f822a
Update test_n_jobs_support.py
icfaust Mar 18, 2025
6d02aea
Update _n_jobs_support.py
icfaust Mar 18, 2025
84f91ac
Update test_n_jobs_support.py
icfaust Mar 18, 2025
a2a499a
Update _n_jobs_support.py
icfaust Mar 18, 2025
ac16042
Update _n_jobs_support.py
icfaust Mar 18, 2025
e6fdd80
Update incremental_linear.py
icfaust Mar 18, 2025
04075dc
Update incremental_ridge.py
icfaust Mar 18, 2025
dd798fa
Update test_n_jobs_support.py
icfaust Mar 18, 2025
70d613e
Update _n_jobs_support.py
icfaust Mar 19, 2025
1b121a5
Update test_run_to_run_stability.py
icfaust Mar 20, 2025
7bd1fcb
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust Mar 20, 2025
bbb2337
Update test_run_to_run_stability.py
icfaust Mar 21, 2025
a3ceaf3
Update requirements-test.txt
icfaust Mar 23, 2025
97a906f
Update requirements-test.txt
icfaust Mar 23, 2025
1948e7d
Update requirements-test.txt
icfaust Mar 23, 2025
62c7d9f
Update requirements-test.txt
icfaust Mar 23, 2025
ce79ace
Update requirements-test.txt
icfaust Mar 24, 2025
8765c0a
Merge branch 'main' into dev/njobs_fix
icfaust Mar 24, 2025
e2fa126
return values, and reduce test
icfaust Mar 24, 2025
1ca56cd
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust Apr 8, 2025
ab1c1eb
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust Apr 20, 2025
e9b5da5
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust May 1, 2025
f979da3
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust May 26, 2025
b56729e
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust May 31, 2025
2b2749c
Update data_conversion.cpp
icfaust Jun 13, 2025
3f0155b
Update table.cpp
icfaust Jun 13, 2025
385ad80
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust Jun 13, 2025
052bcdd
Update data_conversion.cpp
icfaust Jun 13, 2025
c638473
Update test_memory_usage.py
icfaust Jun 13, 2025
e00feb3
Update _n_jobs_support.py
icfaust Jun 17, 2025
627df75
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust Jun 18, 2025
3bf60b5
Merge branch 'uxlfoundation:main' into dev/njobs_fix
icfaust Jun 20, 2025
df38221
Update test_n_jobs_support.py
icfaust Jun 20, 2025
51ccef3
Update run_test.sh
icfaust Jun 22, 2025
17de438
Update ci.yml
icfaust Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 42 additions & 62 deletions daal4py/sklearn/_n_jobs_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,50 @@
import threading
from functools import wraps
from inspect import Parameter, signature
from multiprocessing import cpu_count
from numbers import Integral
from warnings import warn

import threadpoolctl
from joblib import cpu_count

from daal4py import daalinit as set_n_threads
from daal4py import num_threads as get_n_threads
from daal4py import _get__daal_link_version__, daalinit, num_threads

from ._utils import sklearn_check_version

if sklearn_check_version("1.2"):
from sklearn.utils._param_validation import validate_parameter_constraints
else:

def validate_parameter_constraints(n_jobs):
if n_jobs is not None and n_jobs.__class__ != int:
raise TypeError(
f"n_jobs must be an instance of int, not {n_jobs.__class__.__name__}."
)


class oneDALLibController(threadpoolctl.LibController):
user_api = "oneDAL"
internal_api = "oneDAL"

filename_prefixes = ("libonedal_thread", "libonedal")

def get_num_threads(self):
return num_threads()

def set_num_threads(self, nthreads):
Copy link
Contributor

@david-cortes-intel david-cortes-intel Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this setting would apply globally, which could lead to race conditions if users call this in parallel, for example through some framework that would parallelize estimator calls.

Could it somehow get a mutex (or use atomic ops) either here or on the oneDAL side?

Also, would be better to add a warning that the setting is changed at a global level, so that a user would not try to call these inside multi-threaded code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on a further look, it does already have a mutex on the daal side. Still better to document this behavior being global.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will do!

daalinit(nthreads)

def get_version(self):
return _get__daal_link_version__


threadpoolctl.register(oneDALLibController)

# Note: getting controller in global scope of this module is required
# to avoid overheads by its initialization per each function call
threadpool_controller = threadpoolctl.ThreadpoolController()


def get_suggested_n_threads(n_cpus):
"""
Function to get `n_threads` limit
if `n_jobs` is set in upper parallelization context.
Usually, limit is equal to `n_logical_cpus` // `n_jobs`.
Returns None if limit is not set.
"""
n_threads_map = {
lib_ctl.internal_api: lib_ctl.get_num_threads()
for lib_ctl in threadpool_controller.lib_controllers
if lib_ctl.internal_api != "mkl"
}
# openBLAS is limited to 24, 64 or 128 threads by default
# depending on SW/HW configuration.
# thus, these numbers of threads from openBLAS are uninformative
if "openblas" in n_threads_map and n_threads_map["openblas"] in [24, 64, 128]:
del n_threads_map["openblas"]
# remove default values equal to n_cpus as uninformative
for backend in list(n_threads_map.keys()):
if n_threads_map[backend] == n_cpus:
del n_threads_map[backend]
if len(n_threads_map) > 0:
return min(n_threads_map.values())
else:
return None


def _run_with_n_jobs(method):
"""
Decorator for running of methods containing oneDAL kernels with 'n_jobs'.
Expand All @@ -79,59 +76,42 @@ def _run_with_n_jobs(method):
@wraps(method)
def n_jobs_wrapper(self, *args, **kwargs):
# threading parallel backend branch
if not isinstance(threading.current_thread(), threading._MainThread):
warn(
"'Threading' parallel backend is not supported by "
"Intel(R) Extension for Scikit-learn*. "
"Falling back to usage of all available threads."
)
result = method(self, *args, **kwargs)
return result
# multiprocess parallel backends branch
# preemptive validation of n_jobs parameter is required
# because '_run_with_n_jobs' decorator is applied on top of method
# where validation takes place
if sklearn_check_version("1.2") and hasattr(self, "_parameter_constraints"):
if sklearn_check_version("1.2"):
validate_parameter_constraints(
parameter_constraints={"n_jobs": self._parameter_constraints["n_jobs"]},
params={"n_jobs": self.n_jobs},
caller_name=self.__class__.__name__,
)
# search for specified n_jobs
n_jobs = self.n_jobs
n_cpus = cpu_count()
else:
validate_parameter_constraints(self.n_jobs)

# receive n_threads limitation from upper parallelism context
# using `threadpoolctl.ThreadpoolController`
n_threads = get_suggested_n_threads(n_cpus)
# get real `n_jobs` number of threads for oneDAL
# using sklearn rules and `n_threads` from upper parallelism context
if n_jobs is None or n_jobs == 0:
if n_threads is None:
# default branch with no setting for n_jobs
return method(self, *args, **kwargs)
else:
n_jobs = n_threads
elif n_jobs < 0:
if n_threads is None:
n_jobs = max(1, n_cpus + n_jobs + 1)
else:
n_jobs = max(1, n_threads + n_jobs + 1)
# branch with set n_jobs
old_n_threads = get_n_threads()
if n_jobs == old_n_threads:
return method(self, *args, **kwargs)

try:
if not self.n_jobs:
n_jobs = cpu_count()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this later on get limited to the number of physical cores from oneDAL side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be honest, I'm not 100% sure yet. The default in the threading.h in daal will set it to the number of CPUs, but with the affinity I didn't spend the full time to track the default setting there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @Alexsandruss could comment here on whether it'd end up limited to number of physical cores somewhere else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like setting the number of threads like this would not result in that number later on getting limited to the number of physical cores. How about passing argument only_physical_cores=True here?

Copy link
Contributor

@david-cortes-intel david-cortes-intel Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried adding a line to print this value here: https://github.com/uxlfoundation/oneDAL/blob/31cafec9950f1db352b639dafad5875971ca00fe/cpp/daal/src/threading/threading.cpp#L267
.. and from what I see, it is indeed set to the result of cpu_count(only_physical_cores=False).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although from some further testing, this behavior also appears to be the same in the current main branch.

else:
n_jobs = (
self.n_jobs if self.n_jobs > 0 else max(1, cpu_count() + self.n_jobs + 1)
)

if (old_n_threads := num_threads()) != n_jobs:
logger = logging.getLogger("sklearnex")
cl = self.__class__
logger.debug(
f"{cl.__module__}.{cl.__name__}.{method.__name__}: "
f"setting {n_jobs} threads (previous - {old_n_threads})"
)
set_n_threads(n_jobs)
with threadpool_controller.limit(limits=n_jobs, user_api="oneDAL"):
return method(self, *args, **kwargs)
else:
return method(self, *args, **kwargs)
finally:
set_n_threads(old_n_threads)

return n_jobs_wrapper

Expand Down
4 changes: 4 additions & 0 deletions daal4py/sklearn/ensemble/AdaBoostClassifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@

@control_n_jobs(decorated_methods=["fit", "predict"])
class AdaBoostClassifier(ClassifierMixin, BaseEstimator):

if sklearn_check_version("1.2"):
_parameter_constraints = {}

def __init__(
self,
split_criterion="gini",
Expand Down
4 changes: 4 additions & 0 deletions daal4py/sklearn/ensemble/GBTDAAL.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@


class GBTDAALBase(BaseEstimator, d4p.mb.GBTDAALBaseModel):

if sklearn_check_version("1.2"):
_parameter_constraints = {}

def __init__(
self,
split_method="inexact",
Expand Down
38 changes: 30 additions & 8 deletions sklearnex/tests/test_n_jobs_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@

import inspect
import logging
from multiprocessing import cpu_count
import os

import pytest
from joblib import cpu_count
from sklearn.datasets import make_classification
from sklearn.exceptions import NotFittedError
from threadpoolctl import threadpool_info

from sklearnex.tests.utils import (
PATCHED_MODELS,
SPECIAL_INSTANCES,
call_method,
gen_dataset,
gen_models_info,
)
from sklearnex.tests.utils import PATCHED_MODELS, SPECIAL_INSTANCES, call_method

_X, _Y = make_classification(n_samples=40, n_features=4, random_state=42)

Expand Down Expand Up @@ -106,3 +102,29 @@ def test_n_jobs_support(estimator, n_jobs, caplog):

messages = [msg.message for msg in caplog.records]
assert _check_n_jobs_entry_in_logs(messages, method_name, n_jobs)


@pytest.mark.skipif(
not hasattr(os, "sched_setaffinity") or len(os.sched_getaffinity(0)) < 2,
reason="python CPU affinity control unavailable or too few threads",
)
@pytest.mark.parametrize("estimator", {**PATCHED_MODELS, **SPECIAL_INSTANCES}.keys())
def test_n_jobs_affinity(estimator, caplog):
# verify that n_jobs 1) starts at default value of cpu_count
# 2) respects os.sched_setaffinity on supported machines
n_t = next(i for i in threadpool_info() if i["user_api"] == "oneDAL")["num_threads"]

# get affinity mask of calling process
mask = os.sched_getaffinity(0)
# by default, oneDAL should match the number of threads made available to the sklearnex pytest suite
assert len(mask) == n_t

try:
# use half of the available threads
newmask = set(list(mask)[: len(mask) // 2])
os.sched_setaffinity(0, newmask)
test_n_jobs_support(estimator, None, caplog)

finally:
# reset affinity mask no matter what
os.sched_setaffinity(0, mask)
3 changes: 0 additions & 3 deletions sklearnex/tests/test_run_to_run_stability.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@
sklearn_clone_dict,
)

# to reproduce errors even in CI
d4p.daalinit(nthreads=100)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this causes all sorts of memory leak check test failures, not just in windows and not just with pandas.


_dataset_dict = {
"classification": [
partial(load_iris, return_X_y=True),
Expand Down
Loading