**CONCURRENCY IN PYTHON PART-2**
**Learning Outcomes**


1.   Managing threads using ThreadPoolExecutor
2.   Concurrent Tasks using `asyncio`   





*   Turn one line numbers in code blocks by going to Tools -> Settings -> Editor -> **show line numbers**.
*   We will start with individual threads as we did during last week's lab.
*   The code below contains 3 threads, the main thread and threads 'ABC' and 'DEF'.
*   The main thread is made to wait for threads 'ABC' and 'DEF' using the `join` function.

**Code Block 01**

In [None]:
import time
import threading

def testfunc(name, sleepTime):
  print(f'testfunc-{name} started')
  time.sleep(sleepTime)
  print(f'testfunc-{name} ended')

if __name__ == '__main__':
  time1 = time.time()
  print('main started')
  threadA = threading.Thread(target=testfunc, args=['ABC', 2])
  threadB = threading.Thread(target=testfunc, args=['DEF', 2])
  threadA.start()
  threadB.start()
  threadA.join()
  threadB.join()
  runTime = time.time() - time1
  print(f"main ended in {round(runTime, 4)} sec")

main started
testfunc-ABC started
testfunc-DEF started
testfunc-ABC ended
testfunc-DEF ended
main ended in 2.0053 sec


**ThreadPoolExecutor**
If fine-grained control of threads is not needed, then thread management can often be delegated to Python's `ThreadPoolExecutor` class as shown below. Code block-02 uses the uses `ThreadPoolExecutor` to run testfunc on two separate threads named 'ABC' and 'DEF'.

**Code Block 02**

In [3]:
import time
import threading
import concurrent.futures

def testfunc(name, sleepTime):
    time1 = time.time()
    print(f'testfunc-{name} started')
    time.sleep(sleepTime)
    print(f'testfunc-{name} ended in {time.time() - time1 : .4f}')

if __name__ == '__main__':
    time1 = time.time()
    print('main started')
    names_list = ['ABC','DEF', 'GHI']
    sleeptimes_list = [2, 2, 5]
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.map(testfunc, names_list, sleeptimes_list)
    runTime = time.time() - time1
    print(f"main ended in {round(runTime, 4)} sec")

main started
testfunc-ABC started
testfunc-DEF started
testfunc-ABC ended in  2.0001
testfunc-GHI started
testfunc-DEF ended in  2.0001
testfunc-GHI ended in  5.0001
main ended in 7.0019 sec


**Task 8.1**
Modify `testfunc` in code block 02 to print its runtime and answer **Review Quiz Q1** related to this task. Your output should be as below:



> main started

> testfunc-ABC started

> testfunc-DEF started

> testfunc-ABC ended in 2.0035

> testfunc-DEF ended in 2.0029

> main ended in 2.0067 sec



**Review Quiz Q1** Copy-paste your `testfunc()` after completing task 8.1

**Task 8.2**


1.   Modify code block 02 to add a third thread named 'GHI' with `sleeptime` equal to 5. DO NOT make any changes to the existing `ThreadPoolExecutor` and its input parameters.
2.   Do some thinking about how the runtime can be reduced to less than 7 sec.
3.  Answer **Review Quiz Q2** related to this task.



**Review Quiz Q2**

Copy-Paste your code after completing task 8.2

**Review Quiz Q3**

Why is the total runtime 7 sec instead of 5 sec? Is there anyway we can reduce it to 5 sec?

**Task 8.3**

`asyncio` is another library in Python that can be used for thread management. The executor class uses the `ThreadPoolExecutor` to manage threads. `asyncio` employs an "Event Loop" to manage threads. Strictly speaking, `asyncio` uses the term 'Task' instead of 'Thread' for reasons that we will not delve into. The main feature that differentiates `asyncio` from `ThreadPoolExecutor` is that individual tasks in `asyncio` return control back the event loop only when they are willing to do so. This means that a task is in complete control of resources when it's running and is not interrupted during an operation. This setup makes resource sharing easier (compared to `threading`) and we do not have to worry about making things thread-safe.

Install the packages needed for `asyncio` using the commands below.

In [None]:
pip install asyncio

Collecting asyncio
  Downloading asyncio-3.4.3-py3-none-any.whl (101 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.8/101.8 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: asyncio
Successfully installed asyncio-3.4.3


In [None]:
pip install nest_asyncio



**Task 8.4**
`asyncio` uses the `asyn` and `await` keywords to indicate asynchronous tasks.
The `await` keyword can be thought of as something that enables a task to hand control back to the event loop. The `async` keyword must be added before the names of any functions that use `await`.

The `asyncio` based implementation of the code block 02 is shown below

**Code Block 03**

In [None]:
import asyncio
import nest_asyncio
import time

async def testfunc(name, sleeptime):
    print(f'testfunc-{name} started')
    await asyncio.sleep(sleeptime) #note this is asyncio.sleep not time.sleep()
    print(f'testfunc-{name} ended')

async def main():
    await asyncio.gather(testfunc('ABC', 2), testfunc('DEF', 2))

if __name__ == "__main__":
    import time
    time1 = time.time()
    nest_asyncio.apply()
    asyncio.run(main())
    runtime = time.time() - time1
    print(f"main executed in {runtime:0.2f} seconds.")

testfunc-ABC started
testfunc-DEF started
testfunc-ABC ended
testfunc-DEF ended
main executed in 2.00 seconds.


If you have a large number of concurrent tasks, then its better to create a list (or iterable) containing all your tasks and looping through them to create a tasks (or jobs) list that can be run asynchronously. An example is give below. Note the differences between the main function below and the main in code block 03. It may look like you are adding extra code however, it is needed when the number of tasks is large.

**Code Block 04**

In [4]:
import asyncio
import nest_asyncio
import time

async def testfunc(name, sleeptime):
    print(f'testfunc-{name} started')
    await asyncio.sleep(sleeptime) #note this is asyncio.sleep not time.sleep()
    print(f'testfunc-{name} ended')

async def main():
    names_list = ['ABC', 'DEF', 'GHI']
    sleeptime_list = [2, 2, 5]
    jobs = []
    for name, sleeptime in zip(names_list, sleeptime_list):   #zip allow us to iterate both lists simultaneously
      job = asyncio.ensure_future(testfunc(name, sleeptime))
      jobs.append(job)
    await asyncio.gather(*jobs, return_exceptions=True)

if __name__ == "__main__":
    import time
    time1 = time.time()
    nest_asyncio.apply()
    asyncio.run(main())
    runtime = time.time() - time1
    print(f"main executed in {runtime:0.2f} seconds.")

testfunc-ABC started
testfunc-DEF started
testfunc-GHI started
testfunc-ABC ended
testfunc-DEF ended
testfunc-GHI ended
main executed in 5.00 seconds.


**Task 8.5**

*   Modify code block 04 to add a third task named 'GHI' with `sleeptime` equal to 5
*   Answer **Review Quiz Q4** related to this task.



**Review Quiz Q4** Copy-paste your `main()` function after completing task 8.5

**Task 8.6 Sequential Web Downloads**

The code below downloads 120 webpages in a sequential manner. Run code block 05 and examine the runtime.

**Code Block 05**

In [5]:
import requests
import time


def get_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def get_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            get_site(url, session)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "https://en.wikipedia.org/wiki/Small",
    ] * 60
    time1 = time.time()
    get_all_sites(sites)
    runtime = time.time() - time1
    print(f"Downloaded {len(sites)} in {runtime} seconds")

Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small

**Task 8.7 Threading based Web Downloads**

Web downloads are a good example of IO-bound load and can be run concurrently using `threading` and `ThreadPoolExecutor` as shown in code block 06. Run code block 06 and compare the runtime with that of code block 05. You should observe a reduction in runtime.

**Code Block 06**

In [6]:
import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def get_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def get_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(get_site, sites)


if __name__ == "__main__":
    sites = [
       "https://www.jython.org",
       "https://en.wikipedia.org/wiki/Small",
    ] * 60
    time1 = time.time()
    get_all_sites(sites)
    runtime = time.time() - time1
    print(f"Downloaded {len(sites)} in {runtime} seconds")

Read 10783 from https://www.jython.org
Read 10783 from https://www.jython.org
Read 10783 from https://www.jython.org
Read 10783 from https://www.jython.org
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 10783 from https://www.jython.org
Read 51146 from https://en.wikipedia.org/wiki/Small
Read 10783 from https://www.jython.org
Read 10783 from https://www.jython.orgRead 51146 from https://en

**Task 8.8 asyncio based Web Downloads**

1.   You will now perform web downloads using `asyncio`.
2.   You will need to install the `aiohttp` library to make it work. Installation can be done using `pip install aiohttp` as shown below.




In [None]:
pip install aiohttp

3.   Complete the definition of the `get_all_sites()` function given below. Running this code will throw an **error**. You can complete the function definition by using the `main()` function in code block 04 as reference.

4. HINT: Each job needs to contain `asyncio.ensure_future` with `get_site()` as the argument.

In [7]:
import asyncio
import nest_asyncio
import time
import aiohttp


async def get_site(session, url):
    async with session.get(url) as response:
        print("Read {0} from {1}".format(response.content_length, url))


async def get_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        jobs = []
        for site in sites:
            job = asyncio.ensure_future(get_site(session, site))
            jobs.append(job)
        await asyncio.gather(*jobs, return_exceptions=True)
            
if __name__ == "__main__":
    sites = [
       "https://www.jython.org",
       "https://en.wikipedia.org/wiki/Small",
    ] * 60
    time1 = time.time()
    nest_asyncio.apply()
    asyncio.run(get_all_sites(sites)) #Get event loop and run asyn tasks
    runtime = time.time() - time1
    print(f"Downloaded {len(sites)} sites in {runtime} seconds")

Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 from https://www.jython.org
Read 3717 fr

**Review Quiz Q5** Copy-paste your `get_all_sites` function after completing task 8.8