<center><img src='https://netacad.centralesupelec.fr/img/cs.jpg' width=200></center>

<h6><center><b>Big data algorithms, techniques and platforms</b></center></h6>

<h1>
<hr style=" border:none; height:3px;">
<center>MapReduce programming in Python</center>
<hr style=" border:none; height:3px;">
</h1>


MapReduce programming in Python.

* You'll use **Python** as a programming language.

* You'll use the library **mrjob** that lets you write MapReduce jobs in Python.



---


In order to install **mrjob**, execute the following cell.


---



In [1]:
!pip install mrjob

Collecting mrjob
[?25l  Downloading https://files.pythonhosted.org/packages/8e/58/fc28ab743aba16e90736ad4e29694bd2adaf7b879376ff149306d50c4e90/mrjob-0.7.4-py2.py3-none-any.whl (439kB)
[K     |▊                               | 10kB 14.2MB/s eta 0:00:01[K     |█▌                              | 20kB 15.3MB/s eta 0:00:01[K     |██▎                             | 30kB 9.0MB/s eta 0:00:01[K     |███                             | 40kB 8.3MB/s eta 0:00:01[K     |███▊                            | 51kB 4.3MB/s eta 0:00:01[K     |████▌                           | 61kB 4.8MB/s eta 0:00:01[K     |█████▏                          | 71kB 4.9MB/s eta 0:00:01[K     |██████                          | 81kB 5.1MB/s eta 0:00:01[K     |██████▊                         | 92kB 5.6MB/s eta 0:00:01[K     |███████▌                        | 102kB 5.7MB/s eta 0:00:01[K     |████████▏                       | 112kB 5.7MB/s eta 0:00:01[K     |█████████                       | 122kB 5.7MB/s eta 0

## How to use *mrjob*

In this section, we show an example of how to use *mrjob* to implement MapReduce jobs.
For this purpose, we implement a MapReduce job to count the number of occurrences of each word in a text document.




---

**Read the code and the comments** in the following cell to understand how to write a MapReduce job in *mrjob*


---



---



In [2]:
# The following declaration 
# triggers the creation of a file named wordcount.py after the execution 
# of this cell.
%%file wordcount.py

# We import the class MRJob from the library.
from mrjob.job import MRJob

# We define a custom class that represents our 
# specific MapReduce job. 
# This class inherits from MRJob.
class WordCount(MRJob):

    # We define the map function (here called mapper).
    def mapper(self, _, line): 
        '''
        Parameters
        ----------
        self : this object 
          (just ignore it if you're not familiar with object-oriented languages)

        key: Map takes in a (key, value) pair. We don't use here the key (hence we have _).

        line: A line of the input file.
        '''

        # We loop over a line in the input file.
        for word in line.split():
            # For each word in line, we output (word, 1)
            # yield = return, with the only difference that it does not stop the for loop
            yield(word, 1)

    # We define the combine function, here called combiner.
    def combiner(self, word, values):
        '''
        Parameters
        ----------
         self: this object
         word: a word (the key)
         values: The list of values associated with the key (here [1, ...., 1])

        '''
        # Technically, values is an iterator.
        # We convert it into a list. This is useful if we want to iterate 
        # over the list many times.
        l = list(values)

        # We return (word, sum over the values)
        yield(word, sum(l))
  
    # We define the reduce function, here called reducer
    def reducer(self, word, counts):
        '''
        Parameters
        ----------
          self: this object.
          word: the key.
          counts: the list of the counts associated with the key.
        '''
        l = list(counts)
        yield(word, sum(l))

# The entry point of the program.
if __name__ == '__main__': 
    # Start the MapReduce job.
    WordCount.run()

Writing wordcount.py


We create a toy text file in order to try our implementation.



---
**Execute the following cell** to create file test.txt


---




In [3]:
%%file test.txt
CentraleSupélec (CS) is a prestigious French institute of 
research and higher education in engineering and science 
and a graduate school of Paris-Saclay University. 
It is a key founding member of the Paris-Saclay University, 
the TIME (Top Industrial Managers for Europe) network 
and also the CESAER association of European engineering schools.

Writing test.txt



---
**Execute the following cell** to run the MapReduce job on the file *test.txt*

---



In [4]:
!python wordcount.py -r local test.txt

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/wordcount.root.20210110.074759.230026
Running step 1 of 1...
job output is in /tmp/wordcount.root.20210110.074759.230026/output
Streaming final output from /tmp/wordcount.root.20210110.074759.230026/output...
"network"	1
"of"	4
"prestigious"	1
"research"	1
"school"	1
"schools."	1
"science"	1
"the"	3
"(CS)"	1
"(Top"	1
"CESAER"	1
"CentraleSup\u00e9lec"	1
"Europe)"	1
"European"	1
"French"	1
"Industrial"	1
"It"	1
"Managers"	1
"Paris-Saclay"	2
"TIME"	1
"University,"	1
"University."	1
"a"	3
"also"	1
"and"	4
"association"	1
"education"	1
"engineering"	2
"for"	1
"founding"	1
"graduate"	1
"higher"	1
"in"	1
"institute"	1
"is"	2
"key"	1
"member"	1
Removing temp directory /tmp/wordcount.root.20210110.074759.230026...




---



With this example you have all the ingredients to implement your own MapReduce jobs.

For more information and examples you can [look at the documentation.](https://mrjob.readthedocs.io/en/latest/guides/writing-mrjobs.html)



---



---



## Exercise 1: Counting even and odd numbers

The following file *numbers.txt* is given, where each line contains a list of integer numbers.



---
Execute the cell in order to create the file *numbers.txt*


---




In [5]:
%%file numbers.txt
1 2 3 3
3 4 4
5 6 7
9 8 7 4

Writing numbers.txt




---


**Implement a MapReduce job that counts the number of even and odd numbers in the file.**


---



In [6]:
%%file oddeven.py

from mrjob.job import MRJob

class OddEven(MRJob):
    def mapper(self, _, line): 
        for num in line.split():
            if int(num) % 2 == 0:
                yield("even", 1)
            else:
                yield("odd", 1)
  
    def reducer(self, type, counts):
        yield(type, sum(counts))

if __name__ == '__main__': 
    OddEven.run()

Writing oddeven.py




---
**Execute the following cell to test your implementation.**


---




In [7]:
!python oddeven.py -r local numbers.txt

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/oddeven.root.20210110.074838.251520
Running step 1 of 1...
job output is in /tmp/oddeven.root.20210110.074838.251520/output
Streaming final output from /tmp/oddeven.root.20210110.074838.251520/output...
"even"	6
"odd"	8
Removing temp directory /tmp/oddeven.root.20210110.074838.251520...


## Exercise 2 -- Computing the average

The following file *temperatures.txt* is given, where each line gives an average monthly temperature. Two years are recorded (1980 and 1981).


---
**Execute the following cell to create the file**


---



---




In [8]:
%%file temperatures.txt
1980,1,5
1980,2,2
1980,3,10
1980,4,14
1980,5,17
1980,6,22
1980,7,28
1980,8,30
1980,9,21
1980,10,15
1980,11,4
1980,12,1
1981,1,2
1981,2,1
1981,3,-3
1981,4,3
1981,5,10
1981,6,26
1981,7,20
1981,8,22
1981,9,28
1981,10,4
1981,11,-2
1981,12,-4

Writing temperatures.txt




---
**Implement a MapReduce job to get the average monthly temperature for each year.**



---




In [9]:
%%file temperatures.py

from mrjob.job import MRJob

class AvgTemperatures(MRJob):
    def mapper(self, _, line): 
        value = line.split(",")
        yield (value[0], float(value[2]))
  
    def combiner(self, year, temps):
        temps_l = list(temps)
        yield (year, (sum(temps_l), len(temps_l)) )

    def reducer(self, year, values):
        s = 0
        l = 0
        for v in values:
            s += v[0]
            l += v[1]
        yield (year, s/l)
    

if __name__ == '__main__': 
    AvgTemperatures.run()


Writing temperatures.py




---
**Execute the following cell to test your implementation.**


---




In [10]:
!python temperatures.py -r local temperatures.txt

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/temperatures.root.20210110.074853.482298
Running step 1 of 1...
job output is in /tmp/temperatures.root.20210110.074853.482298/output
Streaming final output from /tmp/temperatures.root.20210110.074853.482298/output...
"1980"	14.083333333333334
"1981"	8.916666666666666
Removing temp directory /tmp/temperatures.root.20210110.074853.482298...


## Exercise 3 - Inverted index

Suppose that we have a list of documents (e.g., books);
we want to create an **inverted index** that associates each word to the list of the documents the word occurs in.

As an input file, we use the file *books.json*. 
Each line of this file is a key-value pair, where the key is the name of a file (title of a book) and the value is the content of that file. 


---


**Execute the following cell to create the file books.json**


---



In [11]:
%%file books.json
["milton-paradise.txt", "[ Paradise Lost by John Milton 1667 ] Book I Of Man ' s first disobedience , and the fruit Of that forbidden tree whose mortal taste Brought death into the World , and all our woe , With loss of Eden , till one greater Man Restore us , and regain the blissful seat , Sing , Heavenly Muse , that , on the secret top Of Oreb , or of Sinai , didst inspire That shepherd who first taught the chosen seed In the beginning how the heavens and earth Rose out of Chaos : or , if Sion hill Delight thee more , and Siloa ' s brook that flowed Fast by the oracle of God , I thence Invoke thy aid to my adventurous song , That with no middle flight intends to soar Above th ' Aonian mount , while it pursues Things unattempted yet in prose or rhyme ."]
["edgeworth-parents.txt", "[ The Parent ' s Assistant , by Maria Edgeworth ] THE ORPHANS . Near the ruins of the castle of Rossmore , in Ireland , is a small cabin , in which there once lived a widow and her four children . As long as she was able to work , she was very industrious , and was accounted the best spinner in the parish ; but she overworked herself at last , and fell ill , so that she could not sit to her wheel as she used to do , and was obliged to give it up to her eldest daughter , Mary ."]
["austen-emma.txt", "[ Emma by Jane Austen 1816 ] VOLUME I CHAPTER I Emma Woodhouse , handsome , clever , and rich , with a comfortable home and happy disposition , seemed to unite some of the best blessings of existence ; and had lived nearly twenty - one years in the world with very little to distress or vex her . She was the youngest of the two daughters of a most affectionate , indulgent father ; and had , in consequence of her sister ' s marriage , been mistress of his house from a very early period . Her mother had died too long ago for her to have more than an indistinct remembrance of her caresses ; and her place had been supplied by an excellent woman as governess , who had fallen little short of a mother in affection ."]
["chesterton-ball.txt", "[ The Ball and The Cross by G . K . Chesterton 1909 ] I . A DISCUSSION SOMEWHAT IN THE AIR The flying ship of Professor Lucifer sang through the skies like a silver arrow ; the bleak white steel of it , gleaming in the bleak blue emptiness of the evening . That it was far above the earth was no expression for it ; to the two men in it , it seemed to be far above the stars . The professor had himself invented the flying machine , and had also invented nearly everything in it ."]
["bible-kjv.txt", "[ The King James Bible ] The Old Testament of the King James Bible The First Book of Moses : Called Genesis 1 : 1 In the beginning God created the heaven and the earth . 1 : 2 And the earth was without form , and void ; and darkness was upon the face of the deep . And the Spirit of God moved upon the face of the waters . 1 : 3 And God said , Let there be light : and there was light . 1 : 4 And God saw the light , that it was good : and God divided the light from the darkness . 1 : 5 And God called the light Day , and the darkness he called Night . And the evening and the morning were the first day ."]
["chesterton-thursday.txt", "[ The Man Who Was Thursday by G . K . Chesterton 1908 ] To Edmund Clerihew Bentley A cloud was on the mind of men , and wailing went the weather , Yea , a sick cloud upon the soul when we were boys together . Science announced nonentity and art admired decay ; The world was old and ended : but you and I were gay ; Round us in antic order their crippled vices came -- Lust that had lost its laughter , fear that had lost its shame . Like the white lock of Whistler , that lit our aimless gloom , Men showed their own white feather as proudly as a plume . Life was a fly that faded , and death a drone that stung ; The world was very old indeed when you and I were young ."]
["blake-poems.txt", "[ Poems by William Blake 1789 ] SONGS OF INNOCENCE AND OF EXPERIENCE and THE BOOK of THEL SONGS OF INNOCENCE INTRODUCTION Piping down the valleys wild , Piping songs of pleasant glee , On a cloud I saw a child , And he laughing said to me : \" Pipe a song about a Lamb !\" So I piped with merry cheer . \" Piper , pipe that song again ;\" So I piped : he wept to hear . \" Drop thy pipe , thy happy pipe ; Sing thy songs of happy cheer :!\" So I sang the same again , While he wept with joy to hear . \" Piper , sit thee down and write In a book , that all may read .\" So he vanish ' d from my sight ; And I pluck ' d a hollow reed , And I made a rural pen , And I stain ' d the water clear , And I wrote my happy songs Every child may joy to hear ."]
["shakespeare-caesar.txt", "[ The Tragedie of Julius Caesar by William Shakespeare 1599 ] Actus Primus . Scoena Prima . Enter Flauius , Murellus , and certaine Commoners ouer the Stage . Flauius . Hence : home you idle Creatures , get you home : Is this a Holiday ? What , know you not ( Being Mechanicall ) you ought not walke Vpon a labouring day , without the signe Of your Profession ? Speake , what Trade art thou ? Car . Why Sir , a Carpenter Mur . Where is thy Leather Apron , and thy Rule ? What dost thou with thy best Apparrell on ? You sir , what Trade are you ? Cobl . Truely Sir , in respect of a fine Workman , I am but as you would say , a Cobler Mur . But what Trade art thou ? Answer me directly Cob . A Trade Sir , that I hope I may vse , with a safe Conscience , which is indeed Sir , a Mender of bad soules Fla ."]
["whitman-leaves.txt", "[ Leaves of Grass by Walt Whitman 1855 ] Come , said my soul , Such verses for my Body let us write , ( for we are one ,) That should I after return , Or , long , long hence , in other spheres , There to some group of mates the chants resuming , ( Tallying Earth ' s soil , trees , winds , tumultuous waves ,) Ever with pleas ' d smile I may keep on , Ever and ever yet the verses owning -- as , first , I here and now Signing for Soul and Body , set to them my name , Walt Whitman [ BOOK I . INSCRIPTIONS ] } One ' s - Self I Sing One ' s - self I sing , a simple separate person , Yet utter the word Democratic , the word En - Masse ."]
["melville-moby_dick.txt", "[ Moby Dick by Herman Melville 1851 ] ETYMOLOGY . ( Supplied by a Late Consumptive Usher to a Grammar School ) The pale Usher -- threadbare in coat , heart , body , and brain ; I see him now . He was ever dusting his old lexicons and grammars , with a queer handkerchief , mockingly embellished with all the gay flags of all the known nations of the world . He loved to dust his old grammars ; it somehow mildly reminded him of his mortality ."]

Writing books.json




---


**Write a MapReduce job to create an inverted index.**


---



**Hint.** Given a line in this file, you can use the instruction *json.loads(line)* to obtain a key-value pair, where the key is the name of a document (e.g., milton-paradise.txt) and the value is the content of the document. 

In [12]:
%%file inverted_index.py

from mrjob.job import MRJob
import json

import re

regex = re.compile('[^a-zA-Z ]')
stopwords = ["i","me","my","myself","we","our","ours","ourselves","you","your","yours","yourself","yourselves","he","him","his","himself","she","her","hers","herself","it","its","itself","they","them","their","theirs","themselves","what","which","who","whom","this","that","these","those","am","is","are","was","were","be","been","being","have","has","had","having","do","does","did","doing","a","an","the","and","but","if","or","because","as","until","while","of","at","by","for","with","about","against","between","into","through","during","before","after","above","below","to","from","up","down","in","out","on","off","over","under","again","further","then","once","here","there","when","where","why","how","all","any","both","each","few","more","most","other","some","such","no","nor","not","only","own","same","so","than","too","very","s","t","can","will","just","don","should","now","like","upon","would","through","yet","still","thou","may","could","never","almost","ever","even","might","among","without","let"]

def preprocess(word):
    word = regex.sub('', word)
    if len(word) == 0:
        return None
    word = word.lower()
    if word in stopwords:
        return None
    return word

class InvertedIndex(MRJob):
    def mapper(self, _, line): 
        record = json.loads(line)
        for word in record[1].split():
            word = preprocess(word)
            if word is not None:
                yield(word, record[0])
  
    def reducer(self, word, files):
        files_nodup = list(set(files))
        yield(word, files_nodup)

if __name__ == '__main__': 
    InvertedIndex.run()

Writing inverted_index.py


---
**Execute the following cell to test your implementation.**


---


In [13]:
!python inverted_index.py -r local books.json

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/inverted_index.root.20210110.080310.691190
Running step 1 of 1...
job output is in /tmp/inverted_index.root.20210110.080310.691190/output
Streaming final output from /tmp/inverted_index.root.20210110.080310.691190/output...
"signing"	["whitman-leaves.txt"]
"siloa"	["milton-paradise.txt"]
"silver"	["chesterton-ball.txt"]
"simple"	["whitman-leaves.txt"]
"sinai"	["milton-paradise.txt"]
"sing"	["whitman-leaves.txt", "milton-paradise.txt", "blake-poems.txt"]
"sion"	["milton-paradise.txt"]
"sir"	["shakespeare-caesar.txt"]
"sister"	["austen-emma.txt"]
"sit"	["edgeworth-parents.txt", "blake-poems.txt"]
"skies"	["chesterton-ball.txt"]
"small"	["edgeworth-parents.txt"]
"smile"	["whitman-leaves.txt"]
"soar"	["milton-paradise.txt"]
"soil"	["whitman-leaves.txt"]
"somehow"	["melville-moby_dick.txt"]
"somewhat"	["chesterton-ball.txt"]
"song"	["milton-paradise.txt", "blake-poems.txt"

## Exercise 4 -- Matrix multiplication

We have two matrices $A$ ($n$ rows and $m$ columns) 
and $B$ ($m$ rows and $p$ columns). 
The two matrices are stored in a text file; each line contains: 

* the identifier of the matrix.
* a row index.
* the values of a row in the matrix.

---


**Execute the following cell to create the file matrices.txt**


---

In [14]:
%%file matrices.txt
A,0,1,2,4
A,1,2,3,5
A,2,4,3,2
B,0,4,2,2,5
B,1,1,3,3,2
B,2,4,4,0,3

Writing matrices.txt


---


**Write a MapReduce job to multiply matrices.**


---



**Hint.** We need two iterations of MapReduce. 

In [15]:
%%file matrix_multiplication.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class MatrixMultiplication(MRJob):
    def first_mapper(self, _, line):
    items = line.split(",")
    matrix = items[0]
    row = int(items[1])
    values = items[2:]
    col = 0
    for v in values:
        if matrix == "A":
            yield(col, ("A", row, int(v)))
        else:
            yield(row, ("B", col, int(v)))
        col += 1

    def first_reducer(self, j, values):
        Avalues = []
        Bvalues = []
        for (matrix, coord, value) in values:
            if matrix == "A":
                Avalues.append((coord, value))
            else: 
                Bvalues.append((coord, value))
        for (i, aij) in Avalues:
            for (k, bjk) in Bvalues:
                yield((i, k), aij*bjk)

    def second_reducer(self, coords, values):
        yield(coords, sum(values))

    def steps(self):
        return [
          MRStep(mapper=self.first_mapper,
                  reducer=self.first_reducer),
          MRStep(reducer=self.second_reducer)
        ]
    
if __name__ == '__main__':
    MatrixMultiplication.run()

Writing matrix_multiplication.py


---
**Execute the following cell to test your implementation.**


---


In [16]:
!python matrix_multiplication.py -r local matrices.txt

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/matrix_multiplication.root.20210110.093852.918404
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/matrix_multiplication.root.20210110.093852.918404/output
Streaming final output from /tmp/matrix_multiplication.root.20210110.093852.918404/output...
[2, 1]	25
[2, 2]	17
[2, 3]	32
[0, 0]	22
[0, 1]	24
[0, 2]	8
[0, 3]	21
[1, 0]	31
[1, 1]	33
[1, 2]	13
[1, 3]	31
[2, 0]	27
Removing temp directory /tmp/matrix_multiplication.root.20210110.093852.918404...
