## Future 동시성
- 비동기 작업 실행
- 적합한 작업일 경우 순차적 진행보다 압도적인 성능 향상
- 지연시간(Block), CPU 및 리소스 낭비 방지, Network I/O 관련 작업 동시성 활용 권장

1. 순차실행
1. concurrent.futures 방법 1
1. concurrent.futures 방법 2

In [1]:
import os
import time
import sys
import csv

## 순차실행

In [2]:
NATION_LS = 'Singapore Germany Israel Norway Italy \
              Canada France Spain Mexico'.split()
NATION_LS

['Singapore',
 'Germany',
 'Israel',
 'Norway',
 'Italy',
 'Canada',
 'France',
 'Spain',
 'Mexico']

In [3]:
TARGET_CSV = './resources/nations.csv'
DEST_DIR = './csvs'

### csv 헤더 기초 정보

In [4]:
HEADER = ['Region','Country', 'Item Type', 'Sales Channel',
          'Order Priority', 'Order Date', 'Order ID', 'Ship Date',
          'Units Sold', 'Unit Price', 'Unit Cost', 'Total Revenue',
          'Total Cost', 'Total Profit']
len(HEADER)

14

### 결과별 분리 함수


In [5]:
def get_sales_data(nt):
    with open(TARGET_CSV, 'r') as f:
        reader = csv.DictReader(f)
        # Dict을 리스트로 적재
        data = []
        # Header 확인
        # print(reader.fieldnames)
        for r in reader:
            # OrderedDict 확인
            # print(r)
            # 조건에 맞는 국가만 삽입
            if r['Country'] == nt:
                data.append(r)
    return data

### 중간 상황 출력

In [6]:
def show(text):
    print(text, end=' ')
    # 버퍼 비우기
    sys.stdout.flush()

### 국가별 csv 파일 저장

In [7]:
def save_csv(data, filename):
    # 최종 경로 생성
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'w', newline='') as fp:
        writer = csv.DictWriter(fp, fieldnames=HEADER)
        # Header write
        writer.writeheader()
        # Dict to CSV write
        for row in data:
            writer.writerow(row)

### 결과별 분리 함수 실행

In [8]:
def separate_many(nt_list):
    for nt in sorted(nt_list):
        # 분리 데이터
        data = get_sales_data(nt)
        # 상황 출력
        show(nt)
        # 파일 저장
        save_csv(data, nt.lower()+'.csv')
    return len(nt_list)

### main 함수 실행

In [9]:
def main(sepratate_many):
    # 시작 시간
    start_tm = time.time()
    # 결과 건수
    result_cnt = sepratate_many(NATION_LS)
    # 종료 시간
    end_tm = time.time() - start_tm
    # 최종 결과 출력
    msg = '\n{} csv seperated in {:.2f}s'
    print(msg.format(result_cnt, end_tm))

In [10]:
if __name__ == '__main__':
    main(separate_many)

Canada France Germany Israel Italy Mexico Norway Singapore Spain 
9 csv seperated in 18.36s


## concurrent.futures 방법 1
- `ThreadPoolExecutor`
- 서로 다른 스레드 또는 프로세스에서 실행 가능
- 내부 과정 알 필요 없으며, 고수준의 인터페이스 제공
- GIL(Global Interface Lock): 한번에 한 개의 스레드만 수행할 수 있도록 인터프리터 자체에서 잠금을 거는 것

In [14]:
from concurrent import futures

In [19]:
def separate_many(nt):
    # for nt in sorted(nt_list):  # 스레드로 동시 실행하므로 for문 필요 없음
    # 분리 데이터
    data = get_sales_data(nt)
    # 상황 출력
    show(nt)
    # 파일 저장
    save_csv(data, nt.lower()+'.csv')
    return nt

In [24]:
def main(sepratate_many):
    # worker 개수
    worker = min(20, len(NATION_LS))
    # 시작 시간
    start_tm = time.time()
    # 결과 건수
    with futures.ThreadPoolExecutor(worker) as executor:
        # map -> 작업 순서 유지, 즉시 실행
        result_cnt = executor.map(separate_many, sorted(NATION_LS))
    # 종료 시간
    end_tm = time.time() - start_tm
    # 최종 결과 출력
    msg = '\n{} csv seperated in {:.2f}s'
    print(msg.format(list(result_cnt), end_tm))

In [25]:
if __name__ == '__main__':
    main(separate_many)

Germany CanadaSpain  Norway France IsraelMexico  Italy Singapore 
['Canada', 'France', 'Germany', 'Israel', 'Italy', 'Mexico', 'Norway', 'Singapore', 'Spain'] csv seperated in 19.70s


>순차 진행보다 시간이 더 걸린다!! -> GIL 때문

GIL을 우회하기 위해서는 `ProcessPoolExecutor` 필요

### concurrent.futures 방법 2
- `ProcessPoolExecutor`

In [40]:
def main(sepratate_many):
    # worker 개수
    worker = min(20, len(NATION_LS))
    # 시작 시간
    start_tm = time.time()
    # futures
    futures_list = []
    # 결과 건수
    # ThreadPoolExecutor: GIL에 종속됨
    # ProcessPoolExecutor: GIL을 우회, 변경 후 -> os.cpu_count()
    with futures.ProcessPoolExecutor(worker) as executor:
        # Submit -> Callable 객체 스케줄링 (실행 예약) -> Future
        # Future -> result(), done(), as_complete() 주로 사용
        for nt in sorted(NATION_LS):
            # future 반환
            future = executor.submit(separate_many, nt)
            # 스케줄링
            futures_list.append(future)
            # 출력1
            print('Scheduled for {}: {}'.format(nt, future))
        for future in futures.as_completed(futures_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled
            # future 결과 확인
            print('Future result: {}, Done: {}'.format(result, done))
            print('Future cancelled: {}'.format(cancelled))

    # 종료 시간
    end_tm = time.time() - start_tm
    # 최종 결과 출력
    msg = '\n csv seperated in {:.2f}s'
    # print(msg.format(list(futures_list), end_tm))
    print(msg.format(end_tm))

In [41]:
if __name__ == '__main__':
    main(separate_many)

Scheduled for Canada: <Future at 0x11ff4ee10 state=running>
Scheduled for France: <Future at 0x11ff60ba8 state=pending>
Scheduled for Germany: <Future at 0x11ff60b38 state=pending>
Scheduled for Israel: <Future at 0x11ff62b70 state=pending>
Scheduled for Italy: <Future at 0x11ff1ab70 state=pending>
Scheduled for Mexico: <Future at 0x11ff1ad30 state=pending>
Scheduled for Norway: <Future at 0x11ff1a9b0 state=pending>
Scheduled for Singapore: <Future at 0x11ff1a390 state=pending>
Scheduled for Spain: <Future at 0x11ff1a550 state=pending>
France Norway Germany Mexico Singapore Canada Spain Israel Italy Future result: France, Done: True
Future cancelled: <bound method Future.cancelled of <Future at 0x11ff60ba8 state=finished returned str>>
Future result: Norway, Done: True
Future cancelled: <bound method Future.cancelled of <Future at 0x11ff1a9b0 state=finished returned str>>
Future result: Germany, Done: True
Future cancelled: <bound method Future.cancelled of <Future at 0x11ff60b38 state

## Conclusion
- I/O bound task -> `ThreadPoolExecutor`
- CPU bound task -> `ProcessPoolExecutor`

---