## Objective

> 이해하기 위해 가장 좋은 방법은 구현해보는 것이라고 생각한다. 구현해보면서 배워보자.

reference : [how to write a MapReduce Framework in Python](https://medium.com/@nidhog/how-to-quickly-write-a-mapreduce-framework-in-python-821a79fda554)

# 맵리듀스 구현하기

### 0. HDFS 구현하기

> 진짜 구현한다는 것은 아니고, 맵리듀스 프레임워크는 기본적으로 HDFS 위에서 돌아가기 때문에, HDFS(haddop distributed file system)과 같이, 작업 파일을 관리해주는 가상의 핸들러가 필요하다. 

In [1]:
import os
import json
from operator import itemgetter as operator_ig
from multiprocessing import Process

In [2]:
class HDFSHandler(object):
    """HDFS Handler class
    입력 파일을 Splitting하고 출력 파일을 Joining 해주는 역할
    
    """
    def __init__(self, 
                 input_file_path, 
                 output_dir="./output_files", 
                 temp_dir="./temp_map_files"):
        """        
        :param input_file_path: input file path
        :param output_dir: output directory path
        """
        self.input_file_path = input_file_path
        self.input_dir = os.path.split(input_file_path)[0]
        self.file_name = self.get_input_filename(input_file_path)
        self.output_dir = output_dir
        self.temp_dir = temp_dir
        
        # 작업 및 저장 공간 확보
        os.makedirs(self.temp_dir, exist_ok=True)
        os.makedirs(self.output_dir, exist_ok=True)
    
    def get_input_filename(self, input_file_path):
        """return the name of the input file to be split into chunks
        """
        file_name = os.path.split(input_file_path)[1].split('.')[0]
        return file_name
        
    def get_input_split_file(self, index):
        """ return the name of the current split file corresponding to
        the given index
        """
        split_file_name = f"{self.file_name}_{index}.ext"
        return os.path.join(self.input_dir, split_file_name)
    
    def get_temp_map_file(self, index, reducer):
        """ return the name of the temporary map file 
        corresponding to the given index
        """
        temp_file_name = f"map_{self.file_name}_{index}-{reducer}.ext"
        
        return os.path.join(self.temp_dir, temp_file_name)
    
    def get_output_file(self, index):
        reduce_file_name = f"reduce_{self.file_name}_{index}.out"
        return os.path.join(self.output_dir, reduce_file_name)
    
    def get_output_join_file(self):
        """ return the name of the output file given 
        its corresponding index
        """
        output_join_name = f"{self.file_name}.out"
        return os.path.join(self.output_dir, output_join_name)
        

HDFS 파일시스템에서 우리는 크게 3가지 단계로 쪼갰다.

* `input_dir` : 작업 입력 파일들이 저장되는 공간
* `temp_dir` : mapper의 중간 결과값들이 저장되는 공간
* `output_dir` : reducer의 최종 결과값들이 저장되는 공간 

기본적으로 input_file_path에서 값을 가져오면 아래의 순서대로 처리된다.

example : 

* input_file_path (ex : `../books/pride_and_predudice.txt`)
    > 주어진 파일 path
* file_name(ex : `pride_and_prejudice` )
     > 작업 처리할 때, 파일 이름의 고유 식별 기준
* input_split_file(ex : `pride_and_prejudice_1.ext`) 
    > mapper thread에 전달할 입력 데이터. 예시로 따지면, 1번째 mapper thread가 처리할 데이터
* temp_split_file(ex : `map_pride_and_prejudice_1-2.ext`)
    > 1번째 mapper의 결과물로, 2번째 reducer에게 전달할 결과물
* output_file_name(ex : `reduce_pride_and_prejudice_1.out`)
    > reducer thread의 결과값들. dPtlfh Ekwlaus, 1번째 reducer thread의 결과들
* output_join_name(ex : `pride_and_prejudice.out`)
    > reducer의 결과들을 최종적으로 join한 값들
    


> 우리는 이제 mapper에 보내기 위해 input 파일을 쪼개고, reducer의 결과물들을 합쳐주는 메소드가 필요하다.

In [3]:
def split_file(self, num_splits):
    """split a file into multiple files.
    
    : param num_splits: the number of chunks to split the file into.
    """
    # (1) input 파일 읽어오기
    # (2) split할 크기 계산하기
    # (3) 순회하며 input 파일을 자르기
    # (4) 자른 파일들을 각각 저장하기
    raise NotImplementedError

HDFSHandler.split_file = split_file

구현은 아래와 같은 방식으로 진행하였다.

In [4]:
def initiate_file_split(self, split_index, index):
    """initialize a split file by opening and adding an index.
    :param split_index: the split index we are currently on, to be used for naming the file.
    :param index: the index given to the file.
    """
    file_split = open(self.get_input_split_file(split_index-1), "w+")
    file_split.write(str(index) + "\n")
    return file_split

HDFSHandler.initiate_file_split = initiate_file_split

def split_file(self, num_splits):
    """split a file into multiple files.
    
    : param num_splits: the number of chunks to split the file into.
    """
    # (1) input 파일 읽어오기
    with open(self.input_file_path, "r") as file:
        file_content = file.read()
    # (2) split할 크기 계산하기
    file_size = os.path.getsize(self.input_file_path)
    split_size = file_size / num_splits + 1
    
    # (3) 순회하며 input 파일들을 자르기
    (index, current_split_index) = (1, 1)
    current_split_file = self.initiate_file_split(current_split_index, index)
    for character in file_content:
        # (4) 자른 파일들을 각각 저장하기
        current_split_file.write(character)
        if index>split_size*current_split_index+1 and character.isspace():
            current_split_file.close()
            current_split_index += 1
            current_split_file = self.initiate_file_split(current_split_index, index)
        index += 1
    current_split_file.close()
    
HDFSHandler.split_file = split_file

In [5]:
def join_files(self, num_files, clean=False,sort=True,decreasing=True):
    """join all the files in the output directory into a
    single output file.

    :param num_files: total number of files.
    :param clean: if True the reduce outputs will be deleted,
    by default takes the value of self.clean.
    :param sort: sort the outputs.
    :param decreasing: sort by decreasing order, high value
    to low value.
    :return output_join_list: a list of the outputs
    """
    # (1) reducer의 결과물들을 가져옴
    output_join_list = []
    for reducer_index in range(num_files):
        with open(self.get_output_file(reducer_index), "r") as f:
            output_join_list += json.load(f)
        if clean:
            os.unlink(self.get_output_file(reducer_index))
    # (2) 결과값들을 정렬
    if sort:
        output_join_list.sort(key=operator_ig(1), reverse=decreasing)
    # (3) 저장
    with open(self.get_output_join_file(self.output_dir), "w+") as f:
        json.dump(output_join_list, f)
    return output_join_list

### 1. 맵리듀스 프레임워크 잡기

In [6]:
class MapReduce(object):
    """MapReduce class representing the mapreduce model
    """
    
    def __init__(self, input_file_path, output_dir, 
                 n_mappers=4, n_reducers=4, clean=True):
        """
        
        :param input_file_path: input file의 위치
        :param output_dir: output file이 있는 디렉토리
        :param n_mappers: mapper thread의 갯수 
                          -> 원래는 입력 파일 / 스플릿 크기(블록크기) 로 지정
        :param n_reducers: reducer thread의 갯수
        :param clean: 참이면, 임시 파일들이 삭제
        """
        self.input_file_path = input_file_path
        self.output_dir = output_dir
        self.n_mappers = n_mappers
        self.n_reducers = n_reducers
        self.clean = clean
        
        # 처리할 파일을 읽어오고 저장할 파일 핸들러 가져오기
        self.file_handler = HDFSHandler(self.input_file_path, 
                                        self.output_dir)
        self.file_handler.split_file(self.n_mappers)

맵 리듀스는 하나의 프로그래밍 모델(디자인 패턴)! 매우 간단한 작업이지만, 그러한 작업이 엄청나게 많이 해야 할 때(example, word counts) 사용할 수 있는 Programming Model입니다. Map Reduce는 큰 묶음의 Job을 작은 단위로 쪼개고(split), 계산하고(map), 합치는(reduce) 것으로 구성된다.

이러한 작업은 "parallerl computing"이라는 이름으로 예전부터 있었는데, 분산 작업을 직접 구현하기에는 많은 어려움이 있었다.

1. 여러 개의 물리적 컴퓨팅 자원이 필요하고
2. 그 컴퓨터를 하나로 묶어 제어할 Software를 Install 해야 하며
3. 그 과정에서 생기는 수 많은 번거로운 작업들을 해야 하기 

때문이다.

보통 맵리듀스는 아래의 순서를 따른다.

![](https://t1.daumcdn.net/cfile/tistory/2136A84B59381A8428)

여기서 크게 2 단계(Map, Reduce)의 단계로 구성된다고 볼 수 있다. 우리는 위의 `__init__`에서 지정된 input_dir에 저장된 파일들을 읽어오고(*원래는 hdfs 포맷을 읽어와야 겠지만, 그거까지 구현은 무리데스네...*), output_dir에 저장하는 식으로 진행된다.



### 2. 맵퍼와 리듀서

기본적으로 맵리듀스에서의 입력과 출력은 `<key-value>`의 쌍으로 움직인다. 맵퍼와 리듀서 모두 `<key-value>` 를 쌍으로 받아서 처리한다.  

```python
input_split_file = open(settings.get_input_split_file(index), "r")
key = input_split_file.readline()
value = input_split_file.read()
input_split_file.close()
if(self.clean):
    os.unlink(settings.get_input_split_file(index))
mapper_result = self.mapper(key, value)
for reducer_index in range(self.n_reducers):
    temp_map_file = open(settings.get_temp_map_file(index, reducer_index), "w+")
    json.dump([(key, value) for (key, value) in mapper_result 
                                if self.check_position(key, reducer_index)]
                , temp_map_file)
    temp_map_file.close()
    
```

In [7]:
def mapper(self, key, value):
    """key-value를 읽어서 필터링하거나 다른 값으로 변환시켜주는 함수
    
    -> 실제로 사용자가 구현해야 하는 부분
    """
    #TODO: 여기를 구현해야함
    raise NotImplementedError

def run_mapper(self, ps_id):
    """ 구현된 mapper 메소드를 실행
    
    :param ps_id: 실행하고자 하는 process id
    """
    # (1) get the hdfs file
    with open(self.file_handler.get_input_split_file(ps_id), "r") as file:
        key = file.readline()
        value = file.read()
    # (2) [optional] if clean, remove temp file
    if(self.clean):
        os.unlink(self.file_handler.get_input_split_file(ps_id))
    # (3) get the result of the mapper
    mapper_result = self.mapper(key, value)
    for reducer_index in range(self.n_reducers):
        # (4) store the result to be used by the reducer        
        with open(self.file_handler.get_temp_map_file(ps_id, reducer_index), "w+") as file:
            json.dump([(key, value) for (key, value) in mapper_result 
                                        if self.check_position(key, reducer_index)]
                        , file)

MapReduce.mapper = mapper
MapReduce.run_mapper = run_mapper

```python




```

In [8]:
def reducer(self, key, values_list):
    """맵퍼를 통해 출력된 리스트에 새로운 Key를 기준으로 Aggregation하는 함수
    """
    raise NotImplementedError

def run_reducer(self, ps_id):
    """ 구현된 reducer 메소드를 실행
    
    :param ps_id: 실행하고자 하는 process id
    """
    key_values_map = {}
    for mapper_index in range(self.n_mappers):
        # load the results of the map
        with open(self.file_handler.get_temp_map_file(mapper_index, ps_id), "r") as file:
            mapper_results = json.load(file)
            for (key, value) in mapper_results:
                if not(key in key_values_map):
                    key_values_map[key] = []
                try:
                    key_values_map[key].append(value)
                except Exception as e:
                    print("Exception while inserting key: "+str(e))

        if self.clean:
            os.unlink(self.get_temp_map_file(mapper_index, ps_id))

    # for each key reduce the values
    key_value_list = []
    for key in key_values_map:
        key_value_list.append(self.reducer(key, key_values_map[key]))

    # store the results for this reducer 
    with open(self.file_handler.get_output_file(ps_id), "w+") as file:
        json.dump(key_value_list, file)

    
MapReduce.reducer = reducer
MapReduce.run_reducer = run_reducer

### 3. 잡 트래커

우리가 구현한 Mapper와 Reducer의 process를 만들고 관리하는 메소드가 필요하다. 이것이 잡 트래커. 여기에서는 따로 class혹은 method를 두지 않고 run이라는 이름으로 통칭하였다.

In [9]:
def run(self):
    """ Executes the map and reduce operations
    """
    # initialize mappers list
    map_workers = []
    # initialize reducers list
    rdc_workers = []
    
    # run the map step
    for thread_id in range(self.n_mappers):
        p = Process(target=self.run_mapper, args=(thread_id,))
        p.start()
        map_workers.append(p)
    [t.join() for t in map_workers]
    
    # run the reduce step
    for thread_id in range(self.n_reducers):
        p = Process(target=self.run_reducer, args=(thread_id,))
        p.start()
        map_workers.append(p)
    [t.join() for t in rdc_workers]
    
MapReduce.run = run

In [10]:
def join_outputs(self, clean=True,sort=True,decreasing=True):
    """Join all the reduce output files into a single output file.

    :param clean: if True the reduce outputs will be deleted, by default takes the value of self.clean
    :param sort: sort the outputs
    :param decreasing: sort by decreasing order, high value to low value

    """
    try:
        return self.file_handler.join_files(self.n_reducers, clean, sort, decreasing)
    except:
        print("Exception occured while joining: maybe the join has been performed already  -- "+str(e))
        return []

MapReduce.join_outputs = join_outputs

------
------

## WordCount

> WordCount의 예시!

In [11]:
class WordCount(MapReduce):
    def __init__(self, input_dir, output_dir, n_mappers, n_reducers):
        MapReduce.__init__(self,  input_dir, output_dir, n_mappers, n_reducers)

    def mapper(self, key, value):
        """Map function for the word count example
        Note: Each line needs to be separated into words, and each word
        needs to be converted to lower case.
        """
        results = []
        default_count = 1
        # seperate line into words
        for word in value.split():
            if self.is_valid_word(word):
                # lowercase words
                results.append((word.lower(), default_count))
        return results

    def is_valid_word(self, word):
        """Checks if the word is in the defined character range
        :param word: word to check
        """
        return all(64 < ord(character) < 128 for character in word)

    def reducer(self, key, values):
        """Reduce function implementation for the word count example
        Note: Each line needs to be separated into words, and each word
        needs to be converted to lower case.
        """
        wordcount = sum(value for value in values)
        return key, wordcount

In [12]:
word_count = WordCount("./input_files/file.ext", "./output_files/", 4,4)

In [13]:
word_count.run()

Process Process-1:
Traceback (most recent call last):
Process Process-2:
  File "/Users/ksj/anaconda3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Process Process-3:
  File "/Users/ksj/anaconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "<ipython-input-7-cb3514fefe8d>", line 26, in run_mapper
    json.dump([(key, value) for (key, value) in mapper_result
Traceback (most recent call last):
  File "/Users/ksj/anaconda3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "<ipython-input-7-cb3514fefe8d>", line 27, in <listcomp>
    if self.check_position(key, reducer_index)]
  File "/Users/ksj/anaconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/ksj/anaconda3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()