# 병행성(Concurrency)
* 이터레이터, 제네레이터
* Iterator, Generator
* 파이썬 반복 가능한 타입
* for, collections, text file, List, Dict, Set, Tuple, unpacking, *args
* 반복 가능한 이유? -> iter(x) 함수 호출

In [2]:
# 반복 가능한 이유? -> iter(x) 함수 호출
t = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'

# for 반복
for c in t:
    print('>', c)

print()

# while 반복

w = iter(t)

while True:
    try:
        print(next(w))
    except StopIteration:
        break

print()

from collections import abc

# 반복형 확인
print(hasattr(t, '__iter__'))
print(isinstance(t, abc.Iterable))

> A
> B
> C
> D
> E
> F
> G
> H
> I
> J
> K
> L
> M
> N
> O
> P
> Q
> R
> S
> T
> U
> V
> W
> X
> Y
> Z

A
B
C
D
E
F
G
H
I
J
K
L
M
N
O
P
Q
R
S
T
U
V
W
X
Y
Z

True
True


In [6]:
# next 사용
class WordSplitIter:
    def __init__(self, text):
        self._idx = 0
        self._text = text.split(' ')
    
    def __next__(self):
        # print('Called __next__')
        try:
            word = self._text[self._idx]
        except IndexError:
            raise StopIteration('Stopped Iteration.')
        self._idx += 1
        return word

    def __repr__(self):
        return 'WordSplit(%s)' % (self._text)

In [7]:
wi = WordSplitIter('Do today what you could do tomorrow')

print(wi)
print(next(wi))
print(next(wi))
print(next(wi))
print(next(wi))
print(next(wi))
print(next(wi))
print(next(wi))
# print(next(wi))

WordSplit(['Do', 'today', 'what', 'you', 'could', 'do', 'tomorrow'])
Do
today
what
you
could
do
tomorrow


# Generator 패턴
* 1.지능형 리스트, 딕셔너리, 집합 -> 데이터 양 증가 후 메모리 사용량 증가 -> 제네레이터 사용 권장
* 2.단위 실행 가능한 코루틴(Coroutine) 구현과 연동
* 3.작은 메모리 조각 사용


In [9]:
class WordSplitGenerator:
    def __init__(self, text):
        self._text = text.split(' ')
    
    def __iter__(self):
        # print('Called __iter__')
        for word in self._text:
           yield word # 제네레이터
        return
    
    def __repr__(self):
        return 'WordSplit(%s)' % (self._text)


wg = WordSplitGenerator('Do today what you could do tomorrow')

wt = iter(wg)

In [10]:
print(wt)
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
# print(next(wt))

<generator object WordSplitGenerator.__iter__ at 0x0000021E2CB60F20>
Do
today
what
you
could
do
tomorrow


## Generator Ex1

In [23]:
def generator_ex1():
    print('Start')
    yield 'A Point.'
    print('continue')
    yield 'B Point.'
    print('End')

temp = iter(generator_ex1())

# print(next(temp))
# print(next(temp))
# print(next(temp))

for v in generator_ex1():
    pass
    print(v)

print()

Start
A Point.
continue
B Point.
End



## Generator Ex2

In [12]:
temp2 = [x * 3 for x in generator_ex1()]
temp3 = (x * 3 for x in generator_ex1())

print(temp2)
print(temp3)

for i in temp2:
    print(i)

print()
print()

for i in temp3:
    print(i)

Start
continue
End
['A Point.A Point.A Point.', 'B Point.B Point.B Point.']
<generator object <genexpr> at 0x0000021E2CB6E820>
A Point.A Point.A Point.
B Point.B Point.B Point.


Start
A Point.A Point.A Point.
continue
B Point.B Point.B Point.
End


## Generator Ex3(중요 함수)
* filterfalse, accumulate, chain, product, product, groupby

In [15]:
import itertools

gen1 = itertools.count(1, 2.5)

print(next(gen1))
print(next(gen1))
print(next(gen1))
print(next(gen1))
# ... 무한



1
3.5
6.0
8.5


In [14]:
# 조건
gen2 = itertools.takewhile(lambda n : n < 1000, itertools.count(1, 2.5))

for v in gen2:
    print(v)

1
3.5
6.0
8.5
11.0
13.5
16.0
18.5
21.0
23.5
26.0
28.5
31.0
33.5
36.0
38.5
41.0
43.5
46.0
48.5
51.0
53.5
56.0
58.5
61.0
63.5
66.0
68.5
71.0
73.5
76.0
78.5
81.0
83.5
86.0
88.5
91.0
93.5
96.0
98.5
101.0
103.5
106.0
108.5
111.0
113.5
116.0
118.5
121.0
123.5
126.0
128.5
131.0
133.5
136.0
138.5
141.0
143.5
146.0
148.5
151.0
153.5
156.0
158.5
161.0
163.5
166.0
168.5
171.0
173.5
176.0
178.5
181.0
183.5
186.0
188.5
191.0
193.5
196.0
198.5
201.0
203.5
206.0
208.5
211.0
213.5
216.0
218.5
221.0
223.5
226.0
228.5
231.0
233.5
236.0
238.5
241.0
243.5
246.0
248.5
251.0
253.5
256.0
258.5
261.0
263.5
266.0
268.5
271.0
273.5
276.0
278.5
281.0
283.5
286.0
288.5
291.0
293.5
296.0
298.5
301.0
303.5
306.0
308.5
311.0
313.5
316.0
318.5
321.0
323.5
326.0
328.5
331.0
333.5
336.0
338.5
341.0
343.5
346.0
348.5
351.0
353.5
356.0
358.5
361.0
363.5
366.0
368.5
371.0
373.5
376.0
378.5
381.0
383.5
386.0
388.5
391.0
393.5
396.0
398.5
401.0
403.5
406.0
408.5
411.0
413.5
416.0
418.5
421.0
423.5
426.0
428.5
431.0
433.5
43

In [16]:
# 필터 반대
gen3 = itertools.filterfalse(lambda n : n < 3, [1,2,3,4,5])

for v in gen3:
    print(v)

3
4
5


In [17]:
# 누적 합계
gen4 = itertools.accumulate([x for x in range(1, 101)])

for v in gen4:
    print(v)

1
3
6
10
15
21
28
36
45
55
66
78
91
105
120
136
153
171
190
210
231
253
276
300
325
351
378
406
435
465
496
528
561
595
630
666
703
741
780
820
861
903
946
990
1035
1081
1128
1176
1225
1275
1326
1378
1431
1485
1540
1596
1653
1711
1770
1830
1891
1953
2016
2080
2145
2211
2278
2346
2415
2485
2556
2628
2701
2775
2850
2926
3003
3081
3160
3240
3321
3403
3486
3570
3655
3741
3828
3916
4005
4095
4186
4278
4371
4465
4560
4656
4753
4851
4950
5050


In [18]:
# 연결1
gen5 = itertools.chain('ABCDE', range(1,11,2))

print(list(gen5))

['A', 'B', 'C', 'D', 'E', 1, 3, 5, 7, 9]


In [19]:
# 연결2

gen6 = itertools.chain(enumerate('ABCDE'))

print(list(gen6))

[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]


In [20]:
# 개별
gen7 = itertools.product('ABCDE')

print(list(gen7))

[('A',), ('B',), ('C',), ('D',), ('E',)]


In [21]:
# 연산(경우의 수)
gen8 = itertools.product('ABCDE', repeat=2)

print(list(gen8))

[('A', 'A'), ('A', 'B'), ('A', 'C'), ('A', 'D'), ('A', 'E'), ('B', 'A'), ('B', 'B'), ('B', 'C'), ('B', 'D'), ('B', 'E'), ('C', 'A'), ('C', 'B'), ('C', 'C'), ('C', 'D'), ('C', 'E'), ('D', 'A'), ('D', 'B'), ('D', 'C'), ('D', 'D'), ('D', 'E'), ('E', 'A'), ('E', 'B'), ('E', 'C'), ('E', 'D'), ('E', 'E')]


In [22]:
# 그룹화
gen9 = itertools.groupby('AAABBCCCCDDEEE')

# print(list(gen9))

for chr, group in gen9:
    print(chr, ' : ', list(group))

A  :  ['A', 'A', 'A']
B  :  ['B', 'B']
C  :  ['C', 'C', 'C', 'C']
D  :  ['D', 'D']
E  :  ['E', 'E', 'E']


# 병행성 - 코루틴, Yield

* 병행성(Concurrency) : 한 컴퓨터가 여러 일을 동시에 수행 -> 단일 프로그램안에서 여러일을 쉽게 해결
* 병렬성(Parallelism) : 여러 컴퓨터가 여러 작업을 동시에 수행 -> 속도


* 코루틴(Coroutine) : 단일(싱글) 스레드, 스택을 기반으로 동작하는 비동기 작업
* 쓰레드 : os 관리, CPU 코어에서 실시간, 시분할 비동기 작업 -> 멀티쓰레드
* 서브루틴 : 메인루틴에서 호출 -> 서브루틴에서 수행(흐름제어)


* yield : 메인 <-> 서브
* 코루틴 제어, 상태, 양방향 전송
* yield from

In [26]:
# 코루틴 Ex1
def coroutine1():
    print('>>> coroutine started.')
    i = yield
    print('>>> coroutine received : {}'.format(i))


# 메인 루틴
# 제네레이터 선언
cr1 = coroutine1()

print(cr1, type(cr1))

# yield 지점까지 서브루틴 수행
next(cr1)

cr1.send(100)

<generator object coroutine1 at 0x0000021E2CB8AE40> <class 'generator'>
>>> coroutine started.
>>> coroutine received : 100


StopIteration: 

In [27]:
# 잘못된 사용

cr2 = coroutine1()

cr2.send(100) # 예외 발생

TypeError: can't send non-None value to a just-started generator

코루틴 Ex2
* GEN_CREATED : 처음 대기 상태
* GEN_RUNNING : 실행 상태
* GEN_SUSPENDED : yield 대기 상태
* GEN_CLOSED : 실행 완료 상태

In [29]:
def coroutine2(x):
    print('>>> coroutine started : {}'.format(x))
    y = yield x
    print('>>> coroutine received : {}'.format(y))
    z = yield x + y
    print('>>> coroutine received : {}'.format(z))


cr3 = coroutine2(10)

from inspect import getgeneratorstate

print(getgeneratorstate(cr3))

print(next(cr3))

print(getgeneratorstate(cr3))

print(cr3.send(15))

# print(c3.send(20)) # 예외

GEN_CREATED
>>> coroutine started : 10
10
GEN_SUSPENDED
>>> coroutine received : 15
25


코루틴 Ex3
* StopIteration 자동 처리(3.5 -> await)
* 중첩 코루틴 처리

In [31]:
def generator1():
    for x in 'AB':
        yield x
    for y in range(1,4):
        yield y

t1 = generator1()

print(next(t1))
print(next(t1))
print(next(t1))
print(next(t1))
print(next(t1))
# print(next(t1))

t2 = generator1()

print(list(t2))

print()
print()

def generator2():
    yield from 'AB'
    yield from range(1,4)


t3 = generator2()

print(next(t3))
print(next(t3))
print(next(t3))
print(next(t3))
print(next(t3))
# print(next(t3))

A
B
1
2
3
['A', 'B', 1, 2, 3]


A
B
1
2
3


# 병렬성 - Futures

# Futures 동시성
* 비동기 작업 실행
* 지연시간(Block) CPU 및 리소스 낭비 방지 -> (File)Network I/O 관련 작업 -> 동시성 활용 권장
* 비동기 작업과 적합한 프로그램일 경우 압도적으로 성능 향상


* futures : 비동기 실행을 위한 API를 고수준으로 작성 -> 사용하기 쉽도록 개선
* concurrent.Futures
* 1. 멀티스레딩/멀티프로세싱 API 통일 -> 매우 사용하기 쉬움
* 2. 실행중이 작업 취소, 완료 여부 체크, 타임아웃 옵션, 콜백추가, 동기화 코드 매우 쉽게 작성 -> Promise 개념


* 2가지 패턴 실습
* concurrent.futures 사용법1
* concurrent.futures 사용법2


* GIL : 두 개 이상의 스레드가 동시에 실행 될 때 하나의 자원을 엑세스 하는 경우 -> 문제점을 방지하기 위해
*       GIL 실행 , 리소스 전체에 락이 걸린다. -> Context Switch(문맥 교환)
 
 
* GIL : 멀티프로세싱 사용, CPython

In [43]:
import os
import time
from concurrent import futures

WORK_LIST = [1000000, 10000000, 100000000, 100000000]

# 동시성 합계 계산 메인함수
# 누적 합계 함수(제네레이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))
    # 시작 시간
    start_tm = time.time()
    # 결과 건수
    # ProcessPoolExecutor
    with futures.ThreadPoolExecutor() as excutor:
        # map -> 작업 순서 유지, 즉시 실행
        result = excutor.map(sum_generator, WORK_LIST)
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Result -> {} Time : {:.2f}s'
    # 최종 결과 출력
    print(msg.format(list(result), end_tm))

# 실행
if __name__ == '__main__':
    main()


 Result -> [500000500000, 50000005000000, 5000000050000000, 5000000050000000] Time : 9.61s


In [47]:
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [1000000, 10000000, 100000000, 1000000000]


# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

# wait
# as_completed
def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))
    
    # 시작 시간
    start_tm = time.time()
    # Futures
    futures_list = []

    # 결과 건수
    # ProcessPoolExecutor
    with ThreadPoolExecutor() as excutor:
        for work in WORK_LIST:
            # future 반환
            future = excutor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
            # 스케쥴링 확인
            print('Scheduled for {} : {}'.format(work, future))
            # print()
        
        # wait 결과 출력
        # result = wait(futures_list, timeout=7)
        # # 성공
        # print('Completed Tasks : ' + str(result.done))
        # # 실패
        # print('Pending ones after waiting for 7seconds : ' + str(result.not_done))
        # # 결과 값 출력
        # print([future.result() for future in result.done])
        
        # as_completed 결과 출력
        for future in as_completed(futures_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled
            
            # future 결과 확인
            print('Future Result : {}, Done : {}'.format(result, done))
            print('Future Cancelled : {}'.format(cancelled))
        
        
            
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Time : {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))



# 실행
if __name__ == '__main__':
    main()


Scheduled for 1000000 : <Future at 0x21e2cde2f70 state=running>
Scheduled for 10000000 : <Future at 0x21e2caa9a90 state=pending>
Scheduled for 100000000 : <Future at 0x21e2cde2820 state=running>
Scheduled for 1000000000 : <Future at 0x21e2cd5ea30 state=running>
Future Result : 500000500000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x21e2cde2f70 state=finished returned int>>
Future Result : 50000005000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x21e2caa9a90 state=finished returned int>>
Future Result : 5000000050000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x21e2cde2820 state=finished returned int>>
Future Result : 500000000500000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x21e2cd5ea30 state=finished returned int>>

 Time : 53.52s
