In [1]:
# !pip install multiprocess # 이거 절대로 쓰면 안된다
                            # 써봤다가 노트북(쥬피터 노트북이 아니다) 나갈 뻔했다

Collecting multiprocess
  Downloading multiprocess-0.70.16-py312-none-any.whl.metadata (7.2 kB)
Collecting dill>=0.3.8 (from multiprocess)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Downloading multiprocess-0.70.16-py312-none-any.whl (146 kB)
Downloading dill-0.3.8-py3-none-any.whl (116 kB)
Installing collected packages: dill, multiprocess
Successfully installed dill-0.3.8 multiprocess-0.70.16


In [1]:
import multiprocessing as mp
import numpy as np
import os
from lib.testwork import square

'''
    multiprocessing 모듈은 윈도우와 유닉스/리눅스에서 다르게 작동한다
    <윈도우>
    윈도우의 경우 fork() system call이 없어 새로운 프로세스를 생성할 경우, 새로운 파이썬 인터프리터를 시작함(파이썬에서 프로세스는 1개의 파이썬 인터프리터를 갖는다)과 동시에 전체 코드를 처음부터 다시 실행한다
    따라서 'if __name__ == "__main__":'을 명시하지 않으면 프로세스 생성문을 계속 실행하기 때문에 무한 프로세스 생성 현상이 발생하게 된다(리눅스/유닉스 계열은 해당 코드를 작성하지 않아도 자식 프로세스가 본인에게 배정된 작업 함수만 수행한다)
    -x- 처음 파이썬 프로그램을 시작하여 생성된 첫번째 프로세스는 __name__에 "__main__"이 들어있지만 두번째 프로세스부터는 "__mp_main__"이 들어있다
        따라서 'if __name__ == "__main__":'을 명시하여 코드를 작성하면, 새로이 시작된 프로세스가 프로그램의 처음부터 다시 실행하더라도 if문 안쪽의 프로세스 생성문이 실행되지 않아서 프로세스가 무한히 생성되는 것을 방지한다
    
    <유닉스/리눅스>
    유닉스/리눅스의 경우 fork() system call이 존재하기 때문에 새로운 프로세스 생성 명령이 실행되면 현재 프로세스를 복사하여(현재 프로세스의 code, data, stack 그리고 cpu 문맥(프로그램 카운터), 운영체제 데이터, pcb등 문맥을 모두 복사한다) 새로운 프로세스를 생성한다
    이 때 원본이 되는 프로세스를 "부모 프로세스" 복사된 프로세스를 "자식 프로세스"라고 한다
    생성된 자식 프로세스는 코드를 처음부터 다시 실행하는 것이 아닌 mp.Pool(8)와 같은 프로세스 생성 호출문 바로 다음 코드부터 실행하게 된다(파이썬의 멀티 프로세싱에서는 자식 프로세스에게 배정된 작업만이 실행된다)

    <쥬피터에서 멀티-프로세싱이 안되는 이유>
    "윈도우"에서 파이썬 파일(.py)을 사용한 일반적인 멀티-프로세싱 실행 과정은 아래와 같다
    1. 새로운 프로세스 생성 명령을 호출하면 파이썬은 해당 명령을 호출한 (실제로 그런 것은 아니지만 이하 '부모 프로세스'라고 칭함)main module(.py 파일 혹은 전체 파이썬 코드가 저장된 파일)과 배정된 작업을 처리하기 위한 (이하 '데이터'라고 칭함)인자(데이터)를 pickle(serialize, 직렬화)하여 새로운 프로세스로 보낸다
    2. 새로운 프로세스(이하 '자식 프로세스'라 칭함)는 새로운 파이썬 인터프리터를 실행하고 넘겨받은 main module(.py 파일)을 처음부터 다시 실행(import)한다
    3. 이후, 자식 프로세스는 전달받은 데이터를 unpickle(deserialize, 비직렬화)하고, 이를 입력 데이터로 하여 배정된 작업 함수를 호출한다
    4. 자식 프로세스가 main modeule(.py 파일)의 실행을 완료하면, 배정된 작업(mp_func())에 해당하는 함수의 반환된 결과를 pickle(serialize, 직렬화)하여 부모 프로세스에게 보낸다
    5. 부모 프로세스는 전달 받은 결과물을 unpickle(deserialize, 비직렬화)하여 취합하고, 나머지 파이썬 코드를 계속 실행한다
    
    헌데 쥬피터 노트북은(랩도 마찬가지) 일반적인 파이썬 스크립트가 아닌, 백그라운드에서 상호작용 가능한 파이썬 커널을 실행하고 있는 웹 애플리케이션이다
    이는 쥬피터 노트북에서의 main module(여기서는 .ipynb 파일)이 일반적인 파이썬 스크립트가 아니라 파이썬 객체임을 의미한다
    이러한 파이썬 객체는 pickle(serialize, 직렬화)할 수 없고 따라서 이를 새로운 프로세스에 보내줄 수 없다
    따라서 새로운 프로세스가 생성될 수 없으며, 이것이 쥬피터 상에서 (if문을 둘째치고서라도)멀티-프로세싱이 작동하지 않는 이유다

    <해결방법(4가지가 있지만 그 중 가장 간편한 방법)>
    멀티 프로세싱으로 처리할 작업에 해당하는 함수를 따로 파이썬 파일(.py)로 생성하고 그것을 불러온다
    위 방법이 가능한 이유는 아래와 같다
    1. 윈도우 상에서 새로운 프로세스를 생성할 때 main module을 넘겨준다는 했는데, 이는 새로운 프로세스에 배정한 작업 함수가 main module내에 작성되어있기 때문이다
    ==> 이 때 main module은 작업 함수가 작성되어있는 곳이 된다
    ==> 위의 사실에 따라 작업에 해당하는 함수를 .py 파일로 별도로 생성하게 되면 pickle이 가능해져 main module을 새로운 프로세스에 전달해 줄 수 있게 된다
    2. 작업을 별도의 .py 파일로 생성할 때 "작업에 해당하는 함수만" 작성하였고 "프로세스 생성문은 작성하지 않았기 때문에" 무한 프로세스 생성 루프에 빠지지 않는다(빠질 수 없다)
    
    <참고>
    https://bobswinkels.com/posts/multiprocessing-python-windows-jupyter/(how to use pool.map in jupyter notebook : 검색 문구)
    https://velog.io/@sunaookamisiroko/4.-%ED%94%84%EB%A1%9C%EC%84%B8%EC%8A%A4-%EC%83%9D%EC%84%B1
'''
'''
    윈도우에서의 파이썬 멀티프로세싱 모듈의 작동 방식을 정리하면 아래와 같다
    A. 쥬피터를 사용하지 않는 경우
        1. 프로세스 생성문을 실행하면 작업 함수가 작성되어 있는 부모 프로세스의 메인 모듈(파이썬 파일)을 처리해야할 데이터와 함께 pickle하여 생성할 프로세스에게 전달한다
        2. 윈도우에서는 fork 시스템 콜이 없어 프로세스 생성문은 새로운 프로세스(역할상으로만 자식 프로세스에 해당한다)를 생성한다
        3. 새로이 생성된 프로세스는 전달 받은 (unpickle한)메인 모듈을 처음부터 끝까지 다시 실행하는데, 이 때 무한 프로세스 생성 현상을 방지하기 위해 메인 모듈에 "if __name__ == "__main__""을 작성한다
        4. 작업이 완료된 프로세스들은 결과물을 pickle하여 모체 프로세스에게 전달한다
        5. 부모 프로세스는 그것을 다시 unpickle하여 나머지 작업을 수행한다
    B. 쥬피터를 사용하는 경우
        1. 쥬피터 파일은 파이썬 객체 취급임으로 여기에 작업 함수를 작성하게 되면 main module이 pickle되지 않아 생성되는 프로세스에게 전달해 줄 수 없다(즉, 프로세스가 생성되지 못한다)
        2. 따라서 작업 함수를 별도의 파이썬 파일에 작성하여야 해당 파일이 main module이 되어 새로운 프로세스를 생성할 수 있게 된다
        3. 모체 프로세스가 main module과 처리해야할 데이터를 pickle하여 생성되는 프로세스에 전달한다(프로세스가 생성된다)
        4. 이 때, main module에는 작업 함수와 그에 필요한 것들만 작성되기 때문에 무한 프로세스 생성 현상은 애초에 발생할 수 없게 된다(따라서 "if __name__ == "__main__""을 작성할 필요가 없다)
        5. 작업이 완료된 프로세스들은 결과물을 pickle하여 모체 프로세스에게 전달한다
        6. 부모 프로세스는 그것을 다시 unpickle하여 나머지 작업을 수행한다

    -x- 파이썬에서의 프로세스는 자신만의 파이썬 인터프리터를 갖으며, 단일 스레드 프로세스이다(따라서 GIL의 성능 저하 문제를 겪지 않는다)
        파이썬 인터프리터는 그 자체로 GIL(Global Interpreter Lock)이라는 단일(single) lock을 갖는다
        GIL은 파이썬에서 한 번에 하나의 스레드만 실행되도록 강제한다
'''
# def square(x):
#     return np.square(x)

# 아래의 if문을 사용하지 않으면 무한 오류에 빠지게 된다
# 쥬피터에서는 아래의 if문을 사용해도 멀티 프로세싱이 수행되지 않는다
if __name__ == "__main__":
    x = np.arange(64) # 0 ~ 63까지의 정수 생성
    print(x, type(x), x.dtype)
    print("Current Host System cpu core number :", mp.cpu_count()) # 현재 호스트 시스템의 cpu(코어) 개수를 반환한다
                                                                   # 그러나 이것이 현재 프로세스가 사용 가능한 cpu(코어)의 개수를 의미하지는 않는다
    pool = mp.Pool(8) # 8개의 프로세스를 가지는 프로세스 풀을 생성한다
    squared = pool.map(square, [x[8*i:8*i+8] for i in range(8)]) # 총 64개의 숫자를 8개씩 분할하여 각 프로세스에 배정한다
    print(squared)

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63] <class 'numpy.ndarray'> int32
Current Host System cpu core number : 16
[array([ 0,  1,  4,  9, 16, 25, 36, 49]), array([ 64,  81, 100, 121, 144, 169, 196, 225]), array([256, 289, 324, 361, 400, 441, 484, 529]), array([576, 625, 676, 729, 784, 841, 900, 961]), array([1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]), array([1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209]), array([2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025]), array([3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969])]


In [8]:
import numpy as np
import multiprocessing as mp
import os
from lib.worker_testing import square

x = np.arange(64) # 0 ~ 63까지의 정수 생성
print(x, type(x), x.dtype)
print("Current Host System cpu core number :", mp.cpu_count()) # 현재 호스트 시스템의 cpu(코어) 개수를 반환한다
                                                               # 그러나 이것이 현재 프로세스가 사용 가능한 cpu(코어)의 개수를 의미하지는 않는다
# with 구문을 사용하지 않아도 코드 실행에 아무런 문제는 없지만,
# with을 사용하는 것이 자원 관리 측면에서 훨씬 더 좋다
with mp.Pool(8) as pool:
    squared = pool.map(square, [x[8*i:8*i+8] for i in range(8)]) # 총 64개의 숫자를 8개씩 분할하여 각 프로세스에 배정한다
                                                                 # 각 프로세스가 수행해야 하는 작업(함수)과 작업물(데이터)을 명시한다
for i in range(len(squared)):
    print(squared[i])

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63] <class 'numpy.ndarray'> int32
Current Host System cpu core number : 16
[ 0  1  4  9 16 25 36 49]
[ 64  81 100 121 144 169 196 225]
[256 289 324 361 400 441 484 529]
[576 625 676 729 784 841 900 961]
[1024 1089 1156 1225 1296 1369 1444 1521]
[1600 1681 1764 1849 1936 2025 2116 2209]
[2304 2401 2500 2601 2704 2809 2916 3025]
[3136 3249 3364 3481 3600 3721 3844 3969]


In [4]:
torch.arange(1, 16).pow(2)

tensor([  1,   4,   9,  16,  25,  36,  49,  64,  81, 100, 121, 144, 169, 196,
        225])

In [None]:
# https://www.google.co.kr/search?q=pytorch+mp.Queue&sca_esv=ae093cb415ffb20d&source=hp&ei=xcAzZ-_SIoun1e8Po-KTsA0&iflsig=AL9hbdgAAAAAZzPO1aWq3AZFv35PybPJDU_yh7WYeJTm&ved=0ahUKEwivydqB19eJAxWLU_UHHSPxBNYQ4dUDCA8&uact=5&oq=pytorch+mp.Queue&gs_lp=Egdnd3Mtd2l6IhBweXRvcmNoIG1wLlF1ZXVlMgQQABgeMggQABiABBiiBDIIEAAYgAQYogQyCBAAGIAEGKIEMggQABiABBiiBEixJ1AAWMkmcAB4AJABAJgBfqABuQ2qAQQwLjE1uAEDyAEA-AEBmAIPoALuDcICCxAAGIAEGLEDGIMBwgIREC4YgAQYsQMY0QMYgwEYxwHCAggQABiABBixA8ICBRAAGIAEwgIHEAAYgAQYE8ICBhAAGBMYHsICCBAAGBMYCBgemAMAkgcEMC4xNaAHg0U&sclient=gws-wiz
# https://stackoverflow.com/questions/56426326/using-tensor-share-memory-vs-multiprocessing-queue-in-pytorch-when-training-m?rq=3
import torch
import numpy as np
import torch.multiprocessing as mp
from lib.worker_testing import square3, square4

processes = [] # 프로세스 풀 생성
proc_num = 8 # 생성할 프로세스의 개수
'''
    <참고>
    https://stackoverflow.com/questions/73590545/when-get-tensor-from-multiprocesses-by-torch-multiprocessing-queue-runtimeerr
    https://discuss.pytorch.org/t/using-torch-tensor-over-multiprocessing-queue-process-fails/2847
    https://stackoverflow.com/questions/53094812/how-to-assure-the-multiprocessing-queue-is-empty

    mp.Queue()를 사용하여 다중 프로세스간 "torch 텐서"를 공유하는 과정은 아래와 같다
    1. 주 프로세스가 하위 프로세스를 생성한다
    2. 하위 프로세스는 배정받은 작업 처리를 모두 완료한 후 최종 결과물인 "torch 텐서"가 아닌 그와 관련된 토큰을 mp.Queue()에 저장한다
    3. 주 프로세스가 mp.Queue()로부터 해당 토큰을 읽는다
    4. 토큰을 읽은 주 프로세스는 이를 통해 큐에 해당 토큰을 저장한 하위 프로세스를 대상으로 unix socket을 열어 통신을 시도한다
    5. 하위 프로세스는 unix socket을 통한 통신으로 file descriptor(이게 최종 결과물 "torch 텐서"이다)를 주 프로세스에게 전달한다
    위 과정을 통해 mp.Queue()로부터 주 프로세스가 하위 프로세스의 결과 torch 텐서를 읽어오기 위해서는 그 때 까지 하위 프로세스가 종료되어서는 안된다
    torch 텐서는 mp.Queue()에 직접 저장되지 않으며, unix socket을 통해 file descriptor로써(즉, torch 텐서가 파일로써 다루어진다는 의미이다) 하위 프로세스에서 주 프로세스로 직접 전달된다
    - 윈도우에서는 unix socket을 이용하는지 잘 모르겠지만 주 프로세스와 하위 프로세스간 직접적인 통신을 통해 텐서가 전달된다는 것은 확실하다
    - 넘파이의 경우, torch 텐서와는 다르게 mp.Queue()에 직접 저장되기 때문에 하위 프로세스가 종료되어도 오류가 발생하지 않는다

    mp.Queue()의 empty() 메서드는 미래에 이 큐로 어떠한 데이터도 들어오지 않을 것임을 보장하지 않는다
    - 즉, empty() 메서드는 지금 당장 큐가 비었는지의 여부만 확인해줄 뿐이며, 미래에 해당 큐로 새로운 데이터가 들어올 수도 있다는 것이다
    따라서 코드 구현시 주 프로세스가 큐로부터 하위 프로세스 수만큼 데이터를 읽어올 때까지 데이터 읽기 과정을 반복해야 한다
'''
'''
    <참고>
    https://velog.io/@hyeseong-dev/File-Descriptor
    https://kkikyul.tistory.com/49
    https://m.blog.naver.com/songblue61/221391888403
    
    file descriptor란 리눅스 혹은 유닉스 계열의 시스템에서 프로세스(process)가 파일(file)을 다룰 때 사용하는 개념으로, 프로세스에서 특정 파일에 접근할 때 사용하는 추상적인 정수값이다
    파일 디스크럽터는 일반적으로 0이 아닌 정수값을 갖는데, 이는 이 값이 프로세스가 유지하고 있는 파일 디스크립터 테이블의 인덱스를 의미하기 때문이다
    - 파일 디스크립터 테이블이란 프로세스가 유지하고 있는 테이블로, 프로세스가 현재 사용중인 파일들을 관리하기 위해 자체적으로 갖고 있는 테이블이다
    - 이 테이블의 각 항목을 지칭하는 "인덱스"가 바로 파일 디스크립터이다
    - 파일 디스크립터 테이블의 각 항목은(파일 디스크립터는 단지 이 항목들의 "인덱스"일 뿐이다)은 "파일 디스크립터 플래그(fd flags)"와 "파일 테이블의 특정 항목을 참조하는 포인터"를 갖는다
    흔히 유닉스 시스템에서는 모든 것을 파일이라고 간주하는데, 일반적인 정규파일부터 디렉토리, 소켓, 파이프, 블록 디바이스, 케릭터 디바이스 등 "모든 객체들을 파일로 관리한다"
    여기서 유닉스 시스템의 프로세스가 파일에 접근할 때 사용하는 것이 바로 파일 디스크립터이다
    프로세스가 실행 중에 파일을 Open하면(객체를 불러오면), 커널은 해당 프로세스가 유지하고 있는 파일 디스크립터 테이블의 인덱스 중 사용하지 않는 가장 작은 인덱스를 할당해준다(인덱스의 수에는 한계가 있으며, 따라서 열 수 있는 파일도 제한되어 있다)
    이 후 프로세스가 열려있는 파일에 시스템 콜을 이용해서 접근할 때(불러온 객체에 접근할 때), 파일 디스크립터(인덱스)를 이용해서 접근하려는 파일(객체)을 특정 및 접근할 수 있다
    - 파일 디스크립터에 해당하는 항목이 파일 테이블의 특정 항목을 가리키고, 이것이 다시 VSF i-node 테이블의 항목을 가리킨다
    - VSF i-node 테이블의 항목이 다시 파일 시스템(file system)에 등록되어 있는 특정 파일을 가리킨다
    프로그램에 대한 프로세스가 생성될 때 기본적으로 할당되는 파일 디스크립터는 표준 입력(Standard Input), 표준 출력(Standard Output), 표준 에러(Standard Error)이며, 이들에게 각각 0, 1, 2라는 정수가 할당된다(이 숫자들은 계속 사용되는 것으로 할당 가능한 인덱스에서 제외된다)

    file table이란 시스템 내에서 모든 열려있는 파일들을 관리하기 위한 테이블로, 커널이 유지하고 있는 테이블이다
    - 다시 말해 프로세스에 의해 open된 파일에 대한 읽기/쓰기 작업을 지원하기 위한 테이블로, 파일이 열릴때 마다 테이블 내에 항목이 하나씩 추가된다
    파일 디스크립터 테이블 내의 항목이 가진 포인터는 파일 테이블 내의 특정 항목을 참조하고 있으며, 파일 테이블의 항목은 VSF i-node 테이블의 항목을 참조하고 있다(VSF i-node 테이블의 항목은 다시 파일 시스템 내의 실제 파일을 참조한다)
'''
queue = mp.Queue() # 프로세스가 반환하는 결과물을 취합하기 위한 큐 생성
                   # mp.Queue를 사용하면 주 프로세스가 큐로부터 데이터를 읽어오기 전에 하위 프로세스가 종료되어 주 프로세스의 큐 읽기 작업에 오류가 발생할 수 있다
                   # "발생할 수 있다"라고 언급한 것 처럼 그렇지 않은 경우도 있긴하다
# x = np.arange(64)
x = torch.arange(1, 64)

print("Current available Host System cpu core number :", mp.cpu_count())

# mp.Pool()은 어떤 프로세스에게 어떤 작업을 배정할지를 자동으로 결정하는 반면,
# mp.Process()는 어떤 프로세스에게 어떤 작업을 배정할지를 수동으로 지정한다
# 프로세스 작업 수행 과정은 CMD에서 확인할 수 있다(.run() 메서드를 사용하면 직접 출력해 볼 수 있다)
for i in range(proc_num):
    start_index = 8 * i
    # multiprocessing에서 process는 Process 객체가 생성되고 이 객체의 start() 메서드가 호출되었을 때 소환된다(spawn)
    # Process 객체는 분리된 프로세스 내에서 실행중인 활동을 나타낸다
    proc = mp.Process(target=square3, args=(i, x[start_index:start_index+8], queue)) # 프로세스 번호, 작업물, 결과물 저장 큐를 넘겨준다
    # proc.run() # 프로세스의 활동을 출력한다
                 # 이 메서드를 사용하지 않으면 (쥬피터의 경우) CMD에 프로세스 활동이 출력된다
    proc.start() # 프로세스를 시작한다(프로세스가 실제로 생성된다)
    processes.append(proc) # 추후 프로세스들을 제어하기 위해 프로세스들을 추적한다

results = []
count = 0
while count != proc_num:
    if not queue.empty(): # 큐가 비어있지 않을 경우, 큐 안의 결과물을 반환한다
        results.append(queue.get())
        count += 1
for i in range(len(results)):
    print(results[i])


for proc in processes:
    proc.join() # join() 메서드를 호출한 프로세스가 종료될 때까지 주 프로세스가 해당 하위 프로세스를 기다린다(공식 문서는 이를 block이라고 표현하고 있다)
                # 좀비 프로세스가 생성되는 것을 막기 위해 가급적이면 .join() 을 사용해서 프로세스를 종료시키는 것이 좋다
for proc in processes:
    proc.terminate() # 프로세스를 종료한다(자원 반환은 수행하지 않는다)
                     # 부모 프로세스를 종료하더라도 그 밑의 자식 프로세스들은 종료되지 않는다(이 경우 자식 프로세스들은 "기아"가 된다)
for proc in processes:
    proc.close() # 프로세스 객체를 해체하여 그것과 관련된 모든 자원들을 반환한다

In [2]:
import torch
import torch.multiprocessing as mp
from lib.worker_testing import square3, square5
import time as t

processes = [] # 프로세스 풀 생성
proc_num = 8 # 생성할 프로세스의 개수

queue = mp.Queue() # 프로세스가 반환하는 결과물을 취합하기 위한 큐 생성
                   # mp.Queue를 사용하면 주 프로세스가 큐로부터 데이터를 읽어오기 전에 하위 프로세스가 종료되어 주 프로세스의 큐 읽기 작업에 오류가 발생할 수 있다
                   # "발생할 수 있다"라고 언급한 것 처럼 그렇지 않은 경우도 있긴하다
x = torch.arange(1, 65)
x.share_memory_() # 공유 메모리를 사용하지 않아도 공유 메모리처럼 처리되는 것처럼 보인다
                  # 공유 메모리를 사용하면 다른 수단을 통해 프로세스 간 데이터를 공유하는 것보다 훨씬 빠르고 효율적이다

print("Current available Host System cpu core number :", mp.cpu_count())

start = t.time()
# mp.Pool()은 어떤 프로세스에게 어떤 작업을 배정할지를 자동으로 결정하는 반면,
# mp.Process()는 어떤 프로세스에게 어떤 작업을 배정할지를 수동으로 지정한다
# 프로세스 작업 수행 과정은 CMD에서 확인할 수 있다(.run() 메서드를 사용하면 직접 출력해 볼 수 있다)
for i in range(proc_num):
    start_index = 8 * i
    # multiprocessing에서 process는 Process 객체가 생성되고 이 객체의 start() 메서드가 호출되었을 때 소환된다(spawn)
    # Process 객체는 분리된 프로세스 내에서 실행중인 활동을 나타낸다
    proc = mp.Process(target=square5, args=(i, x[start_index:start_index+8])) # 프로세스 번호, 작업물, 결과물 저장 큐를 넘겨준다
    # proc.run() # 프로세스의 활동을 출력한다
                 # 이 메서드를 사용하지 않으면 (쥬피터의 경우) CMD에 프로세스 활동이 출력된다
    proc.start() # 프로세스를 시작한다(프로세스가 실제로 생성된다)
    processes.append(proc) # 추후 프로세스들을 제어하기 위해 프로세스들을 추적한다

print(f"running time : {t.time() - start:.6f} sec")


for proc in processes:
    proc.join() # join() 메서드를 호출한 프로세스가 종료될 때까지 주 프로세스가 해당 하위 프로세스를 기다린다(공식 문서는 이를 block이라고 표현하고 있다)
                # 좀비 프로세스가 생성되는 것을 막기 위해 가급적이면 .join() 을 사용해서 프로세스를 종료시키는 것이 좋다
for proc in processes:
    proc.terminate() # 프로세스를 종료한다(자원 반환은 수행하지 않는다)
                     # 부모 프로세스를 종료하더라도 그 밑의 자식 프로세스들은 종료되지 않는다(이 경우 자식 프로세스들은 "기아"가 된다)
for proc in processes:
    proc.close() # 프로세스 객체를 해체하여 그것과 관련된 모든 자원들을 반환한다

print(x)

Current available Host System cpu core number : 16
running time : 0.062992 sec
tensor([   1,    4,    9,   16,   25,   36,   49,   64,   81,  100,  121,  144,
         169,  196,  225,  256,  289,  324,  361,  400,  441,  484,  529,  576,
         625,  676,  729,  784,  841,  900,  961, 1024, 1089, 1156, 1225, 1296,
        1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304,
        2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600,
        3721, 3844, 3969, 4096])


In [2]:
'''
    하위 프로세스에서 다시 멀티 프로세싱을 구현하여 작업을 처리하는 것이 가능하다
    (이 때, 공유 메모리 기법을 사용해야 안정적이면서 단순하게 구현할 수 있다)
'''
import torch
import torch.multiprocessing as mp
from lib.worker_testing2 import worker
import time as t

processes = [] # 프로세스 풀 생성
proc_num = 1 # 생성할 프로세스의 개수
sub_proc_num = 8

x = torch.arange(1, 65).share_memory_()


print("Current available Host System cpu core number :", mp.cpu_count())

start = t.time()
# mp.Pool()은 어떤 프로세스에게 어떤 작업을 배정할지를 자동으로 결정하는 반면,
# mp.Process()는 어떤 프로세스에게 어떤 작업을 배정할지를 수동으로 지정한다
# 프로세스 작업 수행 과정은 CMD에서 확인할 수 있다(.run() 메서드를 사용하면 직접 출력해 볼 수 있다)
for i in range(proc_num):
    # multiprocessing에서 process는 Process 객체가 생성되고 이 객체의 start() 메서드가 호출되었을 때 소환된다(spawn)
    # Process 객체는 분리된 프로세스 내에서 실행중인 활동을 나타낸다
    proc = mp.Process(target=worker, args=(x, sub_proc_num)) # 프로세스 번호, 작업물, 결과물 저장 큐를 넘겨준다
    # proc.run() # 프로세스의 활동을 출력한다
                 # 이 메서드를 사용하지 않으면 (쥬피터의 경우) CMD에 프로세스 활동이 출력된다
    proc.start() # 프로세스를 시작한다(프로세스가 실제로 생성된다)
    processes.append(proc) # 추후 프로세스들을 제어하기 위해 프로세스들을 추적한다

for proc in processes:
    proc.join() # join() 메서드를 호출한 프로세스가 종료될 때까지 주 프로세스가 해당 하위 프로세스를 기다린다(공식 문서는 이를 block이라고 표현하고 있다)
                # 좀비 프로세스가 생성되는 것을 막기 위해 가급적이면 .join() 을 사용해서 프로세스를 종료시키는 것이 좋다
for proc in processes:
    proc.terminate() # 프로세스를 종료한다(자원 반환은 수행하지 않는다)
                     # 부모 프로세스를 종료하더라도 그 밑의 자식 프로세스들은 종료되지 않는다(이 경우 자식 프로세스들은 "기아"가 된다)
for proc in processes:
    proc.close() # 프로세스 객체를 해체하여 그것과 관련된 모든 자원들을 반환한다

print(x)
print(f"running time : {t.time() - start:.6f} sec")

Current available Host System cpu core number : 16
running time : 0.020091 sec
tensor([   1,    4,    9,   16,   25,   36,   49,   64,   81,  100,  121,  144,
         169,  196,  225,  256,  289,  324,  361,  400,  441,  484,  529,  576,
         625,  676,  729,  784,  841,  900,  961, 1024, 1089, 1156, 1225, 1296,
        1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304,
        2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600,
        3721, 3844, 3969, 4096])


In [4]:
a = torch.tensor([1])
print(a)
temp = a.add(1)
print(temp, a)
a.add_(1)
print(a)

tensor([1])
tensor([2]) tensor([1])
tensor([2])


In [1]:
import torch
import torch.nn as nn

''' 
    < 실험 결과 >
    - layer2 계층에서 도출된 출력으로 손실을 계산하여 역전파를 수행했을 경우
    ==> layer2 계층에 대한 기울기만 계산된다
    - med_out 계층에서 도출된 출력으로 손실을 계산하여 역전파를 수행했을 경우
    ==> med_out과 layer1 계층에 대한 기울기만 계산된다
    - layer2와 med_out 계층에서 도출된 출력으로 각각 손실을 계산하여 합산한 후 역전파를 수행했을 경우
    ==> 모든 계층에 대한 기울기가 계산된다
'''
class TestModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(10, 10)
        self.med_out = nn.Linear(10, 4)
        self.layer2 = nn.Linear(10, 10)
        
    def forward(self, x):
        out1 = self.layer1(x)
        out2 = self.med_out(out1)
        out3 = self.layer2(out1.detach())
        return out2, out3
    
model = TestModel()

x = torch.randn(1, 10)
# a = model(x)[0]
# a = model(x)[1]
# a.mean().backward()
a, b = model(x)
total = a.mean() + b.mean()
total.backward()
print("layer1's gradient : ", model.layer1.weight.grad)
print("med_out's gradient : ",model.med_out.weight.grad)
print("layer2's gradient : ",model.layer2.weight.grad)

layer1's gradient :  tensor([[-1.5759e-01,  2.8993e-02, -1.0841e-01, -1.0250e-01,  1.0567e-01,
          2.0454e-04,  1.8451e-01, -2.6333e-03, -1.8413e-02, -1.2031e-01],
        [ 5.7566e-02, -1.0591e-02,  3.9601e-02,  3.7442e-02, -3.8601e-02,
         -7.4715e-05, -6.7401e-02,  9.6192e-04,  6.7260e-03,  4.3948e-02],
        [ 1.9029e-01, -3.5009e-02,  1.3090e-01,  1.2377e-01, -1.2760e-01,
         -2.4698e-04, -2.2280e-01,  3.1797e-03,  2.2233e-02,  1.4527e-01],
        [ 3.8111e-02, -7.0115e-03,  2.6217e-02,  2.4788e-02, -2.5556e-02,
         -4.9464e-05, -4.4622e-02,  6.3683e-04,  4.4528e-03,  2.9095e-02],
        [-6.7242e-02,  1.2371e-02, -4.6257e-02, -4.3736e-02,  4.5090e-02,
          8.7273e-05,  7.8730e-02, -1.1236e-03, -7.8565e-03, -5.1335e-02],
        [-8.0289e-03,  1.4771e-03, -5.5232e-03, -5.2222e-03,  5.3838e-03,
          1.0421e-05,  9.4006e-03, -1.3416e-04, -9.3809e-04, -6.1295e-03],
        [ 9.8327e-02, -1.8090e-02,  6.7641e-02,  6.3954e-02, -6.5934e-02,
         -1

In [None]:
''' F.normalize() 작동 방식 실험 1 '''

b = torch.randn(2, 5)
l2_norm_b = b[0].pow(2).sum(dim=0).sqrt()
print("b:", b)
# print(b[0].pow(2))
# print(b[0].pow(2).sum(dim=0))
print("b[0]'s l2 norm1:", b[0].pow(2).sum(dim=0).sqrt()) # l2 norm
print("b[0]'s l2 norm2:",b[0].norm(p=2))
print("b's l2 norm for dim=0:", b.norm(p=2, dim=0)) # dim=0는 열벡터를 대상으로 l2-norm을 계산한다
                                                    # 정확히는 dim=0를 구성하는 원소들로 (열)벡터를 만들고, 그것들 각각에 대하여 l2-norm을 계산한다
print("b's l2 norm for dim=1:", b.norm(p=2, dim=1)) # dim=1은 행벡터를 대상으로 l2-norm을 계산한다
                                                    # 정확히는 dim=1를 구성하는 원소들로 (열)벡터를 만들고, 그것들 각각에 대하여 l2-norm을 계산한다
print("result:", b[0] / l2_norm_b)
print(F.normalize(b, dim=0)) # dim=0는 열벡터를 대상으로 l2-norm을 구한 후, 그것으로 해당 열벡터를 나누어준다
                             # 정확히는 dim=0를 구성하는 원소들로 열벡터를 만들어 그것들 각각에 대해 l2-norm을 계산한 후, 그 결과를 그것에 대응하는 열벡터에 나눠준다
print(F.normalize(b, dim=1)) # dim=1는 행벡터를 대상으로 l2-norm을 구한 후, 그것으로 해당 열벡터를 나누어준다
                             # 정확히는 dim=1를 구성하는 원소들로 열벡터를 만들어 그것들 각각에 대해 l2-norm을 계산한 후, 그 결과를 그것에 대응하는 열벡터에 나눠준다

In [None]:
''' F.normalize() 작동 방식 실험 2 '''

(b[0][0])**2 + (b[1][0])**2, ((b[0][0])**2 + (b[1][0])**2).sqrt()

In [None]:
''' F.normalize() 작동 방식 실험 3 '''

temp = ((b[0][0])**2 + (b[1][0])**2).sqrt()
b[:,0], b[:,0] / temp, F.normalize(b, dim=1), b[0] / b[0].norm(p=2)

In [2]:
import torch
from torch import nn, optim
from torch.nn import functional as F
import torch.multiprocessing as mp
import gymnasium as gym

class ActorCritic(nn.Module):
    def __init__(self):
        super().__init__()
        self.l1 = nn.Linear(4, 25)
        self.l2 = nn.Linear(25, 50)
        self.actor_lin1 = nn.Linear(50, 2)
        self.l3 = nn.Linear(50, 25)
        self.critic_lin1 = nn.Linear(25, 1)

    def forward(self, x):
        x = F.normalize(x, dim=0) # F.normalize()에서 도출되는 결과의 범위는 [-1.0, 1.0]이다
        y = F.relu(self.l1(x))
        y = F.relu(self.l2(y))
        actor = F.log_softmax(self.actor_lin1(y), dim=0) # 음의 로그 확률 값을 모델 단에서 미리 계산한다(정확히는 그냥 로그 확률이다)
        c = F.relu(self.l3(y.detach())) # 여기서 계산 그래프가 분리된다
        critic = torch.tanh(self.critic_lin1(c)) # 가치 함수 값을 tanh을 사용하여 [-1.0, 1.0] 구간의 값으로 변환시켜준 것은
                                                 # 이익 계산에서의 Returns를 정규화하여 [-1.0, 1.0] 구간의 값으로 변환하기 때문이다
        return actor, critic

In [4]:
def worker(t, worker_model, counter, params, lock):
    worker_env = gym.make("CartPole-v1") # 환경 불러오기
    worker_opt = optim.Adam(lr=1e-4, params=worker_model.parameters())

    for i in range(params["epochs"]):
        state_values, logprobs, rewards = run_episode(worker_env, worker_model)
        actor_loss, critic_loss, ep_len = update_params(worker_opt, state_values, logprobs, rewards)
        with lock:
            counter.value = counter.value + 1

def run_episode(worker_env, worker_model):
    cur_state = torch.from_numpy(worker_env.reset()[0]).float()
    state_values, logprobs, rewards = [], [], []
    done = False # 에피소드 종료 여부

    while (done == False):
        policy, state_value = worker_model(cur_state)
        state_values.append(state_value)
        logits = policy.view(-1) # 1차원 텐서로 변환한다
        action_dist = torch.distributions.categorical.Categorical(logits=logits) # 카테고리컬 분포는 시행 횟수 n이 1인 다항분포와 동일한 분포이다
                                                                    # 여기서의 역할은 주어진 로짓을 확률분포로 변환하여 이 확률분포를 토대로 표본을 추출할 수 있도록 하는 것이다
        action = action_dist.sample()
        logprob_ = logits[action]
        logprobs.append(logprob_)
        next_state, _, done, _, _ = worker_env.step(action.numpy())
        cur_state = torch.from_numpy(next_state).float()
        if done:
            reward = -10
            worker_env.reset()
        else:
            reward = 1.0
        rewards.append(reward)

    return state_values, logprobs, rewards

def update_params(worker_opt, state_values, logprobs, rewards, clc=0.1, gamma=0.95):
    rewards = torch.tensor(rewards).flip(dims=(0,)).view(-1)
    logprobs = torch.stack(logprobs).flip(dims=(0,)).view(-1) # torch.stack 대신 torch.tensor를 사용해도 된다
    state_values = torch.stack(state_values).flip(dims=(0,)).view(-1) # torch.stack 대신 torch.tensor를 사용해도 된다
    Returns = [] # 반환값 저장
    ret_ = torch.tensor([0])
    for r in range(rewards.shape[0]): # 보상의 개수만큼 반복을 수행한다
        ret_ = rewards[r] + gamma * ret_ # 에피소드의 마지막 타임 스텝부터 반환값을 계산한다
        Returns.append(ret_)

    Returns = torch.stack(Returns).view(-1) # 텐서가 원소인 리스트를 torch.tensor를 통해 텐서로 변환하면 오류가 발생하는데, torch.stack을 사용하면 오류없이 텐서로 변환할 수 있다
    Returns = F.normalize(Returns, dim=0) # 반환값들에 대해 정규화를 수행하여 [-1.0, 1.0] 구간의 값으로 변환한다
                                          # 이것때문에 비평자의 출력에 tanh를 적용한 것이다
    actor_loss = -1 * logprobs * (Returns - state_values.detach())
    critic_loss = torch.pow(state_values - Returns, 2)
    loss = actor_loss.sum() + clc * critic_loss.sum() # 행위자가 비평자보다 더 빨리 학습하도록 하기 위해 clc=0.1을 곱한다
                                                      # 비평자의 전체 손실 중 일부로만 역전파를 수행하여 비평자의 학습을 지연시킨다
    # 역전파 수행
    worker_opt.zero_grad()
    loss.backward()
    worker_opt.step()

    return actor_loss, critic_loss, len(rewards)

In [1]:
from lib.worker3 import worker

''' 몬테카를로 방식 분산 이익 행위자-비평자 학습 '''

main_model = ActorCritic()
model.share_memory() # share_memory() 메서드는 이를 호출한 텐서를 shared_memory로 이동시킨다
                           # 여기서는 shared_memory에 모델의(여기서는 ActorCritic()) 매개변수를 저장하여,
                           # 서로의 모델을 훈련시키려는 각 프로세스가 동일한 모델 매개변수를 공유하도록 한다
processes = []
params = {
    "epochs":1000,
    "n_workers":7
}

'''
    <참고>
    https://devocean.sk.com/blog/techBoardDetail.do?ID=163669
    https://realpython.com/python-gil/
    
    파이썬에서 생성되는 모든 객체들은 reference count variable(이하 참조 횟수 변수라 칭한다)를 갖는다
    이는 해당 객체를 가리키는 참조자들의 수를 추적하여 메모리 관리를 하기 위함이다
    어떤 객체의 참조 횟수가 0에 다다르면(참조 횟수 변수가 0값을 가지게 되면) 해당 객체가 차지하고 있는 메모리는 반환된다(즉, 객체가 소멸된다)
    이렇듯 참조 횟수 변수는 매우 중요하기 때문에 정확히 추적되어야 하는데, 이를 위해 스레드 간 공유되는 모든 데이터 구조체들(객체)에 lock을 추가할 수 있다
    하지만 (공유되는) 각각의 객체에 모두 lock을 추가한다는 것은 deadlock(교착 상태)을 야기하는 다중 lock 현상이 발생할 수 있다는 것을 의미한다
    따라서 파이썬에서는 이를 해결하고자 단일 lock인 GIL을 도입하여 한번에 하나의 스레드만 실행할 수 있도록 만들었다
    즉, 한 번에 하나의 스레드만 자원을 선점할 수 있게 강제한 것이다(최종적으로 lock 1개만 존재하는 셈이 된다)
    GIL 덕분에 단일 스레드 프로그램은 그로 인한 성능상 혜택을 누릴 수 있지만, 다중 스레드 프로그램은 그렇지 못하고 되려 성능 저하가 발생한다
    CPU 중심 다중 스레드 프로그램의 경우 특히나 GIL로 인해 병렬 처리가 불가능해 작업 처리 지연으로 인한 성능 저하가 심하다
    반면, I/O 중심 다중 스레드 프로그램의 경우 I/O 요청을 대기하는 데에 상당한 시간을 소비하므로(이 때 동안은 작업을 처리하지 않는다) 스레들끼리 GIL을 번갈아 가며 공유하는 형식이 되어 성능 저하가 거의 발생하지 않는다
    GIL로 인한 성능 저하를 해결하기 위해 다중 프로세싱을 사용할 수 있다
    다중 프로세싱을 사용하더라도 작업 처리 시간이 그에 비례하여 줄어드는 것은 아닌데, 이는 프로세스 관리 자체에 overhead(추가 비용)가 존재하기 때문이다
    다중 프로세스는 다중 스레드보다 훨씬 더 무거운 작업이기 때문에 규모를 키우면 병목 현상이 발생하게 된다는 점을 명심해야 한다
'''
'''
    <참고>
    https://stackoverflow.com/questions/74635994/pytorchs-share-memory-vs-built-in-pythons-shared-memory-why-in-pytorch-we
    
    python 내장 모듈인 multiprocessing이 공유 메모리(shared_memory)를 다루는 방법은 아래와 같다
    1. 공유 메모리로부터 공유 객체를 생성한다
    2. 다른 프로세스와 공유 객체를 공유할 때는 공유 메모리의 이름, 공유 객체의 데이터 크기(혹은 shape) 및 자료형을 serialize하여 전달한다
    3. 다른 프로세스는 전달받은 것을 deserialize한 후, 이를 바탕으로 다시 공유 메모리로부터 공유 객체를 복원하여 사용한다
    4. 공유 객체에 대한 변형은 그대로 공유 메모리에 반영되므로, 모든 프로세스가 동일한 영향을 받게 된다
'''
'''
    여기서의 mp.Value는 python의 multiprocessing 모듈에 존재하는 클래스이다
    torch.multiprocessing이 python 내장 모듈인 multiprocessing의 warraper이자 100% 호환되기 때문에 torch.multiprocessing에서도 mp.Value를 사용할 수 있는 것이다
    mp.Value는 공유 메모리(shared memory)로부터 할당된(생성된) ctypes(C언어와 호환되는 자료형이다) 객체를 반환한다
    기본적으로 mp.Value로 반환된 값은 실제로는 공유 메모리로부터 생성된 ctypes 객체에 대한 동기화된(lock 때문에 그렇다) wrapper이다(당연하게도 이 역시 객체이다)
    생성된 ctypes 객체 자체는 mp.Value에서 반환된 객체(wrapper)의 value 속성을 통해 접근할 수 있다
    "i"는 typecode_or_type의 인자로, 이것은 반환되는 객체의 자료형을 결정한다
    typecode_or_type은 ctypes 형이나 또는 array 모듈에서 사용되는 종류의 단일 문자 typecode가 될 수 있다
    "+="와 같이 읽기와 쓰기를 함께 수반하는 연산들은 "원자적(atomic) 연산"을 지원하지 않는다
    ==> 원자적 연산이란 여러 스레드 또는 프로세스에서 동시에 특정 데이터에 접근해도 해당 데이터의 일관성을 보장하는 연산을 말한다
    따라서 공유 객체에 대해 증감 연산 같은 연산을 수행하고 싶다면, 해당 공유 객체를 선점하고 반환하는 과정(lock)을 반드시 명시해야 한다
'''
counter = mp.Value("i", 0)
'''
    <참고>
    https://a-researcher.tistory.com/24
    
    mp.Lock()은 non-recursive lock 객체를 생성하는데, 이를 통해 프로세스들이 공유 객체를 사용함에 있어 서로 간섭하지 못하도록 lock을 통해 우선 선점 후 사용하도록 한다
    Lock에는 아래와 같이 2종류가 존재한다
    - non-recursive lock : 단 한번의 lock만 획득하는 것(프로세스 간 또는 스레드 간 lock이 이에 해당한다)
    - recursive lock : lock을 획득한 상태에서 다시 lock을 획득하는 것
        - 일반적으로 lock을 두 번 호출하면 무한 대기 상태에 빠지게 되지만, recursive lock은 lock을 재귀적으로 획득할 수 있도록 허용한다(lock의 획득을 수반하는 함수를 재귀적으로 호출할 때 lock의 획득도 재귀적으로 할 수 있도록 허용한다)
        - 이렇게 재귀적으로 획득한 lock은 우선 순위의 의미가 생기며, 가장 마지막에 호출된 lock이 가장 높은 우선 순위를 갖는다
        - lock을 재귀적으로 획득하는 동안은 첫 lock을 통해 선점한 공유 자원에 대하여 계속 소유권을 가지고 있게 되며, lock을 획득한 횟수만큼 해제해 주어야 lock이 완전히 해제된다
        - recursive lock을 해석하면 "A라는 함수가 앞서 lock을 획득했는데, 나도 그 A라는 함수니까 해당 공유 자원에 접근할 수 있어. 그런데 내가 앞선 A함수보다 용건이 급해서 그 공유 자원을 먼저 사용할게(lock)." 이라고 할 수 있다
'''
# lock = mp.Lock()

for i in range(params["n_workers"]):
    p = mp.Process(target=worker, args=(i, main_model, counter, params)) # args는 프로세스에게 할당할 작업의 인자를 의미한다
    p.start() # 프로세스를 실행한다
    processes.append(p)

for p in processes:
    p.join() # 프로세스가 종료될 때 까지 block시킨다
             # join() 메서드를 사용하지 않으면 자식 프로세스는 유휴상태(idle)에 들어가고 종료되지 않아(부모 프로세스는 종료된다) 좀비 프로세스가 되어 손수 kill해줘야만 소멸하게 된다
             # 즉, join() 메서드가 하는 일은 부모 프로세스가 자식 프로세스보다 먼저 종료되지 못하도록 막는다

for p in processes:
    p.terminate() # 프로세스를 종료한다
                  # 부모 프로세스는 terminate() 메서드를 사용하지 않아도 자동으로 종료된다(하지만 자식 프로세스는 자동으로 종료되지 않는다)

print(counter.value, processes[1].exitcode) # 공유 객체에 저장된 값을 출력한다
                                            # .exitcode는 자식 프로세스의 종료 코드(exit code)이다
                                            # 자식 프로세스가 아직 종료되지 않았다면 "None"을 반환하고,
                                            # 정상적으로 종료되었다면 "0"을 반환한다

NameError: name 'ActorCritic' is not defined

In [1]:
''' 몬테카를로 방식 분산 이익 행위자-비평자 학습 '''
import os
import torch.multiprocessing as mp
from lib.worker1 import worker

'''
    <참고>
    https://devocean.sk.com/blog/techBoardDetail.do?ID=163669
    https://realpython.com/python-gil/
    
    파이썬에서 생성되는 모든 객체들은 reference count variable(이하 참조 횟수 변수라 칭한다)를 갖는다
    이는 해당 객체를 가리키는 참조자들의 수를 추적하여 메모리 관리를 하기 위함이다
    어떤 객체의 참조 횟수가 0에 다다르면(참조 횟수 변수가 0값을 가지게 되면) 해당 객체가 차지하고 있는 메모리는 반환된다(즉, 객체가 소멸된다)
    이렇듯 참조 횟수 변수는 매우 중요하기 때문에 정확히 추적되어야 하는데, 이를 위해 스레드 간 공유되는 모든 데이터 구조체들(객체)에 lock을 추가할 수 있다
    하지만 (공유되는) 각각의 객체에 모두 lock을 추가한다는 것은 deadlock(교착 상태)을 야기하는 다중 lock 현상이 발생할 수 있다는 것을 의미한다
    따라서 파이썬에서는 이를 해결하고자 단일 lock인 GIL을 도입하여 한번에 하나의 스레드만 실행할 수 있도록 만들었다
    즉, 한 번에 하나의 스레드만 자원을 선점할 수 있게 강제한 것이다(최종적으로 lock 1개만 존재하는 셈이 된다)
    GIL 덕분에 단일 스레드 프로그램은 그로 인한 성능상 혜택을 누릴 수 있지만, 다중 스레드 프로그램은 그렇지 못하고 되려 성능 저하가 발생한다
    CPU 중심 다중 스레드 프로그램의 경우 특히나 GIL로 인해 병렬 처리가 불가능해 작업 처리 지연으로 인한 성능 저하가 심하다
    반면, I/O 중심 다중 스레드 프로그램의 경우 I/O 요청을 대기하는 데에 상당한 시간을 소비하므로(이 때 동안은 작업을 처리하지 않는다) 스레들끼리 GIL을 번갈아 가며 공유하는 형식이 되어 성능 저하가 거의 발생하지 않는다
    GIL로 인한 성능 저하를 해결하기 위해 다중 프로세싱을 사용할 수 있다
    다중 프로세싱을 사용하더라도 작업 처리 시간이 그에 비례하여 줄어드는 것은 아닌데, 이는 프로세스 관리 자체에 overhead(추가 비용)가 존재하기 때문이다
    다중 프로세스는 다중 스레드보다 훨씬 더 무거운 작업이기 때문에 규모를 키우면 병목 현상이 발생하게 된다는 점을 명심해야 한다
'''
'''
    <참고>
    https://stackoverflow.com/questions/74635994/pytorchs-share-memory-vs-built-in-pythons-shared-memory-why-in-pytorch-we
    
    python 내장 모듈인 multiprocessing이 공유 메모리(shared_memory)를 다루는 방법은 아래와 같다
    1. 공유 메모리로부터 공유 객체를 생성한다
    2. 다른 프로세스와 공유 객체를 공유할 때는 공유 메모리의 이름, 공유 객체의 데이터 크기(혹은 shape) 및 자료형을 serialize하여 전달한다
    3. 다른 프로세스는 전달받은 것을 deserialize한 후, 이를 바탕으로 다시 공유 메모리로부터 공유 객체를 복원하여 사용한다
    4. 공유 객체에 대한 변형은 그대로 공유 메모리에 반영되므로, 모든 프로세스가 동일한 영향을 받게 된다
'''
'''
    여기서의 mp.Value는 python의 multiprocessing 모듈에 존재하는 클래스이다
    torch.multiprocessing이 python 내장 모듈인 multiprocessing의 warraper이자 100% 호환되기 때문에 torch.multiprocessing에서도 mp.Value를 사용할 수 있는 것이다
    mp.Value는 공유 메모리(shared memory)로부터 할당된(생성된) ctypes(C언어와 호환되는 자료형이다) 객체를 반환한다
    기본적으로 mp.Value로 반환된 값은 실제로는 공유 메모리로부터 생성된 ctypes 객체에 대한 동기화된(lock 때문에 그렇다) wrapper이다(당연하게도 이 역시 객체이다)
    생성된 ctypes 객체 자체는 mp.Value에서 반환된 객체(wrapper)의 value 속성을 통해 접근할 수 있다
    "i"는 typecode_or_type의 인자로, 이것은 반환되는 객체의 자료형을 결정한다
    typecode_or_type은 ctypes 형이나 또는 array 모듈에서 사용되는 종류의 단일 문자 typecode가 될 수 있다
    "+="와 같이 읽기와 쓰기를 함께 수반하는 연산들은 "원자적(atomic) 연산"을 지원하지 않는다
    ==> 원자적 연산이란 여러 스레드 또는 프로세스에서 동시에 특정 데이터에 접근해도 해당 데이터의 일관성을 보장하는 연산을 말한다
    따라서 공유 객체에 대해 증감 연산 같은 연산을 수행하고 싶다면, 해당 공유 객체를 선점하고 반환하는 과정(lock)을 반드시 명시해야 한다
'''
counter = mp.Value("i", 0)
sub_proc_num = 7
model_file_name = "test01"
model_save_path = os.path.join(os.getcwd(), "parameters", model_file_name)

''' 프로세스 생성 및 실행 '''
p = mp.Process(target=worker, args=(sub_proc_num, counter, model_save_path)) # args는 프로세스에게 할당할 작업의 인자를 의미한다
p.start() # 프로세스가 실행된다

''' 하위 프로세스 작업 대기 및 종료 '''
p.join() # 프로세스가 종료될 때 까지 block시킨다
         # join() 메서드를 사용하지 않으면 자식 프로세스는 유휴상태(idle)에 들어가고 종료되지 않아(부모 프로세스는 종료된다) 좀비 프로세스가 되어 손수 kill해줘야만 소멸하게 된다
         # 즉, join() 메서드가 하는 일은 부모 프로세스가 자식 프로세스보다 먼저 종료되지 못하도록 막는다

print(f"sub process's exitcode: '{p.exitcode}") # 공유 객체에 저장된 값을 출력한다
                                            # .exitcode는 자식 프로세스의 종료 코드(exit code)이다
                                            # 자식 프로세스가 아직 종료되지 않았다면 "None"을 반환하고,
                                            # 정상적으로 종료되었다면 "0"을 반환한다

p.terminate() # 프로세스를 종료한다
              # 부모 프로세스는 terminate() 메서드를 사용하지 않아도 자동으로 종료된다(하지만 자식 프로세스는 자동으로 종료되지 않는다)
p.close() # 프로세스 객체를 해체하여 그것과 관련된 모든 자원들을 회수한다

sub process's exitcode: '0


In [6]:
''' 학습 후 시험(test) '''

model = ActorCritic()
model.load_state_dict(torch.load(model_save_path, weights_only=True))
env = gym.make("CartPole-v1", render_mode="rgb_array") # 카트폴 환경 불러오기

for i in range(100):
    cur_state = torch.from_numpy(env.reset()[0]).float() # 환경 초기화 및 초기 상태 반환
    logits, value = model(cur_state)
    action_dist = torch.distributions.categorical.Categorical(logits=logits) # 옆의 코드는 카테고리컬 분포 말고 다항 분포로도 구현할 수 있다
    action = action_dist.sample() # logit에 근거한 확률분포를 바탕으로 2개의 행동 중 하나를 뽑는다
    next_state, reward, done, _, _ = env.step(action.numpy())
    if done:
        print("Lost: rechead end of game")
        cur_state = torch.from_numpy(env.reset()[0]).float()
    env.render() # 에이전트가 현재 보고 있는 것의 시각화를 위해 환경을 렌더링 한다
                 # 이전에는 render() 메서드가 인자를 받았지만, 지금은 이 인자를 make() 메서드가 받도록 수정되었다
print("model did not failed on game!!!")

model did not failed on game!!!


In [3]:
import gymnasium as gym

env = gym.make("CartPole-v1")
temp = env.reset()
print(temp[0], type(temp[0]))

[-0.03231152 -0.00984595 -0.02659299  0.00150169] <class 'numpy.ndarray'>
