In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'             # 'last_expr' 기본 / 'all'

In [None]:
# parallelism : multi core, multiple CPUs, GPU, ...

# execution unit : independent state and call stack : process, thread, coroutine

## asyncio

In [3]:
import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)      # asyncio.sleep(delay, result=None) : suspends the current task, allowing other tasks to run
    print('world')

main()                      # simply calling a coroutine doesn't schedule it to be executed
                            # to run a corou: 
# asyncio.run(main())           # asyncio.run(corou())
                                # await corou()
                                # asyncio.create_task(coruo()) -> await tsk
                                # async with asyncio.TaskGroup() as tg: tsk = tg.create_task(corou())

<coroutine object main at 0x10d596440>

In [None]:
import time, asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    
    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())                                 # total 3 sec.

In [None]:
import time, asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))      # runs concurrently
    task2 = asyncio.create_task(say_after(2, 'world'))

    print(f'started at {time.strftime("%X")}')

    await task1
    await task2

    print(f'finished at {time.strftime("%X")}')


asyncio.run(main())                                 # total 2 sec.

In [None]:
import time, asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    async with asyncio.TaskGroup() as tg:                       # from 3.11
        task1 = tg.create_task(say_after(1, 'hello'))
        task2 = tg.create_task(say_after(2, 'world'))

        print(f'started at {time.strftime("%X")}')
    
    # await is implicit when the context manager exits          # runs corous concurrently

    print(f'finished at {time.strftime("%X")}')


asyncio.run(main())                             # total 2 sec.

## game of life

In [15]:
L, D = '*-'

class Grid:
    def __init__(self, height, width):
        self.height, self.width = height, width
        self.rows = [[D for _ in range(self.width)] for _ in range(self.height)]

    def get(self, r, c):
        return self.rows[r % self.height][c % self.width]
    
    def set(self, r, c, state):
        self.rows[r % self.height][c % self.width] = state

    def __str__(self):
        return '\n'.join(''.join(row) for row in self.rows)
    

grid = Grid(5, 9)
grid.set(0, 3, L)
grid.set(1, 4, L)
grid.set(2, 2, L)
grid.set(2, 3, L)
grid.set(2, 4, L)
print(grid)

---*-----
----*----
--***----
---------
---------


In [16]:
def count_neighbors(r, c, get):     # get(r, c) method
    neighbor_states = [get(r+r_, c+c_) for r_ in (-1,0,1) 
                           for c_ in (-1,0,1) if (r_,c_) != (0, 0)]
    return neighbor_states.count(L)

def game_logic(state, neighbors):
    if state == L:
        if neighbors < 2 or neighbors > 3:
            return D
    else:
        if neighbors == 3:
            return L
    return state

def step_cell(r, c, get, set):
    state = get(r, c)
    neighbors = count_neighbors(r, c, get)
    next_state = game_logic(state, neighbors)
    set(r, c, next_state)

def simulate(grid):
    next_grid = Grid(grid.height, grid.width)
    for r in range(grid.height):
        for c in range(grid.width):
            step_cell(r, c, grid.get, next_grid.set)
    return next_grid


class ColumnPrinter:
    def __init__(self):
        self.columns = []
        self.h, self.w = 0, 0

    def append(self, x):
        l = len(self.columns)
        w = len(x.split('\n')[0])
        self.columns.append(f'{l:^{w}}\n' + x)

    def __str__(self):
        l = len(self.columns)
        h = len(self.columns[0].split('\n'))
        rows = []
        for i in range(h):
            row = ' | '.join(self.columns[j].split('\n')[i] for j in range(l))
            rows.append(row)
        return '\n'.join(rows)
    
columns = ColumnPrinter()
for _ in range(5):
    columns.append(str(grid))
    grid = simulate(grid)
print(columns)

    0     |     1     |     2     |     3     |     4    
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------


In [17]:
from threading import Lock


class LockingGrid(Grid):

    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()
        
    def get(self, r, c):
        with self.lock:
            return super().get(r, c)
        
    def set(self, r, c, state):
        with self.lock:
            return super().set(r, c, state)

In [18]:
from concurrent.futures import ThreadPoolExecutor


def simulate_pool(pool, grid):
    next_grid = LockingGrid(grid.height, grid.width)
    futures = []
    for r in range(grid.height):
        for c in range(grid.width):
            args = (r, c, grid.get, next_grid.set)
            future = pool.submit(step_cell, *args)      # fan out
            futures.append(future)

    for future in futures:
        future.result()                                 # fan in

    return next_grid


grid = LockingGrid(5, 9)
grid.set(0, 3, L)
grid.set(1, 4, L)
grid.set(2, 2, L)
grid.set(2, 3, L)
grid.set(2, 4, L)

columns = ColumnPrinter()
with ThreadPoolExecutor(max_workers=10) as pool:
    for _ in range(5):
        columns.append(str(grid))
        grid = simulate_pool(pool, grid)

print(columns)

    0     |     1     |     2     |     3     |     4    
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------


In [19]:
async def game_logic(state, neighbors):
    if state == L:
        if neighbors < 2 or neighbors > 3:
            return D
    else:
        if neighbors == 3:
            return L
    return state

async def step_cell(r, c, get, set):
    state = get(r, c)
    neighbors = count_neighbors(r, c, get)
    next_state = await game_logic(state, neighbors)
    set(r, c, next_state)

import asyncio


async def simulate(grid):
    next_grid = Grid(grid.height, grid.width)
    
    tasks = []
    for r in range(grid.height):
        for c in range(grid.width):
            task = step_cell(r, c, grid.get, next_grid.set)         # fan out
            tasks.append(task)

    await asyncio.gather(*tasks)                                    # fan in
    return next_grid


grid = Grid(5, 9)
grid.set(0, 3, L)
grid.set(1, 4, L)
grid.set(2, 2, L)
grid.set(2, 3, L)
grid.set(2, 4, L)

columns = ColumnPrinter()
for _ in range(5):
    columns.append(str(grid))
    grid = asyncio.run(simulate(grid))                              # run the event loop
print(columns)

RuntimeError: asyncio.run() cannot be called from a running event loop

## downloading country flags

In [20]:
# sequential download

import time
from pathlib import Path
import httpx

pop20_cc = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

base_url = 'https://www.fluentpython.com/data/flags'
dest_dir = Path('downloaded')

def save_flag(img, filename):
    (dest_dir / filename).write_bytes(img)

def get_flag(cc):
    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = httpx.get(url, timeout=6.1, follow_redirects=True)
    resp.raise_for_status()
    return resp.content

def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)
    return len(cc_list)

def main(downloader):
    dest_dir.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = downloader(pop20_cc)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

# main(download_many)

In [None]:
from concurrent import futures

def download_one(cc):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list):
    with futures.ThreadPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))

    return len(list(res))