# 데이터 분석 멀티 프로세싱

대규모 데이터 세트로 작업할 때 일반적으로 처리 속도가 느려지는 문제 발생. 코드 실행 시간을 최적화하고 프로세스 속도를 높이려면 결국 병렬화를 방법 중 하나로 고려해야 함.

In [1]:
# 데이터분석 필요 라이브러리 등록
import pandas as pd
import numpy as np
import tqdm

In [2]:
# 멀티프로세싱 필요 라이브러리 등록
import multiprocessing as mp

from itertools import repeat
from multiprocessing import Process, Manager
from multiprocessing import Pool

In [3]:
## 프로세스 개수 확인
num_processes = mp.cpu_count()
f'Number of CPU : {num_processes}'

'Number of CPU : 16'

### 기본 빅데이터 처리

In [4]:
# 파일 읽기
dfEmployee = pd.read_csv('./data/employee.csv', low_memory=False)

In [5]:
dfEmployee.tail()

Unnamed: 0,ecode,ename
93,7638,AMALA
94,7614,RAKSHITHA
95,7621,HANUMAKKA
96,7628,NIRMALA
97,101,BEERAIAH


In [6]:
dfEmployee.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 98 entries, 0 to 97
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   ecode   98 non-null     object
 1   ename   98 non-null     object
dtypes: object(2)
memory usage: 1.7+ KB


In [7]:
# 큰 데이터 불러오기, 71851개
dfData = pd.read_csv('./data/data.csv', low_memory=False)

In [9]:
dfData.tail()

Unnamed: 0,date,employee_code,efficiency
71846,31-12-2018,162,97.094803
71847,31-12-2018,145,104.79063
71848,31-12-2018,104,85.029753
71849,31-12-2018,162,97.691327
71850,31-12-2018,1083,96.823055


In [10]:
dfData.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71851 entries, 0 to 71850
Data columns (total 3 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   date           71851 non-null  object 
 1   employee_code  71851 non-null  int64  
 2   efficiency     71851 non-null  float64
dtypes: float64(1), int64(1), object(1)
memory usage: 1.6+ MB


In [12]:
# dfData에서 employee_code로 데이터 가져오기
employee_codes = []
employee_codes.extend(list(dfData['employee_code'].unique()))

In [13]:
len(employee_codes)

27

In [14]:
# dfEmployee에서 위의 empCode와 매핑되는 사용자 명 가져오기
dictEnames = {}

for i, row in dfEmployee.iterrows():
    dictEnames[row['ecode']] = row['ename']

In [15]:
dictEnames

{'165': 'Harry Crouch',
 '183': 'SIVANNA',
 '190': 'JAYARAMAPPA',
 '191': 'Sadiyah Mays',
 '193': 'Macie Mccartney',
 '194': 'Adelaide Brook',
 '195': 'Shanon Mendez',
 '196': 'Isaak Burnett',
 '197': 'Susie Ramsey',
 '207': 'Sara Donnelly',
 '200': 'Emmanuel Waller',
 '103': 'Elif Noble',
 '122': 'DANEEL',
 '201': 'RAMACHANDRAREDDY',
 '203': 'Farah Caldwell',
 '210': 'Ashraf Rawlings',
 '211': 'Youssef Quintana',
 '238': 'RAMANJINAPPA',
 '215': 'Benn Kearney',
 '218': 'Mekhi Douglas',
 '220': 'Irene Jeffery',
 '250': 'Nathaniel Barron',
 '143': 'Kobi Hogan',
 '145': 'Sidney Ho',
 '1071': 'Wiktoria Wilkerson',
 '161': 'NARASHIMAHAMULU',
 '182': 'Teodor Greene',
 '249': 'Guto Bowden',
 '244': 'Ivy-Rose Chang',
 '68': 'Sneha Carr',
 '1054': 'Laurel Steele',
 '202': 'Bianca Naylor',
 '230': 'Nakita Miranda',
 '228': 'GUPTA',
 '1083': 'Arwel Eastwood',
 '86': 'Shiv Odling',
 '82': 'Patricia Ali',
 '233': 'Mehmet Mccaffrey',
 '206': 'Safiyah Salt',
 '189': 'Carlton Gunn',
 '184': 'Macey Row

In [16]:
# workers.py 작성. 또는 여기에 함수만 정의
def process_rows(data):
    d = data[0]
    df = data[1]
    for index,row in df.iterrows():
        e_c = int(row['pt_ecode1'])
        month = int(row['pt_date'].month)
        prod_eff = int(row['prod_eff'])
        if (e_c,month) in list(d.keys()):
            d[e_c,month] = (prod_eff+d[e_c,month])/2
        else:
            d[e_c,month] = prod_eff


##### 동작 여부 모름!

In [17]:
# 멀티프로세스 실행
num_partitions = num_processes
manager = Manager()  # 멀티프로세싱 매니저 
d = manager.dict()
dfSplit = np.array_split(dfData, num_partitions) # 프로세스 갯수만큼 나눈다
pool = Pool(num_processes)
shared_arg = repeat(d, num_partitions)

for _ in tqdm.tqdm(pool.map(process_rows, zip(shared_arg, dfSplit)), total=num_partitions):
    pass

pool.close()
pool.join()

  return bound(*args, **kwds)
