# Module 5: Thread Safety
Course: Advanced Programming for CSAI (Spring 2025)

Topics covered in this module:  
- Using Locks to deal with Race Conditions
- Semaphores and Events
- Deadlocks

**Warning**: some students found problems when running the multiprocessing library from within Jupyter Notebooks on Windows. The problems were solved when running the notebooks from JupyterLab: https://jupyterlab.uvt.nl. 

You are advised to work on this notebook after, or in parallel to, consulting other materials of the module, such as the slide deck and book chapters. The notebook contains examples and exercises that should help you understand and apply the concepts introduced in the rest of materials. You may also use the official Python docs: https://docs.python.org/3/.

Do not hesitate to be creative when trying out the examples: you can play with the code. You can try variants of the examples and exercises, print values of the variables to understand what is going on at every step, and come up with different solutions to the same exercise and think about relative advantages of each one.

The notebook also contains formative assignments. These are indicated as FA-n, where n is a number id. As explained on the course guide, you have to submit these. Please submit your best effort (*i.e.*, FA-n questions with no answer will be considered incomplete), and **if your solution does not work or you think it is inadequate, add a comment explaining why you could not proceed further**.

To submit the formative assignments, we ask you to upload the filled-in notebook. The notebook you upload should contain *at least* the formative assigments. It's not a problem if you upload the notebook with additional code, like the variants and tests mentioned above. However, to grade your assignments, we will only look at the answers to the requested exercises (those indicated with FA-n), so **make sure you store your answers in the corresponding variables and/or to name your functions as indicated**.

Optional exercises are, as the name indicates, not mandatory for the formative assignments. These are exercises that suggest you to create an alternative approach, or which propose a longer problem that allows for the integration of earlier concepts in one solution; in general, they present scenarios where you can be more creative. To make the most of the course, it is best to try them out and share your solutions on the Discussion Board, so that your peers can comment on them. You are also encouraged to comment on the exercises of your fellow students. This will help you sharpen your evaluation skills, which is a great asset in programming, as in turn this will help you devise more robust, efficient and maintainable solutions. 


### /!\ Before submitting your notebook

Please check it can be ran without errors! You can check this by pressing kernel --> restart and run all before submitting. If it does not run without errors, it is your **responsibility** to fix the problem either by resolving the bug in your code or by commenting it out along with a comment.


---

In [8]:
#Import libraries
import multiprocessing as mp
import threading as tr
import time
import random

## Race Condition
#### When tasks that should run in order run out of order due to lack of control.

One of the problems that might arise when you attempt to make a program run concurrently is a race condition. This condition occurs when you need threads or processes to run in a certain order or after certain events have taken place, but they do not do so due to lack of control. 
In many cases, this happens when access to a variable or to a shared resource is not handled properly. Take a look at the following example and try it out. 

In [9]:
#These two functions both increment the global variable num
def addOnex():
    global num
    for x in range(100000):
        num +=1
        
def addOney():
    global num
    for y in range(100000):
        num +=1

#Because of the non-deterministic way in which threading works, race conditions might not occur every time.
#You can see this when running both of the above functions in different threads at the same time, 
#and repeating this process 50 times.

for x in range(50):
    num = 0

    p1 = tr.Thread(target = addOney)
    p2 = tr.Thread(target = addOnex)

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print(num)


200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000
200000


In the example above, you expect the program to print 200000. However, what you will see when you run the program is that sometimes seemingly random numbers will be returned. This is caused when one thread reads the global variable before the other thread was able to finish the incrementation. 

How it is supposed to go:

<style> 

table { 
margin-left: 0px;
margin-right: auto;
}

</style>

|Thread A   |Thread B   |    num    |
|-----------|-----------|-----------|
|read num=0 |           |0          |
|num+=1     |           |0          |
|write num=1|           |1          |<-- Thread A added 1 to the num variable
|           |           |1          |
|           |read num=1 |1          |<-- The 1 is read by Thread B
|           |num+=1     |1          |<-- Which increments it by 1 again
|           |write num=2|2          |<-- And finally writes it to the num variable again
|           |           |2          |


However, when thread B reads the 'num' value before A has written it, a race condition arises as is illustrated below:

|Thread A   |Thread B   |    num    |
|-----------|-----------|-----------|
|read num=0 |           |0          |
|num+=1     |read num=0 |0          |<-- num value is read by Thread B before Thread A could write the new value
|write num=1|num+=1     |1          |<-- which makes Thread B increment the old value instead of the one thread A made
|           |write num=1|1          |<-- So when Thread B outputs it's new value it is the same as Thread A wrote
|           |           |1          |
|           |           |1          |

This is also what happens in the example code you saw above, which is why the results sometimes varied.



Let's have a look at some objects from the Threading and Multiprocessing libraries which can help prevent or solve these problems.

---

## Locks (also known as Mutex Locks)

A Lock is an object that is used to make sure only one thread or process can access a shared resource (which can be a variable, a database, a web service, etc.) at a time. Locks are placed in the code of the thread or process that is using the resource. 
Locks are created like this:

In [10]:

x = 0

#We create a Lock from the threading library (also available in multiprocessing):
x_lock = tr.Lock() 


#This one is still unsafe:
def unsafe_addOne():
    global x
    x += 1
    return x

#This version is half safe. It uses the lock defined above.
def halfsafe_addOne():
    global x
    x_lock.acquire() #acquire the lock (or wait until it is available).
                     #Other processes which have to acquire the x_lock before accessing x will have to wait...
    x += 1
    x_lock.release() #...until the lock is released
    return x
#The reason this function is only half safe is because 
#if the thread/process crashes while x is being changed, 
#no other processes can acquire x_lock anymore.

#To solve tihs, we can use try // finally:
def safe_addOne_v2():
    global x
    x_lock.acquire() 
    try:
        x += 1 
    finally: #if the thread/process fails, 
             #it will still execute anything in the context of *finally*
        x_lock.release() #make sure the lock is released even when an exception is raised
    
    return x

#Or we can also use *with*, which automatically releases the lock:
def safe_addOne():
    global x
   
    #or with a with statement:
    with x_lock:  #here the lock is acquired
        x += 1    
    #outside the *with* context, the lock is released
    return x



#Note that, even if you use Locks in parts of your code, other functions like unsafe_subOne 
#will still be able to modify x, because it is not told to acquire x_lock before accessing x!
#So make sure you implement the lock in all places where your shared data is accessed  

print(unsafe_addOne())

print(halfsafe_addOne())

print(safe_addOne())

print(safe_addOne_v2())

print(x)



1
2
3
4
4


### FA-1

The following code is not thread-safe, and can lead to a race condition.
Change the code so that it becomes thread-safe, using a Lock. 

Note: the call to time.sleep is included to simulate the race condition, but as you know,
it is not guaranteed to happen. Your code should still be written such that it prevents any
potential race condition.


In [11]:
#### FA-1 ################################

#Add your code where you see fit

lock=tr.Lock()
counter = 0

def increase(by):
    global counter
    lock.acquire()
    local_counter = counter
    local_counter += by
    time.sleep(1)
    counter = local_counter
    lock.release()
    print('counter={}'.format(counter))



# create threads
t1 = tr.Thread(target=increase, args=(10, ))
t2 = tr.Thread(target=increase, args=(20, ))

# start the threads
t1.start()
t2.start()


# wait for the threads to complete
t1.join()
t2.join()


print(f'The final counter is {counter}')

###########################################


#Expected output (after correcting race condition):
# counter=10
# counter=30
# The final counter is 30

counter=10
counter=30
The final counter is 30


#### FA-2:
Add a second function 'decrease(by)' that *decrements* the global 'counter' by 'by'. 
Then create two threads:
  - one calling 'increase(10)'
  - another calling 'decrease(5)'
Use a Lock to ensure no race condition occurs, 
and print the final counter value.

Expected outcome:
- The final counter should be (initial + 10 - 5).
- We see no unexpected intermediate values due to race conditions.

In [12]:
## Your solution goes here: ##
## FA-2:
counter=0
lock=tr.Lock()

def increase(by):
    global counter
    lock.acquire()
    local_counter = counter
    local_counter += by
    time.sleep(1)
    counter = local_counter
    
    print('counter={}'.format(counter))
    lock.release()
def decrease(by):
    global counter
    lock.acquire()
    local_counter = counter
    local_counter -= by
    time.sleep(1)
    counter = local_counter
    
    print('counter={}'.format(counter))
    lock.release()


t1 = tr.Thread(target=increase, args=(10,))
t2 = tr.Thread(target=decrease, args=(5,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Final counter =>", counter)

##############################
# Example usage:
# t1 = tr.Thread(target=increase, args=(10,))
# t2 = tr.Thread(target=decrease, args=(5,))
# ...
# print("Final counter =>", counter)

counter=10
counter=5
Final counter => 5


#### FA-3:
In the 'increase'/'decrease' scenario, run the code multiple times (e.g., 5 or 10 repetitions). 
Print the final counter value each time and verify it's consistent. 
If you remove the Lock, do you see inconsistencies?

Expected outcome:
- With the Lock, the final counter should always be the same (e.g., 5 if start=0). 
- Without the Lock, results may vary.


In [13]:
## Your solution goes here: ##
## FA-3:
#this is without lock
counter=0 

def increase(by):
    global counter
    local_counter = counter
    local_counter += by
    time.sleep(1)
    counter = local_counter
    print('counter={}'.format(counter))

def decrease(by):
    global counter
    local_counter = counter
    local_counter -= by
    time.sleep(1)
    counter = local_counter
    print('counter={}'.format(counter))

def run_increase_decrease_once():
    global counter
    t1 = tr.Thread(target=increase, args=(10,))
    t2 = tr.Thread(target=decrease, args=(5,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Final counter =>", counter)

for i in range(5):
   run_increase_decrease_once()
##############################

# Example usage:
# for i in range(5):
#    run_increase_decrease_once(...)
# => We see consistent final counters each run.

counter=10counter=-5

Final counter => -5
counter=5counter=-10

Final counter => -10
counter=-15counter=0

Final counter => 0
counter=-5
counter=10
Final counter => 10
counter=5
counter=20
Final counter => 20


In [14]:
## Your solution goes here: ##
## FA-3:
#this is with lock
counter=0 
x=tr.Lock()
def increase(by):
    global counter
    x.acquire()
    local_counter = counter
    local_counter += by
    time.sleep(1)
    counter = local_counter
    x.release()
    print('counter={}'.format(counter))

def decrease(by):
    global counter
    x.acquire()
    local_counter = counter
    local_counter -= by
    time.sleep(1)
    counter = local_counter
    x.release()
    print('counter={}'.format(counter))

def run_increase_decrease_once(): #main thread #we dont put lock here
    global counter
    counter=0 
    #main thread    #side thread
    t1 = tr.Thread(target=increase, args=(10,))
    #main thread    #side thread
    t2 = tr.Thread(target=decrease, args=(5,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Final counter =>", counter)

for i in range(5):
   run_increase_decrease_once()
##############################

# Example usage:
# for i in range(5):
#    run_increase_decrease_once(...)
# => We see consistent final counters each run.

counter=10
counter=5
Final counter => 5
counter=10
counter=5
Final counter => 5
counter=10
counter=5
Final counter => 5
counter=10
counter=5
Final counter => 5
counter=10
counter=5
Final counter => 5


#### FA-4:
Create a small class 'CounterLock' that holds:
  - an integer 'value'
  - a 'lock' (threading.Lock)
  - methods 'inc(by)' and 'dec(by)' that do locked increments/decrements.

Demonstrate usage by creating multiple threads calling inc(...) or dec(...). 
Show final value is correct.

Expected outcome:
- No race condition. The final 'value' is what we logically expect 
  from the sum of all increments/decrements.


In [15]:
## Your solution goes here: ##
## FA-4:
class CounterLock:

    def __init__(self): #these args we ask the user for their value
        #whatever we have in the init, they act as global variables for all other functions
        self.value=0 #we make this 0 because we dont ask the user for the value of 'value' and this is our counter
        self.lock=tr.Lock() #since we dont ask the user but we know the value of 'lock'- we dont add 'lock' to the arguments of init next to 'value'
    def inc(self,by):
        self.lock.acquire()
        self.value=self.value+by
        self.lock.release()

    def dec(self,by):
        self.lock.acquire()
        self.value=self.value-by
        self.lock.release()


c = CounterLock()
t1 = tr.Thread(target=c.inc, args=(10,))
t2 = tr.Thread(target=c.dec, args=(5,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Final value in c =>", c.value)
##############################

# Example usage:
# c = CounterLock()
# t1 = tr.Thread(target=c.inc, args=(10,))
# t2 = tr.Thread(target=c.dec, args=(5,))

# print("Final value in c =>", c.value)

Final value in c => 5


---

## RLock (Reentrant Mutex Lock)

RLocks are quite the same as Locks, but can be used by one thread multiple times. This is a common method to lock functions or methods. You can visualise this with the following example:

There is a bike which only one person can use at a time. First Alice takes a ride on the bike, and since the bike is in her possession, she can ring the bell while driving. Bob wants to move the bike to another location, but has to wait until Alice gets back before being able to do so. 

In [16]:
class Bicycle():
    def __init__(self):
        self.lock = tr.RLock()  #we create the RLock
    def drive(self):
        with self.lock:         #acquire lock
            print("Driving...")
            time.sleep(2)
            self.ringBell()     #call to a method that will reuse the lock
            time.sleep(2)
            print("Arrived")
    def ringBell(self):
        with self.lock:        #acquire lock
            print("Riding")
    def move(self):
        with self.lock:        #acquire lock
            print("Moved bike to another spot")
            
#We have one single bicycle (with one single lock)
b = Bicycle()
#We create a thread for each user of the bicycle:
alice = tr.Thread(target = b.drive)
bob = tr.Thread(target = b.move)

#Now we start the threads. Alice comes first:
alice.start()
time.sleep(1)
#Now Bob wants the bike. 
bob.start() #This process would start right away if Alice didn't have the lock
            #If Alice is still using the bike, it will wait...
alice.join() #When the thread is over, the lock is released. Now Bob can start.
bob.join()

Driving...
Riding
Arrived
Moved bike to another spot


### FA-5

The provided code below aims to implement a concurrent solution for a program that needs to perform some tasks
 (simulated with the function "task") and report on each of them (simulated with the function "report").
The provided functions are simplified versions, but you may assume that instead of "sleep", the tasks use some
shared resource and therefore they need thread-safety.
In order to make sure the program is thread-safe, the programmer has implemented a
Lock. However, this implementation of thread-safety is blocking the program: if you run it, you
will find that it gets stuck.

Looking at the code, find out what is the reason for this behavior. Adjust the code such that
thread-safety is handled in a way that allows the program to function normally.

IMPORTANT: the task and report functions should keep the same functionality, and protect the same
lines of code that are protected right now. Do not merge these functions.

In [17]:
#### FA-5 ###################################################################
#Add your code where you see fit
from time import sleep
from random import random
from multiprocessing import *

def report(lock, identifier):
    with lock:
        sleep(1)
        print("Process {} reported".format(identifier))

def task(lock, identifier, value):
    with lock:
        print("Process {} running... ".format(identifier))
        sleep(value)
        report(lock, identifier)

def main():
    lock = RLock() #the mistake was here - it needed to be from lock to Rlock because we lock two things parallel

    processes = [Process(target=task,
                        args=(lock, i, random())) for i in range(10)]
    for process in processes:
        process.start()
    for process in processes:
        process.join()
main()

#output:
# Process 0 running... 
# Process 0 reported
# Process 1 running... 
# Process 1 reported
# Process 2 running... 
# Process 2 reported
# Process 3 running... 
# Process 3 reported
# Process 5 running... 
# Process 5 reported
# Process 4 running... 
# Process 4 reported
# Process 6 running... 
# Process 6 reported
# Process 7 running... 
# Process 7 reported
# Process 8 running... 
# Process 8 reported
# Process 9 running... 
# Process 9 reported
#############################################################################


#### FA-6:
Enhance the 'Bicycle' class example or create your own class 
that uses an RLock in multiple methods. 
In one method, call another method that also uses the RLock. 
Demonstrate that it doesn't cause a deadlock 
when the same thread re-enters the lock.

Expected outcome:
- The program runs without freezing. 
- We see logs from both methods, even though both require the RLock.

In [18]:
## Your solution goes here: ##
## FA-6:
class Bicycle():
    def __init__(self):
        self.lock = tr.RLock()  #we create the RLock
    def drive(self):
        with self.lock:         #acquire lock
            print("Driving...")
            time.sleep(2)
            self.ringBell()     #call to a method that will reuse the lock
            time.sleep(2)
            print("Arrived")
    def ringBell(self):
        with self.lock:        #acquire lock
            print("Riding")
    def move(self):
        with self.lock:        #acquire lock
            print("Moved bike to another spot")
            
#We have one single bicycle (with one single lock)
b = Bicycle()
#We create a thread for each user of the bicycle:
alice = tr.Thread(target = b.drive)
#Now we start the threads. Alice comes first:
alice.start()
alice.join() #When the thread is over, the lock is released. Now Bob can start.


##############################

# Example usage:
# b = Bicycle()
# alice = tr.Thread(target=b.drive)
# alice.start()
# alice.join()
# => We see "Driving..." then "Riding" etc.

Driving...
Riding
Arrived


#### FA-7:
Add some debug prints to show how many times the same thread 
'acquires' the RLock. 
Use rlock._is_owned() or create a small counter for demonstration.

Expected outcome:
- We see something like "Thread X acquires lock the 1st time..." 
  and again "... 2nd time..." if it's reentrant.

In [19]:
## Your solution goes here: ##
## FA-7:
import threading as tr
class Bicycle():
    def __init__(self):
        self.lock = tr.RLock()  #we create the RLock
        self.counter=0
    def drive(self):
        
        with self.lock:         #acquire lock
            self.counter=0
            self.counter+=1
            print(f"Thread {tr.current_thread().name} acquires lock the {self.counter} time...  ")
            # print(self.lock._is_owned()) instead of this we use self.counter=0
            print("Driving...")
            time.sleep(2)
            self.ringBell()     #call to a method that will reuse the lock
            time.sleep(2)
            print("Arrived") #this automatically means that it is == lock release
        
    def ringBell(self):
        with self.lock:        #acquire lock
            self.counter+=1
            print(f"Thread {tr.current_thread().name} acquires lock the {self.counter} time...  ")
            print('Riding')
    def move(self):
        
        with self.lock:        #acquire lock
            self.counter=0
            self.counter+=1
            print("Moved bike to another spot")
            print(f"Thread {tr.current_thread().name} acquires lock the {self.counter} time...  ")


#We have one single bicycle (with one single lock)
b = Bicycle()
#We create a thread for each user of the bicycle:
alice = tr.Thread(target = b.drive)
#Now we start the threads. Alice comes first:
alice.start()
alice.join() #When the thread is over, the lock is released. Now Bob can start.


##############################

# Example usage:
# b = Bicycle()
# or a custom RLock usage scenario
# => Debug prints showing re-entrance

Thread Thread-149 acquires lock the 1 time...  
Driving...
Thread Thread-149 acquires lock the 2 time...  
Riding
Arrived


#### FA-8:
Create two threads that both call 'b.drive()' from the Bicycle example 
while a third thread calls 'b.move()'. 
Show how re-entrancy ensures that each thread eventually acquires the lock 
safely, but no two threads use the bicycle at once.

Expected outcome:
- We see 'Driving...' / 'Riding' / 'Arrived' messages from each drive() call in turn,
  plus 'Moved bike...' from the move() call, 
  with no corruption or partial overlap.


In [20]:
## Your solution goes here: ##
## FA-8:


b = Bicycle()
t1 = tr.Thread(target = b.drive)
t2 = tr.Thread(target = b.drive)
t3 = tr.Thread(target = b.move)
t1.start()
t2.start()
t3.start()
t1.join() 
t2.join() 
t3.join() 
##############################

# Example usage:
# b = Bicycle()
# t1 = tr.Thread(target=b.drive)
# t2 = tr.Thread(target=b.drive)
# t3 = tr.Thread(target=b.move)
# ...
# => Observed correct serialization of usage.

Thread Thread-150 acquires lock the 1 time...  
Driving...
Thread Thread-150 acquires lock the 2 time...  
Riding
Arrived
Thread Thread-151 acquires lock the 1 time...  
Driving...
Thread Thread-151 acquires lock the 2 time...  
Riding
Arrived
Moved bike to another spot
Thread Thread-152 acquires lock the 1 time...  


## Semaphores

Semaphores synchronize your code based on counters. For example, lets say you have a web service that sells tickets. To prevent your site from getting overwhelmed by traffic, you only want 3 people to be able to buy tickets at once. You can parametrize your semaphore with this limit so that it keeps a limitation on the use of the web services, such that only 3 threads or processes can use them at a time.

In [21]:
userLimit = 3
s = tr.Semaphore(userLimit)


ticketID = 0
ticketID_lock = tr.Lock()


def buyTicket():  
    with s: #ask for access to the semaphore
        with ticketID_lock: #a lock for the ticket ID is also needed
            global ticketID
            thisticketID = ticketID #internalize the current ticketID 
                                    #so the global value can be altered without 
                                    #influencing with this transaction
            
            ticketID += 1 #we just increase the global var
            
            print("Processing purchase number " + str(thisticketID) + "\n") #here we want to print what the prev ticket id was before increasing 

        
            print("Ticket " + str(thisticketID) + " bought!\n")
        time.sleep(3) #this is just to simulate more time-consuming functionality

    
buyers = []

for x in range(8):
    buyers += [tr.Thread(target = buyTicket)]
    
for buyer in buyers:
    buyer.start()
    
for buyer in buyers:
    buyer.join()
    



Processing purchase number 0

Ticket 0 bought!

Processing purchase number 1

Ticket 1 bought!

Processing purchase number 2

Ticket 2 bought!

Processing purchase number 3

Ticket 3 bought!

Processing purchase number 4

Ticket 4 bought!

Processing purchase number 5

Ticket 5 bought!

Processing purchase number 6

Ticket 6 bought!

Processing purchase number 7

Ticket 7 bought!



The output will probably show something similar to:

Processing purchase number 0

Processing purchase number 1

Processing purchase number 2


Ticket 0 bought!

Processing purchase number 3

...


Note that we don't start processing purchase number 3 until there is room for it.

### FA-9

Imagine a program that simulates a game on a shared table tennis, to which only 2 players have access at a time. We have a total of 16 players. Implement the program such that it uses a semaphore to restrict the access of the table to the players. You may use a global variable to simulate the shared resource (the table), a function to simulate the use of the table, and you can simulate the players as either threads or processes. 

In [22]:
#### FA-9 ########

userLimit = 2
s = tr.Semaphore(userLimit)

lock = tr.Lock()

def play(playerid):
    with s:
        print(f'Player {playerid} started playing')
        time.sleep(2)
        print(f'Player {playerid} finished playing')
threads=[]
for playerid in range(1,17):
    t=tr.Thread(target=play,args=(playerid,))
    threads.append(t)

for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
##################

Player 1 started playing
Player 2 started playing
Player 2 finished playingPlayer 1 finished playing

Player 3 started playing
Player 4 started playing
Player 3 finished playing
Player 4 finished playing
Player 5 started playing
Player 6 started playing
Player 6 finished playing
Player 7 started playing
Player 5 finished playing
Player 8 started playing
Player 8 finished playingPlayer 7 finished playing

Player 9 started playing
Player 10 started playing
Player 10 finished playingPlayer 9 finished playing
Player 11 started playing

Player 12 started playing
Player 12 finished playingPlayer 11 finished playing

Player 14 started playing
Player 13 started playing
Player 13 finished playing
Player 14 finished playing
Player 16 started playing
Player 15 started playing
Player 16 finished playing
Player 15 finished playing


#### FA-10:
In the table tennis example, 2 players can play at a time. 
Create 16 'players' (threads). 
Only 2 can 'play_table_tennis' concurrently, 
using a threading.Semaphore(2).

Expected outcome:
- We see pairs of players "playing" while others wait.
- The code ends after all 16 have played or tested the table.


In [23]:

## Your solution goes here: ##
## FA-10:


userLimit = 2
s = tr.Semaphore(userLimit)

lock = tr.Lock()
no_of_players = 0

def play(playerid):
    global no_of_players
    with lock:
        if no_of_players == playerid: # we skip the 2nd thread
            return
        
        
        player2id = playerid + 1 #after skipping the 2nd thread, 2nd +1 == 3rd thread that we need
        if player2id > 16: # if my partner id is bigger than 16 then leave the function
            return

        no_of_players += 2

    with s:
        print(f'Player {playerid} started playing')
        print(f'Player {player2id} started playing')
        print(f'Player {playerid} finished playing')
        print(f'Player {player2id} finished playing')
        
threads=[]
for playerid in range(1,17):
    t=tr.Thread(target=play,args=(playerid,))
    threads.append(t)

for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

##############################

# Example usage might produce logs like:
# Player 0 is playing
# Player 1 is playing
# ...

Player 1 started playing
Player 2 started playing
Player 1 finished playing
Player 2 finished playing
Player 3 started playing
Player 4 started playing
Player 3 finished playing
Player 4 finished playing
Player 5 started playing
Player 6 started playing
Player 5 finished playing
Player 6 finished playing
Player 7 started playing
Player 8 started playing
Player 7 finished playing
Player 8 finished playing
Player 9 started playing
Player 10 started playing
Player 9 finished playing
Player 10 finished playing
Player 11 started playing
Player 12 started playing
Player 11 finished playing
Player 12 finished playing
Player 13 started playing
Player 14 started playing
Player 13 finished playing
Player 14 finished playing
Player 15 started playing
Player 16 started playing
Player 15 finished playing
Player 16 finished playing


#### FA-11:
Extend the above scenario so that each player 'plays' for a random short time 
then frees the table. 
Print the order in which players begin and finish, 
to show that it's not strictly round-robin if the times differ.

Expected outcome:
- We see e.g. "Player 0 started", "Player 1 started",
  (some random sleeps), "Player 0 finished", "Player 2 started", etc.


In [24]:
## Your solution goes here: ##
## FA-11:
import random
userLimit = 2
s = tr.Semaphore(userLimit)

lock = tr.Lock()

def play(playerid):
    with s:
        print(f'Player {playerid} started playing')
        time.sleep(random.randint(1,5))
        print(f'Player {playerid} finished playing')
threads=[]
for playerid in range(1,17):
    t=tr.Thread(target=play,args=(playerid,))
    threads.append(t)

for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

##############################


Player 1 started playing
Player 2 started playing
Player 1 finished playing
Player 3 started playing
Player 2 finished playing
Player 3 finished playing
Player 5 started playing
Player 4 started playing
Player 4 finished playing
Player 6 started playing
Player 6 finished playing
Player 7 started playing
Player 5 finished playing
Player 8 started playing
Player 7 finished playing
Player 9 started playing
Player 8 finished playing
Player 10 started playing
Player 9 finished playing
Player 11 started playing
Player 10 finished playing
Player 12 started playing
Player 11 finished playing
Player 13 started playing
Player 13 finished playing
Player 14 started playing
Player 12 finished playing
Player 15 started playing
Player 14 finished playing
Player 16 started playing
Player 15 finished playing
Player 16 finished playing


#### FA-12:
Imagine a library with 3 'reading spots' (Semaphore(3)). 
Create a short class 'Reader' with a method 'read_book()' 
that tries to acquire a spot, reads for 2 seconds, then releases. 
Launch 5 readers and show at most 3 read concurrently.

Expected outcome:
- We see concurrency but never more than 3 reading messages at once.


In [25]:
## Your solution goes here: ##
## FA-12:
import threading
userlimit=3
s=tr.Semaphore(userlimit)
# lock=tr.Lock()
class Reader(threading.Thread):
    def __init__(self,i):
        threading.Thread.__init__(self) #inherit the init of threading.Thread
        self.i=i
    def read_book(self):
        with s:
            print(f'start reading {self.i}') #i means the thread id/thread name
            time.sleep(2)
            print(f'finished reading {self.i}')
    def run(self):  #this will auto run when r.start() has to begin
        self.read_book() #we need to call the upper function
            
readers = [Reader(i) for i in range(5)]
for r in readers: 
    r.start()  #this is a not visible rule - when i call start() here it means that i call the function run(self)
for r in readers:
    r.join()
        
##############################

# Example usage:
# readers = [Reader(i) for i in range(5)]
# for r in readers: r.start()
# ...

start reading 0
start reading 1
start reading 2
finished reading 0finished reading 1
start reading 3
finished reading 2
start reading 4

finished reading 3finished reading 4



## Events

Events can be used when you need multiple threads to wait for one signal. 

Imagine the following example. You are a vault breaker, and in a heated heist you are trying to crack the code on the vault. Because of your experience, you already know what algorithms to use to crack the code. You also found a note containing the base code on which you need to run the algorithm to get the final code to open the vault. However, there is one small problem, the first digit has accidently been torn off. Since you do not want to waste any time, you use Python to both search for the first digit, and at the same time start calculating the code with the values you already have. You have to make sure the code calculators can both run at the same time, and start using the first digit of the base code as soon as it is availible. 

To do this you could write the following code (note that these are dummy algorithms, just for the sake of simulating the problem in a simpler way):

In [26]:
signal = tr.Event() #we create an event

#The digits on the base code note:
first = 0 #still has to be found
second = 5 
third = 3

#Algorithm for finding the first digit of the base code (the lost one)
def findFirst():
    global first #when we find a value, we can always change the 1st number
    signal.clear() #initialize the signal
    tofind = random.randint(0,10)
    i = 0
    while i != tofind:
        time.sleep(1)
        print("Searching...",i)
        i+=1
    print("Found hidden digit:",i)
    first += i
    signal.set() #signal the other algorithms when the first digit is found
    return


#These algorithms are used for calculating the access code
def calcTenfold():
    time.sleep(2)
    sec = second*10
    print("Second digit of first part found:",sec)
    time.sleep(4)
    tri = third*10
    print("Third digit of first part found:",tri)
    print("Waiting for first digit to complete first part...")
    signal.wait() #wait for a signal to make sure that the first digit has been found before continuing
    fir = first*10
    print("First digit of second part found:",fir)
    print("The first part of the code is: [{} {} {}]".format(fir,sec,tri) )
    
def calcSecondPower():
    time.sleep(3)
    sec = second**2
    print("Second digit of second part found:",sec)
    time.sleep(1)
    tri = third**2
    print("Third digit of second part found:",tri)
    print("Waiting for first digit to complete second part...")
    signal.wait() #wait for the first digit to be found before continuing
    fir = first**2
    print("First digit of second part found:",fir)
    print("The second part of the code is: [{} {} {}]".format(fir,sec,tri) )
    

thread1 = tr.Thread(target = findFirst)
thread2 = tr.Thread(target = calcTenfold)
thread3 = tr.Thread(target = calcSecondPower)
thread1.start()
thread2.start()
thread3.start()

thread1.join()
thread2.join()
thread3.join()

Searching... 0
Second digit of first part found:Searching... 1
 50
Second digit of second part found: 25
Searching... 2
Third digit of second part found: 9
Waiting for first digit to complete second part...
Searching... 3
Searching... 4
Third digit of first part found: 30
Waiting for first digit to complete first part...
Searching... 5
Found hidden digit: 6
First digit of second part found: 60
The first part of the code is: [60 50 30]
First digit of second part found: 36
The second part of the code is: [36 25 9]


### FA-13: 

The following code implements a task and several subtasks. The subtasks need to wait for the main task to finish some computation before proceeding, but this is not happening with the current implementaiton. Add the necessary code to implement this functionality using an Event object.

In [27]:
#### FA-13 ###################################################################

#Add your code where you see fit

def subtask(i):
    print("Thread {} started...".format(i))
    #... the subtask can do anything here, and then wait for a signal to continue
    print("Waiting for the signal...")
    signal.wait()
    print("Signal received!")
    #... the thread can now use the shared resource
    print("Thread {} continues...".format(i))
    print("Thread {} was completed.".format(i))

def task(nsubtasks=3):

    signal.clear()
    
    pool=[]
    
    for i in range(nsubtasks):
        pool.append(tr.Thread(target=subtask, args=(i, )))

    for i in range(nsubtasks):
        pool[i].start()

    print('The main task is running now...')
    time.sleep(3) 
    print('The main task is finished. Sending a signal for subtasks to proceed...')
    signal.set()
    
    
    for i in range(nsubtasks):
        pool[i].join()

#main program
task(nsubtasks=4)

#########

Thread 0 started...
Waiting for the signal...
Thread 1 started...
Waiting for the signal...
Thread 2 started...
Waiting for the signal...
Thread 3 started...
Waiting for the signal...
The main task is running now...
The main task is finished. Sending a signal for subtasks to proceed...
Signal received!
Thread 3 continues...
Thread 3 was completed.
Signal received!
Thread 2 continues...
Thread 2 was completed.
Signal received!
Thread 1 continues...
Thread 1 was completed.
Signal received!
Thread 0 continues...
Thread 0 was completed.


#### FA-14:
Create an event-based mechanism for a 'startup' signal:
- One thread is 'setup_thread', which does some setup, then sets an event.
- 3 other threads are 'worker_thread's that each wait() on that event 
  before performing their tasks.

Expected outcome:
- The workers do not start their main tasks until the 'setup_thread' has finished 
  and set the event.


In [28]:
## Your solution goes here: ##
## FA-14:
signal = tr.Event()

def worker_thread(name): #3 waiting threads
    print(f"{name} is waiting for the setup to complete")
    signal.wait()
    print(f"{name} now can start working :)")
    
def setup_thread():
    print('Setup thread is starting setup')
    for i in range(5):
        time.sleep(1)
        print('Stil Starting......')
    print('Setup Complete!!!!!!')
    signal.set()


t1=tr.Thread(target=worker_thread,args=("A",))
t2=tr.Thread(target=worker_thread,args=("B",))
t3=tr.Thread(target=worker_thread,args=("C",))
t4=tr.Thread(target=setup_thread)

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()
##############################

A is waiting for the setup to complete
B is waiting for the setup to complete
C is waiting for the setup to complete
Setup thread is starting setup
Stil Starting......
Stil Starting......
Stil Starting......
Stil Starting......
Stil Starting......
Setup Complete!!!!!!
C now can start working :)
A now can start working :)
B now can start working :)


#### FA-15:
Use an Event to implement a "pause/resume" mechanism:
- A 'worker' thread continuously increments a counter or prints something in a loop.
- Another thread can 'pause' the worker by clearing the event, 
  and 'resume' by setting the event again.

Hint: inside the worker loop, do event.wait() each iteration.

Expected outcome:
- We can see the worker's prints pausing when the event is cleared, 
  and resuming when it's set again.

In [29]:
## Your solution goes here: ##
## FA-15:

signal = tr.Event()

def worker_thread(name): #3 waiting threads
    counter=0
    print(f"{name} is working....")
    for j in range(4):
        signal.wait() 
        print(f'the worker {name} started, counter {counter}')
        counter+=1
        time.sleep(2)
    
def setup_thread():
    time.sleep(2) #3rd
    print('pausing the worker')
    signal.clear()
    time.sleep(2)
    print('resuming the worker')
    signal.set()
    time.sleep(2)

t1=tr.Thread(target=worker_thread,args=("A",))
t4=tr.Thread(target=setup_thread)
t1.start()
t4.start()
t1.join()
t4.join()
##############################

A is working....
pausing the worker
resuming the worker
the worker A started, counter 0
the worker A started, counter 1
the worker A started, counter 2
the worker A started, counter 3


#### FA-16:
Create a small class 'EventController' that has:
   - an Event
   - methods 'wait_for_event()' and 'trigger_event()'.
Create 2 or 3 threads that call 'wait_for_event()', 
while the main thread calls 'trigger_event()' after some delay.
Demonstrate that all waiting threads proceed once triggered.

Expected outcome:
- Multiple threads remain blocked until main triggers the event.


In [30]:
## Your solution goes here: ##
## FA-16:
class EventController: 
    def __init__(self):
        signal = tr.Event()
    def wait_for_event(self,id):
        print(f'{id} is waiting')
        signal.wait()
        print('received. start proceeding...')
    def trigger_event(self):
        signal.clear()
        print('Main thread is triggering the event in 3 seconds....')
        time.sleep(3)
        print("Now Triggered the Event. The Other thread can proceed!!")
        signal.set()
        
    # def run(self) if the class was a thread itself, we need this function and below on the t1 we dont say tr.Thread but t1=EventController()
        # self.wait_for_event()
    #when i have below t1.start() it will automatically call the run func


#everything below is the main thread    
EC=EventController() # when we need a func from the class and the class is not a thread itself, we need this line
t1=tr.Thread(target=EC.wait_for_event,args=(1,)) #and then writing which func from the class we need exactly
t2=tr.Thread(target=EC.wait_for_event,args=(2,))
t1.start()
t2.start()
time.sleep(2)
EC.trigger_event() #we put this here because : while the main thread calls 'trigger_event()'
t1.join()
t2.join()


##############################

1 is waiting
received. start proceeding...
2 is waiting
received. start proceeding...
Main thread is triggering the event in 3 seconds....
Now Triggered the Event. The Other thread can proceed!!


---

## Deadlock
#### When threads or processes are dependent on other threads or processes (or themselves!) to finish, which causes them to wait on each other forever.

Look at the code below. Both t1 and t2 need a fork and a knife to eat. t1 grabs a fork first, while t2 grabs a knife first. Next, they wait forever on each other to finish eating so they can have the other piece of cutlery they need to eat.

In [1]:
#WARNING: Since this example recreates a deadlock, 
#you will have to interrupt the execution
#Otherwise this code will keep consuming resources forever.
#(If the notebook does not recover, restart the kernel.
# You can do that from within the notebook: Kernel -> Restart )


fork = tr.Lock()
knife = tr.Lock()

def eating():
    with knife:
        time.sleep(3)
    with fork:
        print("Much Munch Munch")
            
def eating2():
    with fork:
        time.sleep(3)
        with knife:
            print("Chomp Chomp Chomp")
            
            
t1 = tr.Thread(target = eating)
t2 = tr.Thread(target = eating2)

t1.start()
t2.start()

t1.join()
t2.join()

Much Munch Munch
Chomp Chomp Chomp


### Solutions for Deadlock

To prevent deadlocks, try to prevent having two mutex locks open in the same thread at the same time. Furthermore you can order the locks so they have to be acquired in the same order each time (lock y can never be acquired before lock x, so if lock x is acquired it is certain releasing lock y does not depend on acquiring lock x).

### FA-17

Solve the deadlock in the example above.

In [32]:
### FA-17 #####
import threading as tr
import time
fork = tr.Lock()
knife = tr.Lock()

def eating():
    with fork:
        time.sleep(3)
        with knife: #making this line one tab inside + changing the outside lock to fork and this inner lock to knife in order to stop the deadlock
            print("Much Munch Munch")
            
def eating2():
    with fork:
        time.sleep(3)
        with knife:
            print("Chomp Chomp Chomp")
            
            
t1 = tr.Thread(target = eating)
t2 = tr.Thread(target = eating2)

t1.start()
t2.start()

t1.join()
t2.join()
##############

Much Munch Munch
Chomp Chomp Chomp


#### FA-18:
In the 'fork'/'knife' example, demonstrate a simple fix by acquiring 
the locks in a consistent order. 
You can e.g. define a function 'dine_ordered' that always locks 'fork' first, 
then 'knife', and do the same for the second thread. 
Show that we avoid the deadlock.

Expected outcome:
- Both threads eventually print their "Munch Munch" or "Chomp Chomp," 
  no infinite blocking.

In [33]:
## Your solution goes here: ##
## FA-18:
import threading as tr
import time

fork = tr.Lock()
knife = tr.Lock()

def dine_ordered(message):
    with fork:
        time.sleep(1)
        with knife:
            print(message)

# Two threads using the same locking order
t1 = tr.Thread(target=dine_ordered, args=("Much Munch Munch",))
t2 = tr.Thread(target=dine_ordered, args=("Chomp Chomp Chomp",))

t1.start()
t2.start()

t1.join()
t2.join()
##############################

Much Munch Munch
Chomp Chomp Chomp


#### FA-19:
Create a small scenario with 3 locks that might lead to a cycle 
(e.g., 3 threads each lock one resource and wait for the next). 
Show that the code deadlocks if we acquire locks in an inconsistent order, 
and fix it by enforcing a strict lock order.

Expected outcome:
- Code initially deadlocks if we don't plan the order,
  but works fine if we always lock in the same order (e.g. lock1 -> lock2 -> lock3).

In [34]:
## Your solution goes here: ##
## FA-19:
import threading as tr
import time

# Define locks
lock1 = tr.Lock()
lock2 = tr.Lock()
lock3 = tr.Lock()

def f1():
    with lock1:
        time.sleep(1)
        with lock2:
            print("Thread 1: open lock1 and lock2")

def f2():
    with lock1:
        time.sleep(1)
        with lock2:
            time.sleep(1)
            with lock3:
                print("Thread 2: open lock1, lock2 and lock3")

def f3():
    with lock1:
        time.sleep(1)
        with lock3:
            print("Thread 3: open lock1 and lock3")

# Start threads
t1 = tr.Thread(target=f1)
t2 = tr.Thread(target=f2)
t3 = tr.Thread(target=f3)

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()
##############################

Thread 1: open lock1 and lock2
Thread 2: open lock1, lock2 and lock3
Thread 3: open lock1 and lock3


#### FA-20:
Implement a small class 'DiningPhilosophers' with 2 locks (fork, knife) 
and 2 methods 'eat_phil1()' and 'eat_phil2()'. 
Use any strategy to avoid deadlock (e.g., ordering or a single lock). 
Show that both philosophers eventually get to eat.

Expected outcome:
- We see log messages from both 'eat_phil1' and 'eat_phil2' finishing 
  without indefinite blocking.


In [35]:
## Your solution goes here: ##
## FA-20:
import threading as tr
import time
class DiningPhilosophers:
    def __init__(self):
        self.fork=tr.Lock()
        self.knife=tr.Lock()
    def eat_phil1(self):
        with self.fork:
            time.sleep(2)
            with self.knife:
                print('ph1 eats now')
    def eat_phil2(self):
        with self.fork:
            time.sleep(2)
            with self.knife:
                print('ph2 eats now')
DP=DiningPhilosophers()
t1=tr.Thread(target=DP.eat_phil1)
t2=tr.Thread(target=DP.eat_phil2)
t1.start()
t2.start()
t1.join()
t2.join()
##############################

ph1 eats now
ph2 eats now


---

# References

*  Adapted from: Martin Atzmueller (2019). Materials for Advanced Programming for CSAI: AP11 - Parallelism.ipynb.