### concurrent.futures - 병렬 작업 시작하기
- python 3.2 부터 추가
- concurrent.futures 모듈은 비동기적으로 콜러블을 실행하는 고수준 인터페이스를 제공
- 비 동기 실행은 ThreadPoolExecutor를 사용해서 스레드 혹은 ProcessPoolExecutor를 사용해서 프로세스로 수행할 수 있다.
- 둘다 추상 Executor 클래스로 정의된 것과 같은 인터페이스를 구현한다.

#### Executor 객체

In [7]:
from concurrent import futures

with futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 2, 8)
    print(future.result())

256


#### ThreadPoolExecutor
- 용법: class concurrent.futures.ThreadPoolExecutor(max_workers=None thread_name_prefix='', initializer=None, initargs=())
- ThreadPoolExecutor는 스레드 풀을 사용하여 호출을 비 동기적으로 실행하는 Executor 서브 클래스이다.
- Future와 관련된 Callable 객체다 다른 Future의 결과를 기다릴 때 교착상태가 발생할 수 있다. 예를 들면

In [33]:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6

futures_list = []

executor = futures.ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

futures_list = [a, b]

result = wait(futures_list, timeout=7)
print(result) # 두 스레드가 교착상태에 빠져 done 된게 없다.

DoneAndNotDoneFutures(done=set(), not_done={<Future at 0x2c77a6dae80 state=running>, <Future at 0x2c77a6daa30 state=running>})


In [34]:
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

futures_list = []

executor = futures.ThreadPoolExecutor(max_workers=1)
c = executor.submit(wait_on_future)

futures_list.append(c)

result = wait(futures_list, timeout=7)
print(result) 

DoneAndNotDoneFutures(done=set(), not_done={<Future at 0x2c77913edf0 state=running>})


#### ========== 이하 강의 자료 ==========

<Futures 동시성>  
비동기 작업 실행 예시  
: 비동기 작업과 적합한 프로그램일 경우 압도적으로 성능 향상  
: 지연시간(Block) CPU 및 리소스 낭비 방지 -> (File)Network I/O 관련 작업 -> 동시성 활용 권장

<futures 모듈>  
: concurrent.Futures
: futures : 비동기 실행을 위한 API를 고수준으로 작성 -> 사용하기 쉽도록 개선
  1. (★) 멀티스레딩/멀티프로세싱 API 통일 -> 매우 사용하기 쉬움
  2. 실행중이 작업 취소, 완료 여부 체크, 타임아웃 옵션, 콜백추가, 동기화 코드 매우 쉽게 작성 -> Promise 개념(?)

<GIL, Global Interpreter Lock>  
GIL : 두 개 이상의 스레드가 동시에 실행 될 때 하나의 자원을 엑세스 하는 경우 -> 문제점을 방지하기 위해 GIL 실행, 리소스 전체에 락이 걸린다. -> Context Switch(문맥 교환)이 오히려 속도를 느리게 하는 경우가 있다.

GIL을 우회하기 위해선,,, 멀티프로세싱 사용, CPython

### concurrent.futures map

In [4]:
import os, time
from concurrent import futures

WORK_LIST = [100000, 1000000, 10000000, 10000000]

def sum_generate(n):
    return sum(n for n in range(1, n + 1))

def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))

    start_tm = time.time()
    # ProcessPoolExecutor
    with futures.ThreadPoolExecutor() as executor:
        result = executor.map(sum_generate, WORK_LIST)

    end_tm = time.time() - start_tm

    msg = '\n Result -> {} Time : {:.2f}s'

    print(msg.format(list(result), end_tm))

if __name__ == '__main__':
    main()



 Result -> [5000050000, 500000500000, 50000005000000, 50000005000000] Time : 1.23s


### concurrent.futures wait, as_completed

- submit(fn, /, *args, **kwargs) : callable이 실행되도록 예약한다. callable 실행을 나타내는 Future 객체를 반환한다.
- wait은 대기할 최대 시간을 제어하는데 사용할 수 있다. 
    - {wait 리턴값}.done : 실행 완료된 future 객체를 집합에 담는다.
    - {wait 리턴값}.not_done : 실행 완료되지 않은 future 객체를 집합에 담는다.
- {Future객체}.result(): 반환 값을 나타낸다.
- (강의자에 의하면) wait는 테스트를 특정 단위별로 완료하여 처리할 때 사용하고(e.g. DB에 삽입), 끝나는 대로 바로 처리할 때는 

In [19]:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [100000, 1000000, 10000000, 100000000]


# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

# wait
# as_completed
def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))

    start_tm = time.time()

    # Futures 
    futures_list = []

    with ThreadPoolExecutor(max_workers=worker) as executor:
        for work in WORK_LIST:
            future = executor.submit(sum_generator, work) # sum_generator 함수에 work 인자들이 담긴다.
            # 스케줄링
            futures_list.append(future) # future 객체를 리스트에 담는다.
            # 스케줄링 확인
            print('Scheduled for {} : {}'.format(work, future))

        print()
          
        result = wait(futures_list, timeout=7) # 앞의 map 예제와는 다르게 timeout을 걸 수 있다.
        print(result)
        # 성공
        print('Completed Tasks : ' + str(result.done))
        # 실패
        print('Pending ones after waiting for 7seconds : ' + str(result.not_done))
        # 결과 값 출력s
        print([future.result() for future in result.done])

    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Time : {:.2f}s'
    #msg = '\n Result -> {} Time : {:.2f}s'
    
    # 최종 결과 출력
    print(msg.format(end_tm))
    #print(msg.format(list(result), end_tm))

if __name__ == '__main__':
    main()

Scheduled for 100000 : <Future at 0x29478e3c6d0 state=finished returned int>
Scheduled for 1000000 : <Future at 0x29478b32610 state=pending>
Scheduled for 10000000 : <Future at 0x29478b32280 state=running>
Scheduled for 100000000 : <Future at 0x29478e3f250 state=running>

DoneAndNotDoneFutures(done={<Future at 0x29478b32280 state=finished returned int>, <Future at 0x29478b32610 state=finished returned int>, <Future at 0x29478e3f250 state=finished returned int>, <Future at 0x29478e3c6d0 state=finished returned int>}, not_done=set())
Completed Tasks : {<Future at 0x29478b32280 state=finished returned int>, <Future at 0x29478b32610 state=finished returned int>, <Future at 0x29478e3f250 state=finished returned int>, <Future at 0x29478e3c6d0 state=finished returned int>}
Pending ones after waiting for 7seconds : set()
[50000005000000, 500000500000, 5000000050000000, 5000050000]

 Time : 6.53s


#### concurrent.futures.as_completed(fs, timeout=None)

In [18]:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [100000, 1000000, 10000000, 100000000]


# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

# wait
# as_completed
def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))
    
    start_tm = time.time()

    # Futures 
    futures_list = []

    with ThreadPoolExecutor(max_workers=worker) as executor:
        for work in WORK_LIST:
            future = executor.submit(sum_generator, work) # sum_generator 함수에 work 인자들이 담긴다.
            # 스케줄링
            futures_list.append(future) # future 객체를 리스트에 담는다.
            # 스케줄링 확인
            print('Scheduled for {} : {}'.format(work, future))

        print()

        # # 성공
        # print('Completed Tasks : ' + str(result.done))
        # # 실패
        # print('Pending ones after waiting for 7seconds : ' + str(result.not_done))
        # # 결과 값 출력s
        # print([future.result() for future in result.done])

        # as_completed 결과 출력
        for future in as_completed(futures_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled

            print('Future Result : {}, Done : {}'.format(result, done))
            print('Future Cancelled : {}'.format(cancelled))
            print()

    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Time : {:.2f}s'
    #msg = '\n Result -> {} Time : {:.2f}s'
    
    # 최종 결과 출력
    print(msg.format(end_tm))
    #print(msg.format(list(result), end_tm))

if __name__ == '__main__':
    main()

NameError: name 'worker' is not defined

참고자료: https://docs.python.org/ko/3/library/concurrent.futures.html

In [17]:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [100000, 1000000, 10000000, 100000000]


# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

# wait
# as_completed
def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))
    
    # 시작 시간
    start_tm = time.time()
    # Futures
    futures_list = []

    # 결과 건수
    # ProcessPoolExecutor
    with ThreadPoolExecutor(max_workers=1) as excutor:
        for work in WORK_LIST:
            # future 반환
            future = excutor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
            # 스케쥴링 확인
            print('Scheduled for {} : {}'.format(work, future))
            # print()
        
        #wait 결과 출력
        result = wait(futures_list, timeout=7)
        # 성공
        print('Completed Tasks : ' + str(result.done))
        # 실패
        print('Pending ones after waiting for 7seconds : ' + str(result.not_done))
        # 결과 값 출력
        print([future.result() for future in result.done])
        
        # # as_completed 결과 출력
        # for future in as_completed(futures_list):
        #     result = future.result()
        #     done = future.done()
        #     cancelled = future.cancelled
            
        #     # future 결과 확인
        #     print('Future Result : {}, Done : {}'.format(result, done))
        #     print('Future Cancelled : {}'.format(cancelled))
        
        
            
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Time : {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))



# 실행
if __name__ == '__main__':
    main()


Scheduled for 100000 : <Future at 0x29478b29d90 state=finished returned int>
Scheduled for 1000000 : <Future at 0x29478dde100 state=pending>
Scheduled for 10000000 : <Future at 0x29478e3b2e0 state=pending>
Scheduled for 100000000 : <Future at 0x29478e3b3d0 state=pending>
Completed Tasks : {<Future at 0x29478dde100 state=finished returned int>, <Future at 0x29478b29d90 state=finished returned int>, <Future at 0x29478e3b3d0 state=finished returned int>, <Future at 0x29478e3b2e0 state=finished returned int>}
Pending ones after waiting for 7seconds : set()
[500000500000, 5000050000, 5000000050000000, 50000005000000]

 Time : 6.42s
