# Introductions to Parallel Processing 

## What is Parallel Processing 

가용 메모리가 한정되어 있는 상황에서 큰 데이터셋을 처리하려면 **청크(Chunk)** 단위로 쪼개어 데이터를 처리해야 했다. **병렬 처리(Parallel Processing)**은 청크를 동시에 처리하여 작업의 속도를 증가시킨다. 

**중앙 처리 장치(Centeral Processing Unit; CPU)**는 컴퓨터 연산을 처리하는 하드웨어이다. 예전의 CPU는 단일 코어로 한가지의 작업만 수행할 수 있었지만, 현재의 CPU는 멀티 코어를 사용하여 병렬 처리 작업이 가능하게 되었다. 

Python의 multiprocessing 모듈은 병렬 처리를 가능하게 하는 패키지를 제공한다.

```Python
import multiprocessing 
```

## Process 

### Process.start() 

multiprocessing의 Process() 메소드는 Process 객체를 생성하여 병렬 처리를 위한 프로세스를 생성한다. Process() 메소드의 target parameter는 처리해야 하는 프로세스(함수)를 입력한다. 생성된 Process 객체는 start() 메소드를 통해 실행된다.

```Python
import time
def wait():
    time.sleep(0.5)
    print("Done waiting")

process = multiprocessing.Process(target=wait)

# Add code here
process.start()
print("Finished")
process.join()
```

### Process.join()

해당 프로그램의 실행결과 "Finished"가 먼저 출력되고 "Done waiting"이 실행된다. Python IDE는 노출되어 있는 순으로 코드를 진행하여, Process.start() 이후 병렬 실행된 wait() 함수가 time.sleep()에 의해 지연 되면서 print("Finished")가 먼저 실행되었기 때문이다. 이후 Process.join()은 지속되고 있는 프로세스를 종료해 기다리고 있는 다른 프로세스를 실행히키고자 할 때 사용된다. 따라서 원하는 결과를 산출하기 위한 코드는 다음과 같다. 

```Python
import time
def wait():
    time.sleep(0.5)
    print("Done waiting")

process = multiprocessing.Process(target=wait)

# Add code below

process.start()
process.join()
print("Finished")
```

## Execution time of Parallele Processing 

병렬 처리의 연산 속도 향상을 위한 workflow는 아래와 같다. 아래의 workflow에서 메인 프로그램은 모든 프로세스가 종료될때까지 기다리고 각각의 결과를 종합하여 전체 결과를 출력하게 된다. 하지만 각각의 프로세스는 병렬적으로 시행되기 때문에 수행시간은 프로세스의 수로 나누어저 시행되게 된다. 

1. 각각의 연산을 chunk들로 나눈다. 
2. 각각의 chunk에 대해 프로세스를 시행한다.
3. 모든 프로세스가 종료될때까 기다린다. 
4. 결과를 종합한다. 

```Python
import time
def wait():
    time.sleep(0.5)
    print("Done waiting")

# Add code below
start = time.time()
wait()
end = time.time()
elapsed1 = end - start

start = time.time()

p1 = multiprocessing.Process(target = wait)
p2 = multiprocessing.Process(target = wait)

p1.start()
p2.start()
p1.join()
p2.join()

end = time.time()
elapsed2 = end - start
```

시행결과 elapsed1과 elapsed2는 거의 차이가 없다. 

## Parallel Processing with arguments 

어떤 프로세스에 구성된 함수에 입력값이 필요한 경우가 있는 경우, Process() 메소드는 target parameter외에 args parameter를 통해 해당 함수의 arguments를 입력받을 수 있다. 또한 입력받은 arguments는 iterable하기 때문에 객체의 형태를 그대로 대입할 수 있다.

```Python
def sum3(x, y, z):
    print(x + y + z)

def list_average(values):
    print(sum(values) / len(values))

# Add code below

sum3_process = multiprocessing.Process(target = sum3, args = [3, 2, 5])
list_average_process = multiprocessing.Process(target = list_average, args = [[1, 2, 3, 4, 5]])

sum3_process.start()
list_average_process.start()
sum3_process.join()
list_average_process.join()
```

## Shared memory 

앞선 방식의 프로세스는 함수의 값을 받을 수 없다. 함수는 독립적인 프로세스에서 실행되고 있기 때문에 각각의 메모리에  존재하며 다른 프로세스와 값을 공유하지 못한다. **Shared memory** 방식은 값을 return하는 대신에 결과값을 shared memory 위치에 저장하고 있어 값을 저장할 수 있다. 

multiprocessing.Value 객체는 함수를 정의할 때 함수의 argument 형태로 작성되어 값을 접근하고 사용할 수 있도록 한다. 해당 값을 access 하는 방식은 Value.value 속성을 통해 가능하다. 프로세스를 생성하기 이전에 Value 또한 정의되어야 하는데, Value() 메소드 내부에 데이터 타입을 정의할 필요가 있다. 이후에 생성된 객체를 args prameter에 추가적으로 입력하면 된다. 

```Python
def sum3(x, y, z, shared_value) : 
    shared_value.value = x + y + z 
    

float_value = multiprocessing.Value("f")
process = multiprocessing.Process(target = sum3, args = [5, 7, 4, float_value])
process.start()
process.end()
print(float_value) 
```

## Cautions of sharing value 

multiprocessing.Value는 single value만 저장할 수 있다. 따라서 프로세스 사이에서 shared memory를 사용할 때 동시에 값을 업데이트 하지 않도록 주의해야 한다. 예를들어 1 ~ 10000까지의 수를 더하는 프로세스를 1 ~ 4999, 5000 ~ 10000까지 계산하는 두 개의 병렬 프로세스로 분할하여 처리한다고 할때 하나의 shared value만 사용하게 되면 각각의 프로세스에서 처리된 값이 서로 shared memory에 업데이트 되는 문제가 발생한다.

```Python
def sum_values(first, last, shared_value):
    for i in range(first, last):
        shared_value.value += i

def sum_with_two_processes():
    N = 10000

    shared_value = multiprocessing.Value("i")
    process1 = multiprocessing.Process(target=sum_values, args=(1, N // 2, shared_value))
    process2 = multiprocessing.Process(target=sum_values, args=(N // 2, N, shared_value))

    process1.start()
    process2.start()

    process1.join()
    process2.join()
    return shared_value.value

# Add code below

results = []
for _ in range(10) : 
    results.append(sum_with_two_processes())

print(results)
```

## Using lock to prevent overwitting 

multiprocess를 사용하면서 발생하는 shared memory에 값을 덮어쓰는 문제는 **lock**을 통해 해결할 수 있다. lock은 병렬처리에서 각각의 프로세스의 자원을 통제하는 매커니즘으로, 다른 프로세스가 각 자원에 서로 간섭하는 것을 방지하도록 한다. 해당 작업은 Value.get_loc() 메소드를 사용함으로써 가능하다. 

하지만 이 방식은 병렬처리의 이점을 더이상 살리지 못한다. 다른 프로세스 해당 프로세스가 잠겨있는 동안 shared memory에 접근할 수 없기 때문이다. 이는 지역변수를 설정하고 지역변수의 값에 업데이트 하는 방식으로 해결할 수 있다. 

```Python
def sum_values(first, last, shared_value):
    for i in range(first, last):
        with shared_value.get_lock():
            shared_value.value += i
            
def sum_values_improved(first, last, shared_value) : 
    value_sum = 0
    for i in range(first, last) : 
        value_sum += i 
    with shared_value.get_lock() : 
        shared_value.value = value_sum

def measure_runtime(function_to_measure):
    N = 10000
    shared_value = multiprocessing.Value("i")
    process1 = multiprocessing.Process(target=function_to_measure, args=(1, N // 2, shared_value))
    process2 = multiprocessing.Process(target=function_to_measure, args=(N // 2, N, shared_value))
    start = time.time()
    process1.start()
    process2.start()
    process1.join()
    process2.join()
    end = time.time()
    return end - start
    
# Whole calculation
start = time.time()
res = 0
for i in range(1, 10000) : 
    res += i
end = time.time()
elapsed1 = end - start 

time_sum_values = measure_runtime(sum_values) 
time_sum_values_improved = measure_runtime(sum_values_improved)
```

# Process Pool Executors

## Make frequency table of each languages 

job_postings 데이터테이블에 작성되어 있는 언어별 빈도수 테이블을 생성하려고 한다. Series.str.count() 메소드는 행을 기준으로 각 행의 count에 입력된 argument가 얼마나 반복되었는지를 계산한다. 이후 sum() 메소드를 통해 언어별 총 빈도수를 확인할 수 있다.


```Python
import pandas as pd 
job_postings = pd.read_csv('DataEngineer.csv') 
num_rows = job_postings.shape[0]
num_cols = job_postings.shape[1]

job_postings["Job Description"] = job_postings["Job Description"].str.lower()

# Make frequency table 
skills = pd.read_csv('Skills.csv')
frequency = {}

for skill_name in skills["Name"] : 
    frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum()
print(frequency["programming"])
```

## Function of frequency table 

병렬처리를 위한 첫 단계는 코드를 함수로 변환하는 작업이다. 빈도표를 작성하는 코드를 job_postings와 skills를 입력받는 count_skills() 내부에 작성하여 함수를 생성한다.

```Python
import time

def count_skills(job_postings, skills) : 
    frequency = {}
    for skill_name in skills["Name"]:
        frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum()
    return frequency
    
start = time.time()
count_skills(job_postings, skills) 
end = time.time()
runtime = end - start
print(runtime)
```

## Function of dataframe chunk

데이터셋이 가용 메모리에 적합하지 않을 경우 청크 단위로 나누어 데이터를 처리하였다. 따라서 병렬처리를 위한 데이터셋을 청크 단위로 분해하는 함수를 작성한다. 입력된 청크의 수에 따라 처리해야하는 청크의 크기는 math.ceil() 메소드를 통해 계산하고 ragne(0, num_rows, chunk_size)를 통해 데이터 셋을 분할할 수 있다. 

```Python
import math
def math_chunks(df, num_chunks) : 
    num_rows = df.shape[0]
    chunk_size = math.ceil(num_rows/num_chunks)
    return [df[i:i+chunk_size] for i in range(0, num_rows, chunk_size)]
    
skill_chunks = math_chunks(skills, 8)
```

## Process Pool Executor

Job Description에 기술되어 있는 언어에 대한 빈도표를 청크 단위로 병렬 처리하는 workflow는 아래와 같다.

1. split the data into chunks 
2. create a process list, one for each chunk
3. run all processes and wait for them to finish 
4. gather the results and merge them into a single frequency table 

4번째 단계의 경우 multiprocessing.Value 객체는 하나의 값만 저장하기 때문에 문제가 발생할 수 있다. concurrent.futures 모듈은 실행된 각각의 프로세스의 값을 리스트의 형태로 저장할 수 있다. concurrent.futures.ProcessPoolExecutor context manager는 각각의 프로세스를 함수, 입력값을 입력받아 실행하여 값을 list의 형태로 저장할 수 있다. 프로세스를 실행하기 위해 Executor.submit() 메소드를 사용하여 with statement 내부에서 작동하기 때문에 프로세스를 자동 종료시킨다. Executor.submit()으로 생성된 Future 객체는 Future.result()를 통해 결과를 불러올 수 있다. 

```Python
import concurrent.futures

def increment(value):
    return value + 1

values = [1, 2, 3, 4, 5, 6, 7, 8]

with concurrent.futures.ProcessPoolExecutor() as executor : 
    futures = [executor.submit(increment, value) for value in values] 
    
results = [future.result() for future in futures]
```

## Parallel Processing of frequency table 

1. job_postings 데이터프레임을 분할하고 각각의 청크에 있는 모든 skill을 샌다. 각각의 프로세스는 job_posting의 부분을 입력받아 빈도표를 작성한다.
2. skills를 분할하여 각 청크에 대해 job_postings 천체 데이터프레임에 대해 분할표를 작성한다. 

### First case 

```Python
import concurrent.futures
import pandas as pd 
import math 

def count_skills(df, skills) :
    frequency = {}
    for skill_name in skills["Name"] : 
        frequency[skill_name] = df["Job Description"].str.count(skill_name).sum()
    return frequency

def make_chunks(df, chunk_size) : 
    num_rows = df.shape[0]
    chunk_size = math.ceil(num_rows / chunk_size) 
    return [df[i:i+chunk_size] for i in range(0, num_rows, chunk_size)]

job_chunks = make_chunks(job_postings, 8) 
with current.futures.ProcessPoolExecutor() as executor : 
    futures = [executor.submit(count_skills, job_chunk, skills) for job_chunk in job_chunks]
results = [future.result() for future in futures]

merged_results = {} 
for result in results : 
    for language in result : 
        if language in merged_results : 
            merged_results[language] += result[language]
        else : 
            merged_results[language] = result[language] 
```

### Second case 

```Python
import concurrent.futures
import pandas as pd 
import math 

def count_skills(job_postings, skills) :
    frequency = {}
    for skill_name in skills["Name"] : 
        frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum()
    return frequency

def make_chunks(df, chunk_size) : 
    num_rows = df.shape[0]
    chunk_size = math.ceil(num_rows / chunk_size) 
    return [df[i:i+chunk_size] for i in range(0, num_rows, chunk_size)]

skill_chunks = make_chunks(skills, 8) 
with concurrent.futures.ProcessPoolExecutor() as executor : 
    futures = [executor.submit(count_skills, job_postings, skill_chunk) for skill_chunk in skill_chunks]
results = [future.result() for future in futures]

merged_results = {}

for result in results : 
    merged_results.update(result) 
print(merged_results)
```

## Time Comparison

```Python
def count_skills(job_postings, skills):
    frequency = {}
    for skill_name in skills["Name"]:
        frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum()
    return frequency

def count_skills_parallel(job_postings, skills, num_processes=4):
    # Calculate results using paralleld processing
    skill_chunks = make_chunks(skills, num_processes)
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(count_skills, job_postings, skill_chunk) for skill_chunk in skill_chunks]
    results = [future.result() for future in futures]
    # Merge results
    merged_results = {}
    for result in results:
        merged_results.update(result)
    return merged_results

import time

# Measure execution times 
start = time.time()
count_skills(job_postings, skills)
end = time.time()
time_normal = end - start 

start = time.time()
count_skills_parallel(job_postings, skills, num_processes = 4)
end = time.time()
time_parallel = end - start

print(time_normal/time_parallel)
```

병렬처리를 한 경우가 하지 않은 경우에 비해 2배가량 빠르다.

# Introduction to MapReduce

# Processing data with MapReduce