# Parallelization

In [9]:
!pip install timebudget

Collecting timebudget
  Downloading timebudget-0.7.1-py3-none-any.whl (9.5 kB)
Installing collected packages: timebudget
Successfully installed timebudget-0.7.1
[0m

In [27]:
import os
print('Number of CPUs in the system: {}'.format(os.cpu_count()))

Number of CPUs in the system: 12


In [28]:
import math
import numpy as np
from timebudget import timebudget

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))

    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@timebudget
def run_complex_operations(operation, input):
    for i in input:
        operation(i)

input = range(10)
run_complex_operations(complex_operation, input)

Complex operation. Input index:  0
Complex operation. Input index:  1
Complex operation. Input index:  2
Complex operation. Input index:  3
Complex operation. Input index:  4
Complex operation. Input index:  5
Complex operation. Input index:  6
Complex operation. Input index:  7
Complex operation. Input index:  8
Complex operation. Input index:  9
run_complex_operations took 25.370sec


# Process-Based Parallelism

In [33]:
import math
import numpy as np
from timebudget import timebudget
from multiprocessing import Pool

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}\n".format(input_index))
    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@timebudget
def run_complex_operations(operation, input, pool):
    pool.map(operation, input)

processes_count = 10

# if __name__ == '__main__':
processes_pool = Pool(processes_count)
run_complex_operations(complex_operation, range(10), processes_pool)

Complex operation. Input index:  8
Complex operation. Input index:  1
Complex operation. Input index:  2
Complex operation. Input index:  5
Complex operation. Input index:  0
Complex operation. Input index:  3
Complex operation. Input index:  6
Complex operation. Input index:  7
Complex operation. Input index:  4
Complex operation. Input index:  9










run_complex_operations took 6.914sec


[33m(raylet)[0m [2023-11-25 17:37:25,498 E 42339 42339] (raylet) node_manager.cc:3035: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 57246ea15ef728432a5cbf325a103fd18faee4bee80faa10eadb7272, IP: 172.17.0.2) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.17.0.2`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


# Specialized Libraries

In [34]:
import math
import numpy as np
from timebudget import timebudget
from multiprocessing import Pool

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))

    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

def complex_operation_numpy(input_index):
    print("Complex operation (numpy). Input index: {:2d}".format(input_index))

    data = np.ones(iterations_count)
    np.exp(data) * np.sinh(data)

@timebudget
def run_complex_operations(operation, input, pool):
    pool.map(operation, input)

processes_count = 10

# if __name__ == '__main__':
processes_pool = Pool(processes_count)
print('Without NumPy')
run_complex_operations(complex_operation, range(10), processes_pool)
print('\nNumPy')
run_complex_operations(complex_operation_numpy, range(10), processes_pool) 

Complex operation. Input index:  6Complex operation. Input index:  9Complex operation. Input index:  0Complex operation. Input index:  5Complex operation. Input index:  2Complex operation. Input index:  8Complex operation. Input index:  1Complex operation. Input index:  3Complex operation. Input index:  7Complex operation. Input index:  4











*** SIGTERM received at time=1700933850 on cpu 4 ***
*** SIGTERM received at time=1700933850 on cpu 9 ***
*** SIGTERM received at time=1700933850 on cpu 2 ***
*** SIGTERM received at time=1700933850 on cpu 8 ***
*** SIGTERM received at time=1700933850 on cpu 11 ***
*** SIGTERM received at time=1700933850 on cpu 1 ***
*** SIGTERM received at time=1700933850 on cpu 7 ***
*** SIGTERM received at time=1700933850 on cpu 0 ***
*** SIGTERM received at time=1700933850 on cpu 6 ***
*** SIGTERM received at time=1700933850 on cpu 5 ***
PC: @           0x4d8220  (unknown)  _PyEval_EvalFrameDefault
PC: @           0x4d8220  (unknown)  _PyEval_EvalFrameDefault
PC: @           0x4d8220  (unknown)  _PyEval_EvalFrameDefault
PC: @           0x4d8220  (unknown)  _PyEval_EvalFrameDefault
PC: @           0x4d8220  (unknown)  _PyEval_EvalFrameDefault
    @     0x7f5a640a5420  (unknown)  (unknown)
    @     0x7f5a640a5420  (unknown)  (unknown)
    @     0x7f5a640a5420  (unknown)  (unknown)
    @     0x7f5a64

Without NumPy
run_complex_operations took 6.135sec
Complex operation (numpy). Input index:  0Complex operation (numpy). Input index:  1Complex operation (numpy). Input index:  2Complex operation (numpy). Input index:  6Complex operation (numpy). Input index:  5Complex operation (numpy). Input index:  7Complex operation (numpy). Input index:  3Complex operation (numpy). Input index:  4Complex operation (numpy). Input index:  9Complex operation (numpy). Input index:  8










NumPy
run_complex_operations took 791.87ms


# Ray

In [24]:
!pip install ray

Collecting ray
  Downloading ray-2.8.0-cp38-cp38-manylinux2014_x86_64.whl.metadata (13 kB)
Collecting click>=7.0 (from ray)
  Downloading click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting filelock (from ray)
  Downloading filelock-3.13.1-py3-none-any.whl.metadata (2.8 kB)
Collecting msgpack<2.0.0,>=1.0.0 (from ray)
  Downloading msgpack-1.0.7-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.1 kB)
Collecting protobuf!=3.19.5,>=3.15.3 (from ray)
  Downloading protobuf-4.25.1-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting aiosignal (from ray)
  Downloading aiosignal-1.3.1-py3-none-any.whl (7.6 kB)
Collecting frozenlist (from ray)
  Downloading frozenlist-1.4.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.2 kB)
Downloading ray-2.8.0-cp38-cp38-manylinux2014_x86_64.whl (62.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.6/62.6 MB[0m [31m5.8 MB/s[0m eta [36

In [35]:
import math
import numpy as np
from timebudget import timebudget
import ray

iterations_count = round(1e7)

@ray.remote
def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))

    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@ray.remote
def complex_operation_numpy(input_index):
    print("Complex operation (numpy). Input index: {:2d}".format(input_index))

    data = np.ones(iterations_count)
    np.exp(data) * np.sinh(data)

@timebudget
def run_complex_operations(operation, input):
    
	ray.get([operation.remote(i) for i in input])

ray.shutdown()
ray.init()

input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input)
print('NumPy')
run_complex_operations(complex_operation_numpy, input)

2023-11-25 17:38:23,893	INFO worker.py:1673 -- Started a local Ray instance.


Without NumPy
[36m(complex_operation pid=43803)[0m Complex operation. Input index:  0
run_complex_operations took 6.387sec
NumPy
[36m(complex_operation_numpy pid=43799)[0m Complex operation (numpy). Input index:  0
[36m(complex_operation pid=43804)[0m Complex operation. Input index:  9[32m [repeated 9x across cluster][0m
run_complex_operations took 823.77ms
