# Introduction to  Map Reduce

**We leave setting up hadoop as learning exercise.** 
### Software Requirements 
#### Compiler Distribuitions Used-
* GNU Compilier Tools for C and C++ (Use MinGW or Cygwin  for windows )
* Annaconda Python 2
* OpenJava-jdk 7 (Recommened for Hadoop)

 
#### Hadoop Version 2.7.2


#### MapReduce Frame Works-
* MRjob for Python
* Hadoop Core for Java (Recommened for Performance)
* (MR4C), an open source framework that allows you to run native code in Hadoop. 

<h2>Why to use Mapreduce ? </h2>
### Using the Map Reduce you can work with a big file of 1 Tera Byte which is distributed across multiple computers called as nodes of a cluster.

#### Setting up python 
#### Installing mrjob using pip

In [9]:
!python --version

Python 2.7.12 :: Anaconda 4.1.1 (64-bit)


In [10]:
!pip install mrjob

Collecting mrjob
  Downloading mrjob-0.5.6-py2.py3-none-any.whl (285kB)
[K    100% |████████████████████████████████| 286kB 607kB/s 
Collecting google-api-python-client>=1.5.0 (from mrjob)
  Downloading google_api_python_client-1.5.3-py2.py3-none-any.whl (50kB)
[K    100% |████████████████████████████████| 51kB 872kB/s 
[?25hCollecting filechunkio (from mrjob)
  Downloading filechunkio-1.8.tar.gz
Collecting httplib2<1,>=0.8 (from google-api-python-client>=1.5.0->mrjob)
  Downloading httplib2-0.9.2.zip (210kB)
[K    100% |████████████████████████████████| 215kB 592kB/s 
Collecting uritemplate<1,>=0.6 (from google-api-python-client>=1.5.0->mrjob)
  Downloading uritemplate-0.6.tar.gz
Collecting oauth2client<4.0.0,>=1.5.0 (from google-api-python-client>=1.5.0->mrjob)
  Downloading oauth2client-3.0.0.tar.gz (77kB)
[K    100% |████████████████████████████████| 81kB 643kB/s 
[?25hCollecting simplejson>=2.5.0 (from uritemplate<1,>=0.6->google-api-python-client>=1.5.0->mrjob)
  Downloadin

## Example For counting the number of words,lines and characters in a file.

1. We Change our directory to the home directory of Hadoop.

3. Then we place a random text file in the directory or you can own random genertor to generate words in a file.
    
3. Then create a file wordcount.py to count the words in a file.

4. After that we run the program in our notebook. You can also run it using commandline.

We can create or overwrite a file using %%file macro in notebook.
* We are creating a c program to generate file with many words

In [1]:
%%file wordgenerator.c
#include <cstdio>
#include <cstdlib>
using namespace std;
int main()
{
	int n=100000;
	for(int i=0;i<n;i++)
	{
		int len=1+rand()%20;
		for(int j=0;j<len;j++)
		{	char c='A'+rand()%26;
			putchar(c);
		}
		putchar('\n');
	}
	return 0;
}

Overwriting wordgenerator.c


You can execute commandline commands using ! which tells the python interrpreter to execute the command in console.
* We compile the C program and execute it.
* The output redirection operator > is used to redirect output from stdout to a file stream words2.txt

In [5]:
!g++ wordgenerator.c -o wordgenerator
!./wordgenerator > words2.txt
!echo "Succesfully created"

Succesfully created


## First program in Python using Mapreduce
This programs counts the number of lines,words and characters.

In [3]:
%%file wordcount.py
from mrjob.job import MRJob
class Wordcount(MRJob):
    def mapper(self,key,line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
    
    def reducer(self,key,values):
        yield key, sum(values)
        
if __name__ == '__main__':
    Wordcount.run()

Overwriting wordcount.py


In [6]:
!python wordcount.py words2.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/wordcount.root.20161014.132712.602063
Running step 1 of 1...
Streaming final output from /tmp/wordcount.root.20161014.132712.602063/output...
"chars"	1052079
"lines"	100000
"words"	100000
Removing temp directory /tmp/wordcount.root.20161014.132712.602063...


## The Classic MapReduce job: count the frequency of words.


<img src="MapReduce_Example.jpg"/>

In [7]:
%%file wordfrequency.py
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
     MRWordFreqCount.run()

Overwriting wordfrequency.py


In [6]:
!python wordcounts.py words2.txt > wordcount.txt

Using configs in /etc/mrjob.conf
Creating temp directory /root/wordcounts.root.20161017.094907.436525
Running step 1 of 1...
Streaming final output from /root/wordcounts.root.20161017.094907.436525/output...
Removing temp directory /root/wordcounts.root.20161017.094907.436525...


# Example 2:
Given a fake social networking site data with the following schema:
<table>
<tr><td>User Id</td><td>Name</td><td>Age</td><td>Number of Friends</td></tr>
</table>
in the file fakefriends.csv.
We need to find the average number of friends a person has by age ?
<img src="agefriend.jpg"/>

In [3]:
%%file agefriends.py
from mrjob.job import MRJob

class MRFriendsByAge(MRJob):

    def mapper(self, _, line):
        (ID, name, age, numFriends) = line.split(',')
        yield age, float(numFriends)

    def reducer(self, age, numFriends):
        total = 0
        numElements = 0
        for x in numFriends:
            total += x
            numElements += 1
            
        yield age, total / numElements


if __name__ == '__main__':
    MRFriendsByAge.run()

Overwriting agefriends.py


In [None]:
!python agefriends.py fakefriends.csv

## Example 3:Temperature Game

We take real weather Data take from the year 1800.
<table>
<tr><td>Id Weather Station</td><td>year-month-day</td><td>Property</td><td>Value</td><td>Other fields </td></tr>
</table>

Find the Minimium Temperature for each Weather Station ?
<img src="temp.jpg" />

In [4]:
%%file minTemprature.py
from mrjob.job import MRJob

class MRMinTemperature(MRJob):

    def MakeFahrenheit(self, tenthsOfCelsius):
        celsius = float(tenthsOfCelsius) / 10.0
        fahrenheit = celsius * 1.8 + 32.0
        return fahrenheit

    def mapper(self, _, line):
        (location, date, type, data, x, y, z, w) = line.split(',')
        if (type == 'TMIN'):
            temperature = self.MakeFahrenheit(data)
            yield location, temperature

    def reducer(self, location, temps):
        yield location, min(temps)


if __name__ == '__main__':
    MRMinTemperature.run()

Writing minTemprature.py


In [None]:
!python minTemprature.py 1800.csv

In [5]:
%%file maxTemprature.py
from mrjob.job import MRJob

class MRMaxTemperature(MRJob):
    
    def MakeFahrenheit(self, tenthsOfCelsius):
        celsius = float(tenthsOfCelsius) / 10.0
        fahrenheit = celsius * 1.8 + 32.0
        return fahrenheit

    def mapper(self, _, line):
        (location, date, type, data, x, y, z, w) = line.split(',')
        if (type == 'TMAX'):
            temperature = self.MakeFahrenheit(data)
            yield location, temperature

    def reducer(self, location, temps):
        yield location, max(temps)


if __name__ == '__main__':
    MRMaxTemperature.run()
    

Writing maxTemprature.py


In [None]:
!python maxTemprature.py 1800.csv

<h1>Map Reduce Explanation</h1>
<img src="explainmapreduce.jpg" />

<img src="MapReduce_Architecture.jpg" />

## Example 4: Revisting Word Frequency
Let us try to display the word counts order by increaseing Frequency

In [11]:
%%file wordsortedbyfreq.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
# RE stands for Regular Expression
WORD_REGEXP = re.compile(r"[\w']+")

class MRWordFrequencyCount(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   reducer=self.reducer_count_words),
            MRStep(mapper=self.mapper_make_counts_key,
                   reducer = self.reducer_output_words)
        ]
        
    def mapper_get_words(self, _, line):
        words = WORD_REGEXP.findall(line)
        for word in words:
            yield word.lower(), 1

    def reducer_count_words(self, word, values):
        yield word, sum(values)
        
    def mapper_make_counts_key(self, word, count):
        yield '%04d'%int(count), word
        
    def reducer_output_words(self, count, words):
        for word in words:
            yield count, word


if __name__ == '__main__':
    MRWordFrequencyCount.run()
    

Overwriting wordsortedbyfreq.py


In [10]:
!python wordsortedbyfreq.py book.txt

"0001"	"0"
"0001"	"05"
"0001"	"07"
"0001"	"1000"
"0001"	"104"
"0001"	"1099"
"0001"	"1124"
"0001"	"12"
"0001"	"125"
"0001"	"13"
"0001"	"14"
"0001"	"15"
"0001"	"150"
"0001"	"17"
"0001"	"18"
"0001"	"19"
"0001"	"2006"
"0001"	"20081"
"0001"	"2009"
"0001"	"212"
"0001"	"28"
"0001"	"312"
"0001"	"360"
"0001"	"401"
"0001"	"43"
"0001"	"47"
"0001"	"49"
"0001"	"500"
"0001"	"60"
"0001"	"65"
"0001"	"68"
"0001"	"70"
"0001"	"800"
"0001"	"82"
"0001"	"85"
"0001"	"93"
"0001"	"abandon"
"0001"	"abandoned"
"0001"	"abbreviation"
"0001"	"absolutely"
"0001"	"absorbed"
"0001"	"abstract"
"0001"	"accelerator"
"0001"	"accepting"
"0001"	"accompanies"
"0001"	"accompany"
"0001"	"accomplishment"
"0001"	"accomplishments"
"0001"	"accountant's"
"0001"	"achievable"
"0001"	"achievement"
"0001"	"achieves"
"0001"	"achieving"
"0001"	"acknowledge"
"0001"	"acqui"
"0001"	"acquirers"
"0001"	"acted"
"0001"	"adapted"
"0001"	"adaptive"
"0001"	"added"
"0001"	"addictive"
"0001"	"admit"
"0001"	"admittedly"
"0001"	"admitting"
"0001"	"ado

No configs found; falling back on auto-configuration
Creating temp directory c:\users\madank~1\appdata\local\temp\wordsortedbyfreq.MADAN KAPOOR.20161017.164152.247000
Running step 1 of 2...
Running step 2 of 2...
Streaming final output from c:\users\madank~1\appdata\local\temp\wordsortedbyfreq.MADAN KAPOOR.20161017.164152.247000\output...
Removing temp directory c:\users\madank~1\appdata\local\temp\wordsortedbyfreq.MADAN KAPOOR.20161017.164152.247000...


# Thinking in Map Reduce Paradigim

Let us consider a ecommerce Website.
We need to find out how much each customer spends.
Given a input file customer-orders.csv.
<img src="cust.jpg"/>


<img src="think.jpg"/>

In [13]:
%%file Custorder.py
from mrjob.job import MRJob

class SpendByCustomer(MRJob):

    def mapper(self, _, line):
        (customerID, itemID, orderAmount) = line.split(',')
        yield customerID, float(orderAmount)

    def reducer(self, customerID, orders):
        yield customerID, sum(orders)


if __name__ == '__main__':
    SpendByCustomer.run()

Writing Custorder.py


In [None]:
!python Custorder.py customer-orders.csv

**Assignment Starts:Your Task**

* Implement the code for the given problems .
* We will be giving you some hints.
* As python is very similiar to pseudo codes, we will give the solutions for problems in python.(Later On)
* Please refer to docs and contact me for any help.




## Problem 1

The nth term of the sequence of triangle numbers is given by, $tn = ½n(n+1)$; so the first ten triangle numbers are:

$$1, 3, 6, 10, 15, 21, 28, 36, 45, 55, ...$$

By converting each letter in a word to a number corresponding to its alphabetical position and adding these values we form a word value. For example, the word value for SKY is $19 + 11 + 25 = 55 = t10 $. If the word value is a triangle number then we shall call the word a triangle word.

Using words.txt a file with words in each line, find out how many are triangle words are there in the file?

## Brute Force Approach for Problem 1

In [32]:
%%file Trianglewordcount.py
from mrjob.job import MRJob

TriangleNumbers={ n*(n+1)/2:True for n in range(1,10000)}

def trianglewords(words):
    count=0
    for word in words:
        t=0
        for char in word:
            if ord(char)<= ord('Z') and  ord(char)>= ord('A'):
                t+=ord(char)-ord('A')+1 # Ord gives the ascii value of a character
        default = False
        if TriangleNumbers.get(t, default):
            count+=1
    return count

class TriangleWordcount(MRJob):
    def mapper(self,key,line):
        yield "TraingleWords Count",trianglewords(line.split())
    
    def reducer(self,key,values):
        yield key, sum(values)

if __name__ == '__main__':
    TriangleWordcount.run()

Overwriting Trianglewordcount.py


In [33]:
!python Trianglewordcount.py words2.txt

Using configs in /etc/mrjob.conf
Creating temp directory /root/Trianglewordcount.root.20161014.143816.990840
Running step 1 of 1...
Streaming final output from /root/Trianglewordcount.root.20161014.143816.990840/output...
"TraingleWords Count"	7615
Removing temp directory /root/Trianglewordcount.root.20161014.143816.990840...


In [34]:
!python Trianglewordcount.py words3.txt

Using configs in /etc/mrjob.conf
Creating temp directory /root/Trianglewordcount.root.20161014.143821.545101
Running step 1 of 1...
Streaming final output from /root/Trianglewordcount.root.20161014.143821.545101/output...
"TraingleWords Count"	162
Removing temp directory /root/Trianglewordcount.root.20161014.143821.545101...



** What to do for Problem 1? **
> Try to implement a Function which solves a quadratic Equation.
> $$ n^2 + n - 2t = 0 $$
> and check if it has a proper solution.
> Use function in the mapper.

**Questions to be Answered After Solving the Problem**
* How does the yield keyword differ from return ?
* Can every problem be reduced to mapreduce Paradigim ?
* What is the role of steps, streamming and the creation of temp directory ?
* How to configure MRjob ?




## Problem 2
We need to sort a file by lines and store it in another file.
Use the program given to generate a file with random words.


Tutorials to refer on
* https://www.dezyre.com/hadoop-tutorial/hadoop-mapreduce-tutorial-
* http://blog.ditullio.fr/2015/12/18/hadoop-basics-working-with-sequence-files/

## Problem 3
Triangle, pentagonal, and hexagonal numbers are generated by the following formulae:
* Triangle 	  	$Tn=n(n+1)/2$ 	  	$1, 3, 6, 10, 15, ...$
* Pentagonal 	  	$Pn=n(3n−1)/2$ 	  	$1, 5, 12, 22, 35, ...$
* Hexagonal 	  	$Hn=n(2n−1)$ 	  	$1, 6, 15, 28, 45, ...$

By converting each letter in a word to a number corresponding to its alphabetical position and adding these values we form a word value. For example, the word value for SKY is $19 + 11 + 25 = 55 = t10 $. If the word value is a triangle number then we shall call the word a triangle word.The same way we can define Triangle word,Pentagonal word and Hexagonal word.
Let set of Triangle words be T , set of Pentagonal words be P and set of Hexagonal words be H.Given a file of words find T,P and H
Find number of words in the Union of T , P and H.

Use Exclusion -Inclusion Principle.

## Problem 4 (Open Problem)

Implement the Famous Google Page Rank Algorithm in a minmium set up locally.
We use the below Large Scale Dataset.
* http://academictorrents.com/details/411576c7e80787e4b40452360f5f24acba9b5159



### Desciption of dataset:
A Large-Scale Real-World Dataset for Structured Web Date Extraction

1.Motivation
This dataset is a real-world web page collection used for research on the automatic extraction of structured data (e.g., attribute-value pairs of entities) from the Web. We hope it could serve as a useful benchmark for evaluating and comparing different methods for structured web data extraction.


2.Contents of the Dataset
Currently the dataset involves:
1.	8 verticals with diverse semantics;
1.	80 web sites (10 per vertical);
1.	124,291 web pages (200~2,000 per web site), each containing a single data record with detailed information of an entity;
1.	32 attributes (3~5 per vertical) associated with carefully labeled ground-truth of corresponding values in each web page. The goal of structured data extraction is to automatically identify the values of these attributes from web pages.

The involved verticals are summarized as follows:
Vertical #Sites #Pages #Attributes Attributes
* Auto        10	17,923	4	model, price, engine, fuel_economy
* Book        10	20,000	5	title, author, isbn_13, publisher, publication_date
* Camera      10	5,258	3	model, price, manufacturer
* Job         10	20,000	4	title, company, location, date_posted
* Movie       10	20,000	4	title, director, genre, mpaa_rating
* NBA Player  10	4,405	4	name, team, height, weight
* Restaurant  10	20,000	4	name, address, phone, cuisine
* University  10	16,705	4	name, phone, website, type


## Algorithm:
   > 1. Fetch all web page descriptions and map it with urls.
   > 1. Count all inwards links to a web page and outwards links from a webpage to another page.
   > 1. Calculate the Page Rank Score for Each Page using below formula
   > $ Score(webpage)=50*No of inward links + 10*No of outward links + Reputation $ 
   > Let us assume Reputation to be 0 for time being for all pages.
   > 1. Create a file or database with records such that each record contains the following info.
<table>
<tr>
<td>
    Url (Primary key)
</td>
<td>
    Description
</td>
<td>
    Inward Links Count
</td>
<td>
    Outward Links Count
</td>
<td>
    Page Rank Score
</td>
</tr>
</table> 
  > 1. Then sort the pages on the basis the Page rank Score. 
  
  > 2. Query and display the records according to the ranks. 

** An example of web page header with description.
Use a web page paraser or xml paraser to get meta tags **

In [22]:
%%file Work.html
<!DOCTYPE html>
<html>
<head>
    <title>Not a Meta Tag, but required anyway </title>
    <meta name="description" content="Links to Popular Sites">
    <meta http-equiv="content-type" content="text/html;charset=UTF-8">
</head>
<body>
<a href="www.google.co.in">Google</a>
<a href="www.yahoo.co.in">Yahoo</a>
<a href="www.wiki.com">Wiki</a>
<a href="www.github.in">Github</a>
</body>
</html>

Overwriting Work.html


In [15]:
!ls *.html
!pwd

Work.html
/home/fin


** An Example Program to get Desciption and All links from a web page.**

In [27]:
import urllib2,htmllib,formatter
from bs4 import BeautifulSoup
#Formatter is used to Format the Output
format = formatter.AbstractFormatter(formatter.NullWriter())
#Obtaining the Html web page
data =  urllib2.urlopen("file:////home/fin/Work.html").read()
ptext = htmllib.HTMLParser(format)
ptext.feed(data)
#Parasing the Page and printing the list
print("Outward links to other pages:")
for link in ptext.anchorlist:
    print(link)


soup3= BeautifulSoup(data,"lxml")
print("Description:")
desc= soup3.find(attrs={'name':'Description'})
if desc == None:
    desc= soup3.find(attrs={'name':'description'})
try:
    print desc['content']
except Exception as e:
    print '%s (%s)' % (e.message, type(e))

Outward links to other pages:
www.google.co.in
www.yahoo.co.in
www.wiki.com
www.github.in
Description:
Links to Popular Sites
