In [25]:
import time
import threading
import multiprocessing
import itertools
import os
import logging
import random
import string
import requests
from functools import partial
from multiprocessing import Queue
from multiprocessing.pool import ThreadPool

# Definitions

In [1]:
# Processes are OS-level resource used to run and manage applications
# Processes have pid, list of open files and sockets, isolated memory,

In [3]:
# Special part of OS (scheduler) iterates over processes and give them time slot to run
TIME_SLOT = 0.5/(1000*1000)
while True:
    for process in processes:
        process.run_for(TIME_SLOT)

In [1]:
# Process has 1 or more threads
# Threads have common memory

In [None]:
# 2 kinds of tasks:
- parallel execution: speedup
- background execution: 
    - daemon process: OS services
    - multithreaded process: notifications, monitor, watcher, etc

In [5]:
Parallel execution can be done in 2 ways:
    - threads
    - processes
    
Which one is better?

# Threads

In [6]:
class timer():
    def __init__(self, message):
        self.message = message

    def __enter__(self):
        self.start = time.time()
        return None

    def __exit__(self, type, value, traceback):
        elapsed_time = (time.time() - self.start)
        print(self.message.format(elapsed_time))

TIME_TO_SLEEP = 1
        
def long_running_task(n=TIME_TO_SLEEP):
#     print(threading.current_thread())
    time.sleep(n)

with timer('Elapsed: {}s'):
#     long_running_task()
    long_running_task()


Elapsed: 1.0002257823944092s


In [35]:
with timer('Elapsed: {}s'):
    t1 = threading.Thread(target=long_running_task, args=(TIME_TO_SLEEP/2,))
    t2 = threading.Thread(target=long_running_task, args=(TIME_TO_SLEEP/2,))
    t1.start()
    t2.start()
#     print(threading.current_thread())
    print('BEFORE T1 JOIN')
    t1.join() # be carefull
    print('BEFORE T2 JOIN')
    t2.join() 
    
print('next step')
# work with date

BEFORE T1 JOIN
BEFORE T2 JOIN
Elapsed: 0.5124528408050537s
next step


In [36]:
def run_threads(func, data, workers):
    threads = [
        threading.Thread(target=func, args=(data / workers, ))
        for _ in range(workers)
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

In [46]:
workers = 10
DATA_SIZE = 1

with timer('Elapsed: {}s'):
    run_threads(long_running_task, DATA_SIZE, workers)

Elapsed: 0.10117959976196289s


In [100]:
def generate_task_queue(total_tasks, queue_size):
    task_queue = []
    remainder = total_tasks
    while remainder > 0:
        value = min(remainder, random.randint(1, total_tasks//queue_size))
        remainder -= value
        task_queue.append(value)
    return task_queue

assert sum(generate_task_queue(1000, 20)) == 1000
assert sum(generate_task_queue(1000, 1)) == 1000
assert sum(generate_task_queue(1000, 1000)) == 1000
assert sum(generate_task_queue(1000, 42)) == 1000

In [52]:
input_data = [DATA_SIZE / workers for _ in range(workers)]
# input_data = [0.15, 0.09, 0.11, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]
print(input_data)

with timer('Elapsed: {}s'):
    with ThreadPool(workers) as pool:
        pool.map(long_running_task, input_data)
#         for chunk in input_data:
#             long_running_task(chunk)

[0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]
Elapsed: 0.11604833602905273s


<img src="https://www.nginx.com/wp-content/uploads/2016/07/thread-pools-worker-process-event-cycle.png">

# Real world task

In [53]:
def fetch_pic(num_pic):
# def fetch_pic(num_pic, path):
    url = 'https://picsum.photos/400/600'
    path = './pics'
    for _ in range(num_pic):
        random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=5))
        response = requests.get(url)
        if response.status_code == 200:
            with open(f'{path}/{random_name}.jpg', 'wb') as f:
                f.write(response.content)
                #print(f"Fetched pic [{os.getpid()}]: {f.name}")

In [57]:
workers = 8
DATA_SIZE = 100

with timer('Elapsed: {}s'):
    with ThreadPool(workers) as pool:
        input_data = [DATA_SIZE // workers for _ in range(workers)]
        # input_data = [(DATA_SIZE // workers, './pics') for _ in range(workers)]
        pool.map(fetch_pic, input_data)

Elapsed: 5.068382501602173s


# IO vs CPU bound tasks

In [9]:
DATA_SIZE = 1_000_000
lst = []

def fill_data(n):
    # print(threading.current_thread())
    while n > 0:
        n -= 1
        lst.append(random.randint(1, 100))
        
with timer('Elapsed: {}s'):
#     fill_data(DATA_SIZE, lst)
    fill_data(DATA_SIZE)

Elapsed: 0.7135238647460938s


In [63]:
with timer('Elapsed: {}s'):
#     t1 = threading.Thread(target=fill_data, args=(DATA_SIZE // 2, lst))
#     t2 = threading.Thread(target=fill_data, args=(DATA_SIZE // 2, lst))
    t1 = threading.Thread(target=fill_data, args=(DATA_SIZE // 2,))
    t2 = threading.Thread(target=fill_data, args=(DATA_SIZE // 2,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

Elapsed: 1.0509212017059326s


In [66]:
lst = []
workers = 16
with timer('Elapsed: {}s'):
    with ThreadPool(workers) as pool:
        input_data = [DATA_SIZE // workers for _ in range(workers)]
#         pool.map(partial(fill_data, lst=lst), input_data)
        result = pool.map(fill_data, input_data)
        
print(len(lst), lst[:100])

Elapsed: 1.1036815643310547s
1000000 [11, 77, 98, 86, 54, 32, 69, 88, 8, 46, 48, 47, 58, 31, 75, 29, 53, 33, 7, 72, 31, 48, 13, 93, 93, 30, 8, 44, 68, 75, 83, 13, 64, 39, 80, 60, 44, 63, 26, 74, 30, 5, 80, 94, 63, 51, 10, 53, 50, 61, 52, 38, 46, 59, 90, 58, 22, 89, 30, 54, 43, 11, 2, 46, 60, 76, 76, 48, 30, 53, 49, 33, 10, 55, 45, 98, 100, 34, 33, 16, 92, 29, 59, 95, 32, 66, 57, 62, 69, 64, 100, 22, 47, 48, 12, 64, 34, 36, 64, 13]


In [75]:
lst = []
workers = 16
with timer('Elapsed: {}s'):
    with multiprocessing.Pool(workers) as pool:
        input_data = [DATA_SIZE // workers for _ in range(workers)]
        pool.map(fill_data, input_data)
        
print(len(lst), lst[:100])

Elapsed: 0.4283015727996826s
0 []


In [17]:
def factorize_naive(n):
    """ A naive factorization method. Take integer 'n', return list of
        factors.
    """
    if n < 2:
        return []
    factors = []
    p = 2

    while True:
        if n == 1:
            return factors

        r = n % p
        if r == 0:
            factors.append(p)
            n = n / p
        elif p * p >= n:
            factors.append(n)
            return factors
        elif p > 2:
            # Advance in steps of 2 over odd numbers
            p += 2
        else:
            # If p == 2, get to 3
            p += 1

    assert False, "unreachable"

In [22]:
result = {}
workers = 16
with timer('Elapsed: {}s'):
    with multiprocessing.Pool(workers) as pool:
        input_data = (i for i in range(1, DATA_SIZE+1))
        result = [
            (n, factors)
            for n, factors in enumerate(pool.map(factorize_naive, input_data), 1)
        ]
    
print(len(result), result[:100])

Elapsed: 6.6087000370025635s
1000000 [(1, []), (2, [2]), (3, [3]), (4, [2, 2]), (5, [5]), (6, [2, 3.0]), (7, [7]), (8, [2, 2, 2]), (9, [3, 3]), (10, [2, 5.0]), (11, [11]), (12, [2, 2, 3.0]), (13, [13]), (14, [2, 7.0]), (15, [3, 5.0]), (16, [2, 2, 2, 2]), (17, [17]), (18, [2, 3, 3]), (19, [19]), (20, [2, 2, 5.0]), (21, [3, 7.0]), (22, [2, 11.0]), (23, [23]), (24, [2, 2, 2, 3.0]), (25, [5, 5]), (26, [2, 13.0]), (27, [3, 3, 3]), (28, [2, 2, 7.0]), (29, [29]), (30, [2, 3, 5.0]), (31, [31]), (32, [2, 2, 2, 2, 2]), (33, [3, 11.0]), (34, [2, 17.0]), (35, [5, 7.0]), (36, [2, 2, 3, 3]), (37, [37]), (38, [2, 19.0]), (39, [3, 13.0]), (40, [2, 2, 2, 5.0]), (41, [41]), (42, [2, 3, 7.0]), (43, [43]), (44, [2, 2, 11.0]), (45, [3, 3, 5.0]), (46, [2, 23.0]), (47, [47]), (48, [2, 2, 2, 2, 3.0]), (49, [7, 7]), (50, [2, 5, 5]), (51, [3, 17.0]), (52, [2, 2, 13.0]), (53, [53]), (54, [2, 3, 3, 3]), (55, [5, 11.0]), (56, [2, 2, 2, 7.0]), (57, [3, 19.0]), (58, [2, 29.0]), (59, [59]), (60, [2, 2, 3, 5.0]), (61,

In [26]:
result = {}
workers = 16
with timer('Elapsed: {}s'):
    with ThreadPool(workers) as pool:
        input_data = (i for i in range(1, DATA_SIZE+1))
        result = [
            (n, factors)
            for n, factors in enumerate(pool.map(factorize_naive, input_data), 1)
        ]
    
print(len(result), result[:100])

Elapsed: 33.573885440826416s
1000000 [(1, []), (2, [2]), (3, [3]), (4, [2, 2]), (5, [5]), (6, [2, 3.0]), (7, [7]), (8, [2, 2, 2]), (9, [3, 3]), (10, [2, 5.0]), (11, [11]), (12, [2, 2, 3.0]), (13, [13]), (14, [2, 7.0]), (15, [3, 5.0]), (16, [2, 2, 2, 2]), (17, [17]), (18, [2, 3, 3]), (19, [19]), (20, [2, 2, 5.0]), (21, [3, 7.0]), (22, [2, 11.0]), (23, [23]), (24, [2, 2, 2, 3.0]), (25, [5, 5]), (26, [2, 13.0]), (27, [3, 3, 3]), (28, [2, 2, 7.0]), (29, [29]), (30, [2, 3, 5.0]), (31, [31]), (32, [2, 2, 2, 2, 2]), (33, [3, 11.0]), (34, [2, 17.0]), (35, [5, 7.0]), (36, [2, 2, 3, 3]), (37, [37]), (38, [2, 19.0]), (39, [3, 13.0]), (40, [2, 2, 2, 5.0]), (41, [41]), (42, [2, 3, 7.0]), (43, [43]), (44, [2, 2, 11.0]), (45, [3, 3, 5.0]), (46, [2, 23.0]), (47, [47]), (48, [2, 2, 2, 2, 3.0]), (49, [7, 7]), (50, [2, 5, 5]), (51, [3, 17.0]), (52, [2, 2, 13.0]), (53, [53]), (54, [2, 3, 3, 3]), (55, [5, 11.0]), (56, [2, 2, 2, 7.0]), (57, [3, 19.0]), (58, [2, 29.0]), (59, [59]), (60, [2, 2, 3, 5.0]), (61,