# A10: Big Data Applications using Map Reduce 
##### by Kevin Nguyen (17 March 2017)

## Introduction

In this assignment we will be designing a Page Rank algorithm utilizing mapreduce in the cloudera HDFS environment via RHadoop. Page Rank is ranking webpages in order of importance and quality. The assumption is that webpages with more links from other webpages are more important. Page Rank is the algorithm used in Google’s search engine that helps determine what websites are shown to the user. 

The following instructions were followed for this project.

1.	Design a Map Reduce method to design the (key, val) structure for the page rank equation (the original version). 
2.	Write a code (either in R or another language) for implement the Map reduce method designed in Q1. Explain your code in a separate document and provide full comments in the code. 
3.	Run Example 5.1 in the text book and find the final page ranking to validate your code. You should use "hand" calculation to generate the true solution to validate the outcome. The code must be functional to earn full credit. 
4.	Run your code based on the web-Google.txt file. You need to compute the initial m(i,j) based on the data set first. show the top 10 highest ranked pages. The code must be functional to earn full credit.  

## 1.Map Reduce Design Strategy 

The original page rank equation is v' = M * v. By running through mutliple iterations your page rank vector v will start converging to a 'final' page rank vector as you replace v with v'(v' = new page rank, v = current page rank) after each iteration.  There is no single 'correct' final page rank for any problem, the final page rank can change/differ due to the amount of iterations used or where you start your initial page rank vector. For a 'big data' file, 50-75 iterations are reccomended to get a good estimate of the final page rank vector. On the other hand, 'small data' files will converge very quickly and then you can pass the true final page rank (from a mathematical point of view) vector after a handful of iterations. 

Our map reduce strategy using Rhadoop will be documented below. 

v' = M * v : a map reduce code to iteratively compute v=M * v where columuns of M adds to 1

             M is n x n and v is n x 1
             M is in the format of (i, j, mij), sum(mij)=1 over j           
Example 5.1 from the Mining Massive Data Book will be the basis which we will use to help us lay the foundation for our strategy. 
![](http://localhost:8889/files/Ex5_1.png)
$$\mathbf{M} = \left[\begin{array}
{rrrr}
0 & 1/2 & 1 & 0  \\
1/3 & 0 & 0 & 1/2 \\
1/3 & 0 & 0 & 1/2 \\
1/3 & 1/2 & 0 & 0
\end{array}\right]
\mathbf{v} = \left[\begin{array}
{rrrr}
1/4  \\
1/4 \\
1/4 \\
1/4
\end{array}\right]
$$
Converted matrix M into new i j Mij format M
$$\mathbf{M} = \left[\begin{array}
{rrr}
i & j & Mij\\
1 & 1 & 0\\
1 & 2 & 0.5\\
1 & 3 & 1\\
1 & 4 & 0\\
2 & 1 & 0.333\\
2 & 2 & 0\\
2 & 3 & 0\\
2 & 4 & 0.5\\
3 & 1 & 0.333\\
3 & 2 & 0\\
3 & 3 & 0\\
3 & 4 & 0.5\\
4 & 1 & 0.333\\
4 & 2 & 0.5\\
4 & 3 & 0\\
4 & 4 & 0
\end{array}\right]
$$
The i j Mij format above will be stored into a text file 'M.txt'.

Mapreduce is uses a (key,val) structure. Both the mapper and reducer will be used in this case.

    Mapper(key,val)
    The intial mapper key will be null and val will be the content of our M.txt
    i is used as our new key and Mij for our new val. This effectively maps our i directly with our Mij due to our values being  stored in a usable sequence. We then pass our newly mapped(key, val) to be passed into our reducer.

    Reducer(key,val)
    The initial reducer mapper key will be i and Mij that was set in our mapper previously. The reducer will then multiply our Mij with the the vector v, the output is stored into a new v, 'vp'. Lastly, all the values in vp will be summed together based on their assigned keys. This will result in a new vector v that can continue to be reiterated upon to help converge towards a final page rank. 

## 2. Map Reduce Code - pageRankM.r

In [None]:
#pageRankM.r
#Author: Kevin Nguyen
#A10: Big Data Applications using Map Reduce
#17 March 2017
#
#Template provided by Dr. Shing Chang
#

#a map reduce code to iteratively compute v=M v where columuns of M adds to 1
#an example on section 5.2 in Mining of Massive Datasets
#M is n x n and v is n x 1
#M is in the format of (i, j, mij), sum(mij)=1 over j

# mapper function
	#Note that the input part of mapreduce will provide k as NULL and v as the content of M in (i, j, mij format) 
pgij<-function(k,v){
	
	#Seperates v into usable components
	a<-strsplit(unlist(v), " ")#Splits the unlisted v into subgroups 
	si<-as.numeric(sapply(a, "[", 1))#Seperate subelement used as key into vector (i)
	vp<-as.numeric(sapply(a, "[", 3))# Seperate subelement used as values into vector (mij)
	
	#The key passes into the reduce function is si and value vp
	keyval(si, vp)
}

# reducer function
pgij2<-function(si,vp){
	
	#Page Rank calculation v = M*v where v1 converges to the final page rank after multiple iterations
	vf<-vp*v1
	
	#Summation of values with respect to their key for final page rank vector
	keyval(si, sum(vf))
}

# main program

#make sure M.txt is the data file to be executed 
#input the number of unique node and assign it to n; 4 is the default for M.txt data; 
#n should be the unique count of the "to website" column; a word count 
#the following initial values are for Example 5.1 of the Mining Massive Data book (1 or 2nd edition)
library(rmr2)

n<-4 # Number of unique nodes

#Final Page Rank once multiple iterations have processed
v1<- rep(1/n, n) #Final Page Rank once multiple iterations have processed
				 #Note that v1 is a global varialbe to be used in the while loop below and the pgij function above
i=1
nn<-10 #Amount of iterations, 50-75 for big data files
input.dfs<-("/user/cloudera/M.txt")

# the loop to start iterations
while (i<=nn){
	
	#MapReduce function from dfs 
	tv<-from.dfs(mapreduce(input=input.dfs, input.format="text", map=pgij, reduce = pgij2))
	v1<-as.numeric(tv$val) #here the output in tv is passed onto the global variable v1
	i <- i+1
}
#Save output into text file and also print in command terminal
write.table(data.frame(tv$key,v1), "/home/cloudera/PageRank_4I.txt", sep= " ")
print(paste("The final printout in the $key part is ", tv$key))
print(paste("The final page rank vector is ", v1))

For a detailed explanation of the map reduce code above, please scroll down to Appendix A.

# 3. Example 5.1 using pageRankM.r

pageRankM.r was used to calculate the following final page rank vector for example 5.1. The strategy for our mapreduce was based on example 5.1. Below is the expected page rank vector based on the amount of iterations performed starting at base v. 

$$
\mathbf{v} = \left[\begin{array}
{rrrr}
1/4  \\
1/4 \\
1/4 \\
1/4
\end{array}\right]
\mathbf{1I v} = \left[\begin{array}
{rrrr}
0.375  \\
0.2083333333 \\
0.2083333333\\
0.2083333333
\end{array}\right]
\mathbf{2I v} = \left[\begin{array}
{rrrr}
0.3125  \\
0.2291666667 \\
0.2291666667\\
0.2291666667
\end{array}\right]
\mathbf{3I v} = \left[\begin{array}
{rrrr}
0.34375  \\
0.21875 \\
0.21875\\
0.21875
\end{array}\right]
\mathbf{4I v} = \left[\begin{array}
{rrrr}
0.328125  \\
0.2239583333 \\
0.2239583333\\
0.2239583333
\end{array}\right]
\mathbf{5I v} = \left[\begin{array}
{rrrr}
0.3359375  \\
0.2213541667 \\
0.2213541667\\
0.2213541667
\end{array}\right]
$$

$$
\mathbf{10I v} = \left[\begin{array}
{rrrr}
0.3332519531  \\
0.222249349 \\
0.222249349\\
0.222249349
\end{array}\right]
\mathbf{15I v} = \left[\begin{array}
{rrrr}
0.3333358765  \\
0.2222213745 \\
0.2222213745\\
0.2222213745
\end{array}\right]
\mathbf{20I v} = \left[\begin{array}
{rrrr}
0.3333332539  \\
0.2222222487 \\
0.2222222487\\
0.2222222487
\end{array}\right]
\mathbf{50I v} = \left[\begin{array}
{rrrr}
0.3333333333  \\
0.2222222222 \\
0.2222222222\\
0.2222222222
\end{array}\right]
$$

Below is the output of pageRankM.r ran for multiple iterations. 

In [None]:
pageRankM.r Output
1 Iteration
"tv.key" "v1"
"1" 1 0.375
"2" 2 0.20825
"3" 3 0.20825
"4" 4 0.20825
2 Iterations  
"tv.key" "v1"
"1" 1 0.312375
"2" 2 0.229
"3" 3 0.229
"4" 4 0.229
3 Interations  
"tv.key" "v1"
"1" 1 0.3435
"2" 2 0.218520875
"3" 3 0.218520875
"4" 4 0.218520875
4 Iterations
"tv.key" "v1"
"1" 1 0.3277813125
"2" 2 0.2236459375
"3" 3 0.2236459375
"4" 4 0.2236459375
5 Iterations
"tv.key" "v1"
"1" 1 0.33546890625
"2" 2 0.2209741458125
"3" 3 0.2209741458125
"4" 4 0.2209741458125
10 Iterations
"tv.key" "v1"
"1" 1 0.332235057978652
"2" 2 0.221497085308783
"3" 3 0.221497085308783
"4" 4 0.221497085308783
15 Iterations
"tv.key" "v1"
"1" 1 0.331764893818602
"2" 2 0.221100334464525
"3" 3 0.221100334464525
"4" 4 0.221100334464525
20 Iterations
"tv.key" "tv.val"
"1" 1 0.331209603146945
"2" 2 0.220732862072592
"3" 3 0.220732862072592
"4" 4 0.220732862072592
50 Iterations
"tv.key" "v1"
"1" 1 0.327912814349848
"2" 2 0.218535657185172
"3" 3 0.218535657185172
"4" 4 0.218535657185172

The output of pageRankM.r gets close to the 'true' final page rank vector at 10 iterations. Past that point, the page rank vector begins diverging slowly. This is normal behaviour due to the data from example 5.1 being extremely small. For a big data file, 50-75 iterations would be necessary to get close to a good estimate for the final page rank value.

# 4. web.Google.txt Map Reduce with pageRankG.r

For part 4 of this project. We need to identify the top 10 highest ranked pages within the provided sample file, 'web-Google.txt'. When looking at the file in terms of an i, j, and Mij. Only the i and j are provided, thus we need to create our own Mij(attempting to make the data stochastic). Column 1 is j and column 2 is i from the text file. 

Impala was used to count how many times the same fromnode pointed towards a different tonode. This allows us to merge the wordcount from impala with the 'web-Google.txt' file to help us try to make the data stochastic. Below is the code used in Impala and R to create the new data file we will be using mapreduce on to achieve our desired page rank. 

Results of pageRankG.r for 10 Iterations

Top 10 Nodes and the rank 

1.  21208	0.00079235046509864996
2.  783323	0.00060994192570862599
3.	563390	0.00060446072785647596
4.	64511	0.00019099534832848399
5. 	15808	0.00010690541083469099
6. 	511681	0.000106597183337524
7. 	30681	9.6075760286939502e-05
9.  1497    8.8466291511110298e-05
10. 555920	7.6037245315841402e-05

These were the top 10 pages after running 50 iterations. 
pageRankG.r is not perfect and has issues with the ranks converging to 0. This is likely due to the values of the ranks falling out of double precision values even with the revised page rank formula. 50 Iterations was attempted but produced only the following 3 page ranks. 

Results of pageRankG.r for 50 Iterations 

Top Nodes and the rank 

1. 	11	2.2592664094190601e-07
2.  9	2.3487944489531501e-07
3.	7	7.7193525429078297e-07

One likely issue with pageRankG.r compared to pageRankM.r is that the vector order which the v=M*v based on j does not actually match up during the operation after the first iteration. 

Should webr2.txt be reordered in a way that it would match up with the vector every iteration the issue could be solved. 

In [None]:
#Impala Query Code to create webg2 table
create table webg2 as 
select fromnode, count(*) as uniqueNode from web_google group by fromnode order by fromnode;


Above is a wordcount for fromnodes from 'web-Google.txt' table which will be used to help create mij for the data.
The new table is then extracted into a csv from Hue to HDFS.


In [None]:
#webAppend.r
#Merge two tables together based on the first column of both tables (i of i, j, mij)

a<-read.table("/home/cloudera/web-Google.txt", sep = "\t") #i & j of i, j, mij
b<-read.table("/home/cloudera/webg2.csv", sep =",") #i & mij of i, j, mij
d<-merge(a, b, by.x='V1', by.y='V1') #i, j, mij (merged based on i)
write.table(d, "/home/cloudera/webr2.txt", sep= " ", row.names = FALSE, col.names = FALSE) # Write new data file


Above merges the data from 'web-Google.txt' with the new table with weights, 'webg2.csv', into a new data file called 'webr2.txt'.


In [None]:
#pageRankG.r
#Author: Kevin Nguyen
#A10: Big Data Applications using Map Reduce
#30 March 2017
#
#Template provided by Dr. Shing Chang
#

#a map reduce code to iteratively compute v=M v where columuns of M adds to 1
#M is composed of 3 vectors, j, i, mij 
#M is in the format of (i, j, mij), sum(mij)=1 over j

# mapper function
	#Note that the input part of mapreduce will provide k as NULL and v as the content of M in (i, j, mij format) 
pgij<-function(k,v){
	
	#Seperates v into usable components
	a<-strsplit(unlist(v), " ")#Splits the unlisted v into subgroups 
	sj<-as.numeric(sapply(a, "[", 1))
	si<-as.numeric(sapply(a, "[", 2))#Seperate subelement used as key into vector (i)
	vp<-as.numeric(sapply(a, "[", 3))# Seperate subelement used as values into vector (mij)
	
	#Page Rank calculation v = b*M*v-(1-b)e/n where v1 converges to some page rank 
	vp2<-(b*(1/vp)*v1[si])+(1-b)/n #Revised page rank formula
	
	#The key passes into the reduce function is si and value vp
	keyval(si, vp2)
}

# reducer function
pgij2<-function(si,vp){
	
	#Summation of values with respect to their key for final page rank vector
	keyval(si, sum(vp))
	
}

# main program

#make sure M.txt is the data file to be executed 
#input the number of unique node and assign it to n; 4 is the default for M.txt data; 
#n should be the unique count of the "to website" column; a word count 
#the following initial values are for Example 5.1 of the Mining Massive Data book (1 or 2nd edition)
library(rmr2)

n<-714546 # Number of unique nodes

#Final Page Rank once multiple iterations have processed
v1<- rep(500000/n, n) #Final Page Rank once multiple iterations have processed
				 #Note that v1 is a global varialbe to be used in the while loop below and the pgij function above

b<- 0.85 #Weighting value to prevent v1 from converging to zero
i<-1
nn<-10 #Amount of iterations, 50-75 for big data files
input.dfs<-("/user/cloudera/webr2.txt")

# the loop to start iterations
while (i<=nn){
	
	#MapReduce function from dfs 
	tv<-from.dfs(mapreduce(input=input.dfs, input.format="text", map=pgij, reduce =pgij2))
	v1<-as.numeric(tv$val) #here the output in tv is passed onto the global variable v1
	i <- i+1
}
#Save output into text file and also print in command terminal
write.table(data.frame(tv$key,v1), "/home/cloudera/PageRank_G50I.txt", sep= " ", col.names= FALSE, row.names= FALSE)
print(paste("The final printout in the $key part is ", tv$key))
print(paste("The final page rank vector is ", v1))


Above is code modified from pageRankM.r for the 'web-Google.txt' dataset. 


In [None]:
select * from pagerank50i where rank IS NOT NULL order by rank;

Above is an Impala query to help identify the top page ranks that we found earlier.

# <a name="AppendixA"></a> Appendix A (Code Summary)
pageRankM.r code (pageRankG.r mirrors this very closely)

In [1]:
#pageRankM.r
#Author: Kevin Nguyen
#A10: Big Data Applications using Map Reduce
#17 March 2017
#
#Template provided by Dr. Shing Chang
#

#a map reduce code to iteratively compute v=M v where columuns of M adds to 1
#an example on section 5.2 in Mining of Massive Datasets
#M is n x n and v is n x 1
#M is in the format of (i, j, mij), sum(mij)=1 over j

# mapper function
	#Note that the input part of mapreduce will provide k as NULL and v as the content of M in (i, j, mij format) 
pgij<-function(k,v){
	
	#Seperates v into usable components
	a<-strsplit(unlist(v), " ")#Splits the unlisted v into subgroups 
	si<-as.numeric(sapply(a, "[", 1))#Seperate subelement used as key into vector (i)
	vp<-as.numeric(sapply(a, "[", 3))# Seperate subelement used as values into vector (mij)
	
	#The key passes into the reduce function is si and value vp
	keyval(si, vp)
}

# reducer function
pgij2<-function(si,vp){
	
	#Page Rank calculation v = M*v where v1 converges to the final page rank after multiple iterations
	vf<-vp*v1
	
	#Summation of values with respect to their key for final page rank vector
	keyval(si, sum(vf))
}

# main program

#make sure M.txt is the data file to be executed 
#input the number of unique node and assign it to n; 4 is the default for M.txt data; 
#n should be the unique count of the "to website" column; a word count 
#the following initial values are for Example 5.1 of the Mining Massive Data book (1 or 2nd edition)
library(rmr2)

n<-4 # Number of unique nodes

#Final Page Rank once multiple iterations have processed
v1<- rep(1/n, n) #Final Page Rank once multiple iterations have processed
				 #Note that v1 is a global varialbe to be used in the while loop below and the pgij function above
i=1
nn<-10 #Amount of iterations, 50-75 for big data files
input.dfs<-("/user/cloudera/M.txt")

# the loop to start iterations
while (i<=nn){
	
	#MapReduce function from dfs 
	tv<-from.dfs(mapreduce(input=input.dfs, input.format="text", map=pgij, reduce = pgij2))
	v1<-as.numeric(tv$val) #here the output in tv is passed onto the global variable v1
	i <- i+1
}
#Save output into text file and also print in command terminal
write.table(data.frame(tv$key,v1), "/home/cloudera/PageRank_4I.txt", sep= " ")
print(paste("The final printout in the $key part is ", tv$key))
print(paste("The final page rank vector is ", v1))

ERROR: Error in library(rmr2): there is no package called 'rmr2'


#### Below is a detailed explanation of the code above chunk by chunk in order

In [2]:
# mapper function
	#Note that the input part of mapreduce will provide k as NULL and v as the content of M in (i, j, mij format) 
pgij<-function(k,v){
	
	#Seperates v into usable components
	a<-strsplit(unlist(v), " ")#Splits the unlisted v
	si<-as.numeric(sapply(a, "[", 1))#Seperate subelement used as key into vector (i)
	vp<-as.numeric(sapply(a, "[", 3))# Seperate subelement used as values into vector (mij)
	
	#The key passes into the reduce function is si and value vp
	keyval(si, vp)
}

The mapper pgij has a keyval(k = null, v = M.txt). The mapper 'maps' the your value to its respective key you assign. Data manipulation can be done in this section. In this case, the mapper is used to extract our M in a usable manner to be sent to the reducer where the page rank vector is calculated.

The content of v is unlisted and is then splitted into individual characters (M.txt is space delimited). after the first character of each subgroup is extracted into a column using sapply based on their position. The first character represents our i and the 3rd character represents our Mij in the i j Mij format of M.txt. These characters are of each subgroup are extracted using the sapply function. We convert the characters to numerical values using `as.numeric` and then store them into si and vp. Our new keyval(si, vp) are now ready to be passed into the reducer. 

In [3]:
# reducer function
pgij2<-function(si,vp){
	
	#Page Rank calculation v = M*v where v1 converges to the final page rank after multiple iterations
	vf<-vp*v1
	
	#Summation of values with respect to their key for final page rank vector
	keyval(si, sum(vf))
}

The reducer's keyval' contains the contant that was passed from the mapper. Due to the key and the sequence being preserved, vp (Mij) is multiplied by our v1 which is a global variable used to help iterate the page rank vector and is stored in vf. In our new keyval(si, sum(vf)), the contents of vf will be summed based on their associated key, si. 

In [4]:
library(rmr2)

n<-4 # Number of unique nodes

#Final Page Rank once multiple iterations have processed
v1<- rep(1/n, n) #Final Page Rank once multiple iterations have processed
				 #Note that v1 is a global varialbe to be used in the while loop below and the pgij function above
i=1
nn<-4 #Amount of iterations, 50-75 for big data files
input.dfs<-("/user/cloudera/M.txt")

# the loop to start iterations
while (i<=nn){
	
	#MapReduce function from dfs 
	tv<-from.dfs(mapreduce(input=input.dfs, input.format="text", map=pgij, reduce = pgij2))
	v1<-as.numeric(tv$val) #here the output in tv is passed onto the global variable v1
	i <- i+1
}

ERROR: Error in library(rmr2): there is no package called 'rmr2'


The code above are the intializations required to start our mapreduce job. The mapreduce statement calls our mapper and reducer functions, pgij and pgij2. The input of the mapreduce statement requires a big data object. For the input we provide the path to our M.txt file stored in HDFS, which is used as the content of v in our mapper function. The val of the output is then stored into our global variable v1 which is used to store our current iteration of the final page rank vector. The mapreduce job is ran in a loop for some 'nn' iterations based on what the user's needs. 

The rmr2 library is used to help debug by helping us print within the mapreduce functions as the statement is ran.

In [5]:
#Save output into text file and also print in command terminal
write.table(data.frame(tv$key,v1), "/home/cloudera/PageRank_4I.txt", sep= " ")
print(paste("The final printout in the $key part is ", tv$key))
print(paste("The final page rank vector is ", v1))

ERROR: Error in data.frame(tv$key, v1): object 'tv' not found


After the mapreduce job is performed, we save the results into a text file that is space delimited in our cloudera home directory. The key and final page rank vector are also printed out for easy viewing of the results. 