In [1]:
import threading
import itertools
import time
import sys

class Signal:
    go = True
    

def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        # 使用退格符（\x08）把光标移回来
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status))
    
def slow_function():
    # 假装等待I/O一段时间
    time.sleep(3)
    return 42

def supervisor(): #9
    signal = Signal()
    spinner = threading.Thread(target=spin, args=('thinking!', signal))
    print('spinner object:', spinner) #10
    spinner.start() #11
    # 运行slow_function阻塞主线程
    result = slow_function() #12
    signal.go = False #13
    # 等待spinner线程结束
    spinner.join() #14
    return result

def main():
    result = supervisor() #15
    print('Answer:', result)
    
if __name__ == '__main__':
    main()

spinner object: <Thread(Thread-6, initial)>
| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking          Answer: 42


In [1]:
import asyncio
import itertools
import sys


@asyncio.coroutine #1
def spin(msg): #2
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1) #3
        except asyncio.CancelledError: #4
            break
    write(' ' * len(status) + '\x08' * len(status))
    

@asyncio.coroutine
def slow_function(): #5
    # 假装等待I/O一段时间
    yield from asyncio.sleep(3) #6
    return 42

@asyncio.coroutine
def supervisor(): #7
    spinner = asyncio.async(spin('thinking!')) #8
    print('spinner object:', spinner) #9
    result = yield from slow_function() #10
    spinner.cancel() #11
    return result

def main():
    loop = asyncio.get_event_loop() #12
    result = loop.run_until_complete(supervisor()) #13
    loop.close()
    print('Answer:', result)
    

if __name__ == '__main__':
    main()

SyntaxError: invalid syntax (<ipython-input-1-3c445077ad79>, line 29)

In [2]:
import asyncio
def run_sync(coro_or_future):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coro_or_future)

a = run_sync(some_coroutine())

In [3]:
import asyncio
import aiohttp # 1
from flags import BASE_URL, save_flag, show, main #2

@asyncio.coroutine #3
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url) #4
    image = yield from resp.read() #5
    return image

@asyncio.coroutine
def download_one(cc): #6
    image = yield from get_flag(cc) #7
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop() #8
    to_do = [download_one(cc) for cc in sorted(cc_list)] #9
    wait_coro = asyncio.wait(to_do) #10
    res, _ = loop.run_until_complete(wait_coro) #11
    loop.close() #12
    
    return len(res)


if __name__ == '__main__':
    main(download_many)

RuntimeError: This event loop is already running

In [None]:
import asyncio
import itertools
import sys


@asyncio.coroutine #1
def spin(msg): #2
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1) #3
        except asyncio.CancelledError: #4
            break
    
    write(' ' * len(status) + '\x08' * len(status))
    

@asyncio.coroutine
def slow_function(): #5
    # 假装等到I/O一段时间
    yield from asyncio.sleep(3) #6
    return 42

@asyncio.coroutine
def supervisor(): #7
    spinner = asyncio.async(spin('thinking!')) #8
    print('spinner object:', spinner) #9
    result = yield from slow_function() #10
    spinner.cancel() #11
    return result

def main():
    loop = asyncio.get_event_loop() #12
    result = loop.run_until_complete(supervisor()) #13
    loop.close()
    print('Answer:', result)
    
    
if __name__ == '__main__':
    main()

flags2_common.py

In [None]:
import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum

Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'REMOTE': 'http://flupy.org/data/flags',
    'LOCAL': 'http://localhost:8001/flags',
    'DELAY': 'http://localhost:8002/flags',
    'ERROR': 'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'

DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
        
def initial_report(cc_list, actual_req, server_label):
    if len(cc_list) <= 10:
        cc_msg = ', '.join(cc_list)
    else:
        cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
    print('{} site: {}'.format(server_label, SERVERS[server_label]))
    msg = 'Searching for {} flag{}: {}'
    plural = 's' if len(cc_list) != 1 else ''
    print(msg.format(len(cc_list), plural, cc_msg))
    plural = 's' if actual_req != 1 else ''
    msg = '{} concurrent connection{} will be used.'
    print(msg.format(actual_req, plural))
    
def final_report(cc_list, counter, start_time):
    elapsed = time.time() - start_time
    print('-' * 20)
    msg = '{} flag{} downloaded.'
    plural = 's' if counter[HTTPStatus.ok] != 1 else ''
    print(msg.format(counter[HTTPStatus.ok], plural))
    if counter[HTTPStatus.not_found]:
        print(counter[HTTPStatus.not_found], 'not found.')
    if counter[HTTPStatus.error]:
        plural = 's' if counter[HTTPStatus.error] != 1 else '' 
        print('{} error{}.'.format(counter[HTTPStatus.error], plural))
    print('Elapsed time: {:.2f}s'.format(elapsed))
    
def expand_cc_args(every_cc, all_cc, cc_args, limit):
    codes = set()
    A_Z = string.ascii_uppercase
    if every_cc:
        codes.update(a+b for a in A_Z for b in A_Z)
    elif all_cc:
        with open(COUNTRY_CODES_FILE) as fp:
            text = fp.read()
        codes.update(text.split())
    else:
        for cc in (c.upper() for c in cc_args):
            if len(cc) == 1 and cc in A_Z:
                codes.update(cc+c for c in A_Z)
            elif len(cc) == 2 and all(c in A_Z for c in cc):
                codes.add(cc)
            else:
                msg = 'each CC argument must be A to Z or AA to ZZ.'
                raise ValueError('*** Usage error: ' + msg)
    return sorted(codes)[:limit]

def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(description='Download flags for country codes. Default: top 20 countries by population.')
    parser.add_argument('cc', metavar='CC', nargs='*', help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument('-a', '--all', action='store_true', help='get all available flags (AD to ZW)')
    parser.add_argument('-e', '--every', action='store_true', help='get flags for every possible code (AA...ZZ)')
    parser.add_argument('-l', '--limit', metavar='N', type=int, help='limit to N first codes', default=sys.maxsize)
    parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int, default=default_concur_req, help='maximum concurrent requests (default={})'.format(default_concur_req))
    parser.add_argument('-s', '--server', metavar='LABEL', default=DEFAULT_SERVER, help='Server to hit; one of {} (default={})'.format(server_options, DEFAULT_SERVER))
    parser.add_argument('-v', '--verbose', action='store_true', help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        sys.exit(1)
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(1)
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print('*** Usage error: --server LABEL must be one of', server_options)
        parser.print_usage()
        sys.exit(1)
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(1)
    if not cc_list:
        cc_list = sorted(POP20_CC)
    return args, cc_list

def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    t0 = time.time()
    counter = download_many(cc_list, base_url, args.verbose, actual_req)
    assert sum(counter.values()) == len(cc_list), 'some downloads are unaccounted for'
    final_report(cc_list, counter, t0)

In [None]:
import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm

from flags2_common import main, HTTPStatus, Result, save_flag

# 默认设为较小的值，防止远程网站出错
# 例如503 - Service Temporarily Unavaliable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

class FetchError(Exception): #1
    def __init__(self, country_code):
        self.country_code = country_code
        
@asyncio.coroutine
def get_flag(base_url, cc): #2
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    if resp.status = 200:
        image = yield from resp.read()
        return image
    elif resp.status = 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.HttpProcessingError(code=resp.status, message=resp.reason, headers=resp.headers)

@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose): #3
    # semaphore: 信号标
    try:
        with (yield from semaphore): #4
            image = yield from get_flag(base_url, cc) #5
    except web.HTTPNotFound: #6
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc #7
    else:
        save_flag(image, cc.lower() + '.gif') #8
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose and msg:
        print(cc, msg)
    
    return Result(status, cc)

@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req): #1
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req) #2
    to_do = [download_one(cc, base_url, semaphore, verbose) for cc in sorted(cc_list)] #3
    
    to_do_iter = asyncio.as_completed(to_do) #4
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) #5
    for future in to_do_iter: #6
        try:
            res = yield from future #7
        except FetchError as exc: #8
            country_code = exc.country_code #9
            try:
                error_msg = exc.__cause__.args[0] #10
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__ #11
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status
            
        counter[status] += 1 #12
        
    return counter #13

def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro) #14
    loop.close() #15
    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

python 中的回调地狱：链式回调

In [1]:
def stage1(response1):
    request2 = step1(response1)
    api_call2(request2, stage2)
    
def stage2(response2):
    request3 = step2(response2)
    api_call3(request3, stage3)
    
def stage3(response3):
    step3(response3)
    
api_call(request1, stage1)

NameError: name 'api_call' is not defined

In [None]:
@asyncio.coroutine
def three_stages(request1):
    reponse1 = yield from api_call1(request1)
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    request3 = step2(response2)
    response3 = yield from api_call3(request3)
    step3(response3)
    
# 必须显示调度执行
loop.create_task(three_stages(request1))

In [None]:
@asyncio.coroutine
def http_get(url):
    res = yield from aiohttp.request('GET', url)
    if res.status == 200:
        ctype = res.headers.get('Content-type', '').lower()
        if 'json' in ctype or url.endswith('json'):
            data = yield from res.json() #1
        else:
            data = yield from res.read() #2
        return data
    
    elif res.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.errors.HttpProcessingError(code=res.status, message=res.reason, headers=res.headers)
        
@asyncio.coroutine
def get_country(base_url, cc):
    url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
    metadata = yield from http_get(url) #3
    return metadata['country']

@asyncio.coroutine
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    return (yield from http_get(url)) #4

@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore): #5
            image = yield from get_flag(base_url, cc)
        with (yield from semaphore):
            country = yield from get_country(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        country = country.replace(' ', '_')
        filename = '{}-{}.gif'.format(country, cc)
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag, image, filename)
        status = HTTPStatus.ok
        msg = 'ok'
    
    if verbose and msg:
        print(cc, msg)
        
    return Result(status, cc)

In [3]:
import sys
import asyncio

from charfinder import UnicodeNameIndex #1

CRLF = b'\r\n'
PROMPT = b'?>'

index = UnicodeNameIndex() #2

@asyncio.coroutine
def handle_queries(reader, writer): #3
    while True: #4
        writer.write(PROMPT) # 不能使用yield from #5
        yield from writer.drain() # 必须使用 yield from #6
        data = yield from reader.readline() #7
        try:
            query = data.decode().strip()
        except UnicodeDecodeError: #8
            query = '\x00'
        client = writer.get_extra_info('peername') #9
        print('Received from {}: {!r}'.format(client, query)) #10
        if query:
            if ord(query[:1]) < 32: #11
                break
            lines = list(index.find_description_strs(query)) #12
            if lines:
                writer.writelines(line.encode() + CRLF for line in lines) #13
            writer.write(index.status(query, len(lines)).encode() + CRLF) #14
            
            yield from writer.drain() #15
            print('Sent {} results'.format(len(lines))) #16
        
        print('Close the client socket') #17
        writer.close() #18
        
def main(address='127.0.0.1', port=2323): #1
    port = int(port)
    loop = asyncio.get_event_loop()
    server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) #2
    server = loop.run_until_complete(server_coro) #3
    host = server.sockets[0].getsockname() #4
    print('Serving on {}. Hit CTRL-C to stop.'.format(host)) #5
    try:
        loop.run_forever() #6
    except KeyboardInterrupt: # 按CTRL-C键
        pass
    
    print('Server shutting down.')
    server.close() #7
    loop.run_until_complete(server.wait_closed()) #8
    loop.close() #9

if __name__ == '__main__':
    main(*sys.argv[1:]) #10

ValueError: invalid literal for int() with base 10: 'C:\\Users\\LITIAN\\AppData\\Roaming\\jupyter\\runtime\\kernel-6f546348-ab95-4fdc-9ba6-c5a1558a8d28.json'

In [4]:
from aiohttp import web


def home(request): #1
    query = request.GET.get('query', '').strip() # 2
    print('Query: {!r}'.format(query)) # 3
    if query: # 4
        descriptions = list(index.find_descriptions(query))
        res = '\n'.join(ROW_TPL.format(**vars(descr)) for descr in descriptions)
        msg = index.status(query, len(descriptions))
    else:
        descriptions = []
        res = ''
        msg = 'Enter words describing characters.'
        
    html = template.format(query=query, result=res, message=msg) # 5
    
    print('Sending {} results'.format(len(descriptions))) # 6
    return web.Response(content_type=CONTENT_TYPE, text=html) # 7

@asyncio.coroutine
def init(loop, address, port): # 1
    app = web.Application(loop=loop) #2
    app.router.add_route('GET', '/', home) #3
    handler = app.make_handler() # 4
    server = yield from loop.create_server(handler, address, port) # 5
    return server.sockets[0].getsockname() # 6


def main(address='127.0.0.1', port=8888):
    port = int(port)
    loop = asyncio.get_event_loop()
    host = loop.run_until_complete(init(loop, address, port)) # 7
    print('Serving on {}. Hit CTRL-C to stop.'.format(host))
    try:
        loop.run_forever() # 8
    except KeyboardInterrupt: # 按CTRL-C键
        pass
    print('Server shutting down.')
    loop.close() # 9
    

if __name__ == '__main__':
    main(*sys.argv[1:])

ValueError: invalid literal for int() with base 10: 'C:\\Users\\LITIAN\\AppData\\Roaming\\jupyter\\runtime\\kernel-6f546348-ab95-4fdc-9ba6-c5a1558a8d28.json'