<div class="alert alert-info" style="background-color:#006a79; color:white; padding:0px 10px; border-radius:5px;"><h2 style='margin:10px 5px'>Parallel Processing</h2>
</div>

Parallel processing is a mode of operation where the task is executed simultaneously in multiple processors in the same computer. It is meant to reduce the overall processing time. In this tutorial, you’ll understand the procedure to parallelize any typical logic using python’s multiprocessing module.

Python offers several libraries and methods for parallelization, including:

## 1.Multiprocessing:
This method uses multiple processes to run tasks in parallel. Since each process has its own interpreter and memory space, multiprocessing in Python can utilize multiple CPU cores and overcome the limitations of the GIL. However, due to the overhead of inter-process communication, multiprocessing in Python may not be efficient for small tasks.

In [2]:
pip install multiprocessing

Collecting multiprocessing
  Downloading multiprocessing-2.6.2.1.tar.gz (108 kB)
     -------------------------------------- 108.0/108.0 kB 2.1 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'error'
Note: you may need to restart the kernel to use updated packages.


  error: subprocess-exited-with-error
  
  python setup.py egg_info did not run successfully.
  exit code: 1
  
  [7 lines of output]
  Traceback (most recent call last):
    File "<string>", line 2, in <module>
    File "<pip-setuptools-caller>", line 34, in <module>
    File "C:\Users\Kashish.sukhwani\AppData\Local\Temp\pip-install-9_iu1z1i\multiprocessing_aa763fa76a77486395d02818ca76a823\setup.py", line 94
      print 'Macros:'
      ^^^^^^^^^^^^^^^
  SyntaxError: Missing parentheses in call to 'print'. Did you mean print(...)?
  [end of output]
  
  note: This error originates from a subprocess, and is likely not a problem with pip.
error: metadata-generation-failed

Encountered error while generating package metadata.

See above for output.

note: This is an issue with the package mentioned above, not pip.
hint: See above for details.

[notice] A new release of pip is available: 23.0 -> 23.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import multiprocessing

def worker():
    print("Process running...")

processes = []
for i in range(5):
    p = multiprocessing.Process(target=worker)
    processes.append(p)
    p.start()


Maximum parallel processes that can you run at a time is limited by the number of processors in your computer.

In [1]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

Number of processors:  8


There are 2 main objects in multiprocessing to implement parallel execution of a function: 
The Pool Class and the Process Class.

**Pool Class** (Used generally)
* 1.Synchronous execution(Same order, achieved by locking the main program until the respective processes are finished.)

    Pool.map() and Pool.starmap()
    
    Pool.apply()

* 2.Asynchronous execution(oesn’t involve locking. As a result, the order of results can get mixed up but usually gets done quicker.)

   Pool.map_async() and Pool.starmap_async()
   
   Pool.apply_async())


**Process Class**

In [2]:
import multiprocessing
import time
  
  
class Process(multiprocessing.Process):
    def __init__(self, id):
        super(Process, self).__init__()
        self.id = id
                 
    def run(self):
        time.sleep(1)
        print("I'm the process with id: {}".format(self.id))
  
if __name__ == '__main__':
    p = Process(0)
    p.start()
    p.join()
    p = Process(1)
    p.start()
    p.join()
    


## 2. concurrent.futures: 
This module provides a high-level interface for asynchronously executing functions using threads or processes. It abstracts away the low-level details of threading and multiprocessing, and allows you to easily parallelize tasks using a simple syntax.

In [2]:
import concurrent.futures

def worker():
    print("Task running...")

with concurrent.futures.ThreadPoolExecutor() as executor:
    for i in range(5):
        executor.submit(worker)


Task running...
Task running...
Task running...
Task running...
Task running...


## 3. Dask:
This is a library for parallel computing in Python that extends the functionality of Pandas and NumPy to enable parallel and distributed computing. Dask provides parallel versions of many familiar data manipulation functions and allows you to scale computations from a single machine to a cluster of machines.

In [4]:
pip install dask

Collecting dask
  Downloading dask-2023.3.1-py3-none-any.whl (1.2 MB)
     ---------------------------------------- 1.2/1.2 MB 2.4 MB/s eta 0:00:00
Collecting partd>=1.2.0
  Downloading partd-1.3.0-py3-none-any.whl (18 kB)
Collecting locket
  Downloading locket-1.0.0-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: locket, partd, dask
Successfully installed dask-2023.3.1 locket-1.0.0 partd-1.3.0
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0 -> 23.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
import dask.array as da

x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = x.mean(axis=0).compute()


## joblib
It is a library for running embarrassingly parallel computations in Python, including parallelizing for loops, map calls, and scientific calculations. It provides a simple and efficient way to distribute parallel jobs across multiple CPU cores, making it easy to take advantage of modern multi-core processors.

* Parallelize a for Loop:

In [5]:
from joblib import Parallel, delayed

def square(x):
    return x * x

# Generate a list of numbers from 0 to 999
numbers = list(range(1000))

# Square each number in parallel using 4 CPU cores
results = Parallel(n_jobs=4)(delayed(square)(x) for x in numbers)

# Print the results
print(results)


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801, 10000, 10201, 10404, 10609, 10816, 11025, 11236, 11449, 11664, 11881, 12100, 12321, 12544, 12769, 12996, 13225, 13456, 13689, 13924, 14161, 14400, 14641, 14884, 15129, 15376, 15625, 15876, 16129, 16384, 16641, 16900, 17161, 17424, 17689, 17956, 18225, 18496, 18769, 19044, 19321, 19600, 19881, 20164, 20449, 20736, 21025, 21316, 21609, 21904, 22201, 22500, 22801, 23104, 23409, 23716, 24025, 24336, 24649, 24964, 25281, 25600, 25921, 26244, 2656

In this example, we define a function square() that simply returns the square of its input. We then generate a list of numbers from 0 to 999, and use joblib to parallelize the for loop that squares each number in the list. We use 4 CPU cores to perform the computation in parallel. Finally, we print the results.

* Parallelize a map function:

In [6]:
from joblib import Parallel, delayed

def square(x):
    return x * x

# Generate a list of numbers from 0 to 999
numbers = list(range(1000))

# Square each number in parallel using 4 CPU cores
results = Parallel(n_jobs=4)(delayed(square)(x) for x in numbers)

# Print the results
print(results)


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801, 10000, 10201, 10404, 10609, 10816, 11025, 11236, 11449, 11664, 11881, 12100, 12321, 12544, 12769, 12996, 13225, 13456, 13689, 13924, 14161, 14400, 14641, 14884, 15129, 15376, 15625, 15876, 16129, 16384, 16641, 16900, 17161, 17424, 17689, 17956, 18225, 18496, 18769, 19044, 19321, 19600, 19881, 20164, 20449, 20736, 21025, 21316, 21609, 21904, 22201, 22500, 22801, 23104, 23409, 23716, 24025, 24336, 24649, 24964, 25281, 25600, 25921, 26244, 2656

In this example, we define a function square() that simply returns the square of its input. We then generate a list of numbers from 0 to 999, and use joblib to parallelize the map() function that squares each number in the list. We use 4 CPU cores to perform the computation in parallel. Finally, we print the results.

## COMPARISION
Parallelization is faster than other methods

**EXAMPLE 1:**
In this example, we are running a simple function do_something() ten times in a loop. The function takes one second to complete using the time.sleep() function. We are timing how long it takes to complete all ten iterations using serial processing

In [1]:
import time

def do_something():
    # A function that takes some time to complete
    time.sleep(1)

start_time = time.time()
for i in range(10):
    do_something()
print("Serial execution time:", time.time() - start_time)


Serial execution time: 10.063575506210327


Now we are using the joblib library to parallelize the same function do_something() ten times in a loop. We are using two CPU cores to run the tasks in parallel. We are timing how long it takes to complete all ten iterations using parallel processing.

In [2]:
import time
from joblib import Parallel, delayed

def do_something():
    # A function that takes some time to complete
    time.sleep(1)

start_time = time.time()
Parallel(n_jobs=2)(delayed(do_something)() for i in range(10))
print("Parallel execution time:", time.time() - start_time)


Parallel execution time: 8.216288805007935


**EXAMPLE 2:**
In this example, we are running a recursive function fib() 35 times in a loop and storing the results in a list. The function calculates the nth Fibonacci number using recursion. We are timing how long it takes to complete all the iterations using serial processing.

In [1]:
import time

def fib(n):
    # A function that returns the nth Fibonacci number
    if n <= 1:
        return n
    return fib(n-1) + fib(n-2)

start_time = time.time()
result = []
for i in range(35):
    result.append(fib(i))
print("Serial execution time:", time.time() - start_time)


Serial execution time: 13.787769794464111


In [2]:
#Using Parallelization
import time
from joblib import Parallel, delayed

def fib(n):
    # A function that returns the nth Fibonacci number
    if n <= 1:
        return n
    return fib(n-1) + fib(n-2)

start_time = time.time()
result = Parallel(n_jobs=-1)(delayed(fib)(i) for i in range(35))
print("Parallel execution time:", time.time() - start_time)
print("Results:", result)


Parallel execution time: 2.878491163253784
Results: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887]
