# Advanced Python - Building Scalable Applications

## Module 3

#### Synchronization mechanisms for threads and processes
 - Synchronizing flow-control using ```Barrier```
 - Mutual exclusion patterns using ```Lock``` and ```RLock```
 - Wait/notify patterns using ```Condition``` and ```Event```
 - Bandwidth/Resource management and control using ```Semaphore``` and ```BoundedSemaphore```
 - Producer/Consumer patterns using ```Queue```
 - Using ```threading.local()``` to manage thread-local data

#### Sharing and Exchanging data between processes (Overview)
 - Streaming data using ```Pipe``` and ```Queue```
 - Sharing counters and buffers using ```Value``` and ```Array```
 - Sharing python lists and dictionaries using ```Manager```
 - Creating and managing shared memory using ```multiprocessing.shared_memory``` features


In [8]:
from threading import Thread, Condition, current_thread as current

c = Condition()

def testfn():
    t = current()
    with c:
        print(f"{t.name} is waiting")
        c.wait()
        print(f"{t.name} is done waiting")

threads = []
for i in range(5):
    t = Thread(target=testfn, name=f"Thread-{i}")
    threads.append(t)
    t.start()

Thread-0 is waiting
Thread-1 is waiting
Thread-2 is waiting
Thread-3 is waiting
Thread-4 is waiting


In [12]:
from threading import Thread, Condition, current_thread as current

c = Condition()

def testfn():
    t = current()
    for i in range(3):
        with c:
            print(f"{i}: {t.name} is waiting")
            c.wait()
            print(f"{i}: {t.name} is done waiting")

threads = []
for i in range(5):
    t = Thread(target=testfn, name=f"Thread-{i}")
    threads.append(t)
    t.start()

0: Thread-0 is waiting
0: Thread-1 is waiting
0: Thread-2 is waiting
0: Thread-3 is waiting
0: Thread-4 is waiting


In [15]:
with c:
    c.notify_all()

2: Thread-4 is done waiting
2: Thread-3 is done waiting
2: Thread-0 is done waiting
2: Thread-1 is done waiting
2: Thread-2 is done waiting


In [10]:
from threading import Thread, Event, current_thread as current

e = Event()

def testfn():
    t = current()
    print(f"{t.name} is waiting")
    e.wait()
    print(f"{t.name} is done waiting")

threads = []
for i in range(5):
    t = Thread(target=testfn, name=f"Thread-{i}")
    threads.append(t)
    t.start()


Thread-0 is waitingThread-1 is waiting

Thread-2 is waiting
Thread-3 is waiting
Thread-4 is waiting


In [16]:
from threading import Thread, Event, current_thread as current

e = Event()

def testfn():
    t = current()
    for i in range(3):
        print(f"{i}: {t.name} is waiting")
        e.wait()
        print(f"{i}: {t.name} is done waiting")

threads = []
for i in range(5):
    t = Thread(target=testfn, name=f"Thread-{i}")
    threads.append(t)
    t.start()


0: Thread-0 is waiting
0: Thread-1 is waiting
0: Thread-2 is waiting
0: Thread-3 is waiting
0: Thread-4 is waiting


In [17]:
e.set()

0: Thread-0 is done waiting0: Thread-1 is done waiting
1: Thread-1 is waiting
1: Thread-1 is done waiting
2: Thread-1 is waiting
2: Thread-1 is done waiting
0: Thread-2 is done waiting
1: Thread-2 is waiting
1: Thread-2 is done waiting
2: Thread-2 is waiting
2: Thread-2 is done waiting
0: Thread-3 is done waiting
1: Thread-3 is waiting
1: Thread-3 is done waiting
2: Thread-3 is waiting
2: Thread-3 is done waiting
0: Thread-4 is done waiting
1: Thread-4 is waiting
1: Thread-4 is done waiting
2: Thread-4 is waiting
2: Thread-4 is done waiting

1: Thread-0 is waiting
1: Thread-0 is done waiting
2: Thread-0 is waiting
2: Thread-0 is done waiting


### Semaphores
Used for limiting bandwidth to a resource

Semaphores are available in 3 forms:
  1. Counting Semaphore (standard semaphore)
  2. Binary Semaphore
  3. Null Semaphore

NOTE: Though binary semaphores resemble a mutex (Lock) - they lack "strict mutex discipline"



In [18]:
from threading import Thread, Semaphore, current_thread as current

s = Semaphore(0)

def testfn():
    t = current()
    for i in range(3):
        print(f"{i}: {t.name} is waiting")
        s.acquire()
        print(f"{i}: {t.name} is done waiting")

threads = []
for i in range(5):
    t = Thread(target=testfn, name=f"Thread-{i}")
    threads.append(t)
    t.start()


0: Thread-0 is waiting
0: Thread-1 is waiting
0: Thread-2 is waiting
0: Thread-3 is waiting
0: Thread-4 is waiting


In [29]:
s.release()

2: Thread-0 is done waiting


In [30]:
from queue import Queue

q = Queue(5)


In [31]:
q.put(10)

In [43]:
q.put(20)
q.put(30)
q.put(67)


In [48]:
q.put(60)
q.put(54)

In [51]:
q.qsize()

5

In [45]:
q.maxsize

5

In [None]:
q.empty() # q.qsize() == 0

False

In [None]:
q.full() # q.qsize() == q.maxsize

True

In [56]:
print(f"Queue size: {q.qsize()}, Queue Empty ? {q.empty()}")
q.get()
print(f"Queue size: {q.qsize()}, Queue Empty ? {q.empty()}")

Queue size: 1, Queue Empty ? False
Queue size: 0, Queue Empty ? True


In [57]:
q.get(block=False)

Empty: 

In [58]:
q.get(timeout=5)

Empty: 

In [66]:
from threading import Thread, current_thread as current
from queue import Queue

q = Queue(5)

def testfn():
    t = current()
    for i in range(3):
        print(f"{i}: {t.name} is waiting to put")
        q.put(i+10)
        print(f"{i}: {t.name} is done putting")
    print(f"{t.name} is waiting for acknowledgement")
    q.join()
    print(f"{t.name} is done waiting for acknowledgement")

t = Thread(target=testfn, name="Producer")
t.start()


0: Producer is waiting to put
0: Producer is done putting
1: Producer is waiting to put
1: Producer is done putting
2: Producer is waiting to put
2: Producer is done putting
Producer is waiting for acknowledgement


In [69]:
print(q.get())
q.task_done()

12
Producer is done waiting for acknowledgement


In [70]:
from queue import PriorityQueue

pq = PriorityQueue()


In [71]:
pq.put(20)
pq.put(78)
pq.put(12)
pq.put(45)


In [75]:
pq.get()

78