# Concurrency

### This script have been created with a help of this RealPython tutorial: https://realpython.com/python-concurrency/

### Imports

In [1]:
import requests
import time

import concurrent.futures
import threading

import asyncio
import aiohttp
import nest_asyncio

import multiprocessing
from Code.Lab10.service import site_multiprocessing
from Code.Lab10.service.cpu_bound_class import cpu_bound_multiprocessing

### Non concurrent task

#### We show this version of our task to show the difference in processing time

This program just downloads the content from a URL and prints its size

In [2]:
def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            session.get(url)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time

print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 11.795653820037842 seconds


### Threading version

The overall structure stays the same with a little exception in the download_site method.
ThreadPoolExecutor creates a pool of threads in this example we limit the max threads to 5.
The .map() method automatically splits the work on the threads

In [3]:
thread_local = threading.local()

# Each thread creates a single session every time it calls this method and then keeps using that session
def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url, session):
    session.get(url)


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration2 = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration2} seconds")

Downloaded 160 in 0.0049915313720703125 seconds


As you can see the difference between both execution times is huge


In [4]:
print(f'To be specific: \n'
      f'First methods execution time: {duration} seconds \n'
      f'second methods execution time: {duration2} seconds' )

To be specific: 
First methods execution time: 11.795653820037842 seconds 
second methods execution time: 0.0049915313720703125 seconds


### Race condition
Race happens when different threads interact with each other, which can result in difficult to find errors:

In [5]:
def increment_counter(fake_value):
    global counter
    for _ in range(100):
        counter += 1

fake_data = [x for x in range(5000)]
counter = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=5000) as executor:
    executor.map(increment_counter, fake_data)

This program run on threads without any protection.
Every thread accessed the same global variable at the same time, which could lead to serious data flaws

### Asyncio Version

In this version a single Python object controls how and when each task gets run.
An important factor is the state of each task. Asyncio never gives up the control without intentionally doing so.
With this in mind we may use our resources less thoughtfully than with threads

In [6]:
async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.ensure_future(session.get(url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)

sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
nest_asyncio.apply()
# asyncio.run() is able for python 3.7+ users if you use an earlier version use:
# asyncio.get_event_loop().run_until_complete()
asyncio.run(download_all_sites(sites))
duration3 = time.time() - start_time
print(f"Downloaded {len(sites)} sites in {duration3} seconds")

Downloaded 160 sites in 0.6269824504852295 seconds


And again the execution time compare:

In [7]:
print(f'To be specific: \n'
      f'First methods execution time: {duration} seconds \n'
      f'Second methods execution time: {duration2} seconds\n'
      f'Third methods execution time: {duration3} seconds\n' )

To be specific: 
First methods execution time: 11.795653820037842 seconds 
Second methods execution time: 0.0049915313720703125 seconds
Third methods execution time: 0.6269824504852295 seconds



On my machine the second method was the fastest, on other the third may win. One for sure, the first will always lose.


### Multiprocessing version

In this version we are able to use more CPUs than only one!

## Disclaimer
Jupiter does not support multiprocessing so I had to export set_global_session and download_site_multiprocessing into an external .py class.
Check site_multiprocessing.py for the source code

In [8]:
def download_all_sites(sites):
    with multiprocessing.Pool(initializer=site_multiprocessing.set_global_session) as pool:
        pool.map(site_multiprocessing.download_site_multiprocessing, sites)

sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration4 = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration4} seconds")

Downloaded 160 in 2.2889537811279297 seconds


As we can see this one is much slower than the previous examples but may be very useful in specific cases
This delay occurred, because multiprocessing is implemented by creating a separate Python interpreter

Complete compare:

In [9]:
print(f'To be specific: \n'
      f'First methods execution time: {duration} seconds \n'
      f'Second methods execution time: {duration2} seconds\n'
      f'Third methods execution time: {duration3} seconds\n'
      f'Fourth methods execution time: {duration4} seconds')

To be specific: 
First methods execution time: 11.795653820037842 seconds 
Second methods execution time: 0.0049915313720703125 seconds
Third methods execution time: 0.6269824504852295 seconds
Fourth methods execution time: 2.2889537811279297 seconds


## Speed up a CPU-Bound program

### CPU-Bound Synchronous version

This program computes a sum of squares of each number from 0 to the passed argument value
We pass a large number so it should take a while to compute

In [10]:
def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


numbers = [5_000_000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration_cpu_bound_synchronous = time.time() - start_time
print(f"Duration {duration_cpu_bound_synchronous} seconds")

Duration 10.971799850463867 seconds


### Threading version

In [11]:
def find_sums_threading(numbers):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(cpu_bound, numbers)

numbers = [5_000_000 + x for x in range(20)]

start_time = time.time()
find_sums_threading(numbers)
duration_cpu_bound_threads = time.time() - start_time
print(f"Duration {duration_cpu_bound_threads} seconds")


Duration 10.291450500488281 seconds


### Asyncio version

In [12]:
async def find_sum_asyncio(numbers):
    tasks=[]
    for number in numbers:
        task = asyncio.ensure_future(cpu_bound_asyncio(number))
        tasks.append(task)
    await asyncio.gather(*tasks, return_exceptions=True)


async def cpu_bound_asyncio(number):
    return sum(i * i for i in range(number))


start_time = time.time()
nest_asyncio.apply()
asyncio.run(find_sum_asyncio(numbers))
duration_cpu_bound_asyncio = time.time() - start_time
print(f"Downloaded {len(sites)} sites in {duration_cpu_bound_asyncio} seconds")

Downloaded 160 sites in 11.848283052444458 seconds


Let us see the results:

In [13]:
print(f'Synchronous methods execution time: {duration_cpu_bound_synchronous} seconds \n'
      f'Threading methods execution time: {duration_cpu_bound_threads} seconds\n'
      f'Asyncio methods execution time: {duration_cpu_bound_asyncio} seconds')


Synchronous methods execution time: 10.971799850463867 seconds 
Threading methods execution time: 10.291450500488281 seconds
Asyncio methods execution time: 11.848283052444458 seconds


As we can see in this case using Threading and Asyncio is not worth investing the time, as the threading method is almost as slow as the synchronous method and Asyncio is even slower!


### Let´s try using Multiprocessing!


## Disclaimer
Jupiter does not support multiprocessing so I had to export cpu_bound_multiprocessing into an external .py class.
Check cpu_bound_class.py for the source code

In [14]:
def find_sums_multiprocessing(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound_multiprocessing, numbers)


numbers = [5_000_000 + x for x in range(20)]

start_time = time.time()
find_sums_multiprocessing(numbers)
duration_cpu_bound_multiprocessing = time.time() - start_time
print(f"Duration {duration_cpu_bound_multiprocessing} seconds")

Duration 3.6935839653015137 seconds


Now this is what I call performance!

And a direct compare, because... why not

In [15]:
print(f'Synchronous methods execution time: {duration_cpu_bound_synchronous} seconds \n'
      f'Threading methods execution time: {duration_cpu_bound_threads} seconds\n'
      f'Asyncio methods execution time: {duration_cpu_bound_asyncio} seconds\n'
      f'Multiprocessing methods execution time: {duration_cpu_bound_multiprocessing} seconds')


Synchronous methods execution time: 10.971799850463867 seconds 
Threading methods execution time: 10.291450500488281 seconds
Asyncio methods execution time: 11.848283052444458 seconds
Multiprocessing methods execution time: 3.6935839653015137 seconds
