In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

23/02/15 18:33:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [16]:
sc.getConf().getAll()

[('spark.app.name', 'JoblibSparkBackend'),
 ('spark.driver.host', '222-245.wifi-inria-saclay.saclay.inria.fr'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1676482422519'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '62530'),
 ('spark.ui.showConsoleProgress', 'true')]

In [3]:
sc.defaultParallelism

8

In [6]:
from joblib.parallel import register_parallel_backend
from joblibspark.backend import SparkDistributedBackend

import sys
import logging


def _get_logger(name):
    """ Gets a logger by name, or creates and configures it for the first time. """
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    # If the logger is configured, skip the configure
    if not logger.handlers and not logging.getLogger().handlers:
        handler = logging.StreamHandler(sys.stderr)
        logger.addHandler(handler)
    return logger


logger = _get_logger("joblib-spark")


class MySparkDistributedBackend(SparkDistributedBackend):
    
    # Hard cap on the number of concurrent hyperopt tasks (Spark jobs) to run. Set at 128.
    MAX_CONCURRENT_JOBS_ALLOWED = 128
    
    def __init__(self, **backend_args):
        super().__init__(**backend_args)
        self._spark = spark
    
    @staticmethod
    def _decide_parallelism(requested_parallelism,
                            spark_default_parallelism,
                            max_num_concurrent_tasks):
        """
        Given the requested parallelism, return the max parallelism SparkTrials will actually use.
        See the docstring for `parallelism` in the constructor for expected behavior.
        """
        if max_num_concurrent_tasks == 0:
            logger.warning(
                "The cluster has no executors currently. "
                "The trials won't start until some new executors register."
            )
        if requested_parallelism is None:
            parallelism = 1
        elif requested_parallelism <= 0:
            parallelism = max(spark_default_parallelism, max_num_concurrent_tasks, 1)
            logger.warning(
                "Because the requested parallelism was None or a non-positive value, "
                "parallelism will be set to ({d}), which is Spark's default parallelism ({s}), "
                "or the current total of Spark task slots ({t}), or 1, whichever is greater. "
                "We recommend setting parallelism explicitly to a positive value because "
                "the total of Spark task slots is subject to cluster sizing.".format(
                    d=parallelism,
                    s=spark_default_parallelism,
                    t=max_num_concurrent_tasks,
                )
            )
        else:
            parallelism = requested_parallelism

        if parallelism > MySparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED:
            logger.warning(
                "Parallelism ({p}) is capped at SparkTrials.MAX_CONCURRENT_JOBS_ALLOWED ({c})."
                .format(p=parallelism, c=MySparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED)
            )
            parallelism = MySparkDistributedBackend.MAX_CONCURRENT_JOBS_ALLOWED

        if parallelism > max_num_concurrent_tasks:
            logger.warning(
                "Parallelism ({p}) is greater than the current total of Spark task slots ({c}). "
                "If dynamic allocation is enabled, you might see more executors allocated.".format(
                    p=requested_parallelism, c=max_num_concurrent_tasks
                )
            )
        return parallelism
    
        
    def effective_n_jobs(self, n_jobs):
        """
        n_jobs is None will request 1 worker.
        n_jobs=-1 means requesting all available workers,
        but if cluster in dynamic allocation mode and available workers is zero
        then use spark_default_parallelism and trigger spark worker dynamic allocation
        """
        max_num_concurrent_tasks = self._get_max_num_concurrent_tasks()
        spark_default_parallelism = self._spark.sparkContext.defaultParallelism
        return self._decide_parallelism(
            requested_parallelism=n_jobs,
            spark_default_parallelism=spark_default_parallelism,
            max_num_concurrent_tasks=max_num_concurrent_tasks,
        )
    
register_parallel_backend("spark", MySparkDistributedBackend)

In [3]:
from joblib import cpu_count, Parallel, delayed, parallel_backend

cpu_count()

8

In [4]:
parallel = Parallel(n_jobs=-1, backend="spark")
parallel._effective_n_jobs()

23/02/17 10:21:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


8

# Joblib distributed

This work was first introduced during a Joblib sprint organised at [Inria](https://team.inria.fr/soda/). Our aim is to benchmark the performances of joblib with various popular distributed backends, like Spark, Ray and Dask.

Joblib is the package enabling embarassingly parallel operations in scikit-learn, powering estimators like `RandomForest` or `GridSearch`. Its main objective is to avoid oversubscription —spawning too much threads compared to the number of cores— by tricks like cgroup awareness, that is lacking in the `multiprocessing` package for exemple.

When you run a jupyter notebook in your localhost, the number of physical cores of your machine should match the number of accessible cores by the software. However, when this jupyter notebook run within a Docker container from a Kubernetes pod, the number of physical cores of the cluster no longer match those accessible by your instance. This is where cgroup awareness is useful.

In joblib you can access it with:

In [9]:
from joblib import cpu_count

cpu_count()

8

In [None]:
and with multiprocessing 

We begin our journey by defining a super simple function, that will simulate an expensive computation, and return the associated PID of the thread running it.

In [1]:
import time
import os
from collections import Counter

def bench():
    time.sleep(1)
    return os.getpid()

In [None]:
The 

In [8]:
%%time

with parallel_backend('loky', n_jobs=8):
    out = Parallel(verbose=1)(
        delayed(bench)()
        for i in range(80)
    )
    print(Counter(out))

[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    5.8s


Counter({21872: 10, 21873: 10, 21874: 10, 21875: 10, 21876: 10, 21877: 10, 21878: 10, 21879: 10})
CPU times: user 239 ms, sys: 101 ms, total: 339 ms
Wall time: 11 s


[Parallel(n_jobs=8)]: Done  80 out of  80 | elapsed:   10.9s finished


In [2]:
from joblibspark import register_spark

register_spark()

In [6]:
%%time

with parallel_backend('spark', n_jobs=8):
    out = Parallel(verbose=1)(
        delayed(bench)()
        for i in range(80)
    )
    print(Counter(out))

[Parallel(n_jobs=8)]: Using backend SparkDistributedBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    5.4s                    
[Stage 172:>  (0 + 1) / 1][Stage 173:>  (0 + 1) / 1][Stage 174:>  (0 + 1) / 1]  

Counter({21729: 10, 21731: 10, 21732: 10, 21733: 10, 21727: 10, 21728: 10, 21730: 10, 21726: 10})
CPU times: user 613 ms, sys: 246 ms, total: 859 ms
Wall time: 10.7 s


[Parallel(n_jobs=8)]: Done  80 out of  80 | elapsed:   10.7s finished           
