# Thread 기반 병렬 프로그래밍
* 참고: Book - Python Parallel Programming Cookbook 2장
* 참고: https://www.youtube.com/playlist?list=PLGKQkV4guDKEv1DoK4LYdo2ZPLo6cyLbm

## thread 생성
* `threading` 모듈의 `Thread` 클래스의 인스턴스 생성 후 각각 실행
* `Thread` 인스턴스 생성시 인수들은 다음과 같음

>```python
class threading.Thread(
    group=None,    # 나중 사용을 위해 예약되어있어 사용되지 않음
    target=None,   # thread가 시작되면서 실행할 함수
    name=None,     # thread에 붙일 이름 (기본으로 `Thread-N`으로 지정됨)
    args=(),       # `target`에 전달될 인수들(튜플)
    kwargs={}      # `target`에 전달될 인수들(딕셔너리)
)
```

In [None]:
import time

# thread로 실행할 함수(target)
def thread_func(n, name):
    print("Thread {}, {}초 동안 sleep.".format(name, n))
    time.sleep(n)
    print("Thread {} 깨어남.".format(name))

In [None]:
import threading

# thread 생성
t = threading.Thread(
        target=thread_func, 
        name='sleeper', 
        args=(3, 'sleeper'))

In [None]:
# thread 시작한 직후, 다른 일을 하고 나서 thread 종료 기다림
t.start()
print('Hello'); print('Hello')
t.join()

## 여러 thread 생성

In [None]:
threads_list = []

# 반복문으로 여러 thread 생성
for i in range(5):
    t = threading.Thread(
                    target=thread_func,
                    name='Thread{}'.format(i),
                    args=(3, 'Thread{}'.format(i)))
    threads_list.append(t)
    print('{} 시작됨'.format(t.name))   # t.name
    t.start()
    
# thread 종료 기다림
for t in threads_list:
    t.join()
print('모든 {}개의 thread들 종료됨'.format(len(threads_list)))

## thread 자신에 대한 정보
* `threading.currentThread()`: python의 GIL(Global Interpreter Lock) 때문에 어느 한순간 하나의 thread만 실행
* `t.getName()`

In [None]:
def first_function():
    print (threading.currentThread().getName() + str(' 스레드 시작'))
    time.sleep(3)
    print (threading.currentThread().getName() + str( ' 스레드 종료'))

def second_function():
    print (threading.currentThread().getName() + str(' 스레드 시작'))
    # sleep 없음
    print (threading.currentThread().getName() + str( ' 스레드 종료'))

def third_function():
    print (threading.currentThread().getName() + str(' 스레드 시작'))
    time.sleep(2)
    print (threading.currentThread().getName() + str( ' 스레드 종료'))

t1 = threading.Thread(name='first_function', target=first_function)
t2 = threading.Thread(name='second_function', target=second_function)
t3 = threading.Thread(name='third_function', target=third_function)

t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

## daemon thread
* 새로운 thread를 생성할 때, `daemon=True`로 지정 가능
* daemon thread인 경우, 메인 thread가 종료되면 즉시 종료
* daemon thread가 아닌 경우, 메인 thread가 종료되더라도 자신의 작업이 끝날 때까지 계속 실행

----
```python
import threading
import time

def first_function():
    print (threading.currentThread().getName() + str(' 스레드 시작'))
    time.sleep(3)
    print (threading.currentThread().getName() + str( ' 스레드 종료'))

def second_function():
    print (threading.currentThread().getName() + str(' 스레드 시작'))
    # sleep 없음
    print (threading.currentThread().getName() + str( ' 스레드 종료'))
    
t_a = threading.Thread(name='first_function', 
                       target=first_function, 
                       daemon=True)
t_b = threading.Thread(name='second_function', 
                       target=second_function)

t_a.start()
t_b.start()
```
----

* (위 코드를 콘솔에서 별도로 실행하면 daemon thread는 자신의 작업이 끝나기 전에 종료됨)

In [None]:
!python thread_daemon.py

## 새로운 thread 클래스 정의
* `threading.Thread` 클래스를 상속받아 새로운 클래스 정의
* 추가 인수가 필요하면 `__init__(self, args)` 메소드 재정의
* thread가 시작할 때 수행할 `run(self, args)` 메소드 재정의

In [None]:
class MyThread(threading.Thread):
    def __init__(self, number, style, *args, **kwargs):
        super().__init__(*args, **kwargs)   # 상위 클래스 __init__() 실행
        self.number = number
        self.style = style

    def run(self, *args, **kwargs):
        print('Thread{} 시작!'.format(self.number))
        super().run(*args, **kwargs)
        print('Thread{} 종료!'.format(self.number))

def sleeper(num, style):
    print('  {}초 동안 잠자기~: {}'.format(num, style))
    time.sleep(num)

In [None]:
threads_list = []

for i in range(5):
    t = MyThread(number=i, style='argument example', target=sleeper, args=(3, 'dummy'))
    threads_list.append(t)
    t.start()

for t in threads_list:
    t.join()

## 동기화: `Lock` 사용
* 아래 코드를 별도로 실행

In [None]:
# %load thread_lock.py
import threading

#### 공유 변수들
shared_with_lock = 0
shared_with_no_lock = 0
COUNT = 100000
shared_lock = threading.Lock()  # Lock()

#### thread로 실행할 함수들

#### LOCK 사용 ##
def increment_with_lock():
    """Lock 걸고나서 1 증가 후 Lock 풀기"""
    global shared_with_lock
    for i in range(COUNT):
        shared_lock.acquire()  # Lock().acquire()
        shared_with_lock += 1
        shared_lock.release()  # Lock().release()

def decrement_with_lock():
    """Lock 걸고나서 1 감소 후 Lock 풀기"""
    global shared_with_lock
    for i in range(COUNT):
        shared_lock.acquire()
        shared_with_lock -= 1
        shared_lock.release()

#### NO LOCK ##
def increment_without_lock():
    """Lock 없이 1 증가"""
    global shared_with_no_lock
    for i in range(COUNT):
        shared_with_no_lock += 1

def decrement_without_lock():
    """Lock 없이 1 감소"""
    global shared_with_no_lock
    for i in range(COUNT):
        shared_with_no_lock -= 1

#### thread 4개 생성
t1 = threading.Thread(target = increment_with_lock)
t2 = threading.Thread(target = decrement_with_lock)
t3 = threading.Thread(target = increment_without_lock)
t4 = threading.Thread(target = decrement_without_lock)

#### thread 각각 시작
t1.start()
t2.start()
t3.start()
t4.start()

#### 각 thread 종료 기다림
t1.join()
t2.join()
t3.join()
t4.join()

print ("Lock 사용한 공유 변수의 값 = {}".format(shared_with_lock))
print ("Lock 사용하지 않은 변수의 값 = {}".format(shared_with_no_lock))


In [None]:
!python ./thread_lock.py

## 동기화: `RLock` 사용
* 단순히 lock을 얻고 풀기만 할 경우(아래 코드를 콘솔에서 별도로 실행)

In [None]:
# %load thread_rlock.py
import threading
import time

class Box(object):
    lock = threading.RLock()   # RLock 사용

    def __init__(self):
        self.total_items = 0

    def execute(self, n):
        Box.lock.acquire()
        self.total_items += n
        Box.lock.release()

    def add(self):
        Box.lock.acquire()   # Lock 잠금
        self.execute(1)      # Lock()의 경우, 여기서 또 잠그려고하면 마냥 기다림
        Box.lock.release()

    def remove(self):
        Box.lock.acquire()   # Lock 잠금
        self.execute(-1)     # Lock()의 경우, 여기서 또 잠그려고하면 마냥 기다림
        Box.lock.release()


def adder(box, items):
    while items > 0:
        print ("box에 1 더하고, item 1 줄이기")
        box.add()
        time.sleep(1)
        items -= 1
        
def remover(box, items):
    while items > 0:
        print ("box에 1 빼고, item 1 줄이기")
        box.remove()
        time.sleep(1)
        items -= 1


items = 5
print ("box에 item {}개 넣기".format(items))

box = Box()

t1 = threading.Thread(target=adder, args=(box,items))
t2 = threading.Thread(target=remover, args=(box,items))

t1.start()
t2.start()

t1.join()
t2.join()

print ("box에 {}개 item 남아있음.".format(box.total_items))


In [None]:
!python thread_rlock.py

## 동기화: `Semaphore` 사용

In [None]:
import threading
import time
import random

## 내부 counter의 초기값
semaphore = threading.Semaphore(0)

def consumer():
    global item
    print ("소비하기를 기다림.")
    ## semaphore 획득
    semaphore.acquire()
    ## 공유자원 접근
    print ("아이템 번호 {}를 소비함.".format(item))

def producer():
    global item
    time.sleep(5)
    ## 난수 발생
    item = random.randint(0, 1000)
    print ("아이템 번호 {}를 생성함.".format(item))
    ## 세마포어 해제: counter 1 증가, 0이되면 다른 스레드는 기다림
    semaphore.release()

# 세번의 thread 생성 및 시작
for i in range (0, 3) :
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
print ("프로그램 종료")

## 동기화: `Condition` 사용
* 특정 조건을 만족할 때까지 thread가 기다리도록 하고, 다른 thread가 이 조건이 만족되도록 알려주게 함
* 조건이 만족되면 thread는 lock을 획득하여 공유자원을 차지하게 됨

In [None]:
from threading import Thread, Condition
import time

items = []
condition = Condition()   # Condition

class consumer(Thread):
    def __init__(self):
        Thread.__init__(self)

    def consume(self):
        global condition
        global items
        
        condition.acquire()    # Lock 획득
        if len(items) == 0:
            condition.wait()
            print("소비 알림: 소비할 아이템 없음.")
        items.pop()
        print("소비 알림: 1개 아이템 소비")
        print("소비 알림: 남아있는 소비할 아이템 개수는 " + str(len(items)))
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(5)
            self.consume()

            
class producer(Thread):
    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items

        condition.acquire()
        if len(items) == 7:
            condition.wait()
            print("생산 알림: 남아있는 아이템들 개수는 " + str(len(items)))
            print("생산 알림: 생산 중지!!")
        items.append(1)
        print("생산 알림: 현재 남아있는 전체 아이템들 개수는 " + str(len(items)))
        condition.notify()
        condition.release()
        
    def run(self):
        for i in range(0, 10):
            time.sleep(3)
            self.produce()
            

producer = producer()
consumer = consumer()
producer.start()
consumer.start()
producer.join()
consumer.join()
print("프로그램 종료")

## 동기화: `Event` 사용
* event는 스레드 간 통신을 위한 객체
* 스레드는 다른 스레드가 signal을 출력하는 것을 기다림
* event 객체는 내부 flag를 관리하면서 `set()`을 이용하여 `true`로 설정하고, `clear()`를 이용하여 `false`로 변환
* `wait()`는 flag가 `true`가 될 때까지 기다림

In [None]:
import time
from threading import Thread, Event
import random

items = []
event = Event()

class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while True:
            time.sleep(1)
            self.event.wait(3)
            try:
                item = self.items.pop()
            except:
                break
            print ('소비 알림: {}가 {}를 꺼내 소비함'.format(self.name, item))


class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event
        
    def run(self):
        global item
        for i in range(5):
            time.sleep(1)
            item = random.randint(0, 256)
            self.items.append(item)
            print ('생산 알림: {}가 {}를 생산하여 추가함'.format(self.name, item))
            print ('생산 알림: {}가 event를 설정함'.format(self.name))
            self.event.set()
            print ('생산 알림: {}가 event를 해제함'.format(self.name))
            self.event.clear()
            

t1 = producer(items, event)
t2 = consumer(items, event)
t1.start()
t2.start()
t1.join()
t2.join()
print("프로그램 종료")

## `with`문 사용하기

In [None]:
import threading

def threading_with(statement):
    with statement:
        print('{}가 with를 통해 잠금'.format(statement))

def threading_not_with(statement):
    statement.acquire()
    try:
        print('{}가 직접 잠금'.format(statement))
    finally:
        statement.release()


lock = threading.Lock()
rlock = threading.RLock()
condition = threading.Condition()
mutex = threading.Semaphore(1)

threading_synchronization_list = [lock, rlock, condition, mutex]

# with문이 없는 함수와 있는 함수
for statement in threading_synchronization_list :
    t1 = threading.Thread(target=threading_with, args=(statement,))
    t2 = threading.Thread(target=threading_not_with, args=(statement,))

    t1.start()
    t2.start()
    
    t1.join()
    t2.join()

## `Queue`를 이용한 thread 통신
* `put()` : 큐에 넣기
* `get()` : 큐에서 빼기
* `task_done()` : 아이템이 처리될 때마다 호출 필요

In [None]:
import queue

In [2]:
q = queue.Queue()   # 큐 생성
q.put(5)            # 큐에 5 넣음
print(q.get())      # 큐에서 하나 꺼내옴
#print(q.get())      # 큐가 비어있을 때 꺼내려고 하면 계속 기다림
print(q.empty())    # 큐가 비어있는지 확인

for i in range(5):
    q.put(i)
    
while not q.empty():
    print(q.get(), end='  ')

5
True
0  1  2  3  4  

In [4]:
# FIFO 큐
q = queue.Queue()
for i in range(5):
    q.put(i)
while not q.empty():
    print(q.get(), end='  ')
print('')

# LIFO
q = queue.LifoQueue()
for i in range(5):
    q.put(i)
while not q.empty():
    print(q.get(), end='  ')
print('')

# PriorityQueue: 숫자
q = queue.PriorityQueue()
q.put(1); q.put(3); q.put(4); q.put(2); q.put(0)
for i in range(q.qsize()):
    print(q.get(), end='  ')
print('')

# PriorityQueue: tuple
q = queue.PriorityQueue()
q.put((1, 'A')); q.put((3,'B')); q.put((4,'C')); q.put((2,'D')); q.put((0,'E'))
for i in range(q.qsize()):
    print(q.get()[1], end='  ')

0  1  2  3  4  
4  3  2  1  0  
0  1  2  3  4  
E  A  D  B  C  

In [10]:
import threading
import queue
import time
import random

class producer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self) :
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print ('생산 알림: {}가 아이템 {}을 큐에 넣음'.format(self.name, item))
            time.sleep(1)

            
class consumer(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        
    def run(self):
        for i in range(10):
            try:
                time.sleep(2)
                item = self.queue.get_nowait()
            except:
                continue
            print ('소비 알림: {}가 아이템 {}을 큐에서 꺼냄'.format(self.name, item))
            self.queue.task_done()


q = queue.Queue()

t1 = producer(q)
t2 = consumer(q)
t3 = consumer(q)
t4 = consumer(q)

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()

print("프로그램 종료")

생산 알림: Thread-16가 아이템 3을 큐에 넣음
생산 알림: Thread-16가 아이템 168을 큐에 넣음
소비 알림: Thread-17가 아이템 3을 큐에서 꺼냄생산 알림: Thread-16가 아이템 42을 큐에 넣음
소비 알림: Thread-18가 아이템 168을 큐에서 꺼냄

소비 알림: Thread-19가 아이템 42을 큐에서 꺼냄
생산 알림: Thread-16가 아이템 70을 큐에 넣음
소비 알림: Thread-18가 아이템 70을 큐에서 꺼냄
생산 알림: Thread-16가 아이템 172을 큐에 넣음
생산 알림: Thread-16가 아이템 249을 큐에 넣음
소비 알림: Thread-18가 아이템 172을 큐에서 꺼냄
소비 알림: Thread-17가 아이템 249을 큐에서 꺼냄
생산 알림: Thread-16가 아이템 224을 큐에 넣음
생산 알림: Thread-16가 아이템 216을 큐에 넣음
소비 알림: Thread-17가 아이템 224을 큐에서 꺼냄소비 알림: Thread-18가 아이템 216을 큐에서 꺼냄

생산 알림: Thread-16가 아이템 56을 큐에 넣음
생산 알림: Thread-16가 아이템 126을 큐에 넣음
소비 알림: Thread-17가 아이템 56을 큐에서 꺼냄
소비 알림: Thread-19가 아이템 126을 큐에서 꺼냄
프로그램 종료


## thread를 사용한 경우 성능 비교

In [22]:
from threading import Thread

class threads_object(Thread):
    def run(self):
        function_to_run()
        
class nothreads_object(object):
    def run(self):
        function_to_run()
        
def non_threaded(num_iter):
    funcs = []
    for i in range(int(num_iter)):
        funcs.append(nothreads_object())
    for i in funcs:    # 순차적으로 반복 실행
        i.run()
        
def threaded(num_threads):
    funcs = []
    for i in range(int(num_threads)):
        funcs.append(threads_object())
    for i in funcs:    # thread 생성해서 실행
        i.start()
    for i in funcs:
        i.join()
        
def show_results(func_name, results):
    print ("%-23s %4.6f초 소요" % (func_name, results))

In [28]:
import sys
from timeit import Timer

repeat = 100
number = 1
num_threads = [1, 2, 4, 8]

def test():
    print ('== 테스트 시작')
    for i in num_threads:
        t = Timer("non_threaded(%s)" % i, globals=globals())
        best_result = min(t.repeat(repeat=repeat, number=number))
        show_results("non_threaded (%s iters)" % i, best_result)

        t = Timer("threaded(%s)" % i, globals=globals())
        best_result = min(t.repeat(repeat=repeat, number=number))
        show_results("threaded (%s threads)" % i, best_result)

    print ('== 테스트 끝')

def function_to_run():
    pass

# 첫번째 테스트
test()

== 테스트 시작
non_threaded (1 iters)  0.000002초 소요
threaded (1 threads)    0.000065초 소요
non_threaded (2 iters)  0.000002초 소요
threaded (2 threads)    0.000121초 소요
non_threaded (4 iters)  0.000003초 소요
threaded (4 threads)    0.000198초 소요
non_threaded (8 iters)  0.000005초 소요
threaded (8 threads)    0.000386초 소요
== 테스트 끝


In [29]:
def function_to_run():
    a, b = 0, 1
    for i in range(10000):
        a, b = b, a + b

# 두번쩨 테스트
test()

== 테스트 시작
non_threaded (1 iters)  0.002993초 소요
threaded (1 threads)    0.003216초 소요
non_threaded (2 iters)  0.005755초 소요
threaded (2 threads)    0.006353초 소요
non_threaded (4 iters)  0.011459초 소요
threaded (4 threads)    0.012440초 소요
non_threaded (8 iters)  0.023909초 소요
threaded (8 threads)    0.024854초 소요
== 테스트 끝


In [30]:
def function_to_run():
    fh = open("Parallel_Programming_Thread.ipynb", "rb")
    size = 1024
    for i in range(1000):
        fh.read(size)

# 세번째 테스트
test()

== 테스트 시작
non_threaded (1 iters)  0.001427초 소요
threaded (1 threads)    0.001516초 소요
non_threaded (2 iters)  0.002868초 소요
threaded (2 threads)    0.003517초 소요
non_threaded (4 iters)  0.005796초 소요
threaded (4 threads)    0.015509초 소요
non_threaded (8 iters)  0.011473초 소요
threaded (8 threads)    0.033770초 소요
== 테스트 끝
