# **<font color=#0abab5>Process and Concurrency</font>**

### Programs and Processes

理論內容跟大學作業系統課差不多，感覺可省略

In [5]:
import os 
os.getpid() # process id

10701

### Create a process with subprocess

程式碼預設是用一個process去進行，但可以利用os提供的api從process上創造其他的process叫subprocess
下面這段利用subprocess程式庫呼叫linux(雖然我是mac)上的date，並抓取stdout來做使用

In [8]:
import subprocess
ret = subprocess.getoutput('date')
ret

'2022年12月 2日 週五 20時23分11秒 CST'

In [9]:
ret = subprocess.getoutput('date -u')
ret

'2022年12月 2日 週五 12時26分03秒 UTC'

In [10]:
# or with pipe
ret = subprocess.getoutput('date -u | wc')
ret

'       1       5      45'

In [11]:
# check_output method
ret = subprocess.check_output(['date', '-u'])
ret

b'2022\xe5\xb9\xb412\xe6\x9c\x88 2\xe6\x97\xa5 \xe9\x80\xb1\xe4\xba\x94 12\xe6\x99\x8236\xe5\x88\x8646\xe7\xa7\x92 UTC\n'

In [12]:
# exit code
ret = subprocess.getstatusoutput('date')
ret

(0, '2022年12月 2日 週五 20時37分16秒 CST')

In [14]:
# dont care output but exit code
ret = subprocess.call('date')
ret

2022年12月 2日 週五 20時38分41秒 CST


0

In [18]:
# in shell script format
ret = subprocess.call('date -u', shell = True)

# list of arg way
ret = subprocess.call(['date', '-u'])

2022年12月 2日 週五 12時40分50秒 UTC
2022年12月 2日 週五 12時40分50秒 UTC


### Create process with multiprocessing

In [None]:
# 不能在juypter跑，另外跑
from multiprocessing import Process
import os

def whoami(what):
    print("Process %s says: %s" % (os.getpid(), what))

if __name__ == "__main__": 
    whoami("I'm the main program") 
    for n in range(4):
        p = Process(target=whoami,
            args=("I'm function %s" % n,))
        p.start()

multipreocessing module非常實用，可以用在所有外部程式的呼叫以及把資料跟python腳本互動
在後面concurrency段落有更多應用

In [None]:
# kill process

# 另外執行
import multiprocessing
import time
import os
def whoami(name):
    print("I'm %s, in process %s" % (name, os.getpid()))

def loopy(name): 
    whoami(name)
    start = 1
    stop = 1000000
    for num in range(start, stop):
        print("\tNumber %s of %s. Honk!" % (num, stop))
        time.sleep(1)

if __name__ == "__main__": 
    whoami("main")
    p = multiprocessing.Process(target=loopy, args=("loopy",))
    p.start()
    time.sleep(5)
    p.terminate()

### Command Automation

用python來管理、執行類似shell script用途的操作

#### Invoke
把function呼叫轉成command line arguments

In [None]:
import os
os.system('pip install invoke')

In [43]:
# run my cli
from invoke import task

@task
def mytime(ctx): 
    import time
    now = time.time()
    time_str = time.asctime(time.localtime(now)) 
    print("Local time is", timestr)

### Concurrency

concurrency = 平行運算
這本書舉例兩個最常見的需要平行運算的情境

#### I/O bound
等網路、等硬碟

#### CPU/GPU bound
CPU/GPU在計算一個大的，導致他們一直在忙碌，圖學跟科學計算都是常見情境

另外重要兩個term：

- Synchrnous
同步：連續進行

- Asynchrnous
非同步：task各自獨立運行， 

這段落有稍微介紹平行處理的use-case，總括就是不能夠等的情境，與其在那邊busy waiting，不如直接平行運算做其他服務該做的事

### Queues

就是FIFO的queue

multiprocessing module有著queue function存在，
可以queue各個process的順序如下

In [None]:
# 單獨執行
import multiprocessing as mp

def washer(dishes, output): 
    for dish in dishes:
        print('Washing', dish, 'dish') 
        output.put(dish)

def dryer(input): 
    while True:
        dish = input.get() 
        print('Drying', dish, 'dish') 
        input.task_done()

if __name__ == '__main__':
    dish_queue = mp.JoinableQueue()
    dryer_proc = mp.Process(target=dryer, args=(dish_queue,))
    dryer_proc.daemon = True
    dryer_proc.start()

    dishes = ['salad', 'bread', 'entree', 'dessert']
    washer(dishes, dish_queue)
    dish_queue.join()

### Threads

Thread是在同個process中執行，一對多的資源單元
跟上面multiprocessing module對應，有著threading的module

以下用thread進行類似上面process的任務

In [53]:
import threading 

def do_this(what):
    whoami(what) 

def whoami(what):
    print("Thread %s says: %s" % (threading.current_thread(), what))

if __name__ == "__main__": 
    whoami("I'm the main program") 
    for n in range(4):
        p = threading.Thread(target=do_this,
                              args=("I'm function %s" % n,))
        p.start()   

Thread <_MainThread(MainThread, started 4313335168)> says: I'm the main program
Thread <Thread(Thread-22, started 6364196864)> says: I'm function 0
Thread <Thread(Thread-23, started 6364196864)> says: I'm function 1
Thread <Thread(Thread-24, started 6364196864)> says: I'm function 2
Thread <Thread(Thread-25, started 6364196864)> says: I'm function 3


In [55]:
import threading, queue 
import time

def washer(dishes, dish_queue): 
    for dish in dishes:
        print ("Washing", dish) 
        time.sleep(5)
        dish_queue.put(dish)
def dryer(dish_queue): 
    while True:
        dish = dish_queue.get() 
        print ("Drying", dish) 
        time.sleep(10) 
        dish_queue.task_done()

dish_queue = queue.Queue() 
for n in range(2):
    dryer_thread = threading.Thread(target=dryer, args=(dish_queue,))
    dryer_thread.start()

dishes = ['salad', 'bread', 'entree', 'dessert']
washer(dishes, dish_queue)

dish_queue.join()

Washing salad
WashingDrying salad
 bread
WashingDrying bread
 entree
WashingDrying entree
 dessert
Drying dessert



Threads 可能會產生危險的結果，多個thread可能會導致code難以debug

要確保thread中用到的api都是thread safe的

上面範例完全沒在thread safe所以亂成一團

用multi-process的時候則可以確保每個process獨立運行並且獨立結束，

整個process是一整組的，執行結果會在完全跑完之後才回報

#### Thread safe

Thread在沒有使用到global data的時候是安全且好用的，特別是在IO完成的時候可以比process快很多

但thread在很多時候有必要去控制一些global data，此時就需要thread safe來確保避免preemption

安全地共享資料的方法常常是使用software lock，在lock段落變動參數的時候會禁止其他thread動到這個段落

#### CPU-bound

書上提到python的thread沒辦法解決cpu-bound的問題，因為python interperter的Global Interpreter Lock設計

所以跟傳統thread加速不一樣，python的multi-thread可能比single-thread或multi-process更慢

所以書上建議：

- thread 只用來處理 I/O bound problem
- CPU bound用processes, networking or events去解決

### concurrent.futures

一個thread跟multiple的module用法，可以更方便的schedule async pooled workers

照上面所述去解決各自的問題

In [None]:
# 有process單獨跑
from concurrent import futures 
import math
import time
import sys
def calc(val): 
    time.sleep(1)
    result = math.sqrt(float(val)) 
    return result

def use_threads(num, values): 
    t1 = time.time()
    with futures.ThreadPoolExecutor(num) as tex: 
        results = tex.map(calc, values)
    t2 = time.time() 
    return t2 - t1

def use_processes(num, values): 
    t1 = time.time()
    with futures.ProcessPoolExecutor(num) as pex: 
        results = pex.map(calc, values)
    t2 = time.time() 
    return t2 - t1

def main(workers, values):
    print(f"Using {workers} workers for {len(values)} values") 
    t_sec = use_threads(workers, values)
    print(f"Threads took {t_sec:.4f} seconds")
    p_sec = use_processes(workers, values)
    print(f"Processes took {p_sec:.4f} seconds")
    
if __name__ == '__main__':
    workers = 1
    values = list(range(1, 6)) # 1 .. 5 main(workers, values)
    main(workers, values)

In [None]:
# 有process單獨跑
from concurrent import futures 
import math
import sys

def calc(val):
    result = math.sqrt(float(val)) 
    return val, result

def use_threads(num, values):
    with futures.ThreadPoolExecutor(num) as tex:
        tasks = [tex.submit(calc, value) for value in values] 
        for f in futures.as_completed(tasks):
            yield f.result()

def use_processes(num, values):
    with futures.ProcessPoolExecutor(num) as pex:
        tasks = [pex.submit(calc, value) for value in values] 
        for f in futures.as_completed(tasks):
            yield f.result()

def main(workers, values):
    print(f"Using {workers} workers for {len(values)} values") 
    print("Using threads:")
    for val, result in use_threads(workers, values): 
        print(f'{val} {result:.4f}')
    print("Using processes:")
    for val, result in use_processes(workers, values):
        print(f'{val} {result:.4f}')

if __name__ == '__main__': 
    workers = 3
    if len(sys.argv) > 1:
        workers = int(sys.argv[1])
    values = list(range(1, 6)) # 1 .. 5 
    main(workers, values)

### Green Threads and gevent

開發者利用平行化的方式來加速一些程式運作，apache就是這種例子

而另一種則是基於事件的方式去寫程式，用一個central event loop分配任務並且進行，NGINX就是這種類型，

而且他通常比apache快

In [None]:
import os

os.system('pip install gevent')

In [63]:
import gevent
from gevent import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com',
'www.google.com']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts] 
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

66.6.44.4
144.217.51.126
172.217.160.68


透過gevent.spawn生成greenlet(green thread/microthread)，並且各自執行自己的事件，完成後再收集結果

green thread和thread不同點在於它不會block，gevent在等待時會切換到其他greenlets，他們彼此是yield為主而不是搶佔

應該是類似fiber的實作

##### gevent.monkey

可以替換程式中的呼叫為gevent版本

```
from gevent import monkey
monkey.patch_socket()
```

呼叫上面這行，會讓程式中所有socket呼叫變成gevent版本

注意只會替換掉python寫的函式集，c寫的不會理他

In [65]:
import gevent
from gevent import monkey; monkey.patch_all()
import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com',
'www.google.com']

jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts] 

gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

66.6.44.4
144.217.51.126
172.217.163.36


#### gevent的問題

跟所有event-based的程式碼一樣，每一個指派給gevent的task應該要相對較快

雖然相對thread他是非blocking的，但是在上面進行大量的執行依然很慢

雖然monkey-patching的大量替換方式讓人覺得有點危險，但滿多大型網站例如pinterest都使用gevent來加速他們的網站

多閱讀gevent的官方指引來避免錯誤使用才是真的。

### Twisted

twisted 是一個非同步、事件驅動的網路框架，可以聆聽一些事件的觸發來執行function。

是一個callback design

In [None]:
import os

os.system('pip install twisted')

In [None]:
# knock-server

from twisted.internet import protocol, reactor

class Knock(protocol.Protocol): 
    def dataReceived(self, data):
        print('Client:', data)
        if data.startswith("Knock knock"):
            response = "Who's there?" 
        else:
            response = data + " who?" 
            print('Server:', response) 
            self.transport.write(response)
class KnockFactory(protocol.Factory): 
    def buildProtocol(self, addr):
        return Knock()

reactor.listenTCP(8000, KnockFactory())
reactor.run()

In [None]:
# knock-client

from twisted.internet import reactor, protocol

class KnockClient(protocol.Protocol): 
    def connectionMade(self):
        self.transport.write("Knock knock")
    def dataReceived(self, data):
        if data.startswith("Who's there?"):
            response = "Disappearing client"
            self.transport.write(response) 
        else:
            self.transport.loseConnection()
            reactor.stop()
class KnockFactory(protocol.ClientFactory): 
    protocol = KnockClient

def main():
    f = KnockFactory() 
    reactor.connectTCP("localhost", 8000, f) 
    reactor.run()

if __name__ == '__main__': 
    main()

### asyncio

python在3.4加入官方library中，大家最愛的async await

比起callback更好可讀性，以及更直覺的呼叫await操作

In [84]:
import asyncio

async def say(phrase, seconds):
    print(phrase)
    await asyncio.sleep(seconds)

async def wicked():
    task_1 = asyncio.create_task(say("Surrender,", 2))
    task_2 = asyncio.create_task(say("Dorothy!", 0))
    await task_1
    await task_2

await wicked()

await say("hello", 2)
await say("world", 0)

Surrender,
Dorothy!
hello
world


#### Redis

用python與redis資料庫互動，更有效率的用in memory db做存取

這本書這邊提到是想講解跨機器的平行處理的方法論

不過redis的操作在下一章才有，這邊先跳過範例

#### Beyond Queues

在最高層級的系統面去看，這些平行處理的系統會遇到一些問題，通常的處理方法如下：

- Fire and forget

    把執行的事情丟下去就不管了

- Request-reply

    對於每件request都要等到完成的確認

- Back pressure or throttling

    根據負載調節worker的使用

有一些套件可以負責這些最上層的queue管理

- celery : 利用上面提到的gevent, process等去執行分散式的task
- rq : python的job queue library，同樣基於redis