# DS-GA 1019

# Lab 9: Concurrency 
## Mar. 30, 2023

In [1]:
import time
import threading

def do_something():
    print('Sleeping 1 second...')
    time.sleep(1)
    print('Done sleeping')

In [2]:
start = time.time()

do_something()
do_something()

finish = time.time()

print(f'Finished in {(finish - start):.1f} seconds')

Sleeping 1 second...
Done sleeping
Sleeping 1 second...
Done sleeping
Finished in 2.0 seconds


#### Sequential code

<img   src="images/im1.png" alt="Drawing" style="width: 500px;"/>


#### Multi threaded code

<img   src="images/im2.png" alt="Drawing" style="width: 500px;"/>


In [3]:
start = time.time()

t1 = threading.Thread(target = do_something)
t2 = threading.Thread(target = do_something)

t1.start()
t2.start()

#wait for the threads to finish
t1.join()
t2.join()

finish = time.time()

print(f'Finished in {(finish - start):.1f} seconds')

Sleeping 1 second...
Sleeping 1 second...
Done sleepingDone sleeping

Finished in 1.0 seconds


In [4]:
def do_something_specific(seconds):
    print(f'Sleeping {seconds} seconds...')
    time.sleep(seconds)
    print('Done sleeping')

In [5]:
start = time.time()

threads = []

for _ in range(10):
    t = threading.Thread(target = do_something_specific, args = [2])
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

finish = time.time()

print(f'Finished in {(finish - start):.1f} seconds')

Sleeping 2 seconds...
Sleeping 2 seconds...
Sleeping 2 seconds...
Sleeping 2 seconds...
Sleeping 2 seconds...
Sleeping 2 seconds...
Sleeping 2 seconds...
Sleeping 2 seconds...
Sleeping 2 seconds...
Sleeping 2 seconds...
Done sleepingDone sleeping

Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Finished in 2.0 seconds


### Queue

In [6]:
from queue import Queue 

my_queue = Queue(maxsize=0) #FIFO queue
my_queue.put(1)
my_queue.put(2)
my_queue.put(3)
print(my_queue.get())


1


In [7]:
q = Queue()
num_threads = 2

In [8]:
def worker():
    while True:
        print(f'Waiting for message, id = {threading.get_ident()}')
        item = q.get()
        print(f'Message received = {item}, id = {threading.get_ident()}')
        
        if item is not None:
            do_work(item)
            q.task_done()
        else:
            q.task_done()
            break

In [9]:
def do_work(item):
    print(f'Processing message .... {threading.get_ident()} -- {item}')
    time.sleep(2)
    print(f'Message processed .... {threading.get_ident()} -- {item}')

In [10]:
threads = []

for _ in range(num_threads):
    t = threading.Thread(target = worker)
    t.start()
    threads.append(t)

Waiting for message, id = 6267121664
Waiting for message, id = 6283948032


In [11]:
#Add items to queue
for item in ['wuphf','dot','com']:
    q.put(item)

Message received = wuphf, id = 6267121664Message received = dot, id = 6283948032
Processing message .... 6283948032 -- dot

Processing message .... 6267121664 -- wuphf


In [12]:
#Print all running threads
threading.enumerate()

[<_MainThread(MainThread, started 8475902272)>,
 <Thread(IOPub, started daemon 6147616768)>,
 <Heartbeat(Heartbeat, started daemon 6164443136)>,
 <Thread(Thread-3, started daemon 6182416384)>,
 <Thread(Thread-4, started daemon 6199242752)>,
 <ControlThread(Control, started daemon 6216069120)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 6232895488)>,
 <ParentPollerUnix(Thread-2, started daemon 6250295296)>,
 <Thread(Thread-17, started 6267121664)>,
 <Thread(Thread-18, started 6283948032)>]

In [13]:
#stop threads
for _ in range(num_threads):
    q.put(None)

### Multiprocessing

Tasks are executed on multiple processors / cpus

In [14]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

Number of processors:  10


### Example: Calculate e^x of elements of an array

In [15]:
import numpy as np
arr = np.random.randint(0, 10, size=[5000])
data = arr.tolist()


In [16]:
def factorial_upto(n):
    res = [1]
    
    for i in range(1, n + 1):
        res.append(res[-1] * i)
    return res

def taylor_exp(x,n=1000):
    
    factorials = factorial_upto(n)
    
    res = 0
    for i in range(n):
        res += x**i / factorials[i]
    
    return res

### Sequential solution

In [17]:
start = time.time()
results = []
for x in data:
    results.append(taylor_exp(x))

print(f'Finished in {(time.time() - start):.4f} seconds')


Message processed .... 6283948032 -- dotMessage processed .... 6267121664 -- wuphf
Waiting for message, id = 6267121664
Message received = com, id = 6267121664
Processing message .... 6267121664 -- com

Waiting for message, id = 6283948032
Message received = None, id = 6283948032
Message processed .... 6267121664 -- com
Waiting for message, id = 6267121664
Message received = None, id = 6267121664
Finished in 6.4327 seconds


### Parallelizing using Pool.map

In [18]:
start = time.time()

pool = mp.Pool(mp.cpu_count())

results_mp = pool.map(taylor_exp, [x for x in data])

pool.close()
print(f'Finished in {(time.time() - start):.4f} seconds')

assert (results_mp == results)

Process SpawnPoolWorker-2:
Process SpawnPoolWorker-1:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'taylor_exp' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/an

KeyboardInterrupt: 

In [None]:
!python3 pool.py

Finished in 1.5272 seconds


### Parallelizing using Pool.starmap

Lets us pass multiple arguments

In [19]:
start = time.time()

pool = mp.Pool(mp.cpu_count())

results_smp = pool.starmap(taylor_exp, [(x,1000) for x in data])

pool.close()
print(f'Finished in {(time.time() - start):.4f} seconds')

assert (results_smp == results)

Process SpawnPoolWorker-61:
Process SpawnPoolWorker-62:
Process SpawnPoolWorker-63:
Process SpawnPoolWorker-64:
Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'taylor_exp' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._k

KeyboardInterrupt: 

g/connection.py", line 419, in _recv_bytes
    buf = self._recv(4)
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/queues.py", line 364, in get
    with self._rlock:
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/lib/python3.9/multiprocessing/process.py", line 31

In [None]:
!python3 pool_star.py

Finished in 1.4978 seconds


### Global interpreter lock

Only one thread can access the interpreter at a time due to GIL

Python releases GIL 

- while a thread is waiting for IO
- while numpy is doing an array operation

More info on Python GIL - _https://realpython.com/python-gil/_

In [None]:
import math

def f(x): #Doesnot releast GIL
    print (x)
    y = [1]*5000000
    [math.exp(i) for i in y]
    
def g(x):   #Releases GIL
    print (x)
    y = np.ones(5000000)
    np.exp(y)

def do_work(q,func):
    while True:
        item = q.get()
        
        if item is not None:
            func(item)
            q.task_done()
        else:
            q.task_done()
            break

### serial f()

In [None]:
start = time.time()

for i in range(10):
    f(i)

print(f'Finished in {(time.time() - start):.4f} seconds')


0
1
2
3
4
5
6
7
8
9
Finished in 10.2603 seconds


### threaded f()

In [None]:
start = time.time()

q = Queue()
num_threads = 4

for i in range(num_threads):
    worker = threading.Thread(target = do_work, args = (q,f)) # refer to q
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

# now we have started 10 threads:

for i in range(10):
    q.put(i)

q.join()
print(f'Finished in {(time.time() - start):.4f} seconds')


  worker.setDaemon(True) # this stop the threads when the program quits


0
1
2
3
4
5
6
7
8
9
Finished in 11.6809 seconds


### parallel f()

In [30]:
start = time.time()

pool = mp.Pool(4)

results_mp = pool.map(f, [x for x in range(10)])

pool.close()
print(f'Finished in {(time.time() - start):.4f} seconds')


1023



4
5
6
7
8
9
Finished in 3.8591 seconds


In [31]:
!python3 parallel_f.py

0
1
2
3
4
5
6
7
8
9
Finished in 3.6530 seconds


### serial g()

In [32]:
start = time.time()

for i in range(100):
    g(i)

print(f'Finished in {(time.time() - start):.4f} seconds')


0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
Finished in 2.5225 seconds


### threaded g()

In [33]:
start = time.time()

q = Queue()
num_threads = 4

for i in range(num_threads):
    worker = threading.Thread(target = do_work, args = (q,g)) # refer to q
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

# now we have started 10 threads:

for i in range(100):
    q.put(i)

q.join()
print(f'Finished in {(time.time() - start):.4f} seconds')


  worker.setDaemon(True) # this stop the threads when the program quits


0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
8384

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
Finished in 1.7848 seconds
