# Unknown Source (threading, multiprocessing, Queue)

In [5]:
import threading

def countdown():
     x = 1000000000
     while x > 0:
           x -= 1
# Implementation 1: Multi-threading
def implementation_1():
     thread_1 = threading.Thread(target=countdown)
     thread_2 = threading.Thread(target=countdown)
     thread_1.start()
     thread_2.start()
     thread_1.join()
     thread_2.join()
# Implementation 2: Run in serial
def implementation_2():
     countdown()
     countdown()

In [12]:
%timeit implementation_2

39.5 ns ± 3.5 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)


In [11]:
%timeit implementation_1

38.6 ns ± 4.08 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)


In [8]:
import multiprocessing
# countdown() is defined in the previous snippet.
def implementation_3():
     process_1 = multiprocessing.Process(target=countdown)
     process_2 = multiprocessing.Process(target=countdown)
     process_1.start()
     process_2.start()
     process_1.join()
     process_2.join()

In [10]:
%timeit implementation_3

40.7 ns ± 2.84 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)


In [13]:
import threading
from queue import Queue
import time

In [3]:
def testThread(num):
    return num+100

In [23]:
%%timeit
for i in range(1000):
    t = threading.Thread(target=testThread, args=(i,))
    t.start()

118 ms ± 3.83 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [22]:
%%timeit 
for i in range(1000):
    testThread(i)

195 µs ± 909 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [26]:
print_lock = threading.Lock()

def threadTest():
    # when this exits, the print_lock is released
    with print_lock:
        print(worker)

def threader():
  while True:
    # get the job from the front of the queue
    threadTest(q.get())
    q.task_done()

q = Queue()
for x in range(5):
    thread = threading.Thread(target = threader)
    # this ensures the thread will die when the main thread dies
    # can set t.daemon to False if you want it to keep running
    t.daemon = True
    t.start()

for job in range(10):
    q.put(job)

RuntimeError: cannot set daemon status of active thread

In [27]:
import multiprocessing
def spawn():
  print('test!')

In [28]:
  for i in range(5):
    p = multiprocessing.Process(target=spawn)
    p.start()

test!
test!
test!
test!
test!


In [30]:
%%timeit 
for i in range(1000):
    p = multiprocessing.Process(target=testThread, args=(i,))
    p.start()

7.18 s ± 78.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
def fuunc(x):
    return x**2

In [None]:
import threading
t1 = threading.Thread(target=fuunc, args=(5,))
t2 = threading.Thread(target=fuunc, args=(5,))
t1.start()
t2.start()
t2.join()
t2.join()
import multiprocessing # same syntax

In [32]:
# this is higher API over threading and multiprocessing, losing flexibility though
from concurrent.futures import ThreadPoolExecutor

In [33]:
def test_multithreading(func, args, workers):
    with ThreadPoolExecutor(workers) as ex:
        res = ex.map(func, args)
    return list(res)
def test_multiprocessing(func, args, workers):
    with ProcessPoolExecutor(work) as ex:
        res = ex.map(func, args)
    return list(res)

In [36]:
%time test_multithreading(testThread, list(range(10000)), 4);

CPU times: user 299 ms, sys: 12.3 ms, total: 311 ms
Wall time: 312 ms


[100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185,
 186,
 187,
 188,
 189,
 190,
 191,
 192,
 193,
 194,
 195,
 196,
 197,
 198,
 199,
 200,
 201,
 202,
 203,
 204,
 205,
 206,
 207,
 208,
 209,
 210,
 211,
 212,
 213,
 214,
 215,
 216,
 217,
 218,
 219,
 220,
 221,
 222,
 223,
 224,
 225,
 226,
 227,
 228,
 229,
 230,
 231,
 232,
 233,
 234,
 235,
 236,
 237,
 238,
 239,
 240,
 241,
 242,
 243,
 244,
 245,
 246,
 247,
 248,
 249,
 250,
 251,
 252,
 253,
 254,
 255,
 256,
 257,
 258,
 259,
 260,
 261,
 262,
 263,
 264,
 265,
 266

In [None]:
%time test_multithreading(testThread, list(range(10000)), 4);

In [4]:
import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)

def main():
    ts = time()
    with Pool(4) as p:
        p.map(testThread, list(range(1000)))
    logging.info('Took %s seconds', time() - ts)


In [5]:
main()

2019-09-27 15:47:52,405 - root - INFO - Took 0.11230731010437012 seconds


# Kite: parent-child Proc and IPC with Queue and Pipe

In [1]:
# Spawn a child proc

import multiprocessing

def worker():
    print("worker-1 ready for worker")

proc = multiprocessing.Process(target=worker)
proc.start()

worker-1 ready for worker


In [2]:
# IPC with Pipe

def worker(conn):
    print(conn.recv())
    conn.send("send from child proc")

conn1, conn2 = multiprocessing.Pipe()

In [3]:
conn1, conn2

(<multiprocessing.connection.Connection at 0x7fa4a83b9e10>,
 <multiprocessing.connection.Connection at 0x7fa4a83b9bd0>)

In [14]:
# compare with os.Pipe

import os

conn_r, conn_w = os.pipe()

In [15]:
conn_r, conn_w

(58, 59)

In [4]:
proc = multiprocessing.Process(target=worker, args=[conn2])
proc.start()

send from main process


In [5]:
proc.is_alive()

True

In [6]:
proc.pid

21518

In [7]:
proc.sentinel # FD used for waiting for termination

46

In [8]:
conn1.send("send from main process")

In [10]:
print(conn1.recv())

send from child proc


In [11]:
# IPC using Queue

import queue

q = queue.Queue()

def worker1(q):
    print("worker1 received: " + q.get())

def worker2(q):
    print("worker2 received: " + q.get())
    
queue = multiprocessing.Queue()

(q, queue) # near clone of queue

proc1 = multiprocessing.Process(target=worker1, args=[queue])
proc1.start()

proc2 = multiprocessing.Process(target=worker2, args=[queue])
proc2.start()

queue.put("hello")
queue.put("world")

worker1 received: hello
worker2 received: world


# GeekforGeek

In [12]:
import multiprocessing

In [13]:
def print_cube(num):
    """
    func to print cube of given num
    """
    print("Cube: {}".format(num * num * num))
    
def print_square(num):
    """
    func to print square of given num
    """
    print("Square: {}".format(num * num))
    
proc1 = multiprocessing.Process(target=print_square, args=(10,))
proc2 = multiprocessing.Process(target=print_cube, args=(10,))

proc1.start()
proc2.start()

proc1.join()
proc2.join() # wait until proc2 finishes

print("Done !")

Square: 100
Cube: 1000
Done !


> Creating process with target(func to execute) and args(args passed) NOTE: Process constructor takes many other args -> start method followed by join to wait till complete of current program (running on the same python script)

In [17]:
# check detail

import os

def worker1():
    print("PID of worker1: {}".format(os.getpid()))
def worker2():
    print("PID of worker2: {}".format(os.getpid()))

print("PID of main process: {}".format(os.getpid()))

p1 = multiprocessing.Process(target=worker1)
p2 = multiprocessing.Process(target=worker2)

p1.start()
p2.start()

print("PID pf p1: {}".format(p1.pid))
print("PID of p2: {}".format(p2.pid))

p1.join()
p2.join()

print("Process p1 is alive: {}".format(p1.is_alive()))

PID of main process: 21464
PID of worker1: 19572
PID of worker2: 19575
PID pf p1: 19572
PID of p2: 19575
Process p1 is alive: False


> Main script (or notebook) is difference PID while each process runs indy and has own MEM space 

In [18]:
import multiprocessing 
  
# empty list with global scope 
result = [] 
  
def square_list(mylist): 
    """ 
    function to square a given list 
    """
    global result 
    # append squares of mylist to global list result 
    for num in mylist: 
        result.append(num * num) 
    # print global list result 
    print("Result(in process p1): {}".format(result))
    
mylist = [1,2,3,4]

p1 = multiprocessing.Process(target=square_list,
                            args=(mylist, ))
p1.start()
p1.join()

print("Result in main program: {}".format(result))

Result(in process p1): [1, 4, 9, 16]
Result in main program: []


> the content of global list object result at 2 places (1) square_list func called by p1 hence changed in MEM space of p1 ONLY and (2) after join() of p1 in main proc, since ran by different proc its MEM space still contains EMPTY list

![Iamge](https://media.geeksforgeeks.org/wp-content/uploads/multiprocessing-python-1.png)

In [22]:
# sharing data inter-proce via special array and value object

# array - ctype array alloc 

import multiprocessing

def square_list(mylist, result, square_sum):
    for idx, num in enumerate(mylist):
        result[idx] = num * num
        
    square_sum.value = sum(result)
    
    print("Result in process p1: {}".format(result[:]))
    
    print("Sum of in proceess p1: {}".format(square_sum.value))
    
mylist=[1,2,3,4]

result = multiprocessing.Array('i', 4) # int with spacy for 4 int

square_sum = multiprocessing.Value('i')

p1 = multiprocessing.Process(target=square_list,
                            args=(mylist, result, square_sum))

p1.start()
p1.join()

print("Result in main proc: {}".format(result[:]))
print("Sum in main proc: {}".format(square_sum.value))

Result in process p1: [1, 4, 9, 16]
Sum of in proceess p1: 30
Result in main proc: [1, 4, 9, 16]
Sum in main proc: 30


![Image](https://media.geeksforgeeks.org/wp-content/uploads/multiprocessing-python-2.png)

> Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types like lists, dictionaries, Queue, Value, Array, etc. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

In [23]:
import multiprocessing 
  
def print_records(records): 
    """ 
    function to print record(tuples) in records(list) 
    """
    for record in records: 
        print("Name: {0}\nScore: {1}\n".format(record[0], record[1])) 
  
def insert_record(record, records): 
    """ 
    function to add a new record to records(list) 
    """
    records.append(record) 
    print("New record added!\n") 
  

with multiprocessing.Manager() as manager: 
    # creating a list in server process memory 
    records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)]) 
    # new record to be inserted in records 
    new_record = ('Jeff', 8) 

    # creating new processes 
    p1 = multiprocessing.Process(target=insert_record, args=(new_record, records)) 
    p2 = multiprocessing.Process(target=print_records, args=(records,)) 

    # running process p1 to insert new record 
    p1.start() 
    p1.join() 

    # running process p2 to print records 
    p2.start() 
    p2.join() 

New record added!

Name: Sam
Score: 10

Name: Adam
Score: 9

Name: Kevin
Score: 9

Name: Jeff
Score: 8



![Image](https://media.geeksforgeeks.org/wp-content/uploads/multiprocessing-python-3.png)

In [24]:
# IPC - mapReduce work Queue or Pipe

# Queue: a simple way to IPC using Queue to pass msg back and forth for any Python object

import multiprocessing 
  
def square_list(mylist, q): 
    """ 
    function to square a given list 
    """
    # append squares of mylist to queue 
    for num in mylist: 
        q.put(num * num) 
  
def print_queue(q): 
    """ 
    function to print queue elements 
    """
    print("Queue elements:") 
    while not q.empty(): 
        print(q.get()) 
    print("Queue is now empty!") 
  

# input list 
mylist = [1,2,3,4] 

# creating multiprocessing Queue 
q = multiprocessing.Queue() 

# creating new processes 
p1 = multiprocessing.Process(target=square_list, args=(mylist, q)) 
p2 = multiprocessing.Process(target=print_queue, args=(q,)) 

# running process p1 to square list 
p1.start() 
p1.join() 

# running process p2 to get queue elements 
p2.start() 
p2.join() 

Queue elements:
1
4
9
16
Queue is now empty!


1. Creating Queue as q
2. Pass empty q to square_list func through proc p1 with Elements inserted using put
3. To print queue content using get() until queue is not empty

PIPE - duplex or two-endpoint

Preferred over queue when only two-way comm needed. 

In [25]:
import multiprocessing 
  
def sender(conn, msgs): 
    """ 
    function to send messages to other end of pipe 
    """
    for msg in msgs: 
        conn.send(msg) 
        print("Sent the message: {}".format(msg)) 
    conn.close() 
  
def receiver(conn): 
    """ 
    function to print the messages received from other 
    end of pipe 
    """
    while 1: 
        msg = conn.recv() 
        if msg == "END": 
            break
        print("Received the message: {}".format(msg)) 
  

# messages to be sent 
msgs = ["hello", "hey", "hru?", "END"] 

# creating a pipe 
parent_conn, child_conn = multiprocessing.Pipe() 

# creating new processes 
p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs)) 
p2 = multiprocessing.Process(target=receiver, args=(child_conn,)) 

# running processes 
p1.start() 
p2.start() 

# wait until processes finish 
p1.join() 
p2.join() 

Sent the message: hello
Sent the message: hey
Received the message: hello
Sent the message: hru?
Sent the message: END
Received the message: hey
Received the message: hru?


1. Creating pipe with 2 conn objects for two endpoints of the pipe
2. Send from one endpoint using send
3. Receive from another end using recv

- above read msg until "END" received

![Image](https://media.geeksforgeeks.org/wp-content/uploads/multiprocessing-python-5.png)

> Data in pipe may be corrupted if two procs (or threads) try to read/write to the SAME END AT ONCE, of course no risk if using different ends! Queue do proper SYNC between proc at the expense of more complexity hence said to be proc/thread safe.

RACE CONDITION

In [27]:
# Python program to illustrate  
# the concept of race condition 
# in multiprocessing 
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance):     
    for _ in range(10000): 
        balance.value = balance.value - 1
  
# function to deposit to account 
def deposit(balance):     
    for _ in range(10000): 
        balance.value = balance.value + 1
  
def perform_transactions(): 
  
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
  
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,)) 
  
    # starting processes 
    p1.start() 
    p2.start() 
  
    # wait until processes are finished 
    p1.join() 
    p2.join() 
  
    # print final balance 
    print("Final balance = {}".format(balance.value)) 
  

for _ in range(10): 

    # perform same transaction process 10 times 
    perform_transactions() 

Final balance = 1645
Final balance = 324
Final balance = 150
Final balance = 1434
Final balance = 360
Final balance = 33
Final balance = -2308
Final balance = -416
Final balance = 481
Final balance = -1212


1. 100000 withdraw and 10000 deposit are carried out with initial balanc es at 100, but what get 10 iterations is some different values
2. Due to conrruent access of proc to shared data balance - unpredictability in balance is nothing but race conditon

LOCK - Lock is made of Semaphore object provided by the OS

> A semaphore is a synchronization object that controls access by multiple processes to a common resource in a parallel programming environment. It is simply a value in a designated place in operating system (or kernel) storage that each process can check and then change. Depending on the value that is found, the process can use the resource or will find that it is already in use and must wait for some period before trying again. Semaphores can be binary (0 or 1) or can have additional values. Typically, a process using semaphores checks the value and then, if it using the resource, changes the value to reflect this so that subsequent semaphore users will know to wait.

In [28]:
# Python program to illustrate  
# the concept of locks 
# in multiprocessing 
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance, lock):     
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value - 1
        lock.release() 
  
# function to deposit to account 
def deposit(balance, lock):     
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value + 1
        lock.release() 
  
def perform_transactions(): 
  
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
  
    # creating a lock object 
    lock = multiprocessing.Lock() 
  
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,lock)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,lock)) 
  
    # starting processes 
    p1.start() 
    p2.start() 
  
    # wait until processes are finished 
    p1.join() 
    p2.join() 
  
    # print final balance 
    print("Final balance = {}".format(balance.value)) 
  

for _ in range(10): 

    # perform same transaction process 10 times 
    perform_transactions() 

Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100


1. Lock object created
2. Lock passed as target func arg
3. In CRITICAL SECTION of target func, apply lock using lock.acquire() until done and .release()

POOL

1. only one core is used for program execution and quite possible that other cores idel
2. Pool a batch of worker procs to offload work

In [29]:
# Python program to understand  
# the concept of pool 
import multiprocessing 
import os 
  
def square(n): 
    print("Worker process id for {0}: {1}".format(n, os.getpid())) 
    return (n*n) 
  

# input list 
mylist = [1,2,3,4,5] 

# creating a pool object 
p = multiprocessing.Pool() 

# map list to target function 
result = p.map(square, mylist) 

print(result) 

Worker process id for 1: 27709
Worker process id for 2: 27710
Worker process id for 3: 27711
Worker process id for 4: 27712
Worker process id for 5: 27713
[1, 4, 9, 16, 25]


1. create pool with args like control offloading, num_workers and max task per child
2. some INIT as func and args as proc started
3. map it to some func 
4. return and reduced

# Offical Doc - asyncio run

In [1]:
import asyncio

In [2]:
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

RuntimeError: asyncio.run() cannot be called from a running event loop

In [3]:
await main()

started at 17:24:16
hello
world
finished at 17:24:19


# RAY - another DASK