# Chapter7

## 예시 코드: GAME of LIFE
1. 살아 있는 세포에 살아 있는 이웃이 2개 미만이면 **인구 부족(underpopulation)** 으로 죽는다.
2. 살아 있는 세포에 살아 있는 이웃이 2개나 3개이면 다음 세대에도 살아있는다.
3. 살아 있는 세포에 살아 있는 이웃이 3개 초과이면 **인구 과잉(overpopulation)** 으로 죽는다.
4. 죽어 있는 세포에 살아 있는 이웃이 정확히 3개이면 **번식(reproduction)** 으로 다음 세대에 살아난다.

In [13]:
ALIVE = '*'
EMPTY = '-'

class SimulationError(Exception):   # 시뮬레이션 에러 발생시 처리하는 클래스
    pass

class Grid:  # 게임판 클래스
    def __init__(self, height, width):
        self.height = height    # 게임판의 높이
        self.width = width  # 게임판의 너비
        self.rows = []  # cell의 상태를 저장하는 2차원 리스트
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):    # 특정 cell의 상태를 반환하는 함수
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state): # 특정 cell의 상태를 설정하는 함수
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):  # 게임판의 상태를 문자열로 반환하는 함수
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output
    
class LockingGrid(Grid):    # 게임판을 고정하는 클래스
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)
        
def count_neighbors(y, x, get): # 특정 cell의 이웃 cell의 상태를 반환하는 함수
    n_ = get(y - 1, x + 0) # 북(N)
    ne = get(y - 1, x + 1) # 북동(NE)
    e_ = get(y + 0, x + 1) # 동(E)
    se = get(y + 1, x + 1) # 남동(SE)
    s_ = get(y + 1, x + 0) # 남(S)
    sw = get(y + 1, x - 1) # 남서(SW)
    w_ = get(y + 0, x - 1) # 서(W)
    nw = get(y - 1, x - 1) # 북서(NW)
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0   # 이웃 cell 중 살아있는 cell의 개수
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

def count_neighbors_thread(item):
    y, x, state, get = item
    try:
        neighbors = count_neighbors(y, x, get)
    except Exception as e:
        neighbors = e
    return (y, x, state, neighbors)

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY # 살아 있는 이웃이 너무 적음: 죽음
        elif neighbors > 3:
            return EMPTY # 살아 있는 이웃이 너무 많음: 죽음
    else:
        if neighbors == 3:
            return ALIVE # 다시 생성됨
    # 여기서 블러킹 I/O를 수행
    #data = my_socket.recv(100)
    return state

def step_cell(y, x, get, set):  # 특정 cell의 상태를 업데이트하는 함수 
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = game_logic(state, neighbors)
    set(y, x, next_state)



### Grid 시각화 Class

In [14]:
class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

## Item58. Using Queue for Concurrency Requires Refactoring

- 이전 Item57에서는 **On-demand Threading**과 **ThreadPool**을 사용하여 I/O 문제를 다룰때 Thread 사용에 대한 단점을 설명함
- **On-Demand Treading**: 필요할 때마다 새로운 스레드를 생성하여 작업을 수행
    - *동적 생성*: 필요할 때마다 스레드를 생성하므로, 작업의 수나 종류가 다양할 경우 유연하게 대처할 수 있습니다.
    - *자원 관리*: 각 작업이 완료된 후 스레드를 종료하므로, 자원을 효율적으로 관리할 수 있지만, 스레드 생성과 종료에 따른 오버헤드가 발생할 수 있습니다.
    - *성능 저하 가능성*: 많은 수의 스레드를 동시 생성할 경우, 스레드 관리에 따라 성능이 저하될 수 있습니다. 특히, 스레드 생성과 컨텍스트 스위칭에 드는 비용이 커질 수 있습니다.
- **ThreadPool**: 미리 생성된 Thread의 집합을 이용
    - Thread를 재사용하여 오버헤드를 줄이고 성능 개선 가능

### 멀티 스레딩 사용을 위한 Queue 기본 설계

In [15]:
from queue import Queue
from threading import Thread
from threading import Lock
import time

# 멀티 스레딩 환경에서 Queue를 이용하는 이유
# Queue는 다수의 스레드가 동시에 접근할 수 있는 자료구조이기 때문 => 스레드 간의 작업을 쉽게 조율할 수 있음

class ClosableQueue(Queue):
    # 스레드를 종료시키기 위한 신호 객체
    # 해당 객체를 Queue에 넣으면 스레드가 종료
    SENTINEL = object()

    def close(self):    # SENTINEL을 Queue에 넣어서 스레드를 종료시키는 함수
        self.put(self.SENTINEL)

    def __iter__(self): # Queue에 들어있는 아이템을 꺼내는 함수
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return   # 스레드를 종료시킨다
                yield item
            finally:
                self.task_done()

in_queue = ClosableQueue()
logic_queue = ClosableQueue()
out_queue = ClosableQueue()

class StoppableWorker(Thread):  # 스레드를 상속받는 클래스
    def __init__(self, func, in_queue, out_queue, **kwargs):
        super().__init__(**kwargs)
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):  # 특정 함수 func에 input(=in_queue)을 넣고 output(=out_queue)을 받는 함수
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


#### Queue기반 스레딩 사용

In [16]:

def game_logic_thread(item):    # 게임 로직을 스레드로 실행하는 함수
    y, x, state, neighbors = item
    try:
        next_state = game_logic(state, neighbors)
    except Exception as e:  # 게임 로직에서 에러 발생시 에러를 반환
        next_state = e
    return (y, x, next_state)

# 스레드를 미리 시작한다
threads = []

for _ in range(5):
    thread = StoppableWorker(
        count_neighbors_thread, in_queue, logic_queue)
    thread.start()
    threads.append(thread)

for _ in range(5):
    thread = StoppableWorker(
        game_logic_thread, logic_queue, out_queue)
    thread.start()
    threads.append(thread)

def simulate_phased_pipeline(grid, in_queue, logic_queue, out_queue):  # 게임판의 상태를 Queue에 업데이트하는 함수
    for y in range(grid.height):
        for x in range(grid.width):
            state = grid.get(y, x)
            item = (y, x, state, grid.get)
            in_queue.put(item)  # 팬아웃
    in_queue.join()  # 모든 아이템이 처리될 때까지 대기
    logic_queue.join()  # 파이프라인을 순서대로 실행한다
    out_queue.close()  # 스레드를 종료시키기 위해 Queue에 SENTINEL을 넣는다

    # 다음 상태 생성
    next_grid = LockingGrid(grid.height, grid.width)
    for item in out_queue:  # 팬인
        y, x, next_state = item
        if isinstance(next_state, Exception):
            raise SimulationError(y, x) from next_state
        next_grid.set(y, x, next_state)
    return next_grid

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = simulate_phased_pipeline(
        grid, in_queue, logic_queue, out_queue)

print(columns)

for thread in threads:
    in_queue.close()
for thread in threads:
    logic_queue.close()
for thread in threads:
    thread.join()

    0     |     1     |     2     |     3     |     4    
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------


## Item 59: Consider ThreadPoolExecutor When Threads Are Necessary 

#### python 라이브러리 사용
- 직접 refactoring하는 작업은 너무 많은 작업량이 필요하고, 다단계로 이뤄진 파이프라인이라면 더 많은 작업량이 필요
- python에서 제공하는 concurrent.futures의 ThreadPoolExecutor 사용

In [17]:
from concurrent.futures import ThreadPoolExecutor

def simulate_pool(pool, grid):
    next_grid = LockingGrid(grid.height, grid.width)
    futures = []
    for y in range(grid.height):
        for x in range(grid.width):
            args = (y, x, grid.get, next_grid.set)
            future = pool.submit(step_cell, *args)  # 팬아웃
            futures.append(future)

    for future in futures:
        future.result()  # 팬인

    return next_grid

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
with ThreadPoolExecutor(max_workers=10) as pool:
    for i in range(5):
        columns.append(str(grid))
        grid = simulate_pool(pool, grid)

print(columns)

    0     |     1     |     2     |     3     |     4    
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------


## Item 60: Achieve Highly Concurrent I/O with Coroutines

- python 라이브러리인 current.futures의 ThreadPoolExecutor은 제한된 수의 I/O 병렬성만 제공 -> 많은 개수의 셀을 동시에 처리하기에는 한계 존재
- 이를 보완할 수 있는 코루틴 사용

In [31]:
async def count_neighbors_async(y, x, get):
    n_ = get(y - 1, x + 0) # 북(N)
    ne = get(y - 1, x + 1) # 북동(NE)
    e_ = get(y + 0, x + 1) # 동(E)
    se = get(y + 1, x + 1) # 남동(SE)
    s_ = get(y + 1, x + 0) # 남(S)
    sw = get(y + 1, x - 1) # 남서(SW)
    w_ = get(y + 0, x - 1) # 서(W)
    nw = get(y - 1, x - 1) # 북서(NW)
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    # 여기서 블러킹 I/O를 수행한다
    #data = my_socket.recv(100)
    return count

async def game_logic_async(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY # 살아 있는 이웃이 너무 적음: 죽음
        elif neighbors > 3:
            return EMPTY # 살아 있는 이웃이 너무 많음: 죽음
    else:
        if neighbors == 3:
            return ALIVE # 다시 생성됨

    # 여기서 I/O를 수행한다
    #data = await my_socket.recv(100)
    return state

async def step_cell_async(y, x, get, set):
    state = get(y, x)
    neighbors = await count_neighbors_async(y, x, get)
    next_state = await game_logic_async(state, neighbors)
    set(y, x, next_state)

In [32]:
# Jupyter Notebook과 같은 환경에서는 기본적으로 이벤트 루프가 이미 실행 중인 상태. 이 경우, nest_asyncio 라이브러리를 사용
import nest_asyncio
nest_asyncio.apply()
# Jupyter Notebook이 아닌 경우
import asyncio

async def simulate_async(grid):
    next_grid = Grid(grid.height, grid.width)

    tasks = []
    for y in range(grid.height):
        for x in range(grid.width):
            task = step_cell_async(
                y, x, grid.get, next_grid.set)  # 팬아웃
            tasks.append(task)

    await asyncio.gather(*tasks)  # 팬인

    return next_grid

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    # python 3.7이상에서만 asyncio.run을 제공함
    grid = asyncio.run(simulate_async(grid)) # 이벤트 루프를 실행한다

print(columns)

    0     |     1     |     2     |     3     |     4    
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------
