In [55]:
import os 
import sys
import time

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
        

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


def show(text):
    print(text, end=' ')
    sys.stdout.flush()
    
    
def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
        
    return len(cc_list)


def main(downloag_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    
    
main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG 

KeyboardInterrupt: 

In [22]:
from concurrent import futures

MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))
        
    return len(list(res))


main(download_many)

BD BR NGEG TR  CN CD ET PH MX RU VN ID FR US JP DE IR PK IN 
20 flags downloaded in 3.11s


In [23]:
def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
            
        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
    
    return len(results)

main(download_many)

Scheduled for BR: <Future at 0x19cd88a8470 state=running>
Scheduled for CN: <Future at 0x19cd88f6668 state=running>
Scheduled for ID: <Future at 0x19cd89b05c0 state=running>
Scheduled for IN: <Future at 0x19cd9a08cc0 state=pending>
Scheduled for US: <Future at 0x19cd9a08358 state=pending>
CN <Future at 0x19cd88f6668 state=finished returned str> result: 'CN'
BR <Future at 0x19cd88a8470 state=finished returned str> result: 'BR'
ID <Future at 0x19cd89b05c0 state=finished returned str> result: 'ID'
US <Future at 0x19cd9a08358 state=finished returned str> result: 'US'
IN <Future at 0x19cd9a08cc0 state=finished returned str> result: 'IN'

5 flags downloaded in 1.95s


In [24]:
from concurrent import futures

MAX_WORKERS = 20

def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ProcessPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
            
        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
    
    return len(results)

main(download_many)

Scheduled for BR: <Future at 0x19cd88f66a0 state=running>
Scheduled for CN: <Future at 0x19cd89713c8 state=pending>
Scheduled for ID: <Future at 0x19cd89715f8 state=pending>
Scheduled for IN: <Future at 0x19cd8971d30 state=pending>
Scheduled for US: <Future at 0x19cd8971278 state=pending>


BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

In [26]:
from concurrent import futures
import os 
import sys
import time

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
        

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


def show(text):
    print(text, end=' ')
    sys.stdout.flush()
    
    
MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
        
    return len(list(res))


def main(downloag_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    
    
main(download_many)

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

In [15]:
import os

In [16]:
os.cpu_count()

4

In [28]:
from time import sleep, strftime
from concurrent import futures

def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)
    

def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done'
    display(msg.format('\t'*n, n))
    return n * 10


def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('reuslts:', results)
    display('Warting for indevidual results:')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))
        
        
main()

[11:21:35] Script starting.
[11:21:35] loiter(0): doing nothing for 0s...
[11:21:35] loiter(0): done
[11:21:35] 	loiter(1): doing nothing for 1s...
[11:21:35] 		loiter(2): doing nothing for 2s...
[11:21:35] reuslts: <generator object Executor.map.<locals>.result_iterator at 0x0000019CD89BEB10>
[11:21:35] Warting for indevidual results:
[11:21:35] result 0: 0
[11:21:35] 			loiter(3): doing nothing for 3s...
[11:21:36] 	loiter(1): done
[11:21:36][11:21:36] result 1: 10
 				loiter(4): doing nothing for 4s...
[11:21:37] 		loiter(2): done
[11:21:37] result 2: 20
[11:21:38] 			loiter(3): done
[11:21:38] result 3: 30
[11:21:40] 				loiter(4): done
[11:21:40] result 4: 40


In [35]:
from tqdm import tqdm
for i in tqdm(range(80)):
    time.sleep(.01)
    

100%|██████████████████████████████████████████████████████████████████████████████████| 80/80 [00:01<00:00, 63.24it/s]


In [37]:
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()
    return resp.content


def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:
            raise
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
    
    if verbose:
        print(cc, msg)
        
    return Result(status, cc)

In [38]:
def download_many(cc_list, base_url, berbose, max_req):
    counter = collections.Counter()
    cc_list = sorted(cc_list)
    if not berbose:
        cc_iter = tqdm.tqdm(cc_iter)
    for cc in cc_iter:
        try:
            res = download_one(cc, base_url, verbose)
        except requests.exceptions.HTTPError as exc:
            error_msg = 'HTTP error {res.status_code}  -  {res.reason}'
            error_msg = error_msg.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:
            error_msg = 'Connection error'
        else:
            error_msg = ''
            status = res.status
            
        if error_msg:
            status = HTTPStatus.error
        counter[status] += 1
        if berbose and error_msg:
            print('*** Error for {}: {}'.format(cc, error_msg))
            
    return counter

In [39]:
main(download_many)

TypeError: main() takes 0 positional arguments but 1 was given

In [47]:
import argparse
import collections
from concurrent import futures

import requests
import tqdm

SERVERS = {
    'REMOTE': 'http://flupy.org/data/flags',
    'LOCAL':  'http://localhost:8001/flags',
    'DELAY':  'http://localhost:8002/flags',
    'ERROR':  'http://localhost:8003/flags',
}
DEFAULT_CONCUR_REQ = 30
MAX_CONCUR_REQ = 1000
DEFAULT_SERVER = 'LOCAL'


def download_many(cc_list, base_url, berbose, concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            futures = executor.submit(download_one, cc, base_url, verbose)
            to_do_map[future] = cc
        done_iter = futures.as_completed(to_do_map)
        if not berbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list))
        for future in done_iter:
            try:
                res = future.result()
            except request.exceptions.HTTPError as exc:
                error_msg = 'HTTP {res.status_code} - {res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = 'Connection error'
            else:
                error_msg = ''
                status = res.status
                
            if error_msg:
                status = HTTPStatus.error
            counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future]
                print('*** Error for {}: {}'.format(cc, error_msg))
        return counter
 

def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
                description='Download flags for country codes. '
                'Default: top 20 countries by population.')
    parser.add_argument('cc', metavar='CC', nargs='*',
                help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument('-a', '--all', action='store_true',
                help='get all available flags (AD to ZW)')
    parser.add_argument('-e', '--every', action='store_true',
                help='get flags for every possible code (AA...ZZ)')
    parser.add_argument('-l', '--limit', metavar='N', type=int,
                help='limit to N first codes', default=sys.maxsize)
    parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
                default=default_concur_req,
                help='maximum concurrent requests (default={})'
                      .format(default_concur_req))
    parser.add_argument('-s', '--server', metavar='LABEL',
                default=DEFAULT_SERVER,
                help='Server to hit; one of {} (default={})'
                      .format(server_options, DEFAULT_SERVER))
    parser.add_argument('-v', '--verbose', action='store_true',
                help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        sys.exit(1)
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(1)
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print('*** Usage error: --server LABEL must be one of',
              server_options)
        parser.print_usage()
        sys.exit(1)
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(1)

    if not cc_list:
        cc_list = sorted(POP20_CC)
    return args, cc_list


def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    t0 = time.time()
    counter = download_many(cc_list, base_url, args.berbose, actual_req)
    assert sum(counter.values()) == len(cc_list), 'some downloads are unaccounter for'
    final_report(cc_list, counter, t0)
    
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

usage: ipykernel_launcher.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
                             [-v]
                             [CC [CC ...]]
ipykernel_launcher.py: error: unrecognized arguments: -f


SystemExit: 2

In [54]:
for i in range(15):
    time.sleep(.3)
    print(POP20_CC[i], end=' ')

CN IN US ID BR PK NG BD RU JP MX PH VN ET EG 

# 使用期物futures处理并发

## 依序下载图片
```python
import os 
import sys
import time

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
        

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


def show(text):
    print(text, end=' ')
    sys.stdout.flush() # 命令行打印进度刷新，即每下载完一个打印输出名称
    
    
def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
        
    return len(cc_list)


def main(downloag_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))
    
    
main(download_many)
```

## 使用concurrent.futures.ThreadPoolExecutor多线程下载

- executor.map()：返回值是一个迭代器，迭代器的__next__方法调用各个期物的result方法，因此我们得到的是各个期物的结果，而非期物本身

```python
# 用到上一个实例的save_flag, get_flag, show, main 函数
from concurrent import futures

MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))
        
    return len(list(res))


main(download_many)
```

## 深入理解期物

- map方法

```python
from time import sleep, strftime
from concurrent import futures

def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)
    

def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done'
    display(msg.format('\t'*n, n))
    return n * 10


def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('reuslts:', results)
    display('Warting for indevidual results:')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))
        
        
main()
```

- 把executor.map方法换成executor.submit方法和futures.as_completed函数
```python
def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc) # 返回一个期物
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
            
        results = []
        for future in futures.as_completed(to_do): 
            res = future.result() # 获取期物执行结果
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
    
    return len(results)
```
1. executor.submit()方法：传入一个可调用对象，为传入的可调用对象排期，并返回一个期物

2. concurrent.futures.as_completed函数：参数是一个期物列表，返回值是一个迭代器，在期物运行结束后产出期物

3. .result()方法：在期物运行结束后调用，返回可调用对象的结果，或重新抛出执行异常。如需要在这一步上处理可能的异常，如实例的requests.exceptions.HTTPError、requests.exceptions.ConnectionError异常。

## 使用TQDM包显示下载进度

tqdm是第三方包，需要下载

```python
import time
from tqdm import tqdm
for i in tqdm(range(100)):
    time.sleep(.01)
```

## 总结

- 并发代码的重构：把依次执行的for循环体改成函数，以便并打调用，这里改为了，download_one。
- 使用sys.stdout.flush在命令行中刷新打印内容，以显示进度。
- tqdm模块实现文本动画进度条，处理的是可迭代对象，使用len函数确定可迭代对象大小，以显示进度条和完成的剩余时间。
- 处理并发的简单方法，使用期物。