In [1]:
import sys
    # caution: path[0] is reserved for script path (or '' in REPL)
sys.path.insert(1, 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/')
sys.path

['D:\\books\\python\\0.   Fluent Python, 2nd Edition',
 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/',
 'C:\\Users\\lidan\\miniconda3\\python38.zip',
 'C:\\Users\\lidan\\miniconda3\\DLLs',
 'C:\\Users\\lidan\\miniconda3\\lib',
 'C:\\Users\\lidan\\miniconda3',
 '',
 'C:\\Users\\lidan\\AppData\\Roaming\\Python\\Python38\\site-packages',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\magic_impute-2.0.4-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\seqc-0.2.0-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\weasyprint-56.1-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\cairocffi-1.3.0-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\win32',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\win32\\lib',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\Pythonwin']

In [4]:
#!/usr/bin/env python3

"""Download flags of top 20 countries by population

Sequential version

Sample runs (first with new domain, so no caching ever)::

    $ ./flags.py
    BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
    20 downloads in 26.21s
    $ ./flags.py
    BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
    20 downloads in 14.57s


"""

# tag::FLAGS_PY[]
import time
from pathlib import Path
from typing import Callable
from typing import List

import httpx  # <1>

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()  # <2>

BASE_URL = 'https://www.fluentpython.com/data/flags'  # <3>
DEST_DIR = Path('downloaded')                         # <4>

def save_flag(img: bytes, filename: str) -> None:     # <5>
    (DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:  # <6>
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1,       # <7>
                     follow_redirects=True)  # <8>
    resp.raise_for_status()  # <9>
    return resp.content

def download_many(cc_list: List[str]) -> int:  # <10>
    for cc in sorted(cc_list):                 # <11>
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)         # <12>
    return len(cc_list)

def main(downloader: Callable[[List[str]], int]) -> None:  # <13>
    DEST_DIR.mkdir(exist_ok=True)                          # <14>
    t0 = time.perf_counter()                               # <15>
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

if __name__ == '__main__':
    main(download_many)     # <16>
# end::FLAGS_PY[]


BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 downloads in 6.37s


In [5]:
POP20_CC

['CN',
 'IN',
 'US',
 'ID',
 'BR',
 'PK',
 'NG',
 'BD',
 'RU',
 'JP',
 'MX',
 'PH',
 'VN',
 'ET',
 'EG',
 'DE',
 'IR',
 'TR',
 'CD',
 'FR']

In [1]:
!python --version

Python 3.8.13


In [6]:
import sys
    # caution: path[0] is reserved for script path (or '' in REPL)
sys.path.insert(1, 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/')
sys.path

['D:\\books\\python\\0.   Fluent Python, 2nd Edition',
 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/',
 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/',
 'C:\\Users\\lidan\\miniconda3\\python38.zip',
 'C:\\Users\\lidan\\miniconda3\\DLLs',
 'C:\\Users\\lidan\\miniconda3\\lib',
 'C:\\Users\\lidan\\miniconda3',
 '',
 'C:\\Users\\lidan\\AppData\\Roaming\\Python\\Python38\\site-packages',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\magic_impute-2.0.4-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\seqc-0.2.0-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\weasyprint-56.1-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\cairocffi-1.3.0-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\win32',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\win32\\lib',
 'C:\\Users\\lidan\\miniconda3\\l

In [8]:
#!/usr/bin/env python3

"""Download flags of top 20 countries by population

ThreadPoolExecutor version

Sample run::

    $ python3 flags_threadpool.py
    DE FR BD CN EG RU IN TR VN ID JP BR NG MX PK ET PH CD US IR
    20 downloads in 0.35s

"""

# tag::FLAGS_THREADPOOL[]
from concurrent import futures
from typing import List

from flags import save_flag, get_flag, main  # <1>

def download_one(cc: str):  # <2>
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: List[str]) -> int:
    with futures.ThreadPoolExecutor() as executor:         # <3>
        res = executor.map(download_one, sorted(cc_list))  # <4>

    return len(list(res))                                  # <5>

if __name__ == '__main__':
    main(download_many)  # <6>
# end::FLAGS_THREADPOOL[]


IN PK JP MX CD DE EG BR NG CN ET ID BD IR RU US VN TR FR PH 
20 downloads in 0.84s


In [11]:
#!/usr/bin/env python3

"""Download flags of top 20 countries by population

ThreadPoolExecutor example with ``as_completed``.
"""
from concurrent import futures
from typing import List

from flags import main
from flags_threadpool import download_one


# tag::FLAGS_THREADPOOL_AS_COMPLETED[]
def download_many(cc_list: List[str]) -> int:
    cc_list = cc_list[:5]  # <1>
    with futures.ThreadPoolExecutor(max_workers=3) as executor:  # <2>
        to_do: List[futures.Future] = []
        for cc in sorted(cc_list):  # <3>
            future = executor.submit(download_one, cc)  # <4>
            to_do.append(future)  # <5>
            print(f'Scheduled for {cc}: {future}')  # <6>

        for count, future in enumerate(futures.as_completed(to_do), 1):  # <7>
            res: str = future.result()  # <8>
            print(f'{future} result: {res!r}')  # <9>

    return count
# end::FLAGS_THREADPOOL_AS_COMPLETED[]

if __name__ == '__main__':
    main(download_many)


Scheduled for BR: <Future at 0x14d34f84d60 state=running>
Scheduled for CN: <Future at 0x14d39e1a0a0 state=running>
Scheduled for ID: <Future at 0x14d34fad610 state=running>
Scheduled for IN: <Future at 0x14d34fadcd0 state=pending>
Scheduled for US: <Future at 0x14d34fa5700 state=pending>
BR CN <Future at 0x14d34f84d60 state=finished returned str> result: 'BR'
<Future at 0x14d39e1a0a0 state=finished returned str> result: 'CN'
ID <Future at 0x14d34fad610 state=finished returned str> result: 'ID'
US <Future at 0x14d34fa5700 state=finished returned str> result: 'US'
IN <Future at 0x14d34fadcd0 state=finished returned str> result: 'IN'

5 downloads in 0.41s


In [12]:
import sys
    # caution: path[0] is reserved for script path (or '' in REPL)
sys.path.insert(1, 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/primes/')
sys.path

['D:\\books\\python\\0.   Fluent Python, 2nd Edition',
 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/primes/',
 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/',
 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/',
 'C:\\Users\\lidan\\miniconda3\\python38.zip',
 'C:\\Users\\lidan\\miniconda3\\DLLs',
 'C:\\Users\\lidan\\miniconda3\\lib',
 'C:\\Users\\lidan\\miniconda3',
 '',
 'C:\\Users\\lidan\\AppData\\Roaming\\Python\\Python38\\site-packages',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\magic_impute-2.0.4-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\seqc-0.2.0-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\weasyprint-56.1-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\cairocffi-1.3.0-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\win32',
 'C:\\Use

In [13]:
#!/usr/bin/env python3

import math


PRIME_FIXTURE = [
    (2, True),
    (142702110479723, True),
    (299593572317531, True),
    (3333333333333301, True),
    (3333333333333333, False),
    (3333335652092209, False),
    (4444444444444423, True),
    (4444444444444444, False),
    (4444444488888889, False),
    (5555553133149889, False),
    (5555555555555503, True),
    (5555555555555555, False),
    (6666666666666666, False),
    (6666666666666719, True),
    (6666667141414921, False),
    (7777777536340681, False),
    (7777777777777753, True),
    (7777777777777777, False),
    (9999999999999917, True),
    (9999999999999999, False),
]

NUMBERS = [n for n, _ in PRIME_FIXTURE]

# tag::IS_PRIME[]
def is_prime(n: int) -> bool:
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    root = math.isqrt(n)
    for i in range(3, root + 1, 2):
        if n % i == 0:
            return False
    return True
# end::IS_PRIME[]

if __name__ == '__main__':

    for n, prime in PRIME_FIXTURE:
        prime_res = is_prime(n)
        assert prime_res == prime
        print(n, prime)


2 True
142702110479723 True
299593572317531 True
3333333333333301 True
3333333333333333 False
3333335652092209 False
4444444444444423 True
4444444444444444 False
4444444488888889 False
5555553133149889 False
5555555555555503 True
5555555555555555 False
6666666666666666 False
6666666666666719 True
6666667141414921 False
7777777536340681 False
7777777777777753 True
7777777777777777 False
9999999999999917 True
9999999999999999 False


In [14]:
#!/usr/bin/env python3

"""
proc_pool.py: a version of the proc.py example from chapter 20,
but using `concurrent.futures.ProcessPoolExecutor`.
"""

# tag::PRIMES_POOL[]
import sys
from concurrent import futures  # <1>
from time import perf_counter
from typing import NamedTuple

from primes import is_prime, NUMBERS

class PrimeResult(NamedTuple):  # <2>
    n: int
    flag: bool
    elapsed: float

def check(n: int) -> PrimeResult:
    t0 = perf_counter()
    res = is_prime(n)
    return PrimeResult(n, res, perf_counter() - t0)

def main() -> None:
    if len(sys.argv) < 2:
        workers = None      # <3>
    else:
        workers = int(sys.argv[1])

    executor = futures.ProcessPoolExecutor(workers)  # <4>
    actual_workers = executor._max_workers  # type: ignore  # <5>

    print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')

    t0 = perf_counter()

    numbers = sorted(NUMBERS, reverse=True)  # <6>
    with executor:  # <7>
        for n, prime, elapsed in executor.map(check, numbers):  # <8>
            label = 'P' if prime else ' '
            print(f'{n:16}  {label} {elapsed:9.6f}s')

    time = perf_counter() - t0
    print(f'Total time: {time:.2f}s')

if __name__ == '__main__':
    main()
# end::PRIMES_POOL[]


ValueError: invalid literal for int() with base 10: '-f'

In [15]:
!python example-code-2e/20-executors/primes/proc_pool.py 

Checking 20 numbers with 12 processes:
9999999999999999     0.000013s
9999999999999917  P 11.085648s
7777777777777777     0.000009s
7777777777777753  P 10.445039s
7777777536340681    10.329543s
6666667141414921    10.064072s
6666666666666719  P 10.102916s
6666666666666666     0.000002s
5555555555555555     0.000011s
5555555555555503  P  9.570726s
5555553133149889     9.278866s
4444444488888889     8.815763s
4444444444444444     0.000003s
4444444444444423  P  8.880831s
3333335652092209     7.889527s
3333333333333333     0.000007s
3333333333333301  P  8.008465s
 299593572317531  P  2.514894s
 142702110479723  P  1.635895s
               2  P  0.000002s
Total time: 11.39s


In [2]:
"""
Experiment with ``ThreadPoolExecutor.map``
"""
# tag::EXECUTOR_MAP[]
from time import sleep, strftime
from concurrent import futures

def display(*args):  # <1>
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)

def loiter(n):  # <2>
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10  # <3>

def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)  # <4>
    results = executor.map(loiter, range(5))  # <5>
    display('results:', results)  # <6>
    display('Waiting for individual results:')
    for i, result in enumerate(results):  # <7>
        display(f'result {i}: {result}')

if __name__ == '__main__':
    main()
# end::EXECUTOR_MAP[]


[16:25:20] Script starting.
[16:25:20] loiter(0): doing nothing for 0s...
[16:25:20] loiter(0): done.
[16:25:20] 	loiter(1): doing nothing for 1s...
[16:25:20] 		loiter(2): doing nothing for 2s...
[16:25:20] 			loiter(3): doing nothing for 3s...
[16:25:20] results: <generator object Executor.map.<locals>.result_iterator at 0x000002245A401660>
[16:25:20] Waiting for individual results:
[16:25:20] result 0: 0
[16:25:21] 	loiter(1): done.
[16:25:21] 				loiter(4): doing nothing for 4s...
[16:25:21] result 1: 10
[16:25:22] 		loiter(2): done.
[16:25:22] result 2: 20
[16:25:23] 			loiter(3): done.
[16:25:23] result 3: 30
[16:25:25] 				loiter(4): done.
[16:25:25] result 4: 40


In [5]:
"""Utilities for second set of flag examples.
"""
from __future__ import annotations
import argparse
import string
import sys
import time
from collections import Counter
from enum import Enum
from pathlib import Path
from typing import List

DownloadStatus = Enum('DownloadStatus', 'OK NOT_FOUND ERROR')

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'REMOTE': 'https://www.fluentpython.com/data/flags',
    'LOCAL':  'http://localhost:8000/flags',
    'DELAY':  'http://localhost:8001/flags',
    'ERROR':  'http://localhost:8002/flags',
}
DEFAULT_SERVER = 'LOCAL'

DEST_DIR = Path('downloaded')
COUNTRY_CODES_FILE = Path('country_codes.txt')


def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename).write_bytes(img)


def initial_report(cc_list: List[str],
                   actual_req: int,
                   server_label: str) -> None:
    if len(cc_list) <= 10:
        cc_msg = ', '.join(cc_list)
    else:
        cc_msg = f'from {cc_list[0]} to {cc_list[-1]}'
    print(f'{server_label} site: {SERVERS[server_label]}')
    plural = 's' if len(cc_list) != 1 else ''
    print(f'Searching for {len(cc_list)} flag{plural}: {cc_msg}')
    if actual_req == 1:
        print('1 connection will be used.')
    else:
        print(f'{actual_req} concurrent connections will be used.')


def final_report(cc_list: List[str],
                 counter: Counter[DownloadStatus],
                 start_time: float) -> None:
    elapsed = time.perf_counter() - start_time
    print('-' * 20)
    plural = 's' if counter[DownloadStatus.OK] != 1 else ''
    print(f'{counter[DownloadStatus.OK]:3} flag{plural} downloaded.')
    if counter[DownloadStatus.NOT_FOUND]:
        print(f'{counter[DownloadStatus.NOT_FOUND]:3} not found.')
    if counter[DownloadStatus.ERROR]:
        plural = 's' if counter[DownloadStatus.ERROR] != 1 else ''
        print(f'{counter[DownloadStatus.ERROR]:3} error{plural}.')
    print(f'Elapsed time: {elapsed:.2f}s')


def expand_cc_args(every_cc: bool,
                   all_cc: bool,
                   cc_args: List[str],
                   limit: int) -> List[str]:
    codes: set[str] = set()
    A_Z = string.ascii_uppercase
    if every_cc:
        codes.update(a+b for a in A_Z for b in A_Z)
    elif all_cc:
        text = COUNTRY_CODES_FILE.read_text()
        codes.update(text.split())
    else:
        for cc in (c.upper() for c in cc_args):
            if len(cc) == 1 and cc in A_Z:
                codes.update(cc + c for c in A_Z)
            elif len(cc) == 2 and all(c in A_Z for c in cc):
                codes.add(cc)
            else:
                raise ValueError('*** Usage error: each CC argument '
                                 'must be A to Z or AA to ZZ.')
    return sorted(codes)[:limit]


def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
        description='Download flags for country codes. '
                    'Default: top 20 countries by population.')
    parser.add_argument(
        'cc', metavar='CC', nargs='*',
        help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument(
        '-a', '--all', action='store_true',
        help='get all available flags (AD to ZW)')
    parser.add_argument(
        '-e', '--every', action='store_true',
        help='get flags for every possible code (AA...ZZ)')
    parser.add_argument(
        '-l', '--limit', metavar='N', type=int, help='limit to N first codes',
        default=sys.maxsize)
    parser.add_argument(
        '-m', '--max_req', metavar='CONCURRENT', type=int,
        default=default_concur_req,
        help=f'maximum concurrent requests (default={default_concur_req})')
    parser.add_argument(
        '-s', '--server', metavar='LABEL', default=DEFAULT_SERVER,
        help=f'Server to hit; one of {server_options} '
             f'(default={DEFAULT_SERVER})')
    parser.add_argument(
        '-v', '--verbose', action='store_true',
        help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        # "standard" exit status codes:
        # https://stackoverflow.com/questions/1101957/are-there-any-standard-exit-status-codes-in-linux/40484670#40484670
        sys.exit(2)  # command line usage error
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(2)  # command line usage error
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print(f'*** Usage error: --server LABEL '
              f'must be one of {server_options}')
        parser.print_usage()
        sys.exit(2)  # command line usage error
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(2)  # command line usage error

    if not cc_list:
        cc_list = sorted(POP20_CC)[:args.limit]
    return args, cc_list


def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    counter = download_many(cc_list, base_url, args.verbose, actual_req)
    final_report(cc_list, counter, t0)


In [6]:
import time
from tqdm import tqdm
for i in tqdm(range(1000)):
    time.sleep(.01)

100%|██████████████████████████████████████████████████████████████████████████████| 1000/1000 [00:15<00:00, 63.56it/s]


In [7]:
import sys
    # caution: path[0] is reserved for script path (or '' in REPL)
sys.path.insert(1, 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/')
sys.path

['D:\\books\\python\\0.   Fluent Python, 2nd Edition',
 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/',
 'D:/books/python/0.   Fluent Python, 2nd Edition/example-code-2e/20-executors/getflags/',
 'C:\\Users\\lidan\\miniconda3\\python38.zip',
 'C:\\Users\\lidan\\miniconda3\\DLLs',
 'C:\\Users\\lidan\\miniconda3\\lib',
 'C:\\Users\\lidan\\miniconda3',
 '',
 'C:\\Users\\lidan\\AppData\\Roaming\\Python\\Python38\\site-packages',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\magic_impute-2.0.4-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\seqc-0.2.0-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\weasyprint-56.1-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\cairocffi-1.3.0-py3.8.egg',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\win32',
 'C:\\Users\\lidan\\miniconda3\\lib\\site-packages\\win32\\lib',
 'C:\\Users\\lidan\\miniconda3\\l

In [None]:
#!/usr/bin/env python3

"""Download flags of countries (with error handling).

Sequential version

Sample run::

    $ python3 flags2_sequential.py -s DELAY b
    DELAY site: http://localhost:8002/flags
    Searching for 26 flags: from BA to BZ
    1 concurrent connection will be used.
    --------------------
    17 flags downloaded.
    9 not found.
    Elapsed time: 13.36s

"""

# tag::FLAGS2_BASIC_HTTP_FUNCTIONS[]
from collections import Counter
from http import HTTPStatus

import httpx
import tqdm  # type: ignore  # <1>

from flags2_common import main, save_flag, DownloadStatus  # <2>

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

def get_flag(base_url: str, cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()  # <3>
    return resp.content

def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus:
    try:
        image = get_flag(base_url, cc)
    except httpx.HTTPStatusError as exc:  # <4>
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND  # <5>
            msg = f'not found: {res.url}'
        else:
            raise  # <6>
    else:
        save_flag(image, f'{cc}.gif')
        status = DownloadStatus.OK
        msg = 'OK'

    if verbose:  # <7>
        print(cc, msg)

    return status
# end::FLAGS2_BASIC_HTTP_FUNCTIONS[]

# tag::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[]
def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  _unused_concur_req: int) -> Counter[DownloadStatus]:
    counter: Counter[DownloadStatus] = Counter()  # <1>
    cc_iter = sorted(cc_list)  # <2>
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)  # <3>
    for cc in cc_iter:
        try:
            status = download_one(cc, base_url, verbose)  # <4>
        except httpx.HTTPStatusError as exc:  # <5>
            error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
            error_msg = error_msg.format(resp=exc.response)
        except httpx.RequestError as exc:  # <6>
            error_msg = f'{exc} {type(exc)}'.strip()
        except KeyboardInterrupt:  # <7>
            break
        else:  # <8>
            error_msg = ''

        if error_msg:
            status = DownloadStatus.ERROR  # <9>
        counter[status] += 1           # <10>
        if verbose and error_msg:      # <11>
            print(f'{cc} error: {error_msg}')

    return counter  # <12>
# end::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[]

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)


In [None]:
#!/usr/bin/env python3

"""Download flags of countries (with error handling).

ThreadPool version

Sample run::

    $ python3 flags2_threadpool.py -s ERROR -e
    ERROR site: http://localhost:8003/flags
    Searching for 676 flags: from AA to ZZ
    30 concurrent connections will be used.
    --------------------
    150 flags downloaded.
    361 not found.
    165 errors.
    Elapsed time: 7.46s

"""

# tag::FLAGS2_THREADPOOL[]
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed

import httpx
import tqdm  # type: ignore

from flags2_common import main, DownloadStatus
from flags2_sequential import download_one  # <1>

DEFAULT_CONCUR_REQ = 30  # <2>
MAX_CONCUR_REQ = 1000  # <3>


def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    counter: Counter[DownloadStatus] = Counter()
    with ThreadPoolExecutor(max_workers=concur_req) as executor:  # <4>
        to_do_map = {}  # <5>
        for cc in sorted(cc_list):  # <6>
            future = executor.submit(download_one, cc,
                                     base_url, verbose)  # <7>
            to_do_map[future] = cc  # <8>
        done_iter = as_completed(to_do_map)  # <9>
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list))  # <10>
        for future in done_iter:  # <11>
            try:
                status = future.result()  # <12>
            except httpx.HTTPStatusError as exc:  # <13>
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
            except KeyboardInterrupt:
                break
            else:
                error_msg = ''

            if error_msg:
                status = DownloadStatus.ERROR
            counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future]  # <14>
                print(f'{cc} error: {error_msg}')

    return counter


if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_THREADPOOL[]


In [None]:
#!/usr/bin/env python3

"""Download flags of countries (with error handling).

asyncio async/await version

"""
# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path

import httpx
import tqdm  # type: ignore

from flags2_common import main, DownloadStatus, save_flag

# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

async def get_flag(client: httpx.AsyncClient,  # <1>
                   base_url: str,
                   cc: str) -> bytes:
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)   # <2>
    resp.raise_for_status()
    return resp.content

# tag::FLAGS3_ASYNCIO_GET_COUNTRY[]
async def get_country(client: httpx.AsyncClient,
                      base_url: str,
                      cc: str) -> str:    # <1>
    url = f'{base_url}/{cc}/metadata.json'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()
    metadata = resp.json()  # <2>
    return metadata['country']  # <3>
# end::FLAGS3_ASYNCIO_GET_COUNTRY[]

# tag::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool) -> DownloadStatus:
    try:
        async with semaphore:  # <1>
            image = await get_flag(client, base_url, cc)
        async with semaphore:  # <2>
            country = await get_country(client, base_url, cc)
    except httpx.HTTPStatusError as exc:
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        filename = country.replace(' ', '_')  # <3>
        await asyncio.to_thread(save_flag, image, f'{filename}.gif')
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status
# end::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]

# tag::FLAGS2_ASYNCIO_START[]
async def supervisor(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int) -> Counter[DownloadStatus]:  # <1>
    counter: Counter[DownloadStatus] = Counter()
    semaphore = asyncio.Semaphore(concur_req)  # <2>
    async with httpx.AsyncClient() as client:
        to_do = [download_one(client, cc, base_url, semaphore, verbose)
                 for cc in sorted(cc_list)]  # <3>
        to_do_iter = asyncio.as_completed(to_do)  # <4>
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # <5>
        error: httpx.HTTPError | None = None  # <6>
        for coro in to_do_iter:  # <7>
            try:
                status = await coro  # <8>
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
                error = exc  # <9>
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
                error = exc  # <10>
            except KeyboardInterrupt:
                break

            if error:
                status = DownloadStatus.ERROR  # <11>
                if verbose:
                    url = str(error.request.url)  # <12>
                    cc = Path(url).stem.upper()   # <13>
                    print(f'{cc} error: {error_msg}')
            counter[status] += 1

    return counter

def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int) -> Counter[DownloadStatus]:
    coro = supervisor(cc_list, base_url, verbose, concur_req)
    counts = asyncio.run(coro)  # <14>

    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]


In [None]:
#!/usr/bin/env python3

"""Slow HTTP server class.

This module implements a ThreadingHTTPServer using a custom
SimpleHTTPRequestHandler subclass that introduces delays to all
GET responses, and optionally returns errors to a fraction of
the requests if given the --error_rate command-line argument.
"""

import contextlib
import os
import socket
import time
from functools import partial
from http import server, HTTPStatus
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
from random import random, uniform

MIN_DELAY = 0.5  # minimum delay for do_GET (seconds)
MAX_DELAY = 5.0  # maximum delay for do_GET (seconds)

class SlowHTTPRequestHandler(SimpleHTTPRequestHandler):
    """SlowHTTPRequestHandler adds delays and errors to test HTTP clients.

    The optional error_rate argument determines how often GET requests
    receive a 418 status code, "I'm a teapot".
    If error_rate is .15, there's a 15% probability of each GET request
    getting that error.
    When the server believes it is a teapot, it refuses requests to serve files.

    See: https://tools.ietf.org/html/rfc2324#section-2.3.2
    """

    def __init__(self, *args, error_rate=0.0, **kwargs):
        self.error_rate = error_rate
        super().__init__(*args, **kwargs)

    def do_GET(self):
        """Serve a GET request."""
        delay = uniform(MIN_DELAY, MAX_DELAY)
        cc = self.path[-6:-4].upper()
        print(f'{cc} delay: {delay:0.2}s')
        time.sleep(delay)
        if random() < self.error_rate:
            # HTTPStatus.IM_A_TEAPOT requires Python >= 3.9
            try:
                self.send_error(HTTPStatus.IM_A_TEAPOT, "I'm a Teapot")
            except BrokenPipeError as exc:
                print(f'{cc} *** BrokenPipeError: client closed')
        else:
            f = self.send_head()
            if f:
                try:
                    self.copyfile(f, self.wfile)
                except BrokenPipeError as exc:
                    print(f'{cc} *** BrokenPipeError: client closed')
                finally:
                    f.close()

# The code in the `if` block below, including comments, was copied
# and adapted from the `http.server` module of Python 3.9
# https://github.com/python/cpython/blob/master/Lib/http/server.py

if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('--bind', '-b', metavar='ADDRESS',
                        help='Specify alternate bind address '
                             '[default: all interfaces]')
    parser.add_argument('--directory', '-d', default=os.getcwd(),
                        help='Specify alternative directory '
                             '[default:current directory]')
    parser.add_argument('--error-rate', '-e', metavar='PROBABILITY',
                        default=0.0, type=float,
                        help='Error rate; e.g. use .25 for 25%% probability '
                             '[default:0.0]')
    parser.add_argument('port', action='store',
                        default=8001, type=int,
                        nargs='?',
                        help='Specify alternate port [default: 8001]')
    args = parser.parse_args()
    handler_class = partial(SlowHTTPRequestHandler,
                            directory=args.directory,
                            error_rate=args.error_rate)

    # ensure dual-stack is not disabled; ref #38907
    class DualStackServer(ThreadingHTTPServer):
        def server_bind(self):
            # suppress exception when protocol is IPv4
            with contextlib.suppress(Exception):
                self.socket.setsockopt(
                    socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            return super().server_bind()

    # test is a top-level function in http.server omitted from __all__
    server.test(  # type: ignore
        HandlerClass=handler_class,
        ServerClass=DualStackServer,
        port=args.port,
        bind=args.bind,
    )
