# Introductions to Parallel Processing 

## What is Parallel Processing 

In order to process a large dataset in a situation where available memory was limited, data had to be processed by splitting it into **chunks**. **Parallel Processing** increases the speed of work by simultaneously processing chunks. 

A **Central Processing Unit(CPU)** is a hardware that processes computer operations. The old CPU was able to perform only one task in a single core, but the current CPU is now capable of parallel processing using multi-core processing. 

Python's multiprocessing module provides a package that enables parallel processing. 

```Python
import multiprocessing 
```

## Process 

**Process.start()** 

The multiprocessing.Process() method creates a process for parallel processing by creating a Process object. The target parameter of the Process() method enters the process (functions) to be processed. The generated Process object is executed through the Process.start() method. 

```Python
import time
def wait():
    time.sleep(0.5)
    print("Done waiting")

process = multiprocessing.Process(target=wait)

# Add code here
process.start()
print("Finished")
process.join()
```

**Process.join()**

As a result of the execution of the above program, "Finished" is the first output and "Done waiting" is executed next. 

PythonIDE proceeds the code in the order of exposure. However, after executing Process.start(), the wait() function executed in parallel is delayed by time.sleep(), so print("Finished") is executed first. 

Process.join() is used to terminate an ongoing process to execute another waiting process. Therefore, the code for producing the desired result is as follows: 

```Python
import time
def wait():
    time.sleep(0.5)
    print("Done waiting")

process = multiprocessing.Process(target=wait)

# Add code below

process.start()
process.join()
print("Finished")
```

## Execution time of Parallele Processing 

In the workflow of multiprocessing, the main program waits until all processes are finished an synthesizes each result to output the entire result. However, since each process is executed in parallel, the execution time is divided by the number of process.


1. Divide each operation by chunk.
2. Perform a process for each chunk.
3. Wait until all processes are finished.
4. Summarize the results. 

```Python
import time
def wait():
    time.sleep(0.5)
    print("Done waiting")

# Add code below
start = time.time()
wait()
end = time.time()
elapsed1 = end - start

start = time.time()

p1 = multiprocessing.Process(target = wait)
p2 = multiprocessing.Process(target = wait)

p1.start()
p2.start()
p1.join()
p2.join()

end = time.time()
elapsed2 = end - start
```

As a result of the implementation, there is little difference between elapsed1 and elapsed2. 

## Parallel Processing with arguments 

어떤 프로세스에 구성된 함수에 입력값이 필요한 경우가 있는 경우, Process() 메소드는 target parameter외에 args parameter를 통해 해당 함수의 arguments를 입력받을 수 있다. 또한 입력받은 arguments는 iterable하기 때문에 객체의 형태를 그대로 대입할 수 있다.

```Python
def sum3(x, y, z):
    print(x + y + z)

def list_average(values):
    print(sum(values) / len(values))

# Add code below

sum3_process = multiprocessing.Process(target = sum3, args = [3, 2, 5])
list_average_process = multiprocessing.Process(target = list_average, args = [[1, 2, 3, 4, 5]])

sum3_process.start()
list_average_process.start()
sum3_process.join()
list_average_process.join()
```

## Shared memory 

앞선 방식의 프로세스는 함수의 값을 받을 수 없다. 함수는 독립적인 프로세스에서 실행되고 있기 때문에 각각의 메모리에  존재하며 다른 프로세스와 값을 공유하지 못한다. **Shared memory** 방식은 값을 return하는 대신에 결과값을 shared memory 위치에 저장하고 있어 값을 저장할 수 있다. 

multiprocessing.Value 객체는 함수를 정의할 때 함수의 argument 형태로 작성되어 값을 접근하고 사용할 수 있도록 한다. 해당 값을 access 하는 방식은 Value.value 속성을 통해 가능하다. 프로세스를 생성하기 이전에 Value 또한 정의되어야 하는데, Value() 메소드 내부에 데이터 타입을 정의할 필요가 있다. 이후에 생성된 객체를 args prameter에 추가적으로 입력하면 된다. 

```Python
def sum3(x, y, z, shared_value) : 
    shared_value.value = x + y + z 
    

float_value = multiprocessing.Value("f")
process = multiprocessing.Process(target = sum3, args = [5, 7, 4, float_value])
process.start()
process.end()
print(float_value) 
```

## Cautions of sharing value 

multiprocessing.Value는 single value만 저장할 수 있다. 따라서 프로세스 사이에서 shared memory를 사용할 때 동시에 값을 업데이트 하지 않도록 주의해야 한다. 예를들어 1 ~ 10000까지의 수를 더하는 프로세스를 1 ~ 4999, 5000 ~ 10000까지 계산하는 두 개의 병렬 프로세스로 분할하여 처리한다고 할때 하나의 shared value만 사용하게 되면 각각의 프로세스에서 처리된 값이 서로 shared memory에 업데이트 되는 문제가 발생한다.

```Python
def sum_values(first, last, shared_value):
    for i in range(first, last):
        shared_value.value += i

def sum_with_two_processes():
    N = 10000

    shared_value = multiprocessing.Value("i")
    process1 = multiprocessing.Process(target=sum_values, args=(1, N // 2, shared_value))
    process2 = multiprocessing.Process(target=sum_values, args=(N // 2, N, shared_value))

    process1.start()
    process2.start()

    process1.join()
    process2.join()
    return shared_value.value

# Add code below

results = []
for _ in range(10) : 
    results.append(sum_with_two_processes())

print(results)
```

## Using lock to prevent overwitting 

multiprocess를 사용하면서 발생하는 shared memory에 값을 덮어쓰는 문제는 **lock**을 통해 해결할 수 있다. lock은 병렬처리에서 각각의 프로세스의 자원을 통제하는 매커니즘으로, 다른 프로세스가 각 자원에 서로 간섭하는 것을 방지하도록 한다. 해당 작업은 Value.get_loc() 메소드를 사용함으로써 가능하다. 

하지만 이 방식은 병렬처리의 이점을 더이상 살리지 못한다. 다른 프로세스 해당 프로세스가 잠겨있는 동안 shared memory에 접근할 수 없기 때문이다. 이는 지역변수를 설정하고 지역변수의 값에 업데이트 하는 방식으로 해결할 수 있다. 

```Python
def sum_values(first, last, shared_value):
    for i in range(first, last):
        with shared_value.get_lock():
            shared_value.value += i
            
def sum_values_improved(first, last, shared_value) : 
    value_sum = 0
    for i in range(first, last) : 
        value_sum += i 
    with shared_value.get_lock() : 
        shared_value.value = value_sum

def measure_runtime(function_to_measure):
    N = 10000
    shared_value = multiprocessing.Value("i")
    process1 = multiprocessing.Process(target=function_to_measure, args=(1, N // 2, shared_value))
    process2 = multiprocessing.Process(target=function_to_measure, args=(N // 2, N, shared_value))
    start = time.time()
    process1.start()
    process2.start()
    process1.join()
    process2.join()
    end = time.time()
    return end - start
    
# Whole calculation
start = time.time()
res = 0
for i in range(1, 10000) : 
    res += i
end = time.time()
elapsed1 = end - start 

time_sum_values = measure_runtime(sum_values) 
time_sum_values_improved = measure_runtime(sum_values_improved)
```

# Process Pool Executors

## Make frequency table of each languages 

job_postings 데이터테이블에 작성되어 있는 언어별 빈도수 테이블을 생성하려고 한다. Series.str.count() 메소드는 행을 기준으로 각 행의 count에 입력된 argument가 얼마나 반복되었는지를 계산한다. 이후 sum() 메소드를 통해 언어별 총 빈도수를 확인할 수 있다.


```Python
import pandas as pd 
job_postings = pd.read_csv('DataEngineer.csv') 
num_rows = job_postings.shape[0]
num_cols = job_postings.shape[1]

job_postings["Job Description"] = job_postings["Job Description"].str.lower()

# Make frequency table 
skills = pd.read_csv('Skills.csv')
frequency = {}

for skill_name in skills["Name"] : 
    frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum()
print(frequency["programming"])
```

## Function of frequency table 

병렬처리를 위한 첫 단계는 코드를 함수로 변환하는 작업이다. 빈도표를 작성하는 코드를 job_postings와 skills를 입력받는 count_skills() 내부에 작성하여 함수를 생성한다.

```Python
import time

def count_skills(job_postings, skills) : 
    frequency = {}
    for skill_name in skills["Name"]:
        frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum()
    return frequency
    
start = time.time()
count_skills(job_postings, skills) 
end = time.time()
runtime = end - start
print(runtime)
```

## Function of dataframe chunk

데이터셋이 가용 메모리에 적합하지 않을 경우 청크 단위로 나누어 데이터를 처리하였다. 따라서 병렬처리를 위한 데이터셋을 청크 단위로 분해하는 함수를 작성한다. 입력된 청크의 수에 따라 처리해야하는 청크의 크기는 math.ceil() 메소드를 통해 계산하고 ragne(0, num_rows, chunk_size)를 통해 데이터 셋을 분할할 수 있다. 

```Python
import math
def math_chunks(df, num_chunks) : 
    num_rows = df.shape[0]
    chunk_size = math.ceil(num_rows/num_chunks)
    return [df[i:i+chunk_size] for i in range(0, num_rows, chunk_size)]
    
skill_chunks = math_chunks(skills, 8)
```

## Process Pool Executor

Job Description에 기술되어 있는 언어에 대한 빈도표를 청크 단위로 병렬 처리하는 workflow는 아래와 같다.

1. split the data into chunks 
2. create a process list, one for each chunk
3. run all processes and wait for them to finish 
4. gather the results and merge them into a single frequency table 

4번째 단계의 경우 multiprocessing.Value 객체는 하나의 값만 저장하기 때문에 문제가 발생할 수 있다. concurrent.futures 모듈은 실행된 각각의 프로세스의 값을 리스트의 형태로 저장할 수 있다. concurrent.futures.ProcessPoolExecutor context manager는 각각의 프로세스를 함수, 입력값을 입력받아 실행하여 값을 list의 형태로 저장할 수 있다. 프로세스를 실행하기 위해 Executor.submit() 메소드를 사용하여 with statement 내부에서 작동하기 때문에 프로세스를 자동 종료시킨다. Executor.submit()으로 생성된 Future 객체는 Future.result()를 통해 결과를 불러올 수 있다. 

```Python
import concurrent.futures

def increment(value):
    return value + 1

values = [1, 2, 3, 4, 5, 6, 7, 8]

with concurrent.futures.ProcessPoolExecutor() as executor : 
    futures = [executor.submit(increment, value) for value in values] 
    
results = [future.result() for future in futures]
```

## Parallel Processing of frequency table 

1. job_postings 데이터프레임을 분할하고 각각의 청크에 있는 모든 skill을 샌다. 각각의 프로세스는 job_posting의 부분을 입력받아 빈도표를 작성한다.
2. skills를 분할하여 각 청크에 대해 job_postings 천체 데이터프레임에 대해 분할표를 작성한다. 

### First case 

```Python
import concurrent.futures
import pandas as pd 
import math 

def count_skills(df, skills) :
    frequency = {}
    for skill_name in skills["Name"] : 
        frequency[skill_name] = df["Job Description"].str.count(skill_name).sum()
    return frequency

def make_chunks(df, chunk_size) : 
    num_rows = df.shape[0]
    chunk_size = math.ceil(num_rows / chunk_size) 
    return [df[i:i+chunk_size] for i in range(0, num_rows, chunk_size)]

job_chunks = make_chunks(job_postings, 8) 
with current.futures.ProcessPoolExecutor() as executor : 
    futures = [executor.submit(count_skills, job_chunk, skills) for job_chunk in job_chunks]
results = [future.result() for future in futures]

merged_results = {} 
for result in results : 
    for language in result : 
        if language in merged_results : 
            merged_results[language] += result[language]
        else : 
            merged_results[language] = result[language] 
```

### Second case 

```Python
import concurrent.futures
import pandas as pd 
import math 

def count_skills(job_postings, skills) :
    frequency = {}
    for skill_name in skills["Name"] : 
        frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum()
    return frequency

def make_chunks(df, chunk_size) : 
    num_rows = df.shape[0]
    chunk_size = math.ceil(num_rows / chunk_size) 
    return [df[i:i+chunk_size] for i in range(0, num_rows, chunk_size)]

skill_chunks = make_chunks(skills, 8) 
with concurrent.futures.ProcessPoolExecutor() as executor : 
    futures = [executor.submit(count_skills, job_postings, skill_chunk) for skill_chunk in skill_chunks]
results = [future.result() for future in futures]

merged_results = {}
for result in results : 
    merged_results.update(result) 
print(merged_results)
```

## Time Comparison

```Python
def count_skills(job_postings, skills):
    frequency = {}
    for skill_name in skills["Name"]:
        frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum()
    return frequency

def count_skills_parallel(job_postings, skills, num_processes=4):
    # Calculate results using paralleld processing
    skill_chunks = make_chunks(skills, num_processes)
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(count_skills, job_postings, skill_chunk) for skill_chunk in skill_chunks]
    results = [future.result() for future in futures]
    # Merge results
    merged_results = {}
    for result in results:
        merged_results.update(result)
    return merged_results

import time

# Measure execution times 
start = time.time()
count_skills(job_postings, skills)
end = time.time()
time_normal = end - start 

start = time.time()
count_skills_parallel(job_postings, skills, num_processes = 4)
end = time.time()
time_parallel = end - start

print(time_normal/time_parallel)
```

병렬처리를 한 경우가 하지 않은 경우에 비해 2배가량 빠르다.

# Introduction to MapReduce

## What is MapReduce 

**맵리듀스(MapReduce)**는 구글에서 대용량 처리를 분산 병렬 컴퓨팅에서 처리하기 위한 목적으로 제작된 스프트웨어 프레임워크이다. 이 프레임워크는 페타바이트 이상의 대용량 데이터를 신뢰도가 낮은 컴퓨터로 구성된 클러스터 환경에서 병렬 처리를 지원하기 위해서 개발되었다. 대표적으로 Apache Hadoop에서 Open Soruce Software로 적용되었다.

맵리듀스의 개념은 Process Pool Executors에서 사용한 방식과 유사하다. 

1. Divide : Divide the data into chunks 
2. Map : Use parallel processing to process each chunk
3. Reduce : Combine the individual chunk result into a global result 

이전에 작업했던 병렬 컴퓨팅 프로그램에서 Divide 단계는 make_chunks를, Map 단계는 count_skills를, Reduce 단계는 Future.result()의 결과를 단일 데이터프레임으로 병합한 작업과 동일하다. MapReduec에서 병렬처리는 Map 단계에서 시행된다. 

## Generalize multiprocessing 

### Divide 

병렬 처리에 사용했던 함수를 MapReduce framework에 적용하기 위해선 기존에 사용하던 함수를 데이터 타입에 상관없이 일반화할 필요가 있다. 시행 결과 list 객체에 대해서도 make_chunks() 함수가 작동하는 것을 확인할 수 있다.  

```Python
import math

def make_chunks(data, num_chunks):
    chunk_size = math.ceil(len(data) / num_chunks)
    return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

chunks = make_chunks([1, 2, 3, 4, 5, 6], 3)
print(chunks)
```

### Map

데이터 타입과 관계없는 청크를 생산하는 함수를 작성했다면 입력받은 함수에 대해 병렬처리(Map)를 해서 청크 별 결과를 산출해야 한다.. map_parallel() 함수는 함수 mapper, data, num_processes(chunk size)를 입력받아 MapReduce 전체 과정을 처리하게 된다. 

- mapper : the function that we want to apply to each chunk. 
- data : the data
- num_processes : the number of processes to use 

```Python
import concurrent.futures

def map_parallel(mapper, data, num_processes):
    chunks = make_chunks(data, num_processes)
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(mapper, chunk) for chunk in chunks]
    return [future.result() for future in futures]

values = [1, 4, 5, 2, 7, 21,     \
          31, 41, 3, 40, 5, 14,  \
          9, 32, 12, 18, 1, 30,  \
          6, 19, 23, 35, 12, 13, \
          0, 12, 42, 41, 11, 9]

results = map_parallel(max, values, 5) 
print(results)
```

## Pool

multiprocessing.Pool은 concurrent.futures.ProcessPoolExecutor()없이 나누어진 데이터에 대해 병렬처리를 수행할 수 있다. Pool() 내부에는 프로세스의 수를입력할 수 있으며 default값은 os.cpu_count()에 저장되어 있는 CPU의 수이다. 

생성된 Pool 객체는 Pool.map() 메소드를 통해 프로세스에 사용할 함수와 데이터 집합을 사용할 수 있다. concurrent.futures.ProcessPoolExecutor() 와는 다르게 실행된 프로세스를 종료하고 다른 프로세스가 끝날때까지 기다리기 위해 Pool.close()와 Pool.join() 메소드를 사용한다. 

```Python
values = [1, 4, 5, 2, 7, 21,     \
          31, 41, 3, 40, 5, 14,  \
          9, 32, 12, 18, 1, 30,  \
          6, 19, 23, 35, 12, 13, \
          0, 12, 42, 41, 11, 9]

chunks = make_chunks(values, 6)

from multiprocessing import Pool
pool = Pool(6) 
results = pool.map(max, chunks) 
pool.close()
pool.join()
```

사실 Pool.map() 메소드는 해당 프로세스가 종료될때까지 자동으로 시행을 막기때문에 Pool.join() 메소드를 사용할 필요가 없다. Pool.close() 메소드의 경우 반드시 사용되어야 한다. concurrent.futures.ProcessPoolExecutor()와 같이 context manager를 사용하면 코드를 작성하는데 편할 뿐만아니라, Pool.close() 메소드를 사용할 필요가 없다. 

```Python
values = [1, 4, 5, 2, 7, 21,     \
          31, 41, 3, 40, 5, 14,  \
          9, 32, 12, 18, 1, 30,  \
          6, 19, 23, 35, 12, 13, \
          0, 12, 42, 41, 11, 9]

chunks = make_chunks(values, 6)

with Pool(6) as pool : 
    results = pool.map(max, chunks)
print(results)
```

## Reduce 

**functools module**의 reduce 함수는 각각의 데이터 청크에 적용한 함수의 결과를 취합하여 전체 결과를 만드는데 유용하게 사용된다. functools.reduce() 메소드는 각각 reducer function과 reduce function을 적용하고자 하는 데이터 청크를 입력받는다. functools.reduce()에 함수와 데이터가 입력되면, 각각의 원소를 순차적으로 함수에 적용하여 최종 결과를 산출한다. (자동으로 for loop가 적용되게 된다)

```python
values = [1, 4, 5, 2, 7, 21,     \
          31, 41, 3, 40, 5, 14,  \
          9, 32, 12, 18, 1, 30,  \
          6, 19, 23, 35, 12, 13, \
          0, 12, 42, 41, 11, 9]

import functools 
max_value = functools.reduce(max, values)
```

## Workflow of MapReduce 

입력된 데이터 중 최대값을 산출하는 프로세스를 MapReduce를 기반으로 구현한다고 할 때, 다음과 같은 workflow로 진행될 수 있다.

- Divide : make_chunks()를 사용해서 데이터를 분해
- Map : Pool.map()을 사용해서 각각의 청크에 대해 max 함수를 적용
- Reduce : functools.reduce() 메소드를 사용해서 산출된 결과에서 단일값을 추출

```Python
data = [1, 4, 5, 2, 7, 21,     \
        31, 41, 3, 40, 5, 14,  \
        9, 32, 12, 18, 1, 30,  \
        6, 19, 23, 35, 12, 13, \
        0, 12, 42, 41, 11, 9]

num_processes = 5

# Divide 
def make_chunks(data, num_processes) : 
    chunk_size = math.ceil(len(data) / num_processes) 
    return [data[i : i+chunk_size] for i in range(0, len(data), chunk_size)]
chunks = make_chunks(data, num_processes)

# Map 
with Pool(num_processes) as pool : 
    chunk_results = pool.map(max, chunks) 

# Reduce 
overall_result = functools.reduce(max, results)
print(overall_result)

# All in one 

def make_chunks(data, num_processes) : 
    chunk_size = math.ceil(len(data) / num_processes) 
    return [data[i : i+chunk_size] for i in range(0, len(data), chunk_size)]

def map_reduce(data, num_processes, mapper, reducer):
    chunks = make_chunks(data, num_processes)
    with Pool(num_processes) as pool:
        chunk_results = pool.map(mapper, chunks)
    return functools.reduce(reducer, chunk_results)
```

## Apply mapreduce to dataframe 

기존에 job_postings에서 언어별 빈도표를 작성하는 두가지 workflow이다. 각 workflow를 Process Pool Executors를 사용하여 언어별 빈도표를 작성하는 것이 아닌, MapReduce framework를 사용하여 코드를 구성해야한다. 

1. job_postings 데이터프레임을 분할하고 각각의 청크에 있는 모든 skill을 샌다. 각각의 프로세스는 job_posting의 부분을 입력받아 빈도표를 작성한다.
2. skills를 분할하여 각 청크에 대해 job_postings 천체 데이터프레임에 대해 분할표를 작성한다.

MapReduce의 workflow는 Divide - Map - Reduce로 데이터를 처리하여 하나의 결과만을 출력한다. main function인 map_reduce는 Map 단계와 Reduce 단계에서 적용되는 함수가 상이할 수 있기때문에 mapper와 reducer로 분리하여 함수를 입력받는다. 

1. Divide : Divide the data into chunks 
2. Map : Use parallel processing to process each chunk
3. Reduce : Combine the individual chunk result into a global result 

### First case 

- make_chunks : job_postings를 나누어 job_chunks를 생성
- mapper : 각각의 job_chunks에 대해 skills 별로 빈도표 생성 
- reducer : 각각의 빈도표에 저장되어 있는 값을 서로 더함 

```Python
import pandas as pd
job_postings = pd.read_csv("DataEngineer.csv")
job_postings["Job Description"] = job_postings["Job Description"].str.lower()
skills = pd.read_csv("Skills.csv")

def make_chunks(data, num_processes) : 
    chunk_size = math.ceil(len(data) / num_processes) 
    return [data[i : i+chunk_size] for i in range(0, len(data), chunk_size)]

def map_reduce(data, num_processes, mapper, reducer):
    chunks = make_chunks(data, num_processes)
    with Pool(num_processes) as pool:
        chunk_results = pool.map(mapper, chunks)
    return functools.reduce(reducer, chunk_results)

def mapper(jobs_chunk) : 
    frequency = {} 
    for skill_name in skills["Name"] : 
        frequency[skill_name] = jobs_chunk["Job Description"].str.count(skill_name).sum() 
    return frequency 

def reducer(freq_chunk1, freq_chunk2) : 
    merged = {} 
    for skill in freq_chunk1 : 
        merged[skill] = freq_chunk1[skill] + freq_chunk2[skill]
    return merged

skill_freq = map_reduce(job_postings, 4, mapper, reducer)
```

### Second case 

- make_chunks : skills를 나누어 skill_chunks를 생성
- mapper : 각각의 skill_chunks에 대해 빈도표 생성
- reducer : 각각의 빈도표를 하나의 데이터에 저장 

```Python
import pandas as pd
job_postings = pd.read_csv("DataEngineer.csv")
job_postings["Job Description"] = job_postings["Job Description"].str.lower()
skills = pd.read_csv("Skills.csv")

def make_chunks(data, num_processes) : 
    chunk_size = math.ceil(len(data) / num_processes) 
    return [data[i : i+chunk_size] for i in range(0, len(data), chunk_size)]

def map_reduce(data, num_processes, mapper, reducer):
    chunks = make_chunks(data, num_processes)
    with Pool(num_processes) as pool:
        chunk_results = pool.map(mapper, chunks)
    return functools.reduce(reducer, chunk_results)

def mapper(skill_chunk) : 
    frequency = {} 
    for skill_name in skill_chunk["Name"] : 
        frequency[skill_name] = job_postings["Job Description"].str.count(skill_name).sum() 
    return frequency 

def reducer(freq_chunk1, freq_chunk2) : 
    freq_chunk1.update(freq_chunk2)
    return freq_chunk1 

skill_freq = map_reduce(skills, 4, mapper, reducer)
```

# Processing data with MapReduce

## MapReduce Framework 

MapReduce를 진행하기 위해 생성했던 map_reduce() 함수는 다음의 arguments를 입력받는다. 이때 mapper function은 데이터를 분리한 청크에 대해 적용할 함수를 의미하며, reducer function은 mapper function에 의해 생성된 연산 결과를 하나의 연산 결과로 만드는 함수이다. 

- The data
- The number of processes 
- The mapper function
- The reducer function

## Calculate the length of the longest word 

- mapper function : 각각의 chunk의 가장 긴 단어의 길이를 계산 
- reducer function : 각각의 chunk의 가장 긴 단어의 길이중 최댓값 계산 

```Python
with open("english_words.txt") as f:
    words = [word.strip() for word in f.readlines()]
    
def map_max_length(words_chunk) : 
    max_length = 0 
    for word in words_chunk : 
        if len(word) > max_length : 
            max_length = len(word) 
    return max_length

max_len = map_reduce(words, 4, map_max_length, max)
```

## Retreive the longest word 

- mapper function : 각각의 chunk의 가장 긴 단어를 계산 (max(key = len) 사용) 
- reducer function : word1, word2에 대해서 길이가 긴 단어를 계산 

```Python
with open("english_words.txt") as f:
    words = [word.strip() for word in f.readlines()]
    
def map_max_len_str(words_chunk) : 
    return max(words_chunk, key = len) 

def reduce_max_len_str(word1, word2) : 
    if len(word1) >= len(word2) :
        return word1
    return word2

max_len_str = map_reduce(words, 4, map_max_len_str, reduce_max_len_str)
```

## Retrieve extence of word 

영어 사전의 알파벳을 검색하고자 할때 Mapreduce를 사용하는 경우 속도면에서 훨씬 빠른 검색 속도를 가능하게 한다. 

- mapper function : 각각의 chunk에 target 단어가 존재하는지 True/False return
- reducer function : logical1, logical2에 대해서 Any true를 계산하기 위해 or 연산 계산 

```Python
with open("english_words.txt") as f:
    words = [word.strip() for word in f.readlines()]

target = "pneumonoultramicroscopicsilicovolcanoconiosis"

def map_contains(words_chunk) : 
    if target in words_chunk : 
        return True 
    return False 

def reduce_contains(contains1, contains2) : 
    return contains1 or contains2 

is_contained = map_reduce(words, 4, map_contains, reduce_contains)
```

## Counting the frequency of the characters throughout the entire list of words 

- mapper function : 각각의 chunk에 존재하는 알파벳의 빈도표 계산 
- reducer function : freq1, freq2를 하나로 합침 (존재하지 않는 값에 대해 연산 필요) 

```Python
with open("english_words.txt") as f:
    words = [word.strip() for word in f.readlines()]
    
def map_char_count(words_chunk) : 
    char_freq = {}
    for word in words_chunk : 
        for c in word : 
            if c not in char_freq : 
                char_freq[c] = 0
            char_freq[c] += 1
    return char_freq
                
def reduce_char_count(freq1, freq2) : 
    for c in freq2 : 
        if c in freq1 : 
            freq1[c] += freq2[c]
        else : 
            freq1[c] = freq2[c]
    return freq1
            
char_freq = map_reduce(words, 4, map_char_count, reduce_char_count)
```

## Calculate the average length of English words 

- mapper function : 각각의 chunk에 존재하는 알파벳의 합 / 전체 단어의 수 계산 
- reducer function : res1, res2를 더하여 평균값 계산 

```Python
with open("english_words.txt") as f:
    words = [word.strip() for word in f.readlines()]
    
def map_average(words_chunk) : 
    tot_sum = 0 
    for word in words_chunk : 
        tot_sum += len(word) 
    return tot_sum / len(words) 

def reduce_average(res1, res2) : 
    return res1 + res2

average_word_len = map_reduce(words, 4, map_average, reduce_average)
```

## Find which pairs of characters occurs next to each other in only one word 

- mapper function : 각각의 chunk에 존재하는 모든 알파벳 쌍의 빈도표 계산
- reducer function : freq1, freq2를 하나로 합침 

```Python
with open("english_words.txt") as f:
    words = [word.strip() for word in f.readlines()]
    
def map_adjacent(words_chunk) : 
    freq_pairs = {} 
    for word in words_chunk : 
        for i in range(len(word) - 1) : 
            seq = word[i] + word[i + 1]
            if seq not in freq_pairs : 
                freq_pairs[seq] = 0
            freq_pairs[seq] += 1
    return freq_pairs 

def reduce_adjacent(freq1, freq2) :
    for pair in freq2 : 
        if pair in freq1 : 
            freq1[pair] += freq2[pair]
        else : 
            freq1[pair] = freq2[pair]
    return freq1 

pair_freq = map_reduce(words, 4, map_adjacent, reduce_adjacent) 
unique_pairs = [pair for pair in pair_freq if pair_freq[pair] == 1]
```