# Async programming

## I/O 대기
+ CPU 클럭의 속도가 1초라고 가정했을 때
+ RAM은 20초, SSD는 4일, HDD 6개월, 네트워크 전송은?

## 동시성
+ 여러 개의 요청을 동시에 다룰 수 있는 시스템을 구현하는 방식
+ 아래는 동기식으로 구현된 기본 함수

In [1]:
import time 
import functools


DEFAULT_FMT = '[{elapsed:0.8f}s] {name}({args}, {kwargs}) -> {result}'

def clock(fmt=DEFAULT_FMT): 
    def decorate(func): 
        @functools.wraps(func)
        def clocked(*_args, **_kwargs): # clocked에서 *, ** 키워드를 통해 설정된 인수를 변수화
            t0 = time.time()
            _result = func(*_args)
            elapsed = time.time() - t0
            name = func.__name__
            args = ', '.join(repr(arg) for arg in _args)
            pairs = ['%s=%r' % (k, w) for k, w in sorted(_kwargs.items())]
            kwargs = ', '.join(pairs)
            result = repr(_result)
            print(fmt.format(**locals()))
            return _result # clocked()는 데커레이트된 함수를 대체하므로, 원래 함수가 반환하는 값을 반환해야 한다.
        return clocked     # decorate()는 clocked()를 반환한다. 
    return decorate        # clock()은 decorate()를 반환한다. 

In [3]:
def network_request(number):
    time.sleep(1.0)
    return {"success": True, "result": number ** 2}


@clock()
def fetch_square(number):
    response = network_request(number)
    if response["success"]:
        return response["result"]

        
if __name__ == "__main__":
    t0 = time.time()

    returns = [fetch_square(i) for i in range(10)]
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[1.00105643s] fetch_square(0, ) -> 0
[1.00107455s] fetch_square(1, ) -> 1
[1.00107980s] fetch_square(2, ) -> 4
[1.00106001s] fetch_square(3, ) -> 9
[1.00104928s] fetch_square(4, ) -> 16
[1.00105405s] fetch_square(5, ) -> 25
[1.00106549s] fetch_square(6, ) -> 36
[1.00105596s] fetch_square(7, ) -> 49
[1.00107670s] fetch_square(8, ) -> 64
[1.00106001s] fetch_square(9, ) -> 81
Total: 10.012276649475098 sec


### Callback

+ 원리만 이해하고 복잡하니 쓰지 말자

In [2]:
import threading


def network_request_async(number, on_done):
    
    def timer_done():
        time.sleep(1.0)
        on_done({"success": True, "result": number ** 2})

    timer = threading.Thread(target=timer_done, args=[])
    timer.start()
    
    
@clock()
def fetch_square_async(number):
    
    def on_done(response):
        if response["success"]:
            return response["result"]
        
    network_request_async(number, on_done)

In [3]:
if __name__ == "__main__":
    t0 = time.time()

    returns = [fetch_square_async(i) for i in range(10)]
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00017548s] fetch_square_async(0, ) -> None
[0.00040245s] fetch_square_async(1, ) -> None
[0.00021434s] fetch_square_async(2, ) -> None
[0.00022650s] fetch_square_async(3, ) -> None
[0.00008178s] fetch_square_async(4, ) -> None
[0.00027943s] fetch_square_async(5, ) -> None
[0.00015831s] fetch_square_async(6, ) -> None
[0.00014424s] fetch_square_async(7, ) -> None
[0.00013137s] fetch_square_async(8, ) -> None
[0.00023603s] fetch_square_async(9, ) -> None
Total: 0.0025157928466796875 sec


### Future

+ 요청한 자원을 추적하고 가용하게 될 때 까지 대기하는 데 도움이 되는 추상화

In [4]:
from concurrent.futures import Future

def callback(future):
    print(future.result()[::-1])

fut = Future()
fut.add_done_callback(callback)

In [5]:
vars(fut)

{'_condition': <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f671073cb40>, 0)>,
 '_state': 'PENDING',
 '_result': None,
 '_exception': None,
 '_waiters': [],
 '_done_callbacks': [<function __main__.callback(future)>]}

In [6]:
fut.set_result("HELLO") ## 퓨쳐를 리턴함

OLLEH


In [7]:
vars(fut)

{'_condition': <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f671073cb40>, 0)>,
 '_state': 'FINISHED',
 '_result': 'HELLO',
 '_exception': None,
 '_waiters': [],
 '_done_callbacks': [<function __main__.callback(future)>]}

+ Network_request_async 를 쓰레드 방식으로 변환

In [8]:
def network_reqeust_async(number):
    future = Future()
    result = {"success": True, "result": number ** 2}
    timer = threading.Timer(1.0, lambda: future.set_result(result))
    timer.start()
    return future


@clock()
def fetch_square_async(number):
    fut = network_reqeust_async(number)

    def on_done_future(future):
        response = future.result()
        if response["success"]:
#             return response["result"]
            print(response["result"], flush=True)        
        
    fut.add_done_callback(on_done_future)

In [9]:
if __name__ == "__main__":
    t0 = time.time()

    returns = [fetch_square_async(i) for i in range(10)]
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00032854s] fetch_square_async(0, ) -> None
[0.00040984s] fetch_square_async(1, ) -> None
[0.00018501s] fetch_square_async(2, ) -> None
[0.00018477s] fetch_square_async(3, ) -> None
[0.00015330s] fetch_square_async(4, ) -> None
[0.00018501s] fetch_square_async(5, ) -> None
[0.00016809s] fetch_square_async(6, ) -> None
[0.00015807s] fetch_square_async(7, ) -> None
[0.00014043s] fetch_square_async(8, ) -> None
[0.00014877s] fetch_square_async(9, ) -> None
Total: 0.0024878978729248047 sec
01
925
36
81

49

16
4
64


## asyncio 프레임워크
+ loop = asyncio.get_event_loop() 함수를 호출해 asyncio 루프를 얻을 수 있음
+ loop.call_later() 를 사용해 콜백 실행을 예약할 수 있음
+ loop.stop 메소드를 사용해서 루프 종료
+ loop.run_forever() 로 루프 시작 가능
+ loop.run_until_complete()을 통해 개별적인 실행 가능
+ ensure_future()는 Task 인스턴스를 반환해서 실행하고 싶은 코루틴을 이벤트 루프로 전달 가능

In [25]:
import asyncio
import nest_asyncio
nest_asyncio.apply() # This module patches asyncio to allow nested (due to jupyter notebook)

async def network_request(number):
    await asyncio.sleep(1.0)
    return {"success": True, "result": number ** 2}

@clock()
async def fetch_square(number):
    response = await network_request(number)
    if response["success"]:
        print(response["result"], flush=True)

In [26]:
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    t0 = time.time()

    loop.run_until_complete(fetch_square(0))
    loop.run_until_complete(fetch_square(1))
    loop.run_until_complete(fetch_square(2))
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00000119s] fetch_square(0, ) -> <coroutine object fetch_square at 0x7f67105d1290>
0
[0.00000048s] fetch_square(1, ) -> <coroutine object fetch_square at 0x7f67105d1290>
1
[0.00000143s] fetch_square(2, ) -> <coroutine object fetch_square at 0x7f67105d1290>
4
Total: 3.006361722946167 sec


In [61]:
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    t0 = time.time()

    loop.run_until_complete(asyncio.gather(fetch_square(0), fetch_square(1), fetch_square(2)))
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00000072s] fetch_square(0, ) -> <coroutine object fetch_square at 0x7f65389043b0>
[0.00000048s] fetch_square(1, ) -> <coroutine object fetch_square at 0x7f66865fa710>
[0.00000072s] fetch_square(2, ) -> <coroutine object fetch_square at 0x7f66865fa950>
0
1
4
Total: 1.0058913230895996 sec


In [62]:
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    t0 = time.time()

    asyncio.ensure_future(fetch_square(0))
    asyncio.ensure_future(fetch_square(1))
    asyncio.ensure_future(fetch_square(2))
    
#     loop.run_forever()

    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00000048s] fetch_square(0, ) -> <coroutine object fetch_square at 0x7f6538904050>
[0.00000048s] fetch_square(1, ) -> <coroutine object fetch_square at 0x7f66866acd40>
[0.00000095s] fetch_square(2, ) -> <coroutine object fetch_square at 0x7f66865fa950>
Total: 0.000209808349609375 sec
0
1
4


In [2]:
import asyncio
import json
from pathlib import Path

from collections import defaultdict
import pandas as pd
from tqdm import tqdm

import time


path = Path("/home/shyeon/workspace/python/SparkDefinitiveGuide/data/activity-data/")


def load_json_byline(path):
    with open(path, "r", encoding="UTF-8") as f:
        for line in f:
            yield line


def load_json_byfile(fps):
    for fp in fps:
        yield from load_json_byline(fp)


@clock()
def main():
    dd = defaultdict(list)
    files = path.glob("*.json")
    
    for lines in tqdm(load_json_byfile(files)):
        for key, value in json.loads(lines).items():
            dd[key].append(value)
                
    result = pd.DataFrame(dd)
    print(result.head())    
    
    
if __name__ == "__main__":
    main()

6240991it [00:37, 166059.50it/s]


    Arrival_Time        Creation_Time    Device  Index   Model User     gt  \
0  1424686734976  1424688581028687867  nexus4_2      3  nexus4    g  stand   
1  1424686735185  1424688581240144166  nexus4_2     45  nexus4    g  stand   
2  1424686735390  1424686733392914805  nexus4_1     78  nexus4    g  stand   
3  1424686735590  1424688581638612183  nexus4_2    124  nexus4    g  stand   
4  1424686735791  1424686733790650400  nexus4_1    157  nexus4    g  stand   

          x         y         z  
0  0.000687  0.033356  0.022659  
1 -0.007858 -0.047821  0.016251  
2 -0.001801  0.026138 -0.009201  
3  0.002823 -0.008301  0.003433  
4 -0.000732  0.016525 -0.014542  
[47.13235378s] main(, ) -> None


In [2]:
import asyncio
import json
from pathlib import Path

from collections import defaultdict
import pandas as pd
from tqdm import tqdm

import time


path = Path("/home/shyeon/workspace/python/SparkDefinitiveGuide/data/activity-data/")


@asyncio.coroutine
def load_json_byline(path):
    with open(path, "r", encoding="UTF-8") as fp:
        yield from fp


@asyncio.coroutine        
def load_json_byfile(fps):
    for fp in fps:
        yield from load_json_byline(fp)


@clock()
def main():
    dd = defaultdict(list)
    files = path.glob("*.json")
    
    for lines in tqdm(load_json_byfile(files)):
        for key, value in json.loads(lines).items():
            dd[key].append(value)
                
    result = pd.DataFrame(dd)
    print(result.head())    
    
    
if __name__ == "__main__":
    main()

6240991it [00:39, 158451.46it/s]


    Arrival_Time        Creation_Time    Device  Index   Model User     gt  \
0  1424686734976  1424688581028687867  nexus4_2      3  nexus4    g  stand   
1  1424686735185  1424688581240144166  nexus4_2     45  nexus4    g  stand   
2  1424686735390  1424686733392914805  nexus4_1     78  nexus4    g  stand   
3  1424686735590  1424688581638612183  nexus4_2    124  nexus4    g  stand   
4  1424686735791  1424686733790650400  nexus4_1    157  nexus4    g  stand   

          x         y         z  
0  0.000687  0.033356  0.022659  
1 -0.007858 -0.047821  0.016251  
2 -0.001801  0.026138 -0.009201  
3  0.002823 -0.008301  0.003433  
4 -0.000732  0.016525 -0.014542  
[49.12507033s] main(, ) -> None


In [2]:
import time

import asyncio
import nest_asyncio
nest_asyncio.apply() # This module patches asyncio to allow nested (due to jupyter notebook)
import aiofile

import json
import pandas as pd
from collections import defaultdict
from pathlib import Path
from tqdm import tqdm


path = Path("/home/shyeon/workspace/python/SparkDefinitiveGuide/data/activity-data/")


@asyncio.coroutine
def load_json_byline(path):
    with open(path, "r", encoding="UTF-8") as fp:
        yield from fp


@asyncio.coroutine        
def load_json_byfile(fps):
    for fp in fps:
        yield from load_json_byline(fp)


def convert_json2dict(json_str:str, dic:dict):
    for key, value in json.loads(json_str).items():
        dic[key].append(value)
        
        
@clock()
@asyncio.coroutine   
def main():
    dd = defaultdict(list)
    files = path.glob("*.json")
    
    task = [convert_json2dict(lines, dd) for lines in tqdm(load_json_byfile(files))]
    
    return task
                
#     result = pd.DataFrame(dd)
#     print(result.head())
    
    
if __name__ == "__main__":
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*main()))
#     loop.close() 

15960it [00:00, 159599.77it/s]

[0.00000072s] main(, ) -> <generator object main at 0x7fae80d23dd0>


6240991it [00:37, 165043.98it/s]


In [7]:
import time

import asyncio
import nest_asyncio
nest_asyncio.apply() # This module patches asyncio to allow nested (due to jupyter notebook)
from aiofile import AIOFile, Reader, Writer

import json
import pandas as pd
from collections import defaultdict
from pathlib import Path
from tqdm import tqdm


path = Path("/home/shyeon/workspace/python/SparkDefinitiveGuide/data/activity-data/")


async def load_json(path):
    async with AIOFile(path, "r", encoding="UTF-8") as afp:
        print(await afp.read())



def convert_json2dict(json_str:str, dic:dict):
    for key, value in json.loads(json_str).items():
        dic[key].append(value)
        
        
@clock()
def main():
    dd = defaultdict(list)
    files = path.glob("*.json")
    
    tasks = [load_json(file) for file in files]
    
    return tasks
                
#     result = pd.DataFrame(dd)
#     print(result.head())
    
    
if __name__ == "__main__":
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*main()))
#     loop.close() 

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)

