## Concurrency
- 병행성, 흐름제어
- Iterator & Generator
    - Itarable object in python
        - collections, text file, list, dict, set, tuple, unpacking, *args ... 
    - generator : iterator를 생성해주는 함수, 함수안에 yield 키워드를 사용함
    - genrator 특징
        - iterable한 순서가 지정됨(모든 generator는 iterator)
        - 느슨하게 평가된다.(순서의 다음 값은 필요에 따라 계산됨)
        - 함수의 내부 로컬 변수를 통해 내부상태가 유지된다.
        - 무한한 순서가 있는 객체를 모델링할 수 있다.(명확한 끝이 없는 데이터 스트림)
        - 자연스러운 스트림 처리를 위 파이프라인으로 구성할수 있다.
- \_\_iter_\_, \_\_next_\_
- Class 기반 Generator 구현

In [1]:
t= "adskjflkjkjwlfjoiqonmc"
print(dir(t))

['__add__', '__class__', '__contains__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mod__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmod__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'capitalize', 'casefold', 'center', 'count', 'encode', 'endswith', 'expandtabs', 'find', 'format', 'format_map', 'index', 'isalnum', 'isalpha', 'isascii', 'isdecimal', 'isdigit', 'isidentifier', 'islower', 'isnumeric', 'isprintable', 'isspace', 'istitle', 'isupper', 'join', 'ljust', 'lower', 'lstrip', 'maketrans', 'partition', 'removeprefix', 'removesuffix', 'replace', 'rfind', 'rindex', 'rjust', 'rpartition', 'rsplit', 'rstrip', 'split', 'splitlines', 'startswith', 'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']


In [2]:
for c in t:
    print(c)

a
d
s
k
j
f
l
k
j
k
j
w
l
f
j
o
i
q
o
n
m
c


In [3]:
w= iter(t)
print(next(w))


a


In [4]:
print(next(w))
print(next(w))
print(next(w))

d
s
k


In [5]:
# for 문의 매커니즘
while True:
    try:
        print(next(w))
    except StopIteration:
        break

j
f
l
k
j
k
j
w
l
f
j
o
i
q
o
n
m
c


In [6]:
# 반복형의 확인
print(hasattr(t, '__iter__'))

True


In [7]:
# 반복형의 확인
from collections import abc
print(isinstance(t, abc.Iterable))

True


In [8]:
# class 기반 generator
class WordSplitter:
    def __init__(self, text) -> None:
        self._idx = 0
        self._text = text.split(' ')
    
    # magic method
    def __next__(self):
        # print('Called __next__')
        try:
            word = self._text[self._idx]
        except IndexError:
            raise StopIteration('Stopped Iteration!!!')
        self._idx += 1
        return word
    
    def __repr__(self):
        return 'WordSplit (%s)' % (self._text)

In [9]:
wq = WordSplitter('Do today what you could od tommorrow')
print(wq)

WordSplit (['Do', 'today', 'what', 'you', 'could', 'od', 'tommorrow'])


In [10]:
print(next(wq))
print(next(wq))
print(next(wq))
print(next(wq))
print(next(wq))
print(next(wq))
print(next(wq))
print(next(wq))

Do
today
what
you
could
od
tommorrow


StopIteration: Stopped Iteration!!!

In [11]:
# Generator
# 1. 지능형 리스트, 딕셔너리, 집합 -> 데이터 양 증가 후 메모리 사량 증가 -> Generator 사용 권장
# 2. 단위 실행이 가능한 코루틴(Coroutine) 구현과 연동
# 3. 작은 메모리 조각 사용
class WordSplitterGenerator:
    def __init__(self,text) -> None:
        self._text = text.split(' ')
    
    def __iter__(self):
        for word in self._text:
            yield word # generator, 다음에 반환할 값의 위치값을 기억하는 역할 수행
        return
    
    def __repr__(self):
        return 'WordSplitGenerator (%s)' % (self._text)

In [12]:
wg = WordSplitterGenerator('Do today what you could od tommorrow')
print(wg)

WordSplitGenerator (['Do', 'today', 'what', 'you', 'could', 'od', 'tommorrow'])


In [13]:
wt = iter(wg)
print(wt, wg)

<generator object WordSplitterGenerator.__iter__ at 0x1054b75a0> WordSplitGenerator (['Do', 'today', 'what', 'you', 'could', 'od', 'tommorrow'])


In [14]:
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))

Do
today
what
you
could
od
tommorrow


StopIteration: 

### Yield & Itertools 실습
- 병행성 Concurrency
    - 한 worker가 여러 일을 동시에 수행
    - 단일 프로그램 안에서 여러 일을 효율적으로 수행하기 위함
- 병렬성 Parallelism
    - 여러 worker가 여러 작업을 동시에 수행
    - 속도 면에서 효율성을 가져오기 위함

In [15]:
def generator_ex1():
    print('start')
    yield 'A point'
    print('continue')
    yield 'B point'
    print('End')

tmp = iter(generator_ex1())

In [16]:
print(tmp)

<generator object generator_ex1 at 0x1054b7680>


In [17]:
print(next(tmp))

start
A point


In [18]:
print(next(tmp))

continue
B point


In [19]:
print(next(tmp))

End


StopIteration: 

In [20]:
for v in generator_ex1():
    print(v)

start
A point
continue
B point
End


In [21]:
temp1 = [x*3 for x in generator_ex1()]
temp2 = (x*3 for x in generator_ex1())

start
continue
End


In [22]:
for i in temp1:
    print(i)

A pointA pointA point
B pointB pointB point


In [23]:
for i in temp2:
    print(i)

start
A pointA pointA point
continue
B pointB pointB point
End


In [24]:
# Generator 내 주요 함수
# count, takewhile, filterfalse, accumulate, chain, product, groupby

In [25]:
import itertools

In [26]:
gen1 = itertools.count(1, 2.5) # 무한대로 생성
print(next(gen1))
print(next(gen1))
print(next(gen1))
print(next(gen1))

1
3.5
6.0
8.5


In [27]:
# 조건을 받아서 생성
gen2 = itertools.takewhile(lambda x:x < 1000, itertools.count(1, 2.5)) 
for v in gen2:
    print(v)

1
3.5
6.0
8.5
11.0
13.5
16.0
18.5
21.0
23.5
26.0
28.5
31.0
33.5
36.0
38.5
41.0
43.5
46.0
48.5
51.0
53.5
56.0
58.5
61.0
63.5
66.0
68.5
71.0
73.5
76.0
78.5
81.0
83.5
86.0
88.5
91.0
93.5
96.0
98.5
101.0
103.5
106.0
108.5
111.0
113.5
116.0
118.5
121.0
123.5
126.0
128.5
131.0
133.5
136.0
138.5
141.0
143.5
146.0
148.5
151.0
153.5
156.0
158.5
161.0
163.5
166.0
168.5
171.0
173.5
176.0
178.5
181.0
183.5
186.0
188.5
191.0
193.5
196.0
198.5
201.0
203.5
206.0
208.5
211.0
213.5
216.0
218.5
221.0
223.5
226.0
228.5
231.0
233.5
236.0
238.5
241.0
243.5
246.0
248.5
251.0
253.5
256.0
258.5
261.0
263.5
266.0
268.5
271.0
273.5
276.0
278.5
281.0
283.5
286.0
288.5
291.0
293.5
296.0
298.5
301.0
303.5
306.0
308.5
311.0
313.5
316.0
318.5
321.0
323.5
326.0
328.5
331.0
333.5
336.0
338.5
341.0
343.5
346.0
348.5
351.0
353.5
356.0
358.5
361.0
363.5
366.0
368.5
371.0
373.5
376.0
378.5
381.0
383.5
386.0
388.5
391.0
393.5
396.0
398.5
401.0
403.5
406.0
408.5
411.0
413.5
416.0
418.5
421.0
423.5
426.0
428.5
431.0
433.5
43

In [28]:
# 조건의 false 에 해당하는 값의 생성
gen3 = itertools.filterfalse(lambda x:x <10, [1,2,4,50,20,9,3])
for v in gen3:
    print(v)

50
20


In [29]:
# 누적 합계의 생성
gen4 = itertools.accumulate([x for x in range(1,101)])
for v in gen4:
    print(v)

1
3
6
10
15
21
28
36
45
55
66
78
91
105
120
136
153
171
190
210
231
253
276
300
325
351
378
406
435
465
496
528
561
595
630
666
703
741
780
820
861
903
946
990
1035
1081
1128
1176
1225
1275
1326
1378
1431
1485
1540
1596
1653
1711
1770
1830
1891
1953
2016
2080
2145
2211
2278
2346
2415
2485
2556
2628
2701
2775
2850
2926
3003
3081
3160
3240
3321
3403
3486
3570
3655
3741
3828
3916
4005
4095
4186
4278
4371
4465
4560
4656
4753
4851
4950
5050


In [30]:
# 연결해서 생성
gen5 = itertools.chain('ABCDE', range(1,11,2))
print(list(gen5))

['A', 'B', 'C', 'D', 'E', 1, 3, 5, 7, 9]


In [31]:
gen6 = itertools.chain(enumerate('abcde'))
print(list(gen6))

[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e')]


In [32]:
# 개별로 분리해 생성
gen7 = itertools.product('ABCDE')
print(list(gen7))

[('A',), ('B',), ('C',), ('D',), ('E',)]


In [33]:
gen8 = itertools.product('ABCDE', repeat=2) #combination
print(list(gen8))

[('A', 'A'), ('A', 'B'), ('A', 'C'), ('A', 'D'), ('A', 'E'), ('B', 'A'), ('B', 'B'), ('B', 'C'), ('B', 'D'), ('B', 'E'), ('C', 'A'), ('C', 'B'), ('C', 'C'), ('C', 'D'), ('C', 'E'), ('D', 'A'), ('D', 'B'), ('D', 'C'), ('D', 'D'), ('D', 'E'), ('E', 'A'), ('E', 'B'), ('E', 'C'), ('E', 'D'), ('E', 'E')]


In [34]:
# grouping 생성
gen9 = itertools.groupby('AAAABBBCCCDDDEEEEGGG')
# print(list(gen9))

In [35]:
for chr, group in gen9:
    print(chr, ':', list(group))

A : ['A', 'A', 'A', 'A']
B : ['B', 'B', 'B']
C : ['C', 'C', 'C']
D : ['D', 'D', 'D']
E : ['E', 'E', 'E', 'E']
G : ['G', 'G', 'G']


* Thread : OS 가 관리 주체. CPU core 에서 실시간, 시분할 비동기 작업을 가능케 함
* Coroutine : 단일 (single) thread 기반. Stack 을 기반으로 동작하는 비동기 작업을 가능케 함
* Yield, send : 메인 <-> 서브 
    - coroutine 제어, 상태, 양방향 전송
    - sub-routine : main routine 호출 ->  sub-routine 에서 수행 (흐름 제어)
    - coroutine : routine 실행 중 중지 -> 동시성 프로그래밍
    - Thread 에 비해 오버헤드 감소
    - Thread : 복잡. 자원을 공유하기 때문에 교착 상태 발생 가능성. 컨텍스트 스위칭 비용 발생. 자원 소비 가능성 증가
* def -> async , yield -> await (python >= 3.5)

In [36]:
# Ex1
def coroutine1():
    print('>>> coroutine started.')
    i = yield
    print('>>> coroutine received : {}'.format(i))

In [37]:
cr1 = coroutine1()
print(cr1,type(cr1))

<generator object coroutine1 at 0x105995e00> <class 'generator'>


In [38]:
next(cr1) # yield 지점까지 서브루틴 수행

>>> coroutine started.


In [39]:
next(cr1)

>>> coroutine received : None


StopIteration: 

In [40]:
cr1 = coroutine1()
next(cr1) # yield 지점까지 서브루틴 수행
cr1.send(100) # 메인 루틴과 서브 루틴이 상태값(데이터) 교환을 가능하게 하는 기능

>>> coroutine started.
>>> coroutine received : 100


StopIteration: 

In [41]:
# ex2
# GEN_CREATED : 처음 대기 상태
# GEN_RUNNING : 실행 상태
# GEN_SUSPENED : Yield 대기 상태
# GEN_CLOSED : 실행 완료 상태

def coroutine2(x):
    print('>>> coroutine started : {}'.format(x))
    y = yield x
    print('>>> coroutine received : {}'.format(x))
    z = yield x + y
    print('>>> coroutine received : {}'.format(z))

In [42]:
# 상태값 출력
from inspect import getgeneratorstate

In [43]:
cr3 = coroutine2(10)
print(getgeneratorstate(cr3))
print(next(cr3))

GEN_CREATED
>>> coroutine started : 10
10


In [44]:
print(getgeneratorstate(cr3))
cr3.send(100)

GEN_SUSPENDED
>>> coroutine received : 10


110

In [45]:
print(getgeneratorstate(cr3))
cr3.send(100)

GEN_SUSPENDED
>>> coroutine received : 100


StopIteration: 

In [46]:
# Ex3
# 중첩 coroutine 처리
def generator1():
    for x in 'AB':
        yield x
    for y in range(1,4):
        yield y

In [47]:
t1 = generator1()
print(next(t1))

A


In [48]:
print(next(t1))

B


In [49]:
print(next(t1))

1


In [50]:
print(next(t1))

2


In [51]:
print(next(t1))

3


In [52]:
print(next(t1))

StopIteration: 

In [53]:
t2 = generator1()
print(list(t2))

['A', 'B', 1, 2, 3]


In [54]:
def generator2():
    yield from 'AB'
    yield from range(1,4)

In [55]:
t3 = generator2()
print(next(t3))
print(next(t3))
print(next(t3))
print(next(t3))
print(next(t3))

A
B
1
2
3


### Python Futures (동시성)
- 비동기 작업 처리
    - 동기는 작업의 선후관계에 따라 실행되는 상태를 말함
    - 지연 시간(Block) : CPU 및 리소스 낭비를 방지함 -> (File|Network) I/O 관련 작업은 동시성 활용을 권장함
    - 비동기 작업과 적합한 프로그램일 경우 성능 향상을 많이 끌어올릴 수 있다.
    - 관련 실습
        - concurrent.futures 사용
            - 비동기 실행을 위한 API 를 고수준으로 작성해 사용하기 쉽게 구성된 패키지
            - 1. 멀티스레딩/멀티프로세싱 API 가 통일되어 사용하기 쉬워짐
            - 2. 실행중인 작업 취소, 완료 여부 체크, 타임아웃 옵션, 콜백 추가, 동기화 코드 쉽게 작성 가능 (Promise 개념)
- 파이썬 GIL 설명
    - Global Interpreter Lock : 두 개 이상의 스레드가 동시에 실행 될 때, 하나의 자원을 액세스를 하는 경우, 문제를 방지하기 위해 GIL 이 실행. Resource 전체에 Lock 이 걸림
    - GIL 우회? : 멀티프로세싱 사용, CPython 
- 동시성 처리 실습 예제
    - futures 의 map

In [56]:
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed
import time

In [57]:
# worklist 가 url 로 들어가고 같은 작업을 하는 crawling task 도 만들수 있음
# 1 부터 다음 숫자까지의 합을 구하기
worklist= [10000,100000, 10000000, 100000000]

# 누적 합계 함수 (generator)
def sum_generator(n):
    return sum(n for n in range(1,n+1))

In [58]:
def main():
    worker = min(10,len(worklist))
    start_tm = time.time()

    with futures.ThreadPoolExecutor() as executor:
        result = executor.map(sum_generator, worklist)
        
    end_tm = time.time() - start_tm
    msg = '\n Result -> {} Time : {:.2f}s'
    print(msg.format(list(result), end_tm))

In [59]:
if __name__ == '__main__':
    main()


 Result -> [50005000, 5000050000, 50000005000000, 5000000050000000] Time : 2.90s


### Futures 를 활용한 동시성 실습
- Futures wait 예제
- Futures as_completed
- 동시 실행 결과 출력
- 동시성 처리 응용 예제 설명

In [60]:
worklist= [100000,1000000, 100000000, 1000000000]

In [68]:
def main():
    worker = min(10,len(worklist))
    start_tm = time.time()

    future_list = []

    with ThreadPoolExecutor() as executor:
        for work in worklist:
            # 실행되지는 않음 -> scheduling
            future = executor.submit(sum_generator, work)
            future_list.append(future)
            # check schedule
            print("Scheduled for {} : {}".format(work, future))
            print()

        result = wait(future_list , timeout = 7)
        
    end_tm = time.time() - start_tm
    msg = '\n end Time : {:.2f}s'
    print('completed ::: {}'.format(str(result.done)))
    print('pending ones after waiting for 7secs ::: {}'.format(str(result.not_done)))
    print([future.result() for future in result.done])
    print(msg.format(end_tm))

In [69]:
main()

Scheduled for 100000 : <Future at 0x1056face0 state=finished returned int>

Scheduled for 1000000 : <Future at 0x105690760 state=pending>

Scheduled for 100000000 : <Future at 0x1056ca9b0 state=running>

Scheduled for 1000000000 : <Future at 0x1056bf940 state=running>

completed ::: {<Future at 0x1056ca9b0 state=finished returned int>, <Future at 0x1056face0 state=finished returned int>, <Future at 0x105690760 state=finished returned int>}
pending ones after waiting for 7secs ::: {<Future at 0x1056bf940 state=finished returned int>}
[5000000050000000, 5000050000, 500000500000]

 end Time : 29.52s


In [77]:
def main():
    worker = min(10,len(worklist))
    start_tm = time.time()

    future_list = []

    with ThreadPoolExecutor() as executor:
        for work in worklist:
            # 실행되지는 않음 -> scheduling
            future = executor.submit(sum_generator, work)
            future_list.append(future)
            # check schedule
            print("Scheduled for {} : {}".format(work, future))
            print()

        for future in as_completed(future_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled
            
            print('Future Result : {}, Done : {}'.format(result, done))
            print('Future Cancelled : {}'.format(cancelled))

    end_tm = time.time() - start_tm
    msg = '\n end Time : {:.2f}s'
    # print('completed ::: {}'.format(str(result.done)))
    # print('pending ones after waiting for 7secs ::: {}'.format(str(result.not_done)))
    # print([future.result() for future in result.done])
    print(msg.format(end_tm))

In [78]:
main()

Scheduled for 100000 : <Future at 0x105c868c0 state=finished returned int>

Scheduled for 1000000 : <Future at 0x105701cf0 state=pending>

Scheduled for 100000000 : <Future at 0x105703550 state=running>

Scheduled for 1000000000 : <Future at 0x10583cc10 state=running>

Future Result : 5000050000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x105c868c0 state=finished returned int>>
Future Result : 500000500000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x105701cf0 state=finished returned int>>
Future Result : 5000000050000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x105703550 state=finished returned int>>
Future Result : 500000000500000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x10583cc10 state=finished returned int>>

 end Time : 29.32s


In [79]:
749002 / 3330277 

0.22490681706056284