# Thread

Python threading allows you to have different parts of your program run concurrently and can simplify your design.

In [1]:
import logging # To create logs
import threading
import time 

In [9]:
# Thread function
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

In [3]:
# Main thread, all lines of code specified under its scope 
# will be considered to be executed in main thread.

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    logging.info("Main : before starting a thread")
    
    # Creating instance of thread
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main : before running the thread")
    
    # Start the thread
    x.start()
    logging.info("Main : wait for the thread to finish")
    
    x.join()
    logging.info("Main : All operations are done")

15:23:47: Main : before starting a thread
15:23:47: Main : before running the thread
15:23:47: Thread 1: starting
15:23:47: Main : wait for the thread to finish
15:23:49: Thread 1: finishing
15:23:49: Main : All operations are done


In [4]:
# Multiple Threading

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
        
    threads = list()
    for i in range(5):
        logging.info("Main : before starting a thread %d",i)
        x = threading.Thread(target=thread_function, args=(i,))
        threads.append(x)
        
        #Start the thread 
        x.start()
        logging.info("Main : before running the thread")
    
    print(threads)
    for index,thread in enumerate(threads):
        logging.info("Main thread : before joining the thread %d ",index)
        thread.join()
        logging.info("Main : thread %d done",index)      
        

15:23:49: Main : before starting a thread 0
15:23:49: Thread 0: starting
15:23:49: Main : before running the thread
15:23:49: Main : before starting a thread 1
15:23:49: Thread 1: starting
15:23:49: Main : before running the thread
15:23:49: Main : before starting a thread 2
15:23:49: Thread 2: starting
15:23:49: Main : before running the thread
15:23:49: Main : before starting a thread 3
15:23:49: Thread 3: starting
15:23:49: Main : before running the thread
15:23:49: Main : before starting a thread 4
15:23:49: Thread 4: starting
15:23:49: Main : before running the thread
15:23:49: Main thread : before joining the thread 0 


[<Thread(Thread-7, started 11276)>, <Thread(Thread-8, started 14008)>, <Thread(Thread-9, started 32480)>, <Thread(Thread-10, started 27924)>, <Thread(Thread-11, started 25324)>]


15:23:51: Thread 1: finishing
15:23:51: Thread 0: finishing
15:23:51: Thread 2: finishing
15:23:51: Main : thread 0 done
15:23:51: Main thread : before joining the thread 1 
15:23:51: Main : thread 1 done
15:23:51: Main thread : before joining the thread 2 
15:23:51: Main : thread 2 done
15:23:51: Main thread : before joining the thread 3 
15:23:51: Thread 4: finishing
15:23:51: Thread 3: finishing
15:23:51: Main : thread 3 done
15:23:51: Main thread : before joining the thread 4 
15:23:51: Main : thread 4 done


### Daemon Threads
In computer science, a daemon is a process that runs in the background.

Python threading has a more specific meaning for daemon. A daemon thread will shut down immediately when the program exits. One way to think about these definitions is to consider the daemon thread a thread that runs in the background without worrying about shutting it down.

If a program is running Threads that are not daemons, then the program will wait for those threads to complete before it terminates. Threads that are daemons, however, are just killed wherever they are when the program is exiting.

In [5]:
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    logging.info("Main : before starting a thread")
    
    # Creating instance of thread
    x = threading.Thread(target=thread_function, args=(1,), daemon=True)
    logging.info("Main : before running the thread")
    
    # Start the thread
    x.start()
    logging.info("Main : wait for the thread to finish")
    
    x.join()
    logging.info("Main : All operations are done")

15:23:51: Main : before starting a thread
15:23:51: Main : before running the thread
15:23:51: Thread 1: starting
15:23:51: Main : wait for the thread to finish
15:23:53: Thread 1: finishing
15:23:53: Main : All operations are done


### ThreadPoolExecutor
Group Threads together. consider as a box where you add threads to. this will manage all your threads. 

In [10]:
import concurrent.futures

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
       
    with concurrent.futures.ThreadPoolExecutor(max_workers = 3) as executor:
        executor.map(thread_function,range(3))


17:08:52: Thread 0: starting
17:08:52: Thread 1: starting
17:08:52: Thread 2: starting
17:08:54: Thread 0: finishing
17:08:54: Thread 2: finishing
17:08:54: Thread 1: finishing


In [7]:
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
       
    with concurrent.futures.ThreadPoolExecutor(max_workers = 3) as executor:
        executor.map(thread_function,range(5))

15:23:55: Thread 0: starting
15:23:55: Thread 1: starting
15:23:55: Thread 2: starting
15:23:57: Thread 1: finishing
15:23:57: Thread 0: finishing
15:23:57: Thread 3: starting
15:23:57: Thread 4: starting
15:23:57: Thread 2: finishing
15:23:59: Thread 3: finishing
15:23:59: Thread 4: finishing


### Lock

mutex = mutual exclusion. We allow only one thread to read modify write code at a time

lock is like key to open door 
basic functions: aquire() & .release() 

In [12]:
class XDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()
        
    def locked_update(self, tname):
        logging.info("Thread %s: starting update", tname)
        
        with self._lock:
            logging.info("Thread %s: starting update", tname)
            temp = self.value
            temp += 1
            time.sleep(0.5)
            self.value = temp
            logging.info("Thread %s: finishing update", tname)


In [11]:
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    
    database = XDatabase()
    logging.info("Starting to update the %d value in the database.", database.value)

    with concurrent.futures.ThreadPoolExecutor(max_workers = 3) as executor:
        for tname in range(3):
            executor.submit(database.locked_update,tname)
            
    logging.info("Updated value in the database is %d.", database.value)


15:24:58: Starting to update the 0 value in the database.
15:24:58: Thread 0: starting update
15:24:58: Thread 0: starting update
15:24:58: Thread 1: starting update
15:24:58: Thread 2: starting update
15:24:59: Thread 0: finishing update
15:24:59: Thread 1: starting update
15:24:59: Thread 1: finishing update
15:24:59: Thread 2: starting update
15:25:00: Thread 2: finishing update
15:25:00: Updated value in the database is 3.


In [None]:
x1  = threading.Lock()
print("before first acquire")
x1.acquire()
print("before second acquire")
# x1.acquire() # --> 
print("before release")
x1.release() 

before first acquire
before second acquire


In [2]:
x1  = threading.Lock()
print("before first acquire")
x1.acquire()
print("before release")
x1.release() 
print("before second acquire")
x1.acquire()

before first acquire
before release
before second acquire


True

    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()
        
        
    def get_message(self,name):
        self.consumer_lock.acquire()
        message = self.message
        self.producer_lock.release()
        return message 
    
    
    def set_message(self,message,name):
        self.producer_lock.acquire()
        self.message = message 
        self.consumer_lock.release()

In [16]:
pip install queue

Note: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement queue (from versions: none)
ERROR: No matching distribution found for queue


In [14]:
import queue.Queue
class Pipeline(queue.Queue):
    
    def __init__(self):
        super().__init__(max)
    
    def get_message(self,name):        
        return self.get() 
    
    
    def set_message(self,message,name):
        self.put(message)
    

ModuleNotFoundError: No module named 'queue.Queue'; 'queue' is not a package

In [12]:
import random 

FLAG = object()

def producer(pipeline):
    for i in range(10):
        message = random.randInt(1,10)
        pipeline.set_message(message,"producer")
    
    pipeline.set_message(FLAG,"producer")
    

def consumer(pipeline):
    message =0
    while message is not FLAG:
        pipeline.get_message("consumer")
        if message is not FLAG:
            logging.info("Consumer is consuming/storing message %s.", message)
    
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    
    pipeline = Pipeline()
    event = threading.Event()

    with concurrent.futures.ThreadPoolExecutor(max_workers = 3) as executor:
        for tname in range(3):
            executor.submit(producer,pipeline,event)
            executor.submit(consumer,pipeline,event)
            

