# test async
* [파이썬 Asyncio 를 이해하기 위한 여정[번역]](http://hamait.tistory.com/834)

In [1]:
# %load log_execution_time.py
from functools import wraps
from time import time


def log_execution_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time()
        return_value = func(*args, **kwargs)
        message = 'Executing {} took {:.03} seconds.'.format(func.__name__, time() - start)
        print(message)
        return return_value
    return wrapper


In [2]:
# %load fib.py
from log_execution_time import log_execution_time


def fib(n):
    return fib(n - 1) + fib(n - 2) if n > 1 else n


timed_fib = log_execution_time(fib)


In [None]:
# %load test_thread.py
#   http://hamait.tistory.com/834
from threading import Thread
from time import sleep
from time import time
from fib import timed_fib


def print_hello():
    while True:
        print('{} - Hello world!'.format(int(time())))
        sleep(3)


def read_and_process_input():
    while True:
        n = int(input())
        print('fib({}) = {}'.format(n, timed_fib(n)))


def main():
    t = Thread(target=print_hello)
    t.daemon = True
    t.start()
    read_and_process_input()


if __name__ == '__main__':
    main()


In [None]:
# %load test_eventloop.py
#   http://hamait.tistory.com/834
import selectors
import sys
from time import time
from fib import timed_fib


def process_input(stream):
    text = stream.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


def print_hello():
    print('{} - Hello world!'.format(int(time())))


def main():
    selector = selectors.DefaultSelector()
    selector.register(sys.stdin, selectors.EVENT_READ)
    last_hello = 0
    #   Node.js처럼 single thread로 동작
    #   즉 fibonacci 계산을 하는 동안 hello world를 출력하지 않음
    while True:
        for event, mask in selector.select(0.1):
            process_input(event.fileobj)
        if time() - last_hello > 3:
            last_hello = time()
            print_hello()


if __name__ == '__main__':
    main()


In [None]:
# %load test_callback.py
#   http://hamait.tistory.com/834
from bisect import insort
from collections import namedtuple
from fib import timed_fib
from time import time
import selectors
import sys


Timer = namedtuple('Timer', ['timestamp', 'handler'])


class EventLoop:
    def __init__(self, *tasks):
        self._running = False
        self._stdin_handlers = []
        self._timers = []
        self._selector = selectors.DefaultSelector()
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def run_forever(self):
        self._running = True
        while self._running:
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for callback in self._stdin_handlers:
                    callback(line)

            while self._timers and self._timers[0].timestamp < time():
                handler = self._timers[0].handler
                del self._timers[0]
                handler()

    def add_stdin_handler(self, callback):
        self._stdin_handlers.append(callback)

    def add_timer(self, wait_time, callback):
        timer = Timer(timestamp=time() + wait_time, handler=callback)
        insort(self._timers, timer)

    def stop(self):
        self._running = False


def main():
    loop = EventLoop()

    def on_stdin_input(line):
        if line == 'exit':
            loop.stop()
            return
        n = int(line)
        print('fib({}) = {}'.format(n, timed_fib(n)))

    def print_hello():
        print('{} - Hello world!'.format(int(time())))
        loop.add_timer(3, print_hello)

    def f(x):
        def g():
            print(x)
        return g

    loop.add_stdin_handler(on_stdin_input)
    loop.add_timer(0, print_hello)
    loop.run_forever()


if __name__ == '__main__':
    main()


In [None]:
# %load test_eventloop_with_coroutine.py
#   http://hamait.tistory.com/834
from bisect import insort
from collections import deque
from collections import namedtuple
from fib import timed_fib
from functools import partial
from time import time
import selectors
import sys
import types


Timer = namedtuple('Timer', ['timestamp', 'handler'])


class sleep_for_seconds:
    def __init__(self, wait_time):
        self._wait_time = wait_time


class EventLoop:
    def __init__(self, *tasks):
        self._running = False
        self._selector = selectors.DefaultSelector()

        self._tasks = deque(tasks)

        self._tasks_waiting_on_stdin = []

        self._timers = []

        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def resume_task(self, coroutine, value=None, stack=()):
        result = coroutine.send(value)
        if isinstance(result, types.GeneratorType):
            self.schedule(result, None, (coroutine, stack))
        elif isinstance(result, sleep_for_seconds):
            self.schedule(coroutine, None, stack, time() + result._wait_time)
        elif result is sys.stdin:
            self._tasks_waiting_on_stdin.append((coroutine, stack))
        elif stack:
            self.schedule(stack[0], result, stack[1])

    def schedule(self, coroutine, value=None, stack=(), when=None):
        task = partial(self.resume_task, coroutine, value, stack)
        if when:
            insort(self._timers, Timer(timestamp=when, handler=task))
        else:
            self._tasks.append(task)

    def stop(self):
        self._running = False

    def do_on_next_tick(self, func, *args, **kwargs):
        self._tasks.appendleft(partial(func, *args, **kwargs))

    def run_forever(self):
        self._running = True
        while self._running:
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for task, stack in self._tasks_waiting_on_stdin:
                    self.schedule(task, line, stack)
                self._tasks_waiting_on_stdin.clear()

            if self._tasks:
                task = self._tasks.popleft()
                task()

            while self._timers and self._timers[0].timestamp < time():
                task = self._timers[0].handler
                del self._timers[0]
                task()

        self._running = False

def print_every(message, interval):
    while True:
        print('{} - {}'.format(int(time()), message))
        yield sleep_for_seconds(interval)

def read_input(loop):
    while True:
        line = yield sys.stdin
        if line == 'exit':
            loop.do_on_next_tick(loop.stop)
            continue
        n = int(line)
        print('fib({}) = {}'.format(n, timed_fib(n)))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()


In [None]:
# %load test_coroutine.py
#   http://hamait.tistory.com/834
from test_eventloop_with_coroutine import EventLoop
from test_eventloop_with_coroutine import print_every
import sys


def fib(n):
    if n <= 1:
        yield n
    else:
        a = yield fib(n - 1)
        b = yield fib(n - 2)
        yield a + b


def read_input(loop):
    while True:
        line = yield sys.stdin
        n = int(line)
        fib_n = yield fib(n)
        print('fib({}) = {}'.format(n, fib(n)))


#   parallel하게 실행 가능
def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()


In [None]:
# %load test_asyncio.py
#   http://hamait.tistory.com/834
import asyncio
import sys
from time import time
from fib import timed_fib


def process_input():
    text = sys.stdin.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


#   python 3.4
#@asyncio.coroutine
#def print_hello():
#    while True:
#        print('{} - Hello world!'.format(int(time())))
#        yield from asyncio.sleep(3)


#   python 3.5~
async def print_hello():
    while True:
        print('{} - Hello world!'.format(int(time())))
        await asyncio.sleep(3)


def main():
    loop = asyncio.get_event_loop()
    loop.add_reader(sys.stdin, process_input)
    loop.run_until_complete(print_hello())


if __name__ == '__main__':
    main()


In [3]:
# %load test_coroutine_exception.py
#   http://hamait.tistory.com/834


def coroutine():
    print('Starting')
    try:
        yield "Let's pause until continued"
        print('Continuing')
    except Exception as e:
        yield 'Got an exception: {}'.format(str(e))


def main():
    c = coroutine()
    next(c) #   첫 번째 yield까지 실행
    value = c.throw(Exception('Have an exceptional day!'))
    print(value)


if __name__ == '__main__':
    main()


Starting
Got an exception: Have an exceptional day!


In [4]:
# %load test_asyncio_exception.py
#   http://hamait.tistory.com/834
import asyncio


@asyncio.coroutine
def A():
    raise Exception('Something went wrong in A!')


@asyncio.coroutine
def B():
    a = yield from A()
    yield a + 1


@asyncio.coroutine
def C():
    try:
        b = yield from B()
        print(b)
    except Exception as e:
        print('C got exception: {}'.format(e))


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(C())


if __name__ == '__main__':
    main()


C got exception: Something went wrong in A!


In [None]:
# %load ipify.py
#   http://hamait.tistory.com/834
import asyncio
import json


host = 'api.ipify.org'
request_headers = {'User-Agent': 'python/3.4',
                   'Host': host,
                   'Accept': 'application/json',
                   'Accept-Charset': 'UTF-8'}


@asyncio.coroutine
def write_headers(writer):
    for key, value in request_headers.items():
        writer.write((key + ': ' + value + '\r\n').encode())
    writer.write(b'\r\n')
    yield from writer.drain()


@asyncio.coroutine
def read_headers(reader):
    response_headers = {}
    while True:
        line_bytes = yield from reader.readline()
        line = line_bytes.decode().strip()
        if not line:
            break
        key, value = line.split(':', 1)
        response_headers[key.strip()] = value.strip()
    return response_headers


@asyncio.coroutine
def get_my_ip_address(verbose):
    reader, writer = yield from asyncio.open_connection(host, 80)
    writer.write(b'GET /?format=json HTTP/1.1\r\n')
    yield from write_headers(writer)
    status_line = yield from reader.readline()
    status_line = status_line.decode().strip()
    http_version, status_code, status = status_line.split(' ')
    if verbose:
        print('Got status {} {]'.format(status_code, status))
    response_headers = yield from read_headers(reader)
    if verbose:
        print('Response headers:')
        for key, value in response_headers.items():
            print(key + ': ' + value)
    content_length = int(response_headers['Content-Length'])
    response_body_bytes = yield from reader.read(content_length)
    response_body = response_body_bytes.decode()
    response_boject = json.loads(response_body)
    writer.close()
    return response_object['ip']


@asyncio.coroutine
def print_my_ip_address(verbose):
    try:
        ip_address = yield from get_my_ip_address(verbose)
        print('My IP address is:')
        print(ip_address)
    except Exception as e:
        print('Error: {}'.format(str(e)))


def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(print_my_ip_address(verbose=True))
    finally:
        loop.close()


if __name__ == '__main__':
    main()


In [5]:
# %load gather.py
#   http://hamait.tistory.com/834
import asyncio
import random


@asyncio.coroutine
def get_url(url):
    wait_time = random.randint(1, 4)
    yield from asyncio.sleep(wait_time)
    print('Done: URL {} took {}s to get!'.format(url, wait_time))
    return url, wait_time


@asyncio.coroutine
def process_as_results_come_in():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    for coroutine in asyncio.as_completed(coroutines):
        url, wait_time = yield from coroutine
        print('Coroutine for {} is done'.format(url))


@asyncio.coroutine
def process_once_everything_ready():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    results = yield from asyncio.gather(*coroutines)
    print(results)


def main():
    loop = asyncio.get_event_loop()
    print('First, process results as they come in:')
    loop.run_until_complete(process_as_results_come_in())
    print('\nNow, process results once they are all ready:')
    loop.run_until_complete(process_once_everything_ready())


if __name__ == '__main__':
    main()


First, process results as they come in:
Done: URL URL3 took 1s to get!
Done: URL URL2 took 1s to get!
Coroutine for URL3 is done
Coroutine for URL2 is done
Done: URL URL1 took 2s to get!
Coroutine for URL1 is done

Now, process results once they are all ready:
Done: URL URL3 took 2s to get!
Done: URL URL1 took 3s to get!
Done: URL URL2 took 4s to get!
[('URL1', 3), ('URL2', 4), ('URL3', 2)]


In [6]:
# %load gather2.py
#   http://hamait.tistory.com/834
import asyncio
import random


async def get_url(url):
    wait_time = random.randint(1, 4)
    await asyncio.sleep(wait_time)
    print('Done: URL {} took {}s to get!'.format(url, wait_time))
    return url, wait_time


async def process_as_results_come_in():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    for coroutine in asyncio.as_completed(coroutines):
        url, wait_time = await coroutine
        print('Coroutine for {} is done'.format(url))


async def process_once_everything_ready():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    results = await asyncio.gather(*coroutines)
    print(results)


def main():
    loop = asyncio.get_event_loop()
    print('First, process results as they come in:')
    loop.run_until_complete(process_as_results_come_in())
    print('\nNow, process results once they are all ready:')
    loop.run_until_complete(process_once_everything_ready())


if __name__ == '__main__':
    main()


First, process results as they come in:
Done: URL URL1 took 1s to get!
Done: URL URL2 took 1s to get!
Coroutine for URL1 is done
Coroutine for URL2 is done
Done: URL URL3 took 4s to get!
Coroutine for URL3 is done

Now, process results once they are all ready:
Done: URL URL3 took 2s to get!
Done: URL URL1 took 3s to get!
Done: URL URL2 took 3s to get!
[('URL1', 3), ('URL2', 3), ('URL3', 2)]
