In [None]:
%load_ext dockermagic

# MRJob

- https://github.com/Yelp/mrjob
- https://mrjob.readthedocs.io/en/stable/

## Local execution

In [None]:
%%dockerexec hadoop

# local install
pip3 install mrjob

In [None]:
%%dockerexec hadoop

mkdir -p /opt/src/mrjob

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/mrwordcount.py

import re
from mrjob.job import MRJob

WORD_RE = re.compile(r"[\w']+")

class MRWordCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            word = word.lower()
            yield word,1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)

if __name__ == '__main__':
    MRWordCount.run()


In [None]:
%%dockerexec hadoop

# download book "The History of Don Quixote by Miguel de Cervantes" from Gutenberg Project
wget -q -c http://www.gutenberg.org/files/996/996-0.txt -O donquixote.txt

In [None]:
%%dockerexec hadoop

# inline runner (default)
# python3 /opt/src/mrjob/mrwordcount.py donquixote.txt > donquixote-output.txt

# local runner
python3 /opt/src/mrjob/mrwordcount.py -r local donquixote.txt > donquixote-output.txt

# head output
head donquixote-output.txt

## Cluster execution

### Setup

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# install in all hadoop nodes
pdsh -w hadoop1,hadoop2,hadoop3 pip3 install mrjob

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# create directory in HDFS and send file
hdfs dfs -mkdir donquixote
hdfs dfs -put donquixote.txt donquixote

# run in hadoop
python3 /opt/src/mrjob/mrwordcount.py -r hadoop --output-dir donquixote-output hdfs:///user/hadoop/donquixote

In [None]:
%%dockerexec hadoop

source /opt/envvars.sh

# get output
hdfs dfs -getmerge donquixote-output donquixote-output.txt

# head output
head donquixote-output.txt

## MapReduce patterns

### Datasets

- weblog.csv
- books from Gutenberg project
- departments.csv and employees.csv

In [None]:
%%bash

# copy datasets used by mrjob examples to hadoop container
docker cp mrjobdataset.tgz hadoop:/opt/src/mrjob

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob
# unpack dataset
tar -zxf mrjobdataset.tgz

### 1. Count

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/1_count_weblog.py

#Total number of times each page is visited
from mrjob.job import MRJob

class MRURLCount(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract site
        url = request.split(' ')[1]

        #Emit url and 1
        yield url, 1

    def reducer(self, key, list_of_values):
        yield key,sum(list_of_values)

if __name__ == '__main__':
    MRURLCount.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 1_count_weblog.py weblog.csv 2> /dev/null | head

### 2. Max value

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/2_max_weblog.py

# Return most visited URL
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRURLMax(MRJob) :

    def mapper1(self, _, line) :
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract site
        url = request.split(' ')[1]

        #Emit url and 1
        yield url, 1

    def reducer1(self, key, list_of_values) :
        yield None, (sum(list_of_values), key)

    def reducer2(self, key, list_of_values) :
        yield max(list_of_values)

    def steps(self) :
        return [MRStep(mapper=self.mapper1, reducer=self.reducer1),
        MRStep(reducer=self.reducer2)]

if __name__ == '__main__' :
    MRURLMax.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 2_max_weblog.py weblog.csv 2> /dev/null

### 3. Average

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/3_average_weblog.py

#Average visit time
from mrjob.job import MRJob

class MRAvgVisitTime(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = float(data[4].strip())

        #Extract site
        url = request.split(' ')[1]

        #Emit url and visit time
        yield url, visit

    def reducer(self, key, list_of_values):
        count = 0
        total = 0.0
        for x in list_of_values:
            total = total + x
            count = count + 1

        avglen = ("%.2f" % (total/count))
        yield key,avglen

if __name__ == '__main__':
    MRAvgVisitTime.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 3_average_weblog.py weblog.csv 2> /dev/null | head

### 4. Top N

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/4_topn_weblog.py

#Top 3 visited pages
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRTopN(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract url
        url = request.split(' ')[1]

        #Emit url and 1
        yield url, 1

    def reducer1(self, key, list_of_values):
        total_count = sum(list_of_values)
        yield None, (total_count, key)

    def reducer2(self, _, list_of_values):
        N=3
        list_of_values = sorted(list(list_of_values), reverse=True)
        return list_of_values[:N]

    def steps(self):
        return [MRStep(mapper=self.mapper, reducer=self.reducer1),
        MRStep(reducer=self.reducer2)]

if __name__ == '__main__':
    MRTopN.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 4_topn_weblog.py weblog.csv 2> /dev/null

### 5. Filter

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/5_filter_weblog.py

#Filter accesses to "/login.php?value=fail" on Feb/2018
from mrjob.job import MRJob

class MRFilter(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract site
        url = request.split(' ')[1]

        #Extract month/year
        date = time[4:12]

        #Filter access to "/login.php?value=fail" on Feb/2018
        if url == "/login.php?value=fail" and date == "Feb/2018" :
            yield url, (time, ip, visit)

if __name__ == '__main__':
    MRFilter.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 5_filter_weblog.py weblog.csv 2> /dev/null

### 6. Distinct

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/6_distinct_weblog.py

#Distinct IPs
from mrjob.job import MRJob

class MRDistinct(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        yield ip, None

    def reducer(self, key, list_of_values) :
        yield key, None

if __name__ == '__main__':
    MRDistinct.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 6_distinct_weblog.py weblog.csv 2> /dev/null

### 7. Binning

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/7_binning_weblog.py

#Create bins for different status codes for 20/Feb/2018
from mrjob.job import MRJob

class MRBinning(MRJob):

    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()
        
        #Extract month/year
        date = time[1:12]

        #Filter accesses on 20/Feb/2018
        if date == "20/Feb/2018" :
            yield status, (time, request, ip)

    def reducer(self, key, list_of_values):
        yield key, (list(list_of_values))

if __name__ == '__main__':
    MRBinning.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 7_binning_weblog.py weblog.csv 2> /dev/null

### 8. Inverted index

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/8_invertedindex_books.py

#Inverted Index
from mrjob.job import MRJob
import os

class MRInvertedIndex(MRJob):

    def mapper(self, _, line):
        fileName = os.environ['mapreduce_map_input_file']

        words = line.split()
        for word in words:
            yield word, fileName

    def reducer(self, key, list_of_values):
        docs = set()
        for x in list_of_values :
            docs.add(x)
        yield key,list(docs)

if __name__ == '__main__':
    MRInvertedIndex.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 8_invertedindex_books.py books 2> /dev/null | head -n 40

### 9. Sort

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/9_sort_weblog.py

# Sort visit times in descending order
from mrjob.job import MRJob
class MRSortVisit(MRJob) :
    def mapper(self, _, line):
        #Split the line with comma separated fields
        data = line.split(',')

        #Parse line
        ip = data[0].strip()
        #Check if it's not the header line
        if ip == 'IP' : return
        time = data[1].strip()
        request = data[2].strip()
        status = data[3].strip()
        visit = data[4].strip()

        #Extract site
        url = request.split(' ')[1]

        yield None, (visit, (time, url, ip))

    def reducer(self, key, list_of_values):
        l = [(float(v), content) for v, content in list_of_values]
        l.sort(reverse=True)
        return l

if __name__ == '__main__':
    MRSortVisit.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 9_sort_weblog.py weblog.csv 2> /dev/null | head -n 20

### 10. Joins

#### InnerJoin

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/10_innerjoin_db.py

from mrjob.job import MRJob
import os

class MRInnerJoin(MRJob) :
    def mapper(self, _, line):
        data = line.split(',')

        filename = os.environ['mapreduce_map_input_file']

        if 'employees.csv' in filename :
            dep_no = data[2]
            yield dep_no, ('Employee', data)
        elif 'departments.csv' in filename:
            dep_no = data[0]
            yield dep_no, ('Department', data)

    def reducer(self, key, list_of_values) :
        values = list(list_of_values)
        employees = []
        departments = []
        for v in values:
            if v[0] == 'Employee' :
                employees.append(v)
            elif v[0] == 'Department' :
                departments.append(v)

        # Inner Join
        for e in employees :
            for d in departments :
                yield key, (e+d)

if __name__ == '__main__' :
    MRInnerJoin.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 10_innerjoin_db.py employees.csv departments.csv 2> /dev/null | head

#### LeftOuterJoin

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/11_leftouterjoin_db.py

from mrjob.job import MRJob
import os

class MRLeftOuterJoin(MRJob) :
    def mapper(self, _, line):
        data = line.split(',')

        filename = os.environ['mapreduce_map_input_file']

        if 'employees.csv' in filename :
            dep_no = data[2]
            yield dep_no, ('Employee', data)
        elif 'departments.csv' in filename:
            dep_no = data[0]
            yield dep_no, ('Department', data)

    def reducer(self, key, list_of_values) :
        # yield None, list(list_of_values)
        values = list(list_of_values)
        employees = []
        departments = []
        for v in values:
            if v[0] == 'Employee' :
                employees.append(v)
            elif v[0] == 'Department' :
                departments.append(v)

        # Left Outer Join
        for e in employees :
            if len(departments) > 0 :
                for d in departments :
                    yield key, (e+d)
            else :
                yield key, (e)

if __name__ == '__main__' :
    MRLeftOuterJoin.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 11_leftouterjoin_db.py employees.csv departments.csv 2> /dev/null | head

#### RightOuterJoin

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/12_rightouterjoin_db.py

from mrjob.job import MRJob
import os

class MRRightOuterJoin(MRJob) :
    def mapper(self, _, line):
        data = line.split(',')

        filename = os.environ['mapreduce_map_input_file']
        
        if 'employees.csv' in filename :
            dep_no = data[2]
            yield dep_no, ('Employee', data)
        elif 'departments.csv' in filename:
            dep_no = data[0]
            yield dep_no, ('Department', data)

    def reducer(self, key, list_of_values) :
        # yield None, list(list_of_values)
        values = list(list_of_values)
        employees = []
        departments = []
        for v in values:
            if v[0] == 'Employee' :
                employees.append(v)
            elif v[0] == 'Department' :
                departments.append(v)

        # Right Outer Join
        for d in departments :
            if len(employees) > 0 :
                for e in employees :
                    yield key, (e+d)
            else :
                yield key, (d)

if __name__ == '__main__' :
    MRRightOuterJoin.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 12_rightouterjoin_db.py employees.csv departments.csv 2> /dev/null | head

#### FullOuterJoin

In [None]:
%%dockerwrite hadoop /opt/src/mrjob/13_fullouterjoin_db.py

from mrjob.job import MRJob
import os

class MRFullOuterJoin(MRJob) :
    def mapper(self, _, line):
        data = line.split(',')

        filename = os.environ['mapreduce_map_input_file']

        if 'employees.csv' in filename :
            dep_no = data[2]
            yield dep_no, ('Employee', data)
        elif 'departments.csv' in filename:
            dep_no = data[0]
            yield dep_no, ('Department', data)

    def reducer(self, key, list_of_values) :
        values = list(list_of_values)
        employees = []
        departments = []
        for v in values:
            if v[0] == 'Employee' :
                employees.append(v)
            elif v[0] == 'Department' :
                departments.append(v)

        # Full Outer Join
        if len(employees) > 0 :
            for e in employees :
                if len(departments) > 0 :
                    for d in departments :
                        yield key, (e+d)
                else :
                    yield key, (e)
        else :
            yield None, (d)

if __name__ == '__main__' :
    MRFullOuterJoin.run()

In [None]:
%%dockerexec hadoop

cd /opt/src/mrjob

python3 13_fullouterjoin_db.py employees.csv departments.csv 2> /dev/null | head