### Hadoop Streaming은 표준 입출력(stdio)를 제공하는 모든 언어 이용 가능
#### 두 가지 요소가 정의되어야 함
1. Map 기능이 정의된 실행 가능 Mapper 파일
2. Reduce 기능이 정의된 실행 가능 Reducer 파일

py hello.py?<br>
chmod755 hello.py<br> 
(then) ./hello.py 가능

## Python Map.py

In [None]:
# !/home/hadoop/python/py
import sys

for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '{0}\t{1}'.format(word, 1)

echo "hi hi hi bye bye hi" | mapper.py<br>
hi 1 <br>
hi 1 <br>
hi 1 <br>
bye 1 <br>
bye 1 <br>
hi 1 <br>

echo "hi hi hi bye bye hi" | mapper.py | sort -k 1<br>
bye 1 <br>
bye 1 <br>
hi 1 <br>
hi 1 <br>
hi 1 <br>
hi 1 <br>

## Python Reducer.py

In [None]:
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t',1)
    count = int(count)
    
    # 기준 단어와 같다면 +1
    if current_word == word: 
        current_count += count
    else:
        # 기존 단어가 null이 아니라면 M/R 결과 출력
        if current_word:
            print '{0}\t{1}'.format(current_word, current_count)
        else: # 새로운 기준 단어 설정
            current_count = count
            current_word = word
        
if current_word == word: #마지막 라인 처리용?
    print '{0}\t{1}'.format(current_word, current_count)

echo "hi hi hi bye bye hi" | ./mapper.py | sort -k 1 | ./reducer.py<br>
bye    2<br>
hi     2


### 조건
1. HadoopStreaming에서 mapper/reducer는 실행가능한 쉘로 지정되어야 한다.
    - [ok]Hadoop jar hadoop-streaming*.jar -mapper map.py -reducer reduce.py ...
2. Python 스크립트는 어디에서든 접근 가능하도록 디렉토리 PATH를 설정
    - hadoop jar hadoop-streaming-2.5.1.jar -input myInuptDirs -output myOutputDir -mapper /bin/cat -reducer /usr/bin/wc
    
    
- Hadoop x.x의 HadoopStreaming의 위치
    - $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-x.x.x.jar

#### 사용예

In [None]:
hadoop jar hadoop-streaming-2.5.1.jar \
-input alice -output wc_alice
-mapper mapper.py -reducer reducer.py \
-file mapper.py -file reducer.py

### mapper.py 수정

In [None]:
import sys
import re

for line in sys.stdin:
    line = line.strip()
    line = re.sub('[{=.#/?:$\'!,"}]','',line) # 정규표현식, 특수문자 제거
    words = line.split()
    for word in words:
        print('{0}\t{1}'.format(word, 1))

#### 후 다시 돌리면 특수문자 제거된 결과 나온다

In [None]:
hadoop jar hadoop-streaming-2.5.1.jar \
-input alice -output wc_alice2
-mapper mapper.py - reducer reducer.py \
-file mapper.py - file reducer.py

### 기타

Mapper.py

In [4]:
import sys

def mapper(text):
    print("text is:",text)
    after_map=[]
    for line in text.split():
        line = line.strip()
        words = line.split()
        for word in words:
            after_map.append((word,1))
            print('%s\t%s' % (word, 1))
            
    return after_map
        
"""
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print('%s\t%s' % (word, 1))
        
echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
"""

'\nimport sys\nfor line in sys.stdin:\n    line = line.strip()\n    words = line.split()\n    for word in words:\n        print(\'%s\t%s\' % (word, 1))\n        \necho "foo foo quux labs foo bar quux" | /home/hduser/mapper.py\n'

In [5]:
reducer.py

NameError: name 'reducer' is not defined

In [6]:
from operator import itemgetter
import sys

def reducer(after_mapper):

    current_word = None
    current_count = 0
    word = None 
    dic_result = {}    
    
    for line in after_mapper:
        word, count = line
        try:
            count = int(count)
        except valueError:
            continue

        if word in dic_result:
            dic_result[word] += 1
        else:
            dic_result[word] = 1

    return dic_result
        
"""
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None 
for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t',1)
    try:
        count = int(count)
    except valueError:
        continue
        
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print('%s\t%s' % (current_word, current_count))
            
        current_count = count
        current_word = word
        
    if current_word == word:
        print('%s\t%s') % (current_word, current_count)
        
echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
"""

'\nfrom operator import itemgetter\nimport sys\ncurrent_word = None\ncurrent_count = 0\nword = None \nfor line in sys.stdin:\n    line = line.strip()\n    word, count = line.split(\'\t\',1)\n    try:\n        count = int(count)\n    except valueError:\n        continue\n        \n    if current_word == word:\n        current_count += count\n    else:\n        if current_word:\n            print(\'%s\t%s\' % (current_word, current_count))\n            \n        current_count = count\n        current_word = word\n        \n    if current_word == word:\n        print(\'%s\t%s\') % (current_word, current_count)\n        \necho "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py\n'

In [7]:
text = "foo foo quux labs foo bar quux"
text

'foo foo quux labs foo bar quux'

In [8]:
after_map = mapper(text)

text is: foo foo quux labs foo bar quux
foo	1
foo	1
quux	1
labs	1
foo	1
bar	1
quux	1


In [10]:
result = reducer(after_map)
result

{'bar': 1, 'foo': 3, 'labs': 1, 'quux': 2}