# Concurrent downloads from the web

When dealing with a web server, we should always prefer to do things concurrently.

Next we will see a implementation of a code that downloads 20 flags in a sequential manner and using a thread pool. The time used to execute the script concurrently dropped about 5 times.

In [2]:
# Sequential script

import time
from pathlib import Path
from typing import Callable

import httpx

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'https://www.fluentpython.com/data/flags'
DEST_DIR = Path('downloaded')

def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename). write_bytes(img)

def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout = 6.1, follow_redirects=True)
    resp.raise_for_status()
    return resp.content

def download_many(cc_list: list[str]) -> int:
    for cc in sorted(cc_list):
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)
    return len(cc_list)

def main(downloader: Callable[[list[str]], int]) -> None:
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

if __name__ == '__main__':
    main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 downloads in 27.27s


In [7]:
# Using thread pool with futures module

import time
from pathlib import Path
from typing import Callable
from concurrent import futures
import httpx

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'https://www.fluentpython.com/data/flags'
DEST_DIR = Path('downloaded')

def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename). write_bytes(img)

def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout = 6.1, follow_redirects=True)
    resp.raise_for_status()
    return resp.content

def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor() as executer:
        res = executer.map(download_one, sorted(cc_list))
    return len(list(res))

def main(downloader: Callable[[list[str]], int]) -> None:
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

if __name__ == '__main__':
    main(download_many)

DEBD  EG ET ID CDFR  IR JP BR IN CN NG MX PH TR US VN PK RU 
20 downloads in 4.31s


# Where are the futures?

The `Future` object represents a delayed processing that may be or not completed. They are design to be placed in queues, then verify if they are completed and recover the results (or exceptions) when they are available.

A `Future` object is never created by the user, it is created by the framework: `concurrent.futures` for threading or `asyncio` for coroutines.

The application code should not modify the `Future` state. The framework will change it when the processing is done.

Both frameworks have a `.done()` function that returns a boolean of the future execution status and a `.add_done_callback()` that executes some code when the future's processing is done.

There's also the `.result()` function that will return the result of the callable or a exception when the execution is done. However, calling this function in those frameworks led to different outputs. For the `concurrent.futures.Future`, a call to `.result()` will block the thread and wait until the result is done. A timeout might be provided that will generated a `TimeoutError` if the callable did not return. For the `asyncio`, the book did not say what happens. 

We can refactor the example above to use the `as_completed()` function that will receive a iterable of futures and will return a iterator that yield `Future` objects when finished the execution

```python
def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do: list[futures.Future]
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            print(f'Scheduled for {cc}: {future}')
        
        for count, future in enumerate(futures.as_completed(to_do))
            res: str = future.result()
            print(f'{future} result: {res!r}')
    return count
```

Question: what's the difference of using `ThreadPoolExecutor.map()` and `futures.as_completed()`?

The difference to use a `futures.ThreadPoolExecutor` and the `threading` lib is that the former will process all the data and present it in the order that was request, while the latter will print the info as soon as it is available.



In [9]:
# Experimenting with Executor.map

from time import sleep, strftime
from concurrent import futures

def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)

def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done'
    display(msg.format('\t'*n, n, n))
    return n * 10

def main():
    display('Script starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results: ', results)
    display('Waiting for individual results: ')
    for i, result in enumerate(results):
        display(f'result {i}: {result}')

main()


[21:36:46] Script starting
[21:36:46] loiter(0): doing nothing for 0s
[21:36:46] loiter(0): done
[21:36:46] 	loiter(1): doing nothing for 1s
[21:36:46] 		loiter(2): doing nothing for 2s
[21:36:46] 			loiter(3): doing nothing for 3s
[21:36:46] results:  <generator object Executor.map.<locals>.result_iterator at 0x00000214EB922D40>
[21:36:46] Waiting for individual results: 
[21:36:46] result 0: 0
[21:36:49] 	loiter(1): done
[21:36:49] 				loiter(4): doing nothing for 4s
[21:36:49] result 1: 10
[21:36:52] 		loiter(2): done
[21:36:52] result 2: 20
[21:36:55] 			loiter(3): done
[21:36:55] result 3: 30
[21:37:01] 				loiter(4): done
[21:37:01] result 4: 40


# Improving the flag downloader

In [21]:
# Sequential code snippets
from enum import Enum
from collections import Counter
import tqdm
from http import HTTPStatus
import httpx

DownloadStatus = Enum('DownloadStatus', 'OK NOT_FOUND ERROR')

def get_flag(base_url: str, cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()  # <3>
    return resp.content

def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus:
    try:
        image = get_flag(base_url, cc)
    except httpx.HTTPStatusError as exc:  # <4>
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND  # <5>
            msg = f'not found: {res.url}'
        else:
            raise  # <6>
    else:
        save_flag(image, f'{cc}.gif')
        status = DownloadStatus.OK
        msg = 'OK'

    if verbose:  # <7>
        print(cc, msg)

    return status

def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool) -> Counter[DownloadStatus]:
    counter: Counter[DownloadStatus] = Counter()
    cc_iter = sorted(cc_list)
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)
    for cc in cc_iter:
        try:
            status = download_one(cc, base_url, verbose)
        except httpx.HTTPStatusError as exc:
            error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
            error_msg = error_msg.format(resp = exc.response)
        except httpx.RequestError as exc:
            error_msg = f'{exc} {type(exc)}'.strip()
        except KeyboardInterrupt:
            break
        else:
            error_msg = ''

        if error_msg:
            status = DownloadStatus.ERROR
        counter[status] += 1
        if verbose and error_msg:
            print(f'{cc} error: {error_msg}')

download_many(POP20_CC, BASE_URL, False)

100%|██████████| 20/20 [00:12<00:00,  1.63it/s]


In [23]:
# Using futures.as_completed

from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed

import httpx
import tqdm

DEFAULT_CONCUR_REQ = 30 
MAX_CONCUR_REQ = 1000

def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    counter: Counter[DownloadStatus] = Counter()
    with ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc, base_url, verbose)
            to_do_map[future] = cc # future is hashable?
        done_iter = as_completed(to_do_map)
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list))
        for future in done_iter:
            try:
                status = future.result()
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
            except KeyboardInterrupt:
                break
            else:
                error_msg = ''

            if error_msg:
                status = DownloadStatus.ERROR
            counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future]
                print(f'{cc} error: {error_msg}')
    return counter

download_many(POP20_CC, BASE_URL, False, DEFAULT_CONCUR_REQ)



100%|██████████| 20/20 [00:03<00:00,  6.37it/s]


Counter({<DownloadStatus.OK: 1>: 20})