### `Thread Synchronisation` ensures that two or more concurrent threads running simultaneously do not execute the segment (critical section ) of the code where the shrared memory is accessed.
### There are `synchronization primitives` for synchronising threads in `threading` module
* `Locks`

* `Rlocks`

* `Semaphores`

* `Events`

* `Conditions`

https://www.laurentluce.com/posts/python-threads-synchronization-locks-rlocks-semaphores-conditions-events-and-queues/

In [4]:
import threading
import time

In [56]:
amount = 0

In [57]:
def deposit(dep_amount):
    
    global amount
    
    for i in range(dep_amount):
        amount += 1

In [58]:
def two_deposit_threads(dep_amount): 

    t1 = threading.Thread(target=deposit, 
                          args=(dep_amount, )) 
    t2 = threading.Thread(target=deposit, 
                          args=(dep_amount, )) 
  
    t1.start() 
    t2.start() 
  
    t1.join() 
    t2.join()  

In [59]:
two_deposit_threads(10)

print("Balance amount = {0}".format(amount))

Balance amount = 20


<B> `output`: The value of the amount is incremented two times of the given range as we have created two threads and each thread is iterating over the mentioned range separately that is 10, so the output is 20 <B>

### Incrementing the value of amount 1000 times inside the loop

In [62]:
amount = 0
two_deposit_threads(1000)

print("Balance amount = {0}".format(amount)) 

Balance amount = 2000


### Incrementing the value of amount 100000 times inside the loop

In [24]:
amount = 0

two_deposit_threads(100000)

print("Balance amount = {0}".format(amount))

Balance amount = 133588


<B>`output`: After running it for the first time, it may give the correct output, if we run it again and again than we can notice that the output values are coming different each time, it is because we have created two threads and both of them are accessing the same data, initially when the range is less the iterations were also less but as the range increased to 1000000 then the for each thread number of iterations increased and which led to synchronisation problem, this kind of synchronisation problem can be called `race condition`.<B>

`Note`: When two or more concurrently running threads can access the shared data resource at the same time and they try to change it, as a result, the values of variables may be unpredictable and vary depending on the timings of context switches of the processes. This condition is called `race condition`

## 1) `Lock` 
* Race condition can be solved using Lock
* Using <B> Lock </B> class objectTo block the threads to access the shared data at the same time. 
* It has two methods <B> acquire </B>  and <B> release </B>. `acquire` method locks the shared data for another thread to access it, only after unlocking by the `release` method, shared data can be accessed

https://docs.python.org/3/library/threading.html#lock-objects

In [25]:
def deposit(dep_amount, dep_lock):
    
    global amount
    
    for i in range(dep_amount):
        
        dep_lock.acquire()
        
        amount += 1
        
        dep_lock.release()

In [26]:
def two_deposit_threads(dep_amount): 
    
    lock = threading.Lock()

    t1 = threading.Thread(target=deposit, 
                          args=(dep_amount, lock)) 
    t2 = threading.Thread(target=deposit, 
                          args=(dep_amount, lock)) 
  
    t1.start() 
    t2.start() 
  
    t1.join() 
    t2.join() 

In [30]:
amount = 0

two_deposit_threads(100000)

print("Balance amount = {0}".format(amount))

Balance amount = 200000


#### `output`: The iteration range is now same as before but even after running it for multiple times we get the expected result as output

### If `release()` is called in the unlocked state, a `RunTimeError` is raised.

In [31]:
def deposit(dep_amount, dep_lock):
    
    global amount
    
    for i in range(dep_amount):
        
        amount += 1
        
        dep_lock.release()

In [32]:
amount = 0

two_deposit_threads(100000)

print("Balance amount = {0}".format(amount))

Balance amount = 2


Exception in thread Thread-45:
Traceback (most recent call last):
  File "/Users/kishan/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/kishan/anaconda3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-31-7a52e8aa2413>", line 9, in deposit
    dep_lock.release()
RuntimeError: release unlocked lock
Exception in thread Thread-44:
Traceback (most recent call last):
  File "/Users/kishan/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/kishan/anaconda3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-31-7a52e8aa2413>", line 9, in deposit
    dep_lock.release()
RuntimeError: release unlocked lock




### `NOTE`: Deadlock Condition -  It is the point where one or more threads are competing for the resouces and some of the threads go to "waiting/blocked" mode infinitely

## 2) `RLock (re-entrant lock)` 
* The standard Lock does not know which thread is currently holding the lock. If the lock is held, any thread that attempts to acquire it will block, even if the same thread itself is already holding the lock. In such cases, RLock is used
* The main difference between Lock and RLock is that when you try to **acquire** the lock which is **already acquired by the same thread** it doesn't block the thread unlike the Lock(). 

https://docs.python.org/3/library/threading.html#threading.RLock

#### Trying to acquire the lock that is already held
The second attempt to acquire the lock will go into an infinite wait. You will need to stop the kernel to break out of the wait

In [35]:
lock = threading.Lock()

print ('First try :', lock.acquire())
print ('Second try:', lock.acquire())

First try : True


KeyboardInterrupt: 

#### Set a timeout for the lock acquisition

In [38]:
lock = threading.Lock()

print ('First try :', lock.acquire())
print ('Second try:', lock.acquire(timeout=3))

First try : True
Second try: False


#### `output`: The Lock object is able to aquire only at the first not from the second time onwards

In [9]:
lock = threading.RLock()

print ('First try :', lock.acquire())
print ('Second try:', lock.acquire())
print ('Third try:', lock.acquire())

First try : True
Second try: True
Third try: True


## Deadlock simulation using locks

#### Conditions for a deadlock:
* Mutual Exclusion: One or more than one resource are non-sharable (Only one process can use at a time)
* Hold and Wait: A process is holding at least one resource and waiting for resources.
* No Preemption: A resource cannot be taken from a process unless the process releases the resource.
* Circular Wait: A set of processes are waiting for each other in circular form.

In [13]:
data_one = 3
data_two = 5

#### This process can result in a deadlock
Conditions for a deadlock
* Mutual Exclusion: One or more than one resource are non-sharable (Only one process can use at a time)
* Hold and Wait: A process is holding at least one resource and waiting for resources.
* No Preemption: A resource cannot be taken from a process unless the process releases the resource.
* Circular Wait: A set of processes are waiting for each other in circular form.

In [14]:
def my_process(lock_one, lock_two):
    
    global data_one
    global data_two
    
    lock_one.acquire()
    print(threading.current_thread().name, " incrementing data_one")
    data_one += 1
    time.sleep(1)
    
    lock_two.acquire()
    print(threading.current_thread().name, " incrementing data_two")
    data_two +=1
    time.sleep(1)
    
    lock_one.release()
    lock_two.release()
    

In [15]:
lock_one = threading.Lock()
lock_two = threading.Lock()

#### A deadlock will result here
t1 acquires lock_one and t2 acquires lock two first. t1 then waits for t2 to release lock_two and t2 waits for t1 to release lock_one before releasing the locks which they have acquired. 

You will need to interrupt the kernel to get out of the deadlock.

In [16]:
t1 = threading.Thread(target=my_process, 
                      args=(lock_one, lock_two)) 
t2 = threading.Thread(target=my_process, 
                      args=(lock_two, lock_one)) 
  
t1.start() 
t2.start() 

t1.join()
t2.join()

Thread-10Thread-11   incrementing data_one incrementing data_one


KeyboardInterrupt: 

In [17]:
data_one = 3
data_two = 5

lock_one = threading.Lock()
lock_two = threading.Lock()

#### This should prevent the deadlock
t1 will now acquire lock_one and t2 will wait for that to be released first.

In [18]:
t1 = threading.Thread(target=my_process, 
                      args=(lock_one, lock_two)) 
t2 = threading.Thread(target=my_process, 
                      args=(lock_one, lock_two)) 
  
t1.start() 
t2.start() 

t1.join()
t2.join()

Thread-12  incrementing data_one
Thread-12  incrementing data_two
Thread-13  incrementing data_one
Thread-13  incrementing data_two


In [19]:
print("data_one:", data_one)
print("data_two:", data_two)

data_one: 5
data_two: 7


#### Preventing the deadlock
Get rid of the Hold and Wait i.e. where the process holds one lock while waiting for the other to be released. This is a more robust solution than hoping for the threads to request locks in a specific sequence

In [20]:
def my_process(lock_one, lock_two):
    
    global data_one
    global data_two
    
    lock_one.acquire()
    print(threading.current_thread().name, " incrementing data_one")
    data_one += 1
    time.sleep(1)
    
    lock_one.release()
    
    lock_two.acquire()
    print(threading.current_thread().name, " incrementing data_two")
    data_two +=1
    time.sleep(1)
    
    lock_two.release()
    

In [21]:
data_one = 3
data_two = 5

lock_one = threading.Lock()
lock_two = threading.Lock()

In [22]:
t1 = threading.Thread(target=my_process, 
                      args=(lock_one, lock_two)) 
t2 = threading.Thread(target=my_process, 
                      args=(lock_two, lock_one)) 
  
t1.start() 
t2.start() 

t1.join()
t2.join()

Thread-14Thread-15   incrementing data_one incrementing data_one

Thread-14 Thread-15 incrementing data_two 
 incrementing data_two


In [23]:
print("data_one:", data_one)
print("data_two:", data_two)

data_one: 5
data_two: 7


#### `output`: Unlike the Lock object, RLock object is able to aquire every time it is invoked

## 3) `Semaphores` 
* This is one of the oldest synchronization primitives in the history of computer science, invented by the early Dutch computer scientist Edsger W. Dijkstra
* Semaphores are typically used for limiting a resource
* Semaphores are simply advanced counters 
* An acquire() call to a semaphore will block only after a number of threads have acquire()ed it. 
* The associated counter decreases per acquire() call, and increases per release() call.

https://docs.python.org/3/library/threading.html#semaphore-objects

#### Initialize a semaphore
The default internal variable = 1. This represents the maximum number of threads which can acquire the semaphore at any time

In [102]:
semaphore = threading.Semaphore() 

In [103]:
def my_func():
    
    semaphore.acquire()
    
    time.sleep(0.1)
    print(threading.current_thread().name,  " acquired the semaphore.")
    print("Semaphore value after acquire:", semaphore._value)
    
    time.sleep(5)
    
    semaphore.release()
    
    print("Semaphore value after release:", semaphore._value)

In [107]:
t1 = threading.Thread(target=my_func)
t2 = threading.Thread(target=my_func)

print("Initial semaphore value:", semaphore._value)

Initial semaphore value: 1


In [None]:
start_time = time.time()

t1.start()
t2.start()

t1.join()
t2.join()

end_time = time.time()

In [108]:
print("Total time:", end_time-start_time)

Thread-100  acquired the semaphore.
Semaphore value after acquire: 0
Semaphore value after release: 1
Thread-101  acquired the semaphore.
Semaphore value after acquire: 0
Semaphore value after release: 1
Total time: 10.212400197982788


In [114]:
semaphore = threading.Semaphore(value=3) 

In [115]:
t1 = threading.Thread(target=my_func)
t2 = threading.Thread(target=my_func)
t3 = threading.Thread(target=my_func)
t4 = threading.Thread(target=my_func)
t5 = threading.Thread(target=my_func)
t6 = threading.Thread(target=my_func)
t7 = threading.Thread(target=my_func)
t8 = threading.Thread(target=my_func)

In [None]:
start_time = time.time()

t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t6.start()
t7.start()
t8.start()

t1.join()
t2.join()
t3.join()
t4.join()
t5.join()
t6.join()
t7.join()
t8.join()

end_time = time.time()

In [116]:
print("Total time:", end_time-start_time)

Thread-112Thread-110 Thread-111  acquired the semaphore. 
 acquired the semaphore. acquired the semaphore.Semaphore value after acquire:

 Semaphore value after acquire:Semaphore value after acquire:0  
00

Semaphore value after release:Semaphore value after release:  12Semaphore value after release:

 1
Thread-114 Thread-113 acquired the semaphore. Thread-115
 acquired the semaphore. Semaphore value after acquire:
 acquired the semaphore. Semaphore value after acquire:
0 Semaphore value after acquire:
0 0

Semaphore value after release: 1
Semaphore value after release:Semaphore value after release: 1 
1
Thread-116  acquired the semaphore.Thread-117 
 acquired the semaphore.Semaphore value after acquire:
 Semaphore value after acquire:1 
1
Semaphore value after release: Semaphore value after release:2 
3
Total time: 15.34638500213623


In [117]:
semaphore = threading.Semaphore(-1) 

ValueError: semaphore initial value must be >= 0

### While creating a semaphore object `threading.Semaphore()` input <b> internal variable </b> can not be less than zero

### Using a Semaphore to synchronize threads
We can use the following programe as a ordering software in a resturant, where the ordering and serving is automoted
* The internal variable is set to 0
* By the `resturant()` function the lock been released which increased the variable to 1 so that `customer()` function `semaphore.acquire()` can be invoked 

In [9]:
semaphore = threading.Semaphore(0)

In [10]:
order_num = 0

In [14]:
def place_order():
    print ("Order placed.")
    semaphore.acquire()
    print ("Customer order number is:", order_num)

In [15]:
def prepare_order(): 
    
    global order_num
    time.sleep(3)
    order_num += 1
    
    print ("Preparing order number", order_num)
    semaphore.release()

In [16]:
for i in range (0, 6):
    
    t1 = threading.Thread(target=place_order)
    t2 = threading.Thread(target=prepare_order)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
print ("Program terminated")

Order placed.
Customer order number is: 6
Preparing order number 7
Order placed.
Customer order number is: 7
Preparing order number 8
Order placed.
Customer order number is: 8
Preparing order number 9
Order placed.
Customer order number is: 9
Preparing order number 10
Order placed.
Customer order number is: 10
Preparing order number 11Order placed.

Customer order number is: 11
Preparing order numberProgram terminated 
12


###  A Semaphore provides a non-bounded counter which allows you to call release() any number of times for incrementing

In [17]:
semaphore = threading.Semaphore()

semaphore._value

1

#### The main thread acquires the semaphore

In [18]:
semaphore.acquire()

semaphore._value

0

In [19]:
semaphore.release()

semaphore._value

1

In [20]:
semaphore.release()

semaphore._value

2

In [21]:
semaphore.release()
semaphore.release()
semaphore.release()

semaphore._value

5

### `BoundedSemaphore` provides a bounded counter, which raises an error if a release() call tries to increase the counter beyond its maximum size.

https://docs.python.org/3/library/threading.html#threading.BoundedSemaphore

In [22]:
semaphore = threading.BoundedSemaphore(1)

semaphore._value

1

In [23]:
semaphore.acquire()

semaphore._value

0

In [24]:
semaphore.release()

semaphore._value

1

In [25]:
semaphore.release()

semaphore._value

ValueError: Semaphore released too many times

## 4) `Events` 
* This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it.
* An event object manages an internal flag that can be set to true with the <b> set() </b> method and reset to false with the <b> clear() </b> method. 
* The <b> wait() </b> method blocks until the flag is true.

https://docs.python.org/3/library/threading.html#event-objects

### Creating a event object

In [16]:
event = threading.Event()

### Checking out methods available for events

In [17]:
dir(event)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_cond',
 '_flag',
 '_reset_internal_locks',
 'clear',
 'isSet',
 'is_set',
 'set',
 'wait']

In [18]:
event.set()
print(event.is_set())

event.wait()
print(event.is_set())

event.clear()
print(event.is_set())

True
True
False


### Run the following function, as the thread is blocked after `event.wait()` we'll not get any output and our shell operation doesn't responds, go to the kernel tab > interrupt 

In [19]:
event.set()
print(event.is_set())

event.clear()
print(event.is_set())

event.wait()
print(event.is_set())

True
False


KeyboardInterrupt: 

### To understand the implementation a example is shown below

In [5]:
meeting = threading.Event()

In [6]:
def hold_meeting():
    
    meeting.set()
    print('Event is set. The meeting has begun')
    
    time.sleep(6)
    
    print('The meeting is complete. Clearing the event...')
    meeting.clear()

In [7]:
def enter_conference_room():
    
    time.sleep(1)
    meeting.wait()
    
    while meeting.is_set():
        
        print("Waiting for the meeting to end")
        time.sleep(0.5)
        
    print("The meeting is done. Entering the conference room...")

In [8]:
t1 = threading.Thread(target = hold_meeting)
t2 = threading.Thread(target = enter_conference_room)

In [9]:
t1.start()
t2.start()

t1.join()
t2.join()

Event is set. The meeting has begun
Waiting for the meeting to end
Waiting for the meeting to end
Waiting for the meeting to end
Waiting for the meeting to end
Waiting for the meeting to end
Waiting for the meeting to end
Waiting for the meeting to end
Waiting for the meeting to end
Waiting for the meeting to end
Waiting for the meeting to end
The meeting is complete. Clearing the event...
The meeting is done. Entering the conference room...


## 5) `Conditions` 
* A Condition object is simply a more advanced version of the Event object.

* It too acts as a communicator between threads and can be used to notify() other threads about a change in the state of the program

https://docs.python.org/3/library/threading.html#condition-objects

#### A good way to illustrate this mechanism is by looking at a news producer and 3 consumers. The producer appends random news text to a list at random time and the consumer retrieves those integers from the list. 

In [10]:
import random

In [11]:
condition = threading.Condition()

In [12]:
container = []

counter = 1

more_to_come = True

In [13]:
def produce():
    
    global container
    global counter
    global more_to_come
    
    for i in range(5):
        
        time.sleep(random.randrange(2, 5))
        condition.acquire()
        
        item = "News item #" + str(counter) 
        
        container.append(item)
        counter +=1
        
        print("\nProduced:", item)
        condition.notify_all()
        
        condition.release()
        
    more_to_come = False
  

In [14]:
def consume():
        
    global more_to_come  
    
    while(more_to_come):
        
        condition.acquire()
        condition.wait()

        time.sleep(random.random())
        print(threading.current_thread().getName(), " acquired: ", container[-1])

        condition.release()

In [15]:
producer_thread = threading.Thread(target=produce)

consumer_one_thread = threading.Thread(target=consume, 
                                       name="News Site One",)

consumer_two_thread = threading.Thread(target=consume, 
                                       name="News Site Two")

consumer_three_thread = threading.Thread(target=consume,
                                         name="News Site Three")

In [16]:
threads = [producer_thread,
           consumer_one_thread, 
           consumer_two_thread, 
           consumer_three_thread, 
           ]

for t in threads:
    t.start()
    
for t in threads:
    t.join()
    
time.sleep(1)
print("\nAll done")


Produced: News item #1


  print(threading.current_thread().getName(), " acquired: ", container[-1])


News Site One  acquired:  News item #1
News Site Three  acquired:  News item #1
News Site Two  acquired:  News item #1

Produced: News item #2
News Site Three  acquired:  News item #2
News Site One  acquired:  News item #2
News Site Two  acquired:  News item #2

Produced: News item #3
News Site One  acquired:  News item #3
News Site Two  acquired:  News item #3
News Site Three  acquired:  News item #3

Produced: News item #4
News Site One  acquired:  News item #4
News Site Three  acquired:  News item #4
News Site Two  acquired:  News item #4

Produced: News item #5
News Site One  acquired:  News item #5
News Site Three  acquired:  News item #5
News Site Two  acquired:  News item #5

All done
