# Multiprocessing

Reference: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming

In [None]:
import multiprocessing
from tqdm import tqdm
from functools import partial
import os
import sys
from pytube import YouTube
import psutil
import time
import subprocess

In [26]:
!pip install pytubefix

from pytubefix import YouTube


Collecting pytubefix
  Using cached pytubefix-8.12.2-py3-none-any.whl.metadata (5.3 kB)
Using cached pytubefix-8.12.2-py3-none-any.whl (730 kB)
Installing collected packages: pytubefix
Successfully installed pytubefix-8.12.2


### Parallelism Concept

In [27]:
def say_numbers():
    # Count 1~5
    for i in range(1,6,1):
        print(f"Number: #{i}")
        #sys.stdout.write(f"Number: #{i}\n")
        #sys.stdout.flush()
        time.sleep(0.1)

def say_alphabet():
    # Count a~e
    for i in range(ord("a"), ord("f")):
        print(f"Alphabet: {chr(i)}")
        #sys.stdout.write(f"Alphabet: {chr(i)}\n")
        #sys.stdout.flush()
        time.sleep(0.1)

In [28]:
print("-"*10+"Sequential execution"+"-"*10)
seq = time.time()
say_numbers()
say_alphabet()
print("execution time :", time.time() - seq)

----------Sequential execution----------
Number: #1
Number: #2
Number: #3
Number: #4
Number: #5
Alphabet: a
Alphabet: b
Alphabet: c
Alphabet: d
Alphabet: e
execution time : 1.0028891563415527


In [29]:
print("-"*10+"Parallel execution"+"-"*10)
par = time.time()   # 러닝타임 체크 시작
p1 = multiprocessing.Process(target=say_numbers)
p2 = multiprocessing.Process(target=say_alphabet)
p1.start()
p2.start()

# join으로 대기하지 않으면 부모 process가 종료되어 자식이 zombie가 된다.
p1.join()
p2.join()

print("execution time :", time.time() - par)    # 러닝타임 체크 끝

----------Parallel execution----------
Number: #1
Alphabet: a
Number: #2
Alphabet: b
Number: #3Alphabet: c

Number: #4Alphabet: d

Alphabet: eNumber: #5

execution time : 0.531724214553833


### Process control methods

- **start()**: Sub process를 실행시킵니다.
- **terminate()**: Process에게 SIGTERM을 보냅니다.
- **join()**: Process가 종료될 때까지 기다립니다.
- join([_timeout_]): 지정된 시간(초 단위)까지 기다립니다.

In [30]:
# terminate() example
# 아래의 코드는 좀비 프로세스를 생성시킵니다.
# `watch -n1 "ps aux|grep defunct"` 명령어로 관찰 할 수 있습니다.

def child_process_func():
    print("Child process counting started")
    # Count 1~5
    for i in range(10):
        print(f"Number: #{i}")
        time.sleep(0.5)
    # print on normal termination only.
    print("Normal termination")


p1 = multiprocessing.Process(target=child_process_func)
print("Process child process")
p1.start()
time.sleep(1)

# Force kill process
p1.terminate()

Process child process
Child process counting started
Number: #0
Number: #1


In [31]:
# join()을 이용해서 좀비 프로세스가 발생하는 것을 방지 할 수 있습니다.

def say_numbers():
    # Count 1~5
    for i in range(1,6,1):
        print(f"Number: #{i}")
        time.sleep(0.1)
    print("Process terminating...", flush=True)

p1 = multiprocessing.Process(target=say_numbers)
print("Process starting...")
p1.start()

# Wait until process terminates
print("Waiting for process to be terminated...")
p1.join()

Process starting...
Number: #1
Number: #2
Waiting for process to be terminated...
Number: #3
Number: #4
Number: #5
Process terminating...


### IPC practice: Popen

In [32]:
print("Current process ID:{}".format(os.getpid()), flush=True)

ps_process = subprocess.Popen(['bash', 'subprocess.sh'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ps_process.communicate()

print(stdout.decode())

Current process ID:30244
Subprocess PID: 33437, PPID: 30244
합계 6992
drwxrwxr-x  2 kds kds       4096  2월 26 15:16 ANN_test
drwxrwxr-x  4 kds kds       4096  3월 17 10:40 Arduino
drwxrwxr-x  8 kds kds       4096  3월 17 09:29 DX-01
drwxr-xr-x  2 kds kds       4096  3월  6 15:21 Desktop
drwxr-xr-x  3 kds kds       4096  2월 20 17:15 Documents
drwxr-xr-x  4 kds kds       4096  3월 17 11:34 Downloads
drwxr-xr-x  2 kds kds       4096  2월 20 15:47 Music
drwxr-xr-x  4 kds kds       4096  3월  4 08:57 Pictures
drwxr-xr-x  2 kds kds       4096  2월 20 15:47 Public
drwxrwxr-x  7 kds kds       4096  3월  8 15:51 Soccer_MBTI_Chatbot_RAG
drwxr-xr-x  2 kds kds       4096  2월 20 15:47 Templates
drwxr-xr-x  2 kds kds       4096  2월 20 15:47 Videos
-rwxrwxr-x  1 kds kds      15960  2월 20 16:31 a.out
drwxr-xr-x  4 kds kds       4096  8월 26  2022 coco128
-rw-rw-r--  1 kds kds    6983030  7월 10  2024 coco128.zip
-rw-rw-r--  1 kds docker     413  2월 25 15:52 dockerfile
drwxrwxr-x  4 kds kds       4096  2월 21 09:41

### `pytube` `tqdm` example

- UI와 다운로드가 message_queue로 통신

- UI Process

In [33]:
# UI 프로세스
def draw_ui(message_queue):
    print(
        "UI process starting ... PID:{}, PPID:{}"
        .format(
            os.getpid(),
            psutil.Process(os.getpid()).ppid()), flush=True)
    prev = 0
    tqdm_bar = None
    while True:
        message = message_queue.get()
        if message["type"] == "on_progress":
            if tqdm_bar is None:
                tqdm_bar = tqdm(total=100, desc="Downloading...")
            cur_rate = message["progress_rate"]
            tqdm_bar.update(int(cur_rate-prev))
            prev = int(cur_rate)
        elif message["type"] == "on_complete":
            if tqdm_bar is None:
                tqdm_bar = tqdm(total=100, desc="Downloading...")
            tqdm_bar.update(100-prev)
            tqdm_bar.close()
            print("Completed!")
            break

- Download Process

In [34]:
# 다운로드 프로세스
def on_progress(stream, chunk, bytes_remaining, message_queue):
    total_size = stream.filesize
    bytes_downloaded = total_size - bytes_remaining
    progress = (bytes_downloaded / total_size) * 100
    message_queue.put({"type":"on_progress", "progress_rate":progress})

def on_complete(stream, file_handle, message_queue):
    message_queue.put({"type":"on_complete"})

def download(url, message_queue):
    print(
        "Download process starting ... PID:{}, PPID:{}"
        .format(
            os.getpid(),
            psutil.Process(os.getpid()).ppid()), flush=True)
    on_progress_with_MQ = partial(on_progress, message_queue=message_queue)
    on_complete_with_MQ = partial(on_complete, message_queue=message_queue)
    youtube_clip = YouTube(
                        url,
                        on_progress_callback=on_progress_with_MQ,
                        on_complete_callback=on_complete_with_MQ)
    youtube_stream = youtube_clip.streams.get_highest_resolution()
    youtube_stream.download("videos")

- Multiprocessing

In [35]:
# 미국은 어떻게 강대국이 되었나
url = "https://www.youtube.com/watch?v=S0NsxGrMg3Q"

print("main process running ... PID:{}".format(os.getpid()), flush=True)

message_queue = multiprocessing.Queue()

p1 = multiprocessing.Process(target=draw_ui, args=(message_queue,))
p2 = multiprocessing.Process(target=download, args=(url, message_queue,))

p1.start()
p2.start()

p1.join()
p2.join()

main process running ... PID:30244
UI process starting ... PID:33577, PPID:30244
Download process starting ... PID:33580, PPID:30244


Downloading...: 100%|██████████| 100/100 [00:12<00:00,  8.13it/s]


Completed!


In [None]:
import multiprocessing
import os
from pytubefix import YouTube

# UI를 그리는 함수
def draw_ui(message_queue):
    while True:
        message = message_queue.get()  # 큐에서 메시지를 가져옴
        if message == 'exit':  # 종료 신호
            break
        print(f"UI Update: {message}")

# 다운로드 함수
def download(url, message_queue):
    try:
        yt = YouTube(url)
        stream = yt.streams.get_highest_resolution()
        message_queue.put(f"Downloading: {yt.title}")
        stream.download()
        message_queue.put(f"Download Complete: {yt.title}")
    except Exception as e:
        message_queue.put(f"Error: {str(e)}")

# 메인 실행
if __name__ == '__main__':
    url = "https://www.youtube.com/watch?v=S0NsxGrMg3Q"

    print("main process running ... PID:{}".format(os.getpid()), flush=True)

    message_queue = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=draw_ui, args=(message_queue,))
    p2 = multiprocessing.Process(target=download, args=(url, message_queue,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()


main process running ... PID:30244
UI Update: Downloading: 미친듯한 운과 실력 미국은 어떻게 최강대국이 되었나
UI Update: Download Complete: 미친듯한 운과 실력 미국은 어떻게 최강대국이 되었나
