<a href="https://colab.research.google.com/github/Prathapnagaraj/training2020/blob/master/Multiprocessing_and_Multithreading.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Parallel / Concurrent Programming in Python

In Parallel programming we divide problem into small units which can be run parallel.

In Cuncurrent Programming executing the task in cooperative manner.
Lets say we have 5 processes are running in 2 core CPU. ideally 2 process can run at a time . let say one of the process is waiting for user response . mean time that particular process is paused and another process is scheduled to run. Similarly all of the processes will be completed in co-operative manner  

Parallel/Concurrent programming can increase the number of tasks done by your program which reduces the overall processing time

Python provide lot of sophisticated tools for managing cocurrent operations using processes and threads. 

egs:Lets say i want to make sure list of website is always accessible from my private network . if not i need to notify admin saying what are the sites not accessible 

In [1]:
import time
import logging
import requests
 
 
class WebsiteDownException(Exception):
    pass
 
 
def ping_website(address, timeout=20):
    """
    Check if a website is down. A website is considered down 
    if either the status_code >= 400 or if the timeout expires
     
    Throw a WebsiteDownException if any of the website down conditions are met
    """
    try:
        response = requests.head(address, timeout=timeout)
        if response.status_code >= 400:
            logging.warning("Website %s returned status_code=%s" % (address, response.status_code))
            raise WebsiteDownException()
    except requests.exceptions.RequestException:
        logging.warning("Timeout expired for website %s" % address)
        raise WebsiteDownException()
         
 
def notify_owner(address):
    """ 
    Send the owner of the address a notification that their website is down 
     
    For now, we're just going to sleep for 0.5 seconds but this is where 
    you would send an email, push notification or text-message
    """
    logging.info("Notifying the owner of %s website" % address)
    time.sleep(0.5)
     
 
def check_website(address):
    """
    Utility function: check if a website is down, if so, notify the user
    """
    try:
        ping_website(address)
    except WebsiteDownException:
        notify_owner(address)

WEBSITE_LIST = [
    'http://envato.com',
    'http://amazon.co.uk',
    'http://amazon.com',
    'http://facebook.com',
    'http://google.com',
    'http://google.fr',
    'http://google.es',
    'http://google.co.uk',
    'http://internet.org',
    'http://gmail.com',
    'http://stackoverflow.com',
    'http://github.com',
    'http://heroku.com',
    'http://really-cool-available-domain.com',
    'http://djangoproject.com',
    'http://rubyonrails.org',
    'http://basecamp.com',
    'http://trello.com',
    'http://yiiframework.com',
    'http://shopify.com',
    'http://another-really-interesting-domain.co',
    'http://airbnb.com',
    'http://instagram.com',
    'http://snapchat.com',
    'http://youtube.com',
    'http://baidu.com',
    'http://yahoo.com',
    'http://live.com',
    'http://linkedin.com',
    'http://yandex.ru',
    'http://netflix.com',
    'http://wordpress.com',
    'http://bing.com',
]


Serial Approach

In [2]:
import time
 
 
start_time = time.time()
 
for address in WEBSITE_LIST:
    check_website(address)
         
end_time = time.time()        
 
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time))
 



Time for SerialSquirrel: 5.440550327301025secs


The Parallel Approach

In [5]:
import time
import socket
import multiprocessing
 
NUM_WORKERS = 2
 
start_time = time.time()
 
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
    results = pool.map_async(check_website, WEBSITE_LIST)
    results.wait()
 
end_time = time.time()        
 
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405



Time for MultiProcessingSquirrel: 3.2245585918426514secs


Cucurrent Approach

In [6]:
import time
from queue import Queue
from threading import Thread
 
NUM_WORKERS = 2
task_queue = Queue()
 
def worker():
    # Constantly check the queue for addresses
    while True:
        address = task_queue.get()
        check_website(address)
         
        # Mark the processed task as done
        task_queue.task_done()
 
start_time = time.time()
         
# Create the worker threads
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)]
 
# Add the websites to the task queue
[task_queue.put(item) for item in WEBSITE_LIST]
 
# Start all the workers
[thread.start() for thread in threads]
 
# Wait for all the tasks in the queue to be processed
task_queue.join()
 
         
end_time = time.time()        
 
print("Time for ThreadedSquirrel: %ssecs" % (end_time - start_time))



Time for ThreadedSquirrel: 2.3150153160095215secs


#Python Multiprocessing


Parallel processing is getting more attention nowdays . As CPU manufactures start adding more and more cores to their processors , creating parallel code is a great way to improve performance. **multiprocessing** is python module provide facitlity to develop parallel code

let look at simple example with single processing

In [None]:
import time

def basic_func(x):
    if x == 0:
        return 'zero'
    elif x%2 == 0:
        return 'even'
    else:
        return 'odd'
    
starttime = time.time()
for i in range(0,10):
    y = i*i
    time.sleep(2)
    print('{} squared results in a/an {} number'.format(i, basic_func(y)))
    
print('That took {} seconds'.format(time.time() - starttime))

0 squared results in a/an zero number
1 squared results in a/an odd number
2 squared results in a/an even number
3 squared results in a/an odd number
4 squared results in a/an even number
5 squared results in a/an odd number
6 squared results in a/an even number
7 squared results in a/an odd number
8 squared results in a/an even number
9 squared results in a/an odd number
That took 20.023037433624268 seconds


with multi processing

In [None]:
import time
import multiprocessing 

def basic_func(x):
    if x == 0:
        return 'zero'
    elif x%2 == 0:
        return 'even'
    else:
        return 'odd'

def multiprocessing_func(x):
    y = x*x
    time.sleep(2)
    print('{} squared results in a/an {} number'.format(x, basic_func(y)))
    
if __name__ == '__main__':
    starttime = time.time()
    processes = []
    for i in range(0,10):
        p = multiprocessing.Process(target=multiprocessing_func, args=(i,))
        processes.append(p)
        p.start()
        
    for process in processes:
        process.join()
        
    print('That took {} seconds'.format(time.time() - starttime))

0 squared results in a/an zero number
1 squared results in a/an odd number
2 squared results in a/an even number
3 squared results in a/an odd number
4 squared results in a/an even number
5 squared results in a/an odd number
6 squared results in a/an even number
7 squared results in a/an odd number
9 squared results in a/an odd number
8 squared results in a/an even number
That took 2.069441318511963 seconds


##multiprocessing basics

There are plenty of classes in python multiprocessing module for building a parallel program. Among them, three basic classes are Process, Queue and Lock. These classes will help you to build a parallel program

But before describing about those . let see how many cores you system supports

In [None]:
import multiprocessing

print("Number of cpu : ", multiprocessing.cpu_count())

Number of cpu :  2


Python multiprocessing Process class is an abstraction that sets up another Python process, provides it to run code and a way for the parent application to control execution.
There are two important functions that belongs to the Process class – start() and join() function

At first, we need to write a function, that will be run by the process. Then, we need to instantiate a process object.

If we create a process object, nothing will happen until we tell it to start processing via start() function. Then, the process will run and return its result. After that we tell the process to complete via join() function.

Without join() function call, process will remain idle and won’t terminate.

In [None]:
from multiprocessing import Process


def worker():
    """worker function"""
    print('Worker')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Process(target=worker)
        jobs.append(p)
        p.start()

Worker
Worker
Worker
Worker
Worker


It usually more useful to be able to spawn a process with arguments to tell it what work to do

In [None]:
from multiprocessing import Process


def print_func(continent='Asia'):
    print('The name of continent is : ', continent)

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # instantiating process with arguments
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # complete the processes
    for proc in procs:
        proc.join()

The name of continent is :  Asia
The name of continent is :  America
The name of continent is :  Europe
The name of continent is :  Africa


#### Problem 1. create 5 process to find cube of numbers parallely using multi-processing


### Determining the Current Process

Naming processes is useful for keeping track of them, especially in applications with multiple types of processes running simultaneously

In [None]:
import multiprocessing
import time

def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')

def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')

if __name__ == '__main__':
    service = multiprocessing.Process( name='my_service', target=my_service, )
    worker_1 = multiprocessing.Process( name='worker 1', target=worker, )
    worker_2 = multiprocessing.Process(target=worker,) # default name

    worker_1.start()
    worker_2.start()
    service.start()

    worker_1.join()
    worker_2.join()
    service.join()

worker 1 Starting
Process-59 Starting
my_service Starting
my_service Exiting
worker 1 Exiting
Process-59 Exiting


######egs: created 2 process to add and substract operation in parallel. 

###Daemon Processes

By default, the main program will not exit until all of the children have exited. There are times when starting a background process that runs without blocking the main program from exiting is useful.
To mark a process as a daemon, set its daemon attribute to True. The default is for processes to not be daemons.

In [None]:
from multiprocessing import Process
import time
import sys

def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()

def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(3)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()

if __name__ == '__main__':
    d = Process(name='daemon', target=daemon,)
    d.daemon = True

    n = Process(name='non-daemon',target=non_daemon,)
    n.daemon = False
    n.start()
    time.sleep(1)
    d.start()
    for i in range(5):
      print(i)



Starting: non-daemon 939
0
1
2
3
4
Starting: daemon 942


### Waiting for Processes

To wait until a process has completed its work and exited, use the join() method.

In [None]:
import multiprocessing
from multiprocessing import Process
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = Process(name='daemon', target=daemon,)
    d.daemon = True

    n = Process(name='non-daemon',target=non_daemon,)
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

  
    n.join()
    d.join()

Starting: daemon
Starting: non-daemon
Exiting : non-daemon
Exiting : daemon


###Terminating Processes

Calling terminate() on a process object kills the child process

In [None]:
import multiprocessing


def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())

    p.start()
    print('DURING:', p, p.is_alive())

    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())

BEFORE: <Process(Process-55, initial)> False
DURING: <Process(Process-55, started)> True
TERMINATED: <Process(Process-55, started)> True
JOINED: <Process(Process-55, stopped[SIGTERM])> False


### Subclassing Process

Although the simplest way to start a job in a separate process is to use Process and pass a target function, it is also possible to use a custom subclass.

In [None]:
import multiprocessing


class Worker(multiprocessing.Process):

    def run(self):
        print('In {}'.format(self.name))
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

In Worker-64
In Worker-65
In Worker-67
In Worker-66
In Worker-68


## Process communication

A common use pattern for multiple processes is to divide a job up among several workers to run in parallel. Effective use of multiple processes usually requires some communication between them, so that work can be divided and results can be aggregated. A simple way to communicate between processes with multiprocessing is data sharing and message passing back and forth

### Shared memory

multiprocessing module provides Array and Value objects to share data between processes

*   Array: a ctypes array allocated from shared memory.
*   Value: a ctypes object allocated from shared memory.



Given below is a simple example showing use of Array and Value for sharing data between processes.


In [None]:
import multiprocessing 
  
def square_list(mylist, result, square_sum): 
    """ 
    function to square a given list 
    """
    print("p1 got %s from main process"%(mylist))
    # append squares of mylist to result array 
    for idx, num in enumerate(mylist): 
        result[idx] = num * num 
  
    # square_sum value 
    square_sum.value = sum(result) 
  
    # print result Array 
    print("Result(in process p1): {}".format(result[:])) 
  
    # print square_sum Value 
    print("Sum of squares(in process p1): {}".format(square_sum.value)) 
  
if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 
  
    # creating Array of int data type with space for 4 integers 
    result = multiprocessing.Array('i', 4) 
  
    # creating Value of int data type 
    square_sum = multiprocessing.Value('i') 
  
    # creating new process 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum)) 
  
    # starting process 
    p1.start() 
  
    # wait until process is finished 
    p1.join() 
  
    # print result array 
    print("Result(in main program): {}".format(result[:])) 
  
    # print square_sum Value 
    print("Sum of squares(in main program): {}".format(square_sum.value)) 

p1 got [1, 2, 3, 4] from main process
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30


###Queue

A simple way to communicate between process with multiprocessing is to use a Queue to pass messages back and forth. Any Python object can pass through a Queue.

In [None]:
import multiprocessing 
  
def square_list(mylist, q): 
    """ 
    function to square a given list 
    """
    # append squares of mylist to queue 
    for num in mylist:
        print("putting ", num,"To q" )       
        q.put(num * num) 
  
def print_queue(q): 
    """ 
    function to print queue elements 
    """
    print("Queue elements:") 
    while not q.empty(): 
        print("reading from q" )
        print(q.get()) 
    print("Queue is now empty!") 
  
if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 
  
    # creating multiprocessing Queue 
    q = multiprocessing.Queue() 
  
    # creating new processes 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, q)) 
    p2 = multiprocessing.Process(target=print_queue, args=(q,)) 
  
    # running process p1 to square list 
    p1.start()
    p2.start() 
    p1.join() 
  
    # running process p2 to get queue elements 
    #p2.start() 
    p2.join() 

putting  1 To q
Queue elements:
putting  2 To q
reading from q
1
reading from q
putting  3 To q
putting  4 To q
4
reading from q
9
reading from q
16
Queue is now empty!


## Signaling between Processes

The Event class provides a simple way to communicate state information between processes. An event can be toggled between set and unset states. Users of the event object can wait for it to change from unset to set, using an optional timeout value

In [None]:
import multiprocessing
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())

if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,),)
    w1.start()

    w2 = multiprocessing.Process(name='nonblock',target=wait_for_event_timeout, args=(e, 2),)
    w2.start()

    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')

wait_for_event: starting
wait_for_event_timeout: starting
main: waiting before calling Event.set()
wait_for_event_timeout: e.is_set()-> False
wait_for_event: e.is_set()-> True
main: event is set


## Controlling Access to Resources

In situations when a single resource needs to be shared between multiple processes, a Lock can be used to avoid conflicting accesses

In [None]:
import multiprocessing
import sys

def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquired via with\n')

def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquired directly\n')
    finally:
        lock.release()

lock = multiprocessing.Lock()
w = multiprocessing.Process(target=worker_with, args=(lock, sys.stdout),)
nw = multiprocessing.Process(target=worker_no_with, args=(lock, sys.stdout))

w.start()
nw.start()

w.join()
nw.join()

Lock acquired via with
Lock acquired directly


## Synchronizing Operations

Process synchronization is defined as a mechanism which ensures that two or more concurrent processes do not simultaneously execute some particular program segment known as critical section

Consider the program below to understand the concept of race condition:

In [None]:
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance):     
    for _ in range(10000): 
        balance.value = balance.value - 1   
  
# function to deposit to account 
def deposit(balance):     
    for _ in range(10000): 
        balance.value = balance.value + 1  
  
def perform_transactions(): 
  
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
  
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,)) 
  
    # starting processes 
    p1.start() 
    p2.start() 
  
    # wait until processes are finished 
    p1.join() 
    p2.join() 
  
    # print final balance 
    print("Final balance = {}".format(balance.value)) 
  
if __name__ == "__main__": 
    for _ in range(10): 
  
        # perform same transaction process 10 times 
        perform_transactions() 

Final balance = -273
Final balance = -539
Final balance = 87
Final balance = -726
Final balance = -993
Final balance = 1571
Final balance = 1293
Final balance = -117
Final balance = -575
Final balance = -1123


In above program, 10000 withdraw and 10000 deposit transactions are carried out with initial balance as 100. The expected final balance is 100 but what we get in 10 iterations of perform_transactions function is some different values.

This happens due to concurrent access of processes to the shared data balance. This unpredictability in balance value is nothing but race condition.

multiprocessing module provides a Lock class to deal with the race conditions. Lock is implemented using a Semaphore object provided by the Operating System

In [None]:
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance, lock):     
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value - 1
        lock.release() 
  
# function to deposit to account 
def deposit(balance, lock):     
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value + 1
        lock.release() 
  
def perform_transactions(): 
  
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
  
    # creating a lock object 
    lock = multiprocessing.Lock() 
  
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,lock)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,lock)) 
  
    # starting processes 
    p1.start() 
    p2.start() 
  
    # wait until processes are finished 
    p1.join() 
    p2.join() 
  
    # print final balance 
    print("Final balance = {}".format(balance.value)) 
  
if __name__ == "__main__": 
    for _ in range(10): 
  
        # perform same transaction process 10 times 
        perform_transactions() 

Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100


Let us try to understand the above code step by step:


Firstly, a Lock object is created using:
lock = multiprocessing.Lock()

Then, lock is passed as target function argument:
p1 = multiprocessing.Process(target=withdraw, args=(balance,lock))
 p2 = multiprocessing.Process(target=deposit, args=(balance,lock))

In the critical section of target function, we apply lock using lock.acquire() method. As soon as a lock is acquired, no other process can access its critical section until the lock is released using lock.release() method.
lock.acquire().
balance.value = balance.value - 1.
lock.release().

As you can see in the results, the final balance comes out to be 100 every time (which is the expected final result).

## Pooling between processes

Let us consider a simple program to find squares of numbers in a given list.

In [None]:
def square(n): 
    return (n*n) 
  
if __name__ == "__main__": 

    # input list 
    mylist = [1,2,3,4,5,6,7,8,9,10] 
  
    # empty list to store result 
    result = [] 

    for num in mylist: 
        result.append(square(num)) 
  
    print(result) 

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


Here the entire code will run on single core. Only one of the cores is used for program execution and it’s quite possible that other cores remain idle

In order to utilize all the cores, multiprocessing module provides a Pool class. The Pool class represents a pool of worker processes. It has methods which allows tasks to be offloaded to the worker processes in a few different ways

In [None]:
# Python program to understand  
# the concept of pool 
import multiprocessing 
import os 
  
def square(n): 
    print("Worker process id for {0}: {1}".format(n, os.getpid())) 
    return (n*n) 
  
if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4,5,6,7,8,9,10] 
    # creating a pool object 
    p = multiprocessing.Pool()
    # map list to target function 
    result = p.map(square, mylist) 
  
    print(result) 

Worker process id for 1: 1911
Worker process id for 3: 1912
Worker process id for 4: 1912
Worker process id for 2: 1911
Worker process id for 7: 1911
Worker process id for 5: 1912
Worker process id for 6: 1912
Worker process id for 8: 1911
Worker process id for 9: 1912
Worker process id for 10: 1912
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


# Python multithreading

A Thread or a Thread of Execution is defined in computer science as the smallest unit that can be scheduled in an operating system. Threads are normally created by a fork of a computer script or program in two or more parallel (which is implemented on a single processor by multitasking) tasks. Threads are usually contained in processes. More than one thread can exist within the same process. These threads share the memory and the state of the process

## Thread Objects

In Python, the threading module provides a very simple and intuitive API for spawning multiple threads in a program. Using threads allows a program to run multiple operations concurrently in the same process space

Let us consider a simple example using threading module:

In [None]:
import threading 
  
def print_cube(num): 
    """ 
    function to print cube of given num 
    """
    print("Cube: {}".format(num * num * num)) 
  
def print_square(num): 
    """ 
    function to print square of given num 
    """
    print("Square: {}".format(num * num)) 
  
if __name__ == "__main__": 
    # creating thread 
    t1 = threading.Thread(target=print_square, args=(10,)) 
    t2 = threading.Thread(target=print_cube, args=(10,)) 
  
    # starting thread 1 
    t1.start() 
    # starting thread 2 
    t2.start() 
    print("\nmain thread idle")
    # wait until thread 1 is completely executed 
    t1.join() 
    # wait until thread 2 is completely executed 
    t2.join() 
  
    # both threads completely executed 
    print("Done!") 

Square: 100
Cube: 1000

main thread idle
Done!


![alt text](https://media.geeksforgeeks.org/wp-content/uploads/multithreading-python-4.png)

Problem 3. create 2 thread to add and sub of two of numbers cuncurrently using multi-threading

### Determining the Current Thread

Using arguments to identify or name the thread is cumbersome and unnecessary. Each Thread instance has a name with a default value that can be changed as the thread is created. Naming threads is useful in server processes with multiple service threads handling different operations

In [None]:
import threading
import time

def worker():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.2)
    print(threading.current_thread().getName(), 'Exiting')

def my_service():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.3)
    print(threading.current_thread().getName(), 'Exiting')

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # use default name

w.start()
w2.start()
t.start()

w.join()
w2.join()
t.join()

worker Starting
Thread-41 Starting
my_service Starting
worker Exiting
Thread-41 Exiting
my_service Exiting


The debug output includes the name of the current thread on each line. The lines with "Thread-N" in the thread name column correspond to the unnamed thread w2.

### Daemon vs. Non-Daemon Threads

Sometimes programs spawn a thread as a daemon that runs without blocking the main program from exiting. o mark a thread as a daemon, pass daemon=True when constructing it or call its set_daemon() method with True. The default is for threads to not be daemons

In [None]:
import time
import logging


def daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()
time.sleep(5)
for i in range(5):
  print(i)

0
1
2
3
4


By default, join() blocks indefinitely. It is also possible to pass a float value representing the number of seconds to wait for the thread to become inactive. If the thread does not complete within the timeout period, join() returns anyway.

In [None]:
import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join(0.1)
print('d.isAlive()', d.isAlive())
time.sleep(0.2)
print('d.isAlive()', d.isAlive())
t.join()

d.isAlive() True
d.isAlive() False


### Subclassing Thread

At start-up, a Thread does some basic initialization and then calls its run() method, which calls the target function passed to the constructor. To create a subclass of Thread, override run() to do whatever is necessary

In [1]:
import threading
import logging


class MyThreadWithArgs(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        super().__init__(group=group, target=target, name=name,
                         daemon=daemon)
        self.args = args
        self.kwargs = kwargs

    def run(self):
        logging.debug('running with %s and %s',
                      self.args, self.kwargs)


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
    t.start()

(Thread-4  ) running with (0,) and {'a': 'A', 'b': 'B'}
(Thread-5  ) running with (1,) and {'a': 'A', 'b': 'B'}
(Thread-6  ) running with (2,) and {'a': 'A', 'b': 'B'}
(Thread-7  ) running with (3,) and {'a': 'A', 'b': 'B'}
(Thread-8  ) running with (4,) and {'a': 'A', 'b': 'B'}
(MainThread) Loaded backend module://ipykernel.pylab.backend_inline version unknown.


### Signaling Between Threads

Although the point of using multiple threads is to run separate operations concurrently, there are times when it is important to be able to synchronize the operations in two or more threads. Event objects are a simple way to communicate between threads safely. An Event manages an internal flag that callers can control with the set() and clear() methods. Other threads can use wait() to pause until the flag is set, effectively blocking progress until allowed to continue

In [4]:
import logging
import threading
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.is_set():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

e = threading.Event()
t1 = threading.Thread(name='block', target=wait_for_event, args=(e,),)
t1.start()

t2 = threading.Thread(name='nonblock',target=wait_for_event_timeout, args=(e, 2),)
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(2)
e.set()
logging.debug('Event is set')

t1.join()
t2.join()

(block     ) wait_for_event starting
(nonblock  ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(nonblock  ) event set: False
(nonblock  ) doing other work
(nonblock  ) wait_for_event_timeout starting
(MainThread) Event is set
(block     ) event set: True
(nonblock  ) event set: True
(nonblock  ) processing event


The wait() method takes an argument representing the number of seconds to wait for the event before timing out. It returns a Boolean indicating whether or not the event is set, so the caller knows why wait() returned. The is_set() method can be used separately on the event without fear of blocking.

In this example, wait_for_event_timeout() checks the event status without blocking indefinitely. The wait_for_event() blocks on the call to wait(), which does not return until the event status changes.

### Synchronizing Threads

Condition object one of the way of synchronizing threads.Because the Condition uses a Lock, it can be tied to a shared resource, allowing multiple threads to wait for the resource to be updated. In this example, the consumer() threads wait for the Condition to be set before continuing. The producer() thread is responsible for setting the condition and notifying the other threads that they can continue.

In [5]:
import logging
import threading
import time

def consumer(cond):
    """wait for the condition and use the resource"""
    logging.debug('Starting consumer thread')
    with cond:
        cond.wait()
        logging.debug('Resource is available to consumer')

def producer(cond):
    """set up the resource to be used by the consumer"""
    logging.debug('Starting producer thread')
    with cond:
        logging.debug('Making resource available')
        cond.notifyAll()


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
                      args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
                      args=(condition,))
p = threading.Thread(name='p', target=producer,
                     args=(condition,))

c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()

(c1        ) Starting consumer thread
(c2        ) Starting consumer thread
(p         ) Starting producer thread
(p         ) Making resource available
(c2        ) Resource is available to consumer
(c1        ) Resource is available to consumer


### Controlling Access to Resources

In addition to synchronizing the operations of threads, it is also important to be able to control access to shared resources to prevent corruption or missed data. Python’s built-in data structures (lists, dictionaries, etc.) are thread-safe as a side-effect of having atomic byte-codes for manipulating them

Consider the program below to understand the concept of race condition

In [6]:
import threading 
  
# global variable x 
x = 0
  
def thread_task(): 
    """ 
    task for thread 
    calls increment function 100000 times. 
    """
    global x
    for _ in range(100000): 
        x += 1
  
def main_task(): 
    global x 
    # setting global variable x as 0 
    x = 0
  
    # creating threads 
    t1 = threading.Thread(target=thread_task) 
    t2 = threading.Thread(target=thread_task) 
  
    # start threads 
    t1.start() 
    t2.start() 
  
    # wait until threads finish their job 
    t1.join() 
    t2.join() 
  
if __name__ == "__main__": 
    for i in range(10): 
        main_task() 
        print("Iteration {0}: x = {1}".format(i,x))

Iteration 0: x = 200000
Iteration 1: x = 200000
Iteration 2: x = 200000
Iteration 3: x = 200000
Iteration 4: x = 131026
Iteration 5: x = 168276
Iteration 6: x = 200000
Iteration 7: x = 168346
Iteration 8: x = 169012
Iteration 9: x = 200000


The expected final value of x is 200000 but what we get in 10 iterations of main_task function is some different values.

This happens due to concurrent access of threads to the shared variable x. This unpredictability in value of x is nothing but race condition.

Hence, we need a tool for proper synchronization between multiple threads.threading module provides a Lock class to deal with the race conditions. Lock is implemented using a Semaphore object provided by the Operating System.

In [8]:
import threading 
  
# global variable x 
x = 0
  
def thread_task(lock): 
    """ 
    task for thread 
    calls increment function 100000 times. 
    """
    global x 
    for _ in range(100000): 
        lock.acquire() 
        x += 1 
        lock.release() 
  
def main_task(): 
    global x 
    # setting global variable x as 0 
    x = 0
  
    # creating a lock 
    lock = threading.Lock() 
  
    # creating threads 
    t1 = threading.Thread(target=thread_task, args=(lock,)) 
    t2 = threading.Thread(target=thread_task, args=(lock,)) 
  
    # start threads 
    t1.start() 
    t2.start() 
  
    # wait until threads finish their job 
    t1.join() 
    t2.join() 
  
if __name__ == "__main__": 
    for i in range(10): 
        main_task() 
        print("Iteration {0}: x = {1}".format(i,x)) 

Iteration 0: x = 200000
Iteration 1: x = 200000
Iteration 2: x = 200000
Iteration 3: x = 200000
Iteration 4: x = 200000
Iteration 5: x = 200000
Iteration 6: x = 200000
Iteration 7: x = 200000
Iteration 8: x = 200000
Iteration 9: x = 200000


As you can see in the results, the final value of x comes out to be 200000 every time (which is the expected final result).