# concurrent.futures.
   - __ThreadPoolExecutor__ : 멀티스레딩, GIL에 종속
   - __ProcessPoolExecutor__ : 멀티프로세싱, GIL에 독립
   
   
    각각 map(), submit()을 활용하는 방식이 있다.
   - __map()__ 활용 : 작업순서 유지하고 즉시 실행함
   - __submit(), futures__ 활용: 더 구체적인 작업 현황 파악 가능
   -------------------------

각 case마다 공통적인 코딩

In [52]:
import os
import time
import sys
import csv
from concurrent import futures

#국가정보
NATION_LS = 'Singapore Germany Israel Norway Italy Canada France Spain Mexico'.split() #split함수; list로 반환

#csv 자본 위치
TARGET_CSV = 'C:/Python/python_basic/PYTHON_ADVANCED/resources/nations.csv'

#저당 폴더 위치
DEST_DIR = 'C:/Python/python_basic/csvs'

#CSV자본 헤더 정보
with open('C:/Python/python_basic/PYTHON_ADVANCED/resources/nations.csv','r', encoding='utf-8') as f:
    HEADER = f.readline().split(',') 
f.close()
HEADER[-1] = HEADER[-1][:-1]


#국가별 분리
# target_csv를 위에서부터 읽어서 nt 인 국가들만 모아서 data 로 반환하는 함수
def get_sales_data(nt):
    with open(TARGET_CSV,'r') as f:
        reader = csv.DictReader(f) #header column명과 field값의 쌍으로 dict 반환
        #Dict을 리스트로 적재
        data =[]
        for r in reader:
            if r['Country']==nt: #dict의 key인 country를 활용함. nt인 나라들을 모으다.
                data.append(r)
    return data


# 중간 상황 출력
def show(text):
    print(text, end = ' ')
    # 중간출력, 버퍼지우기 (처리 완료되는 나라들 출력. 다 될 때 까지 기다렸다가 한번이 출력하지 말고.)
    sys.stdout.flush()

    
# 국가별 csv 파일 저장
# dict형식의 LIST 파일을 받아서 field명과 field값에 맞게 write 해주는 함수
def save_csv(data, filename):
    #최종 경로 생성
    path = os.path.join(DEST_DIR, filename) # join(a,b) = ab 붙여서 반환
    
    with open(path,'w',newline='') as fp: #기본적으로 줄바꾸기가 들어기가 때문에, newline ='' 로 줄바꾸기 하나 없애줌
        writer = csv.DictWriter(fp,fieldnames=HEADER) #fieldnames:Headder를 인식하게 함. 단, header가 파일에 한 줄을 차지하진 않음
        # 헤더 작성
        writer.writeheader() # 이제야 헤더가 파일의 한줄을 차지함
        #Dict to CSV Write
        for row in data:
            writer.writerow(row) #dict 한쌍을 field, field값으로 인식해 파일에 작성.

# 국가별 분리 함수 실행
# 국가 이름을 받아서 그 이름에 해당하는 데이터만 분류하여 파일로 만드는 함수
def separate_many(nt):
    #분리 데이터
    data = get_sales_data(nt)
    #상황 출력
    show(nt)
    #파일저장
    save_csv(data,nt.lower()+'.csv') #save_csv(저장할데이터, 저장할파일명)
    return nt

--------------------------
## .map() 활용
#### ThreadPoolExecutor 활용 비동기 작업
- 결론 : n개의 workers가 하나에 파일에 순차적으로 접근을 시도하므로(by GIL), context swithing 비용이 일어나서 순차 작업보다 느림.

In [54]:
#main 함수
def main(separate_many):
    # worker 개수 설정. threadpoolexecutor 사용시에만 설정
    worker = min(20,len(NATION_LS))
    # 시작 시간
    start_tm = time.time()
    # 결과 건수
    with futures.ThreadPoolExecutor(worker) as executor:
        # map : 작업 순서를 유지하고 즉시 실행됨.
        result_cnt = executor.map(separate_many, sorted(NATION_LS)) #result_cnt는 generator로 반환됨.
    # 종료 시간
    end_tm = time.time() - start_tm
    msg = '\n {} csv separated in {:.2f}s'
    # 최종 결과 출력
    print(msg.format(list(result_cnt),end_tm)) #generator 인 result_cnt를 list로 반환

# 실행
if __name__ == '__main__':
        main(separate_many)

Israel Singapore Spain Canada Germany Italy France Mexico Norway 
 ['Canada', 'France', 'Germany', 'Israel', 'Italy', 'Mexico', 'Norway', 'Singapore', 'Spain'] csv separated in 14.79s


#### ProcessPoolExecutor 활용 비동기 작업
- CPU의 개수만큼 멀티프로세스 가능.
- CPU 전부 활용 : 부하가 크다
- 딥러닝 등의 고속계산시에 효과가 좋다

In [45]:
#main 함수
def main(separate_many):
    
    # 시작 시간
    start_tm = time.time()
    # 결과 건수
    with futures.ProcessPoolExecutor() as executor:
        # map : 작업 순서를 유지하고 즉시 실행됨.
        result_cnt = executor.map(separate_many, sorted(NATION_LS)) #result_cnt는 generator로 반환됨.
    # 종료 시간
    end_tm = time.time() - start_tm
    msg = '\n {} csv separated in {:.2f}s'
    # 최종 결과 출력
    print(msg.format(result_cnt,end_tm)) 

# 실행
if __name__ == '__main__':
    main(separate_many)
    
    
# vscode 에서는 돌아가나 jupyter notebook 에서는 돌아가지 않음. jupyter의 특성때문


 <generator object _chain_from_iterable_of_lists at 0x00000212F04E7A50> csv separated in 0.37s


---------------------------
## .submit() 활용

### future
   1. 정의 : 일, 작업(Task)
   
   
   2. 특징: 
       - future = concurrent.futures.submit(func, `*args`, `**kwargs`) 을 통해 객체 생성됨
        - callable 함수 func를 func(`*args`, `**kwargs`) 으로 __스케줄링__하며 future 객체를 반환한다.

      - 아래의 메소드를 갖는다.
           - result() :  future 작업 결과물
           - done(): future 작업 완료 여부
           - as_completed(futures) : 모든 futures의 작업이 완료되면 futures iterator 객체 반환.
           - cancelled() : 취소여부 True,False. (cancelled 하면 state = finished, error, cancelled 나타냄)

### ThreadPoolExecutor 활용

In [61]:
def main(separate_many):
    #workers
    workers = len(NATION_LS)
    start_tm = time.time()
    #futures를 담는 
    futures_list = []
    
    with futures.ThreadPoolExecutor(workers) as executor:
        for nt in sorted(NATION_LS):
            #future 객체 반환
            future = executor.submit(separate_many,nt) # nt 만큼의 future 생성
            #스케쥴링
            futures_list.append(future)
    
    #futures_list에 future들을 담았다
    
    for future in futures.as_completed(futures_list): #futures 작업 완료후 iterator로 반환됨
        result = future.result() #futures 하나하나에 대해서 메소드 호출해서 상태 확인.
        done = future.done()
        cancelled = future.cancelled
        #future 결과 확인
        print('Future Result: {}, Done? : {}'.format(result,done))
        print('Future cancelled info: {}'.format(cancelled))
    
    #종료시간
    end_tm = time.time()-start_tm
    msg = '\n csv separated in {:2f}s'
    #최종결과출력
    print(msg.format(end_tm))

# 실행
if __name__ == '__main__':
    main(separate_many)

Israel Germany Norway Mexico Singapore Spain France Italy Canada Future Result: Mexico, Done? : True
Future cancelled info: <bound method Future.cancelled of <Future at 0x212f04bf9d0 state=finished returned str>>
Future Result: Italy, Done? : True
Future cancelled info: <bound method Future.cancelled of <Future at 0x212f053d5b0 state=finished returned str>>
Future Result: Canada, Done? : True
Future cancelled info: <bound method Future.cancelled of <Future at 0x212f0521340 state=finished returned str>>
Future Result: Norway, Done? : True
Future cancelled info: <bound method Future.cancelled of <Future at 0x212f054c2e0 state=finished returned str>>
Future Result: Spain, Done? : True
Future cancelled info: <bound method Future.cancelled of <Future at 0x212f0553eb0 state=finished returned str>>
Future Result: Singapore, Done? : True
Future cancelled info: <bound method Future.cancelled of <Future at 0x212f054cac0 state=finished returned str>>
Future Result: Germany, Done? : True
Future ca

### ProcessPoolExecutor 활용

In [59]:
def main(separate_many):
    start_tm = time.time()
    #futures를 담는 
    futures_list = []
    
    with futures.ProcessPoolExecutor() as executor:
        for nt in sorted(NATION_LS):
            #future 객체 반환
            future = executor.submit(separate_many,nt) # nt 만큼의 future 생성
            #스케쥴링
            futures_list.append(future)
    
    #futures_list에 future들을 담았다
    
    for future in futures.as_completed(futures_list): #futures 작업 완료후 iterator로 반환됨
        result = future.result() #futures 하나하나에 대해서 메소드 호출해서 상태 확인.
        done = future.done()
        cancelled = future.cancelled
        #future 결과 확인
        print('Future Result: {}, Done? : {}'.format(result,done))
        print('Future cancelled info: {}'.format(cancelled))
    
    #종료시간
    end_tm = time.time()-start_tm
    msg = '\n csv separated in {:2f}s'
    #최종결과출력
    print(msg.format(end_tm))

# 실행
if __name__ == '__main__':
    main(separate_many)

# python 특성 때문에 BrokenProcessPool 에러 발생

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.