# Threading

- Thread is helpful for doing things faster (speed up program)
- Speed ups are not guaranteed
- Thread are generally used when doing IO bound tasks like reading/writing file, downloading stuffs from the internet

In [1]:
# Synchronous

import time

start = time.perf_counter()

def do_something() -> None:
    print("Sleeping for 1 second")
    time.sleep(1) # This means we are doing some IO bound tasks
    print("Done sleeping")
    
do_something()
do_something()

finish = time.perf_counter()

print(f"Finished in {round(finish-start, 2)} second(s)")

Sleeping for 1 second
Done sleeping
Sleeping for 1 second
Done sleeping
Finished in 2.01 second(s)


In [2]:
# Making it asynchronous using threads

import threading
import time

start = time.perf_counter()

def do_something() -> None:
    print("Sleeping for 1 second")
    time.sleep(1) # This means we are doing some IO bound tasks
    print("Done sleeping")
    
t1 = threading.Thread(target=do_something)
t2 = threading.Thread(target=do_something)

t1.start()
t2.start()

finish = time.perf_counter()

print(f"Finished in {round(finish-start, 2)} second(s)")

Sleeping for 1 second
Sleeping for 1 second
Finished in 0.0 second(s)


As you can see in above cell that `Finished in 0.0 second(s)` appeard first and only shows `0.0 second(s)` which is not the expected behavoiur. This is because when we start our each thread using the `start()` method, we only kicked off thread but didn't wait for our task to finish. To wait for our thread we'll use the `join()` method.

In [3]:
# Making it asynchronous using threads

import threading
import time

start = time.perf_counter()

def do_something() -> None:
    print("Sleeping for 1 second", flush=True)
    time.sleep(1) # This means we are doing some IO bound tasks
    print("Done sleeping", end="\n", flush=True)
    
t1 = threading.Thread(target=do_something)
t2 = threading.Thread(target=do_something)

# Start the thread
t1.start()
t2.start()

# Wait for each thread to complete
t1.join()
t2.join()

finish = time.perf_counter()

print(f"Finished in {round(finish-start, 2)} second(s)")

Sleeping for 1 second
Sleeping for 1 second
Done sleeping
Done sleeping
Done sleepingDone sleeping

Finished in 1.0 second(s)


Running threaded tasks multiple times can be done by using loop.

In [4]:
# Running multiple threads with argument

import threading
import time

start = time.perf_counter()

def do_something(n_sec: int) -> None:
    """ A arbitary function that does something
    
    Args:
        n_sec: Number of seconds we want to sleep
    """
    print("Sleeping for 1 second", flush=True)
    time.sleep(n_sec) # This means we are doing some IO bound tasks
    print("Done sleeping", end="\n", flush=True)
    
threads = [] # a list to maintain all thread objects

for _ in range(10):
    t = threading.Thread(target=do_something, args=(1.5,))
    t.start() # start every thread
    threads.append(t)
    
for thread in threads:
    thread.join() # Wait for every thread to complete

finish = time.perf_counter()

print(f"Finished in {round(finish-start, 2)} second(s)")

Sleeping for 1 second
Sleeping for 1 secondSleeping for 1 second

Sleeping for 1 second
Sleeping for 1 secondSleeping for 1 second
Sleeping for 1 secondSleeping for 1 second


Sleeping for 1 secondSleeping for 1 second

Done sleepingDone sleeping
Done sleepingDone sleepingDone sleepingDone sleepingDone sleepingDone sleeping

Done sleeping

Done sleeping




Finished in 1.51 second(s)


## After Python3.2 we have a ThreadPoolExecutor

In [5]:
from concurrent.futures import ThreadPoolExecutor as Executor

import time

start = time.perf_counter()

def do_something(n_sec: int) -> str:
    """ A arbitary function that does something
    
    Args:
        n_sec: Number of seconds we want to sleep
    """
    print(f"Sleeping for {n_sec} second", flush=True)
    time.sleep(n_sec) # This means we are doing some IO bound tasks
    return "Done sleeping"
    
with Executor() as executor:
    # Submit is used when we want to execute jobs one at a time
    # The submit() method returns a Future object
    
    # Future object encapsulates the execution of our 
    # function and allows us to check in on it after it's 
    # scheduled whether it's executing or returning any values
    f1 = executor.submit(do_something, 1.5) 
    print(f1.result())

finish = time.perf_counter()

print(f"Finished in {round(finish-start, 2)} second(s)")

Sleeping for 1.5 second
Done sleeping
Finished in 1.5 second(s)


In [6]:
# Doing thread pool executor multiple times

import concurrent.futures as cf
import time

start = time.perf_counter()

def do_something(n_sec: int) -> str:
    """ A arbitary function that does something
    
    Args:
        n_sec: Number of seconds we want to sleep
    """
    print(f"Sleeping for {n_sec} second", flush=True)
    time.sleep(n_sec) # This means we are doing some IO bound tasks
    return f"Done sleeping {n_sec}"
    
with cf.ThreadPoolExecutor() as executor:
    seconds = [5, 4, 3, 2, 1]
    results = [executor.submit(do_something, sec) for sec in seconds]
    
    # as_completed yields results in order they are completed
    for f in cf.as_completed(results):
        print(f.result())

finish = time.perf_counter()

print(f"Finished in {round(finish-start, 2)} second(s)")

Sleeping for 5 second
Sleeping for 4 second
Sleeping for 3 second
Sleeping for 2 second
Sleeping for 1 second
Done sleeping 1
Done sleeping 2
Done sleeping 3
Done sleeping 4
Done sleeping 5
Finished in 5.01 second(s)


In [7]:
# Doing thread pool executor multiple times

import concurrent.futures as cf
import time

start = time.perf_counter()

def do_something(n_sec: int) -> str:
    """ A arbitary function that does something
    
    Args:
        n_sec: Number of seconds we want to sleep
    """
    print(f"Sleeping for {n_sec} second", flush=True)
    time.sleep(n_sec) # This means we are doing some IO bound tasks
    return f"Done sleeping {n_sec}"
    
with cf.ThreadPoolExecutor() as executor:
    seconds = [5, 4, 3, 2, 1]
    
    # map returns results in order they are started
    results = executor.map(do_something, seconds) 
    
    # as_completed yields results as they complete in our future object
    for result in results:
        print(result)

finish = time.perf_counter()

print(f"Finished in {round(finish-start, 2)} second(s)")

Sleeping for 5 second
Sleeping for 4 second
Sleeping for 3 second
Sleeping for 2 second
Sleeping for 1 second
Done sleeping 5
Done sleeping 4
Done sleeping 3
Done sleeping 2
Done sleeping 1
Finished in 5.01 second(s)


In [8]:
# Handling exception while doing thread pool executor multiple times

import concurrent.futures as cf
import time
import traceback

start = time.perf_counter()

def divide(numerator: int, denominator: int) -> float:
    """ A function that does magic division
    
    Args:
        numerator: numerator for our division function
        denominator: denominator for our division function
    """
    print(f"Dividing {numerator} by {denominator}")
    div_result = numerator/denominator
    return div_result
    
with cf.ThreadPoolExecutor() as executor:
    nume = [5, 4, 3, 2, 1]
    denom = [2, 1, 4, 0, 6]
    results = [executor.submit(divide, n, d) for n, d in zip(nume, denom)]
    
    for f in cf.as_completed(results):
        # We need to handle exception as we iterate through result
        try:
            print(f.result())
        except Exception as e:
            print("Exception occurred)")
            print(traceback.format_exc())

finish = time.perf_counter()

print(f"Finished in {round(finish-start, 2)} second(s)")

Dividing 5 by 2Dividing 4 by 1
Dividing 3 by 4
Dividing 2 by 0

Dividing 1 by 6
0.16666666666666666
0.75
Exception occurred)
Traceback (most recent call last):
  File "<ipython-input-8-b1dc594d15d8>", line 28, in <module>
    print(f.result())
  File "/Users/sagar-giri/opt/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/Users/sagar-giri/opt/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/Users/sagar-giri/opt/anaconda3/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "<ipython-input-8-b1dc594d15d8>", line 17, in divide
    div_result = numerator/denominator
ZeroDivisionError: division by zero

4.0
2.5
Finished in 0.01 second(s)


# Downloading stuffs from Internet using Thread
This is the example for network bound IO task

In [9]:
# synchronous download

import requests
import time
import concurrent.futures

img_urls = [
    'https://images.unsplash.com/photo-1516117172878-fd2c41f4a759',
    'https://images.unsplash.com/photo-1532009324734-20a7a5813719',
    'https://images.unsplash.com/photo-1524429656589-6633a470097c',
    'https://images.unsplash.com/photo-1530224264768-7ff8c1789d79',
    'https://images.unsplash.com/photo-1564135624576-c5c88640f235',
    'https://images.unsplash.com/photo-1541698444083-023c97d3f4b6',
    'https://images.unsplash.com/photo-1522364723953-452d3431c267',
    'https://images.unsplash.com/photo-1513938709626-033611b8cc03',
    'https://images.unsplash.com/photo-1507143550189-fed454f93097',
    'https://images.unsplash.com/photo-1493976040374-85c8e12f0c0e',
    'https://images.unsplash.com/photo-1504198453319-5ce911bafcde',
    'https://images.unsplash.com/photo-1530122037265-a5f1f91d3b99',
    'https://images.unsplash.com/photo-1516972810927-80185027ca84',
    'https://images.unsplash.com/photo-1550439062-609e1531270e',
    'https://images.unsplash.com/photo-1549692520-acc6669e2f0c'
]

t1 = time.perf_counter()


def download_image(img_url):
    img_bytes = requests.get(img_url).content
    img_name = img_url.split('/')[3]
    img_name = f'{img_name}.jpg'
    with open(img_name, 'wb') as img_file:
        img_file.write(img_bytes)
        print(f'{img_name} was downloaded...')

for url in img_urls:
    download_image(url)

t2 = time.perf_counter()

print(f'Finished in {t2-t1} seconds')

photo-1516117172878-fd2c41f4a759.jpg was downloaded...
photo-1532009324734-20a7a5813719.jpg was downloaded...
photo-1524429656589-6633a470097c.jpg was downloaded...
photo-1530224264768-7ff8c1789d79.jpg was downloaded...
photo-1564135624576-c5c88640f235.jpg was downloaded...
photo-1541698444083-023c97d3f4b6.jpg was downloaded...
photo-1522364723953-452d3431c267.jpg was downloaded...
photo-1513938709626-033611b8cc03.jpg was downloaded...
photo-1507143550189-fed454f93097.jpg was downloaded...
photo-1493976040374-85c8e12f0c0e.jpg was downloaded...
photo-1504198453319-5ce911bafcde.jpg was downloaded...
photo-1530122037265-a5f1f91d3b99.jpg was downloaded...
photo-1516972810927-80185027ca84.jpg was downloaded...
photo-1550439062-609e1531270e.jpg was downloaded...
photo-1549692520-acc6669e2f0c.jpg was downloaded...
Finished in 26.457827560000002 seconds


In [10]:
# assynchronous download using thread concurrent

import requests
import time
import concurrent.futures as cf

img_urls = [
    'https://images.unsplash.com/photo-1516117172878-fd2c41f4a759',
    'https://images.unsplash.com/photo-1532009324734-20a7a5813719',
    'https://images.unsplash.com/photo-1524429656589-6633a470097c',
    'https://images.unsplash.com/photo-1530224264768-7ff8c1789d79',
    'https://images.unsplash.com/photo-1564135624576-c5c88640f235',
    'https://images.unsplash.com/photo-1541698444083-023c97d3f4b6',
    'https://images.unsplash.com/photo-1522364723953-452d3431c267',
    'https://images.unsplash.com/photo-1513938709626-033611b8cc03',
    'https://images.unsplash.com/photo-1507143550189-fed454f93097',
    'https://images.unsplash.com/photo-1493976040374-85c8e12f0c0e',
    'https://images.unsplash.com/photo-1504198453319-5ce911bafcde',
    'https://images.unsplash.com/photo-1530122037265-a5f1f91d3b99',
    'https://images.unsplash.com/photo-1516972810927-80185027ca84',
    'https://images.unsplash.com/photo-1550439062-609e1531270e',
    'https://images.unsplash.com/photo-1549692520-acc6669e2f0c'
]

t1 = time.perf_counter()


def download_image(img_url):
    img_bytes = requests.get(img_url).content
    img_name = img_url.split('/')[3]
    img_name = f'{img_name}.jpg'
    with open(img_name, 'wb') as img_file:
        img_file.write(img_bytes)
        return f'{img_name} was downloaded...'

with cf.ThreadPoolExecutor() as executor:
    futures = [executor.submit(download_image, url) for url in img_urls]
    
    for f in cf.as_completed(futures):
        print(f.result())
    
t2 = time.perf_counter()

print(f'Finished in {t2-t1} seconds')

photo-1516117172878-fd2c41f4a759.jpg was downloaded...
photo-1564135624576-c5c88640f235.jpg was downloaded...
photo-1507143550189-fed454f93097.jpg was downloaded...
photo-1549692520-acc6669e2f0c.jpg was downloaded...
photo-1530224264768-7ff8c1789d79.jpg was downloaded...
photo-1516972810927-80185027ca84.jpg was downloaded...
photo-1504198453319-5ce911bafcde.jpg was downloaded...
photo-1524429656589-6633a470097c.jpg was downloaded...
photo-1530122037265-a5f1f91d3b99.jpg was downloaded...
photo-1550439062-609e1531270e.jpg was downloaded...
photo-1522364723953-452d3431c267.jpg was downloaded...
photo-1513938709626-033611b8cc03.jpg was downloaded...
photo-1532009324734-20a7a5813719.jpg was downloaded...
photo-1541698444083-023c97d3f4b6.jpg was downloaded...
photo-1493976040374-85c8e12f0c0e.jpg was downloaded...
Finished in 11.623833784999995 seconds


In [12]:
# Percentage of speed gained:

# Note: Speed entirery depends on your internet connection speed

synchronous = 26.457827560000002
asynchronous = 11.623833784999995

percentage = round((synchronous-asynchronous)/(synchronous+asynchronous)*100, 2)
print(percentage)

38.95
