In [12]:
# 일반 연산
# os : 운영체제의 명령을 전달할 수 있는 패키지
import time, os

# os.getpid() : 프로세스 : 1프로그램에 1프로세스 (CPU 코어마다 할당된 일꾼 : 프로세스 : 1일꾼마다 1프로그램)
def work_func(x):
    print("value %s is in PID : %s" % (x, os.getpid()))
    time.sleep(1)
    return x**5

def main():
    start = int(time.time())
    print(list(map(work_func, range(0,12))))
    print("***run time(sec) :", int(time.time()) - start)

if __name__ == "__main__":
    main()

value 0 is in PID : 354378
value 1 is in PID : 354378
value 2 is in PID : 354378
value 3 is in PID : 354378
value 4 is in PID : 354378
value 5 is in PID : 354378
value 6 is in PID : 354378
value 7 is in PID : 354378
value 8 is in PID : 354378
value 9 is in PID : 354378
value 10 is in PID : 354378
value 11 is in PID : 354378
[0, 1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049, 100000, 161051]
***run time(sec) : 12


In [13]:
# sklearn 과 같은 머신러닝에서는 자동으로 연산작업 병렬처리를 하지만, 
# pandas에서는 따로 제공하지 않기에 multiprocessing 라이브러리의 pool과 process를 활용하여
# 병렬로 연산 작업 처리

import time, os
from multiprocessing import Pool

def work_func(x):
    print("value %s is in PID : %s" % (x, os.getpid()))
    time.sleep(1)
    return x**5

def main():
    start = int(time.time())
    num_cores = 4
    # 입력받은 job을 process에 분배(할당)하여 함수 실행이 병렬처리되도록 도움
    # num_cores : Process의 갯수 : 4개의 PID가 3개씩 작업을 할당받아 작업 시행
    pool = Pool(num_cores)
    print(pool.map(work_func, range(1,13)))
    print("***run time(sec) :", int(time.time()) - start)

if __name__ == "__main__":
    main()

value 3 is in PID : 355877value 1 is in PID : 355875
value 2 is in PID : 355876value 4 is in PID : 355878


value 5 is in PID : 355875
value 6 is in PID : 355876
value 8 is in PID : 355877value 7 is in PID : 355878

value 9 is in PID : 355875
value 10 is in PID : 355876value 11 is in PID : 355878
value 12 is in PID : 355877

[1, 32, 243, 1024, 3125, 7776, 16807, 32768, 59049, 100000, 161051, 248832]
***run time(sec) : 4


In [14]:
# CPU 프로세스의 갯수 출력
import multiprocessing as mp
mp.cpu_count()

4

In [15]:
# 데이터 프레임 전처리 병렬처리 예제
import time
import pandas as pd
from multiprocessing import Pool

df = pd.DataFrame({'date':pd.date_range('2019-01-01','2020-12-31',freq='1min')})

# 연산 시작
start = int(time.time())

df['time'] = df['date'].apply(lambda x : str(x)[-9:])

# 연산 소요시간 측정
print(int(time.time() - start))

df

4


Unnamed: 0,date,time
0,2019-01-01 00:00:00,00:00:00
1,2019-01-01 00:01:00,00:01:00
2,2019-01-01 00:02:00,00:02:00
3,2019-01-01 00:03:00,00:03:00
4,2019-01-01 00:04:00,00:04:00
...,...,...
1051196,2020-12-30 23:56:00,23:56:00
1051197,2020-12-30 23:57:00,23:57:00
1051198,2020-12-30 23:58:00,23:58:00
1051199,2020-12-30 23:59:00,23:59:00


In [16]:
import numpy as np
from multiprocessing import Pool

df = pd.DataFrame({'date':pd.date_range('2019-01-01','2020-12-31',freq='1min')})

def make_time(data):
    data['time'] = data['date'].apply(lambda x : str(x)[-9:])
    return data

def parallel_df(df,func,n_cores):
    # 데이터프레임을 n_cores갯수만큼 튜플 분해
    df_split = np.array_split(df,n_cores)
    pool = Pool(n_cores)
    # 데이터프레임 합치기
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

# 연산 시작
start = int(time.time())

df = parallel_df(df,make_time,n_cores=4)

# 연산 소요시간 측정
print(int(time.time() - start))
df

1


Unnamed: 0,date,time
0,2019-01-01 00:00:00,00:00:00
1,2019-01-01 00:01:00,00:01:00
2,2019-01-01 00:02:00,00:02:00
3,2019-01-01 00:03:00,00:03:00
4,2019-01-01 00:04:00,00:04:00
...,...,...
1051196,2020-12-30 23:56:00,23:56:00
1051197,2020-12-30 23:57:00,23:57:00
1051198,2020-12-30 23:58:00,23:58:00
1051199,2020-12-30 23:59:00,23:59:00


In [11]:
# multiprocessing외에 Threading 모들은 파이썬의 GIL(Global Interpreter Lock)라는불리우는 잠금 모델을 사용하기 때문에
# I/O 작업이 아닌 CPU 작업이 많을 경우 오히려 성능이 저하

# process와 pool의 차이
# Pool의 경우 실행되어야 할 작업이 코어수 만큼 분할되고 각 코어수 만큼 프로세스가 
# 생성되어 힐당받은 작업을 처리하는데, 
# Process의 경우 각 작업마다 새로운 프로세스가 할당되어 작업을 처리

# Pool은 처리할 일을 바닥에 뿌려놓고 알아서 분산 처리를 하게 만드는 방식이고 
# Process는 각 프로세스별로 할당량을 명시적으로 정해준 뒤 실행되는 방식