Three simple programs to download images of 20 country flags from the web. The first one, flags.py, runs sequentially: it only requests the next image when the previous one is downloaded and saved locally. The other two scripts make concurrent downloads: they request several images practically at the same time, and save them as they arrive. The flags_threadpool.py script uses the concurrent.futures package, while flags_asyncio.py uses asyncio.      https://learning.oreilly.com/library/view/fluent-python-2nd/9781492056348/ch20.html#idm46582389726560

A synchronous Sequential Download Script:


In [2]:
import time
from pathlib import Path
from typing import Callable

import httpx

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'https://www.fluentpython.com/data/flags'
DEST_DIR = Path('downloaded')  

def save_flag(img: bytes, filename: str) -> None: 
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1, 
                     follow_redirects=True)
    resp.raise_for_status() 
    return resp.content

def download_many(cc_list: list[str]) -> int: 
    for cc in sorted(cc_list):    
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True) 
    return len(cc_list)

def main(downloader: Callable[[list[str]], int]) -> None: 
    DEST_DIR.mkdir(exist_ok=True)  
    t0 = time.perf_counter() 
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

if __name__ == '__main__':
    main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 downloads in 7.42s


Threaded download script using futures.ThreadPoolExecutor

In [3]:
from concurrent import futures

# from flags import save_flag, get_flag, main

###### Same as previous ########

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'https://www.fluentpython.com/data/flags'
DEST_DIR = Path('downloaded')  

def save_flag(img: bytes, filename: str) -> None: 
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1, 
                     follow_redirects=True)
    resp.raise_for_status() 
    return resp.content

def main(downloader: Callable[[list[str]], int]) -> None: 
    DEST_DIR.mkdir(exist_ok=True)  
    t0 = time.perf_counter() 
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

###### Above is same as previous ########


def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor() as executor:  
        res = executor.map(download_one, sorted(cc_list))

    return len(list(res))  

if __name__ == '__main__':
    main(download_many)


BR CD FREG  ET CN DE BD IN MX ID IR PH JP NG PK US TR RU VN 
20 downloads in 1.83s


Another implementation of threading with a more feature-rich, higher abstraction level package, using concurrent.futures package.  This package also allows you to easily switch between threading structure (use: futures.ThreadPoolExecutor) and multiprocessing structure (use: futures.ProcessPoolExecutor)

In this case, there is no advantage to using ProcessPoolExecutor, bc this is an I/O bound task not a compute bound task. 

In [8]:
from concurrent import futures

# from flags import save_flag, get_flag, main

###### Same as previous ########

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'https://www.fluentpython.com/data/flags'
DEST_DIR = Path('downloaded')  

def save_flag(img: bytes, filename: str) -> None: 
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1, 
                     follow_redirects=True)
    resp.raise_for_status() 
    return resp.content

def main(downloader: Callable[[list[str]], int]) -> None: 
    DEST_DIR.mkdir(exist_ok=True)  
    t0 = time.perf_counter() 
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

###### Above is same as previous ########


def download_many(cc_list: list[str]) -> int:
    with futures.ThreadPoolExecutor(max_workers=None) as executor:
        to_do: list[futures.Future] = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            print(f'Scheduled for {cc}: {future}')

        for count, future in enumerate(futures.as_completed(to_do), 1):
            res: str = future.result()
            print(f'{future} result: {res!r}')

    return count

if __name__ == '__main__':
    main(download_many)


Scheduled for BD: <Future at 0x10c983040 state=running>
Scheduled for BR: <Future at 0x10c9a4250 state=running>
Scheduled for CD: <Future at 0x10c9819f0 state=running>
Scheduled for CN: <Future at 0x10c981630 state=running>
Scheduled for DE: <Future at 0x10c983580 state=running>
Scheduled for EG: <Future at 0x10c980040 state=running>
Scheduled for ET: <Future at 0x10c925630 state=running>
Scheduled for FR: <Future at 0x10c924850 state=running>
Scheduled for ID: <Future at 0x10c9249d0 state=running>
Scheduled for IN: <Future at 0x10c926c80 state=running>
Scheduled for IR: <Future at 0x10c9242e0 state=pending>
Scheduled for JP: <Future at 0x10c924190 state=pending>
Scheduled for MX: <Future at 0x10c927f40 state=pending>
Scheduled for NG: <Future at 0x10c925480 state=pending>
Scheduled for PH: <Future at 0x10c926500 state=pending>
Scheduled for PK: <Future at 0x10c925090 state=pending>
Scheduled for RU: <Future at 0x10c927c10 state=pending>
Scheduled for TR: <Future at 0x10c924550 state=p

<Future at 0x10c925090 state=finished returned str> result: 'PK'
PHRU  <Future at 0x10c927c10 state=finished returned str> result: 'RU'
<Future at 0x10c926500 state=finished returned str> result: 'PH'
US <Future at 0x10c925a50 state=finished returned str> result: 'US'
TR NG <Future at 0x10c924550 state=finished returned str> result: 'TR'
<Future at 0x10c925480 state=finished returned str> result: 'NG'
VN <Future at 0x10c9260e0 state=finished returned str> result: 'VN'

20 downloads in 1.52s


Here is an example of a compute heavy task with process pool executor:

In [None]:
import sys
from concurrent import futures
from time import perf_counter
from typing import NamedTuple

from primes import is_prime, NUMBERS

class PrimeResult(NamedTuple): 
    n: int
    flag: bool
    elapsed: float

def check(n: int) -> PrimeResult:
    t0 = perf_counter()
    res = is_prime(n)
    return PrimeResult(n, res, perf_counter() - t0)

def main() -> None:
    if len(sys.argv) < 2:
        workers = None
    else:
        workers = int(sys.argv[1])

    executor = futures.ProcessPoolExecutor(workers)
    actual_workers = executor._max_workers  # type: ignore 

    print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')

    t0 = perf_counter()

    numbers = sorted(NUMBERS, reverse=True)
    with executor:
        for n, prime, elapsed in executor.map(check, numbers):
            label = 'P' if prime else ' '
            print(f'{n:16}  {label} {elapsed:9.6f}s')

    time = perf_counter() - t0
    print(f'Total time: {time:.2f}s')

if __name__ == '__main__':
    main()