<a href="https://colab.research.google.com/github/Rei-L0/4-1-BigData/blob/main/02_MapReduce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Map Reduce

본 코드의 목적은 map-reduce 과정을 사용하여 word count 응용을 만드는 것입니다.
Java 버전은 여기에서 확인할 수 있습니다. [this page](https://www.dezyre.com/hadoop-tutorial/hadoop-mapreduce-wordcount-tutorial)

![domain decomposition](images/domain_decomp.png)

credits: https://computing.llnl.gov/tutorials/parallel_comp

## `map` function example

`map(func, seq)` Python 함수는 함수 `func`를 시퀀스(sequence) `seq`의 모든 요소에 적용합니다.그리고 `func`에 의해 변화된 요소와 함께 새로운 리스트(list)를 반환합니다

In [7]:
def f(x):
    return x * x

rdd = [2, 6, -3, 7]
res = map(f, rdd )
res  # Res is an iterator

<map at 0x7f06dca61820>

In [None]:
print(*res)

4 36 9 49


In [6]:
from operator import mul
rdd1, rdd2 = [2, 6, -3, 7], [1, -4, 5, 3]
res = map(mul, rdd1, rdd2 ) # element wise sum of rdd1 and rdd2 

In [None]:
print(*res)

2 -24 -15 21


![MapReduce](images/mapreduce.jpg)

## `functools.reduce` example

 `reduce(func, seq)` 함수는 시퀀스 `seq`에 함수 `func`를 연속적으로 적용한다. 예로, reduce(f, [1, 2, 3, 4, 5]) 는 f(f(f(f(1,2),3),4),5)로 동작한다.

In [5]:
from functools import reduce
from operator import add
rdd = list(range(1,6))
reduce(add, rdd) # computes ((((1+2)+3)+4)+5)

15

## 가중치 평균과 분산(Weighted mean and Variance)

만약 임의 변수의 생성자

임의변수 X의 생성자가 확률 질량 함수(probability mass function) $x_1 \mapsto p_1, x_2 \mapsto p_2, \ldots, x_n \mapsto p_n$ 을 가지는 이산 값이면, 다음 수식과 같이 정의된다.

$$\operatorname{Var}(X) = \left(\sum_{i=1}^n p_i x_i ^2\right) - \mu^2,$$

여기서, $\mu$ 평균값이다, i.e.

$$\mu = \sum_{i=1}^n p_i x_i. $$

### Exercise 2.1

반복문(for loops)를 사용하여 평균 값과 분산값을 계산하는 함수를 작성하시오

In [24]:
X = [5, 1, 2, 3, 1, 2, 5, 4]
P = [0.05, 0.05, 0.15, 0.05, 0.15, 0.2, 0.1, 0.25]

avg=0
var=0

# 평균 구하기
for i in range(len(X)):
  avg+=X[i]*P[i]

# 분산 구하기
for i in range(len(X)):
  var+=((X[i]-avg)**2)*P[i]

print(avg)
print(var)

2.8
1.9600000000000002


### Exercise 2.2

`map` 과 `reduce` 사용하여 평균 값과 분산값을 계산하는 함수를 작성하시오

* Exercise 는 map-reduce 과정을 이해하는데 사용하는 것이며, 실제로 분산을 구하는 것은 [Numpy](http://www.numpy.org) 라이브러리를 사용해야합니다*

In [29]:
from operator import mul,add

avg=reduce(add,list(map(mul,X,P)))
avg_l=[avg]*len(X)
var=reduce(add,list(map(mul,map(lambda x,y:(x-y)**2,X,avg_l),P)))

print(avg)
print(var)

2.8
1.9600000000000002


## Wordcount 


우리는 `wordcount` 응용을 map-reduce 과정과 함께 수정할 것입니다.
여기서, `map`은 text files을 입력으로 취하고 words들로 분리합니다. 그리고  `reduce` 과정은 각 words의 등장 숫자를 합(sum)하여, key/value 쌍형태로 내보냅니다.

우리는 다음 예제 [Hadoop documentation](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html#Example:_WordCount_v1.0) Python에서 구현하고자 합니다.

## Map - Read file and return a key/value pairs

### Exercise 2.3

`mapper` 함수를 작성하시오. 이 함수는 하나의 file name을 입력으로 취하고, (word, 1) 튜플(tuples)의 정렬된 시퀀스를 반환합니다.
Write a function `mapper` with a single file name as input that returns a sorted sequence of tuples (word, 1) values.

```pybt
mapper('sample.txt')
[('adipisci', 1), ('adipisci', 1), ('adipisci', 1), ('adipisci', 1), ('adipisci', 1), ('adipisci', 1), ('adipisci', 1), ('aliquam', 1), ('aliquam', 1), ('aliquam', 1), ('aliquam', 1), ('aliquam', 1), ('aliquam', 1), ('aliquam', 1), ('amet', 1), ('amet', 1), ('amet', 1)...
```

In [None]:
def mapper(filename):
  f=open(filename,'r')
  words=f.read().split()

## Partition

### Exercise 2.4

`partitioner` 함수를 작성하시오. 이 함수는 mapper로부터 key/value 쌍을 저장하고 mapper는 (word,1)을 리스트 형태로 그룹화 합니다

```python
partitioner(mapper('sample.txt'))
[('adipisci', [1, 1, 1, 1, 1, 1, 1]), ('aliquam', [1, 1, 1, 1, 1, 1, 1]), ('amet', [1, 1, 1, 1],...]
```

## Reduce - Sums the counts and returns a single key/value (word, sum).

### Exercise 2.5

`reducer` 함수를 작성하시오. 이 함수는 `(word,[1,1,1,..,1])` 튜플을 읽어서 word의 발생을 합(sum)하고,  튜플 (word, 발생 횟수)를 출력한다.

```python
reducer(('hello',[1,1,1,1,1])
('hello',5)
```

## Process several files

다음 예제는 8개의 파일 `sample[00-07].txt`을 만들고. 가장 자주 등장하는 단어에 대한 파일을 출력의 맨 위에 설정한다.

In [None]:
from lorem import text
for i in range(1):
    with open("sample{0:02d}.txt".format(i), "w") as f:
        f.write(text())

In [None]:
import glob
files = sorted(glob.glob('sample0*.txt'))
files

### Exercise 2.6

파일과 파티션된 데이터 위의 반복문을 사용하여 (word, occurrences)를 세는 함수를 구현하시오

### Exercise 2.7

이번에는, `my_map_reduce` 함수를 작성하시오. 이 함수는 mapper, partitioner, 그리고 reducer를 포함한다.

```python
 my_map_reduce(files)
 [('hello',5), ('nice',3), ..., ]
```

