## Threading in Python

1) A thread is a separate flow of execution.
2) Python 3 implementations the different threads do not actually execute at the same time: they merely appear to.
3) This is due to interactions with the GIL that essentially limit one Python thread to run at a time.
4) Tasks that spend much of their time waiting for external events are generally good candidates for threading. 
5) Getting multiple tasks running simultaneously requires a non-standard implementation of Python
    a) writing some of your code in a different language.
    b) using multiprocessing which comes with some extra overhead.

In [2]:
import logging
import threading
import time

In [None]:

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))#,daemon=False)
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    #x.join()
    logging.info("Main    : all done")

### Daemon Thread

1) The daemon thread is a thread that runs in the background without worrying about shutting it down.
2) If a program is running Threads that are not daemons, then the program will wait for those threads to complete before it        terminates.
3) Threads that are daemons, however, are just killed wherever they are when the program is exiting

### join() a Thread

To tell one thread to wait for another thread to finish, you call .join().

In [None]:
Run above example uncommenting x.join() line.

### Multiple Thread

In [None]:

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    
    threads = list()
    for index in range(200):
        logging.info("Main    : before creating thread %d",index)
        x = threading.Thread(target=thread_function, args=(index,))#,daemon=False)
        threads.append(x)
        x.start()
        
    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)    

### ThreadPoolExecutor

1) Easier way to start up a group of threads than the one you saw above.
2) The order in which threads are run is determined by the operating system
3) It may (and likely will) vary from run to run.
4) The end of the with block causes the ThreadPoolExecutor to do a .join() on each of the threads in the pool. 
5) It is strongly recommended that you use ThreadPoolExecutor as a context manager when you can so that you never forget to        .join() the threads.

In [3]:
import concurrent.futures

In [None]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(5))

In [None]:
print(type(map(thread_function, range(3))))

Note: Using a ThreadPoolExecutor can cause some confusing errors.

For example, if you call a function that takes no parameters, but you pass it parameters in .map(), the thread will throw an exception.

Unfortunately, ThreadPoolExecutor will hide that exception, and (in the case above) the program terminates with no output. This can be quite confusing to debug at first.

### Race condition

Race conditions can occur when two or more threads access a shared piece of data or resource.

In [None]:
class Database:
    
    def __init__(self):
        self.value = 0
        
    def changeValue(self,name):        
        temp_val = self.value
        logging.info("Thread %s: starting update", name)
        #print('Value before update %d', self.value)
        temp_val = temp_val + 1
        time.sleep(1)
        self.value = temp_val
        logging.info("Thread %s: finishing update", name)
        #print('Value after update %d', self.value)

In [None]:
db = Database()
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
logging.info("Testing update. Starting value is %d.", db.value)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.map(db.changeValue, range(2))
logging.info("Testing update. Ending value is %d.", db.value)

### Solve Race Condition

#### Synchronization using lock

1) Only one thread at a time can have the Lock.
2) Any other thread that wants the Lock must wait until the owner of the Lock gives it up.
3) The basic functions to do this are .acquire() and .release().
4) If one thread gets the lock but never gives it back, your program will be stuck.


In [None]:
class NewDatabase:
    
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
        
        
    def changeValue(self,name):
        #self.lock.acquire(True)
        logging.info("Thread %s has lock", name)
        temp_val = self.value
        logging.info("Thread %s: starting update", name)
        #print('Value before update %d', self.value)
        temp_val = temp_val + 1
        time.sleep(3)
        self.value = temp_val
        logging.info("Thread %s about to release lock", name)
        #self.lock.release()
        logging.info("Thread %s: finishing update", name)
        #print('Value after update %d', self.value)

In [None]:
logging.getLogger().setLevel(logging.DEBUG)
db = NewDatabase()
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
logging.debug("Testing update. Starting value is %d.", db.value)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.map(db.changeValue, range(2))    
logging.debug("Testing update. Ending value is %d.", db.value)

In [None]:
help(threading.Lock())

Below code not working due to change in with block

In [None]:
class NewDatabase:
    
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
        
        
    def changeValue(self,name):
        with self._lock:
            logging.info("Thread %s has lock", name)
            temp_val = self.value
            logging.info("Thread %s: starting update", name)            
            temp_val = temp_val + 1
            time.sleep(1)
            self.value = temp_val
            logging.info("Thread %s about to release lock", name)        
        logging.info("Thread %s: finishing update", name)
        

#### Deadlock while using Lock and its resolution using RLock (Re-entrant lock)

Suppose a thread acquires a lock and it is not released and again we we try to access lock with same thread. This will result in deadlock.
To avoid this situation we use RLock.
With RLock same thread can aquire lock any no of times. Its just we need to release lock for each acquire.
Other threads need to wait until this thread releases the resource again.

##### Usecases of RLock

###### Recursion

In [None]:
lock = threading.RLock()
def a(...):
     with lock:

         a(...) # somewhere inside

###### You want to have thread-safe access from outside the class and use the same methods from inside the class:

In [None]:
class SomeClass:
    def __init__(self):
        self.a = 1
        self.b = 2
        self.lock = threading.RLock()

    def changeA(self):
        with self.lock:
            self.a = self.a + 1

    def changeB(self):
        with self.lock:
            self.b = self.b + self.a

    def changeAandB(self):
        # you can use chanceA and changeB thread-safe!
        with self.lock:
            self.changeA() # a usual lock would block at here
            self.changeB()

In [None]:
obj = SomeClass()
obj.changeA()

In [None]:
print(obj.a)
print(obj.b)

In [None]:
obj.changeB()

In [None]:
print(obj.a)
print(obj.b)

In [None]:
obj.changeAandB()

In [None]:
print(obj.a)
print(obj.b)

#### Semaphore

In [None]:
It is another syncronozation technique.
In this we can allow multiple thread to have an resource.
It has internal counter _val which is decreased whenever acquire() is called and incremented whenever 
release() is called.
when the _val becomes zero no other thread can enter until any thread calls release()
We can call any no of release(). It can be more than acquire().


In [None]:
class SemaphoreDatabase:
    
    def __init__(self):
        self.value = 0
        self.semaphore = threading.Semaphore(2)
        
    def changeValue(self,name):
        self.semaphore.acquire()
        logging.info("Thread %s has lock", name)
        temp_val = self.value
        logging.info("Thread %s: starting update", name)
        #print('Value before update %d', self.value)
        temp_val = temp_val + 1
        time.sleep(3)
        self.value = temp_val
        logging.info("Thread %s about to release lock", name)        
        self.semaphore.release()
        #self.semaphore.release()
        logging.info("Thread %s: finishing update", name)
        #print('Value after update %d', self.value)

In [None]:
logging.getLogger().setLevel(logging.DEBUG)
db = SemaphoreDatabase()
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
logging.debug("Testing update. Starting value is %d.", db.value)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(db.changeValue, range(5))    
logging.debug("Testing update. Ending value is %d.", db.value)

In [None]:
help(threading.Semaphore())

#### Bounded Semaphore

This is same as semaphore. The difference its just we can only call same no of release() which is equal to 
acquire().
If we call more no of release() than acquire() then we will get error.

In [25]:
class BundedSemaphoreDatabase:
    
    def __init__(self):
        self.value = 0
        #self.boundedSemaphore = threading.BoundedSemaphore(2)
        
    def changeValue(self,name):        
        temp_val = self.value
        logging.info("Thread %s: starting update", name)
        #print('Value before update %d', self.value)
        temp_val = temp_val + 1
        time.sleep(1)
        self.value = temp_val        
        logging.info("Thread %s: finishing update", name)
        #print('Value after update %d', self.value)

In [29]:
logging.getLogger().setLevel(logging.DEBUG)
db = BundedSemaphoreDatabase()
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
logging.debug("Testing update. Starting value is %d.", db.value)
bounded_semaphor = threading.Semaphore(2)

t1 = threading.Thread(target=db.changeValue,args=('1',))
t2 = threading.Thread(target=db.changeValue,args=('2',))
t3 = threading.Thread(target=db.changeValue,args=('3',))
t4 = threading.Thread(target=db.changeValue,args=('4',))
t5 = threading.Thread(target=db.changeValue,args=('5',))

bounded_semaphor.acquire()
bounded_semaphor.acquire()
t1.start()
t2.start()
t3.start()
bounded_semaphor.release()
bounded_semaphor.release()
bounded_semaphor.release()
t4.start()
t5.start()
logging.debug("Testing update. Ending value is %d.", db.value)

17:25:52: Testing update. Starting value is 0.
17:25:52: Thread 1: starting update
17:25:52: Thread 2: starting update
17:25:52: Thread 3: starting update
17:25:52: Thread 4: starting update
17:25:52: Testing update. Ending value is 0.
17:25:52: Thread 5: starting update


In [None]:
help(threading.BoundedSemaphore())

#### Barrier

#### Timer

A threading.Timer is a way to schedule a function to be called after a certain amount of time has passed.
t = threading.Timer(30.0, my_function)
You start the Timer by calling .start(). The function will be called on a new thread at some point after the specified time, but be aware that there is no promise that it will be called exactly at the time you want.
If you want to stop a Timer that you’ve already started, you can cancel it by calling .cancel().
Calling .cancel() after the Timer has triggered does nothing and does not produce an exception.
A Timer can be used to prompt a user for action after a specific amount of time. If the user does the action before the Timer expires, .cancel() can be called.

In [None]:
help(threading.Lock())

#### Events

#### Conditions