# Map Reduce I - Indegree and Outdegree
Instrukcje
The goal of this task is to use the map-reduce paradigm to develop algorithms for calculating some characteristics, which describe the structure of large web graphs. A web graph is a directed graph G = (V,E) in which the nodes (vertices) represent single web pages and directed edges correspond to the hyperlinks between the websites. For any two pages u,v \in V, v contains a link to u iff there exists an edge (u,v) \in E.

The designed algorithms need to be implemented in Apache Spark. You can use Spark with Scala, Java, Python or R. Simply choose your preferred language. Python enthusiasts can also use PySpark in Google Colab.

In this assignment we will use the data representing the "Stanford web graph" (pages from Stanford University (stanford.edu); collected in 2002). This data isavailable at http://snap.stanford.edu/data/web-Stanford.html This dataset was collected as a part of research on the analysis of social and information networks, including identifying the clusters and determining their properties. If you are interested in this topic, you can find more information about the dataset, the graph structure, and the results of performed research in a paper by J. Leskovec et al. (see http://arxiv.org/abs/0810.1355)

In [1]:
import pyspark

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

# 1. 
Download the dataset and analyze the structure of the file (the most important information are briefly described in the first 4 lines). Starting from line 5, the edges are encoded in the form of Vertex1 Vertex2 This corresponds to the existence of a directed edge from Vertex1 to Vertex2. 

### Dataset info

In [3]:
textFile = spark.read.text("web-Stanford.txt.gz")

In [4]:
textFile.limit(10).show(n=7, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------
 value | # Directed graph (each unordered pair of nodes is saved once): web-Stanford.txt  
-RECORD 1---------------------------------------------------------------------------------
 value | # Stanford web graph from 2002                                                   
-RECORD 2---------------------------------------------------------------------------------
 value | # Nodes: 281903 Edges: 2312497                                                   
-RECORD 3---------------------------------------------------------------------------------
 value | # FromNodeId	ToNodeId                                                            
-RECORD 4---------------------------------------------------------------------------------
 value | 1	6548                                                                           
-RECORD 5---------------------------------------------------------------------------------

In [43]:
textFile.take(10)

[Row(value='# Directed graph (each unordered pair of nodes is saved once): web-Stanford.txt '),
 Row(value='# Stanford web graph from 2002'),
 Row(value='# Nodes: 281903 Edges: 2312497'),
 Row(value='# FromNodeId\tToNodeId'),
 Row(value='1\t6548'),
 Row(value='1\t15409'),
 Row(value='6548\t57031'),
 Row(value='15409\t13102'),
 Row(value='2\t17794'),
 Row(value='2\t25202')]

### Read to a dataframe
Remove first rows

In [3]:
from pyspark.sql.functions import split

df = spark.read.csv("web-Stanford.txt.gz",sep='\n')
df = df.filter(~df._c0.contains('#')) #removing the first 5 rows
df.show(10)

+-----------+
|        _c0|
+-----------+
|     1	6548|
|    1	15409|
| 6548	57031|
|15409	13102|
|    2	17794|
|    2	25202|
|    2	53625|
|    2	54582|
|    2	64930|
|    2	73764|
+-----------+
only showing top 10 rows



In [4]:
split_col = pyspark.sql.functions.split(df['_c0'], '\t')
df = df.withColumn('Node1', split_col.getItem(0))
df = df.withColumn('Node2', split_col.getItem(1))
df = df.drop("_c0")
df.show(10)

+-----+-----+
|Node1|Node2|
+-----+-----+
|    1| 6548|
|    1|15409|
| 6548|57031|
|15409|13102|
|    2|17794|
|    2|25202|
|    2|53625|
|    2|54582|
|    2|64930|
|    2|73764|
+-----+-----+
only showing top 10 rows



Dataframe to RDD.

In [5]:
rdd = df.rdd
rdd.take(10)

[Row(Node1='1', Node2='6548'),
 Row(Node1='1', Node2='15409'),
 Row(Node1='6548', Node2='57031'),
 Row(Node1='15409', Node2='13102'),
 Row(Node1='2', Node2='17794'),
 Row(Node1='2', Node2='25202'),
 Row(Node1='2', Node2='53625'),
 Row(Node1='2', Node2='54582'),
 Row(Node1='2', Node2='64930'),
 Row(Node1='2', Node2='73764')]

The data is prepared. 

# 2.
Solve Problem 36 from the problem set by Dr. M. Gębala. For each vertex v \in V you need to determine its indegree and outdegree, i.e. the number of incoming and outgoing edges. Design and implement the procedures for mapper and reducer. Also write the procedure for determining the average indegree and outdegree for the graph. Try to make your implementation as efficient as possible.

#### Outdegree

Map : (node1,node2) -> (node1,1)

Reduce : (node1,1) , (node1,1) -> (node1,1+1)

In [60]:
outdegree_pairs = rdd.map(lambda x: (x[0],1)) 

In [61]:
outdegree_pairs.take(10)

[('1', 1),
 ('1', 1),
 ('6548', 1),
 ('15409', 1),
 ('2', 1),
 ('2', 1),
 ('2', 1),
 ('2', 1),
 ('2', 1),
 ('2', 1)]

In [62]:
outdegree_counts = outdegree_pairs.reduceByKey(lambda a, b: a + b)

In [63]:
outdegree = outdegree_counts.toDF()

Top 10 Outdegree

In [64]:
outdegree.orderBy("_2", ascending=False).show(10)

+------+---+
|    _1| _2|
+------+---+
| 82409|255|
| 82868|247|
|180611|247|
| 16984|247|
| 86290|247|
|188978|247|
| 10699|245|
|121634|245|
|176419|244|
|255711|244|
+------+---+
only showing top 10 rows



#### Indegree

Map : (node1,node2) -> (node2,1)

Reduce : (node2,1) , (node2,1) -> (node2,1+1)

In [65]:
indegree_pairs = rdd.map(lambda x: (x[1],1)) 
indegree_pairs.take(10)

[('6548', 1),
 ('15409', 1),
 ('57031', 1),
 ('13102', 1),
 ('17794', 1),
 ('25202', 1),
 ('53625', 1),
 ('54582', 1),
 ('64930', 1),
 ('73764', 1)]

In [66]:
indegree_counts = indegree_pairs.reduceByKey(lambda a, b: a + b) 

indegree = indegree_counts.toDF()

indegree.orderBy("_2", ascending=False).show(10)

+------+-----+
|    _1|   _2|
+------+-----+
|226411|38606|
|234704|21920|
|105607|19457|
|241454|19377|
|167295|19003|
|198090|18975|
| 81435|18970|
|214128|18967|
| 38342|18958|
|245659|18935|
+------+-----+
only showing top 10 rows



Top Indegrees higher than top outdegrees.

#### Average

In [32]:
from pyspark.sql.functions import mean as _mean, col
indegree_avg = indegree.select(_mean(col('_2')))

In [34]:
indegree_avg.show(5)

+-----------------+
|          avg(_2)|
+-----------------+
|8.840225851338747|
+-----------------+



In [36]:
outdegree_avg = outdegree.select(_mean(col('_2')))
outdegree_avg.show(5)

+-----------------+
|          avg(_2)|
+-----------------+
|8.208173754396924|
+-----------------+



#### Average map-reduce

Map : (node, count) -> ("sum",count)

GroupByKey : ("sum",count1) , ("sum",count2), ... -> ("sum", (count1,count2,...))

Reduce: ("sum",(count1,count2,..))  -> ("sum",sum(counts)/len(counts) 


In [67]:
indegree_avg2 = indegree_counts.map(lambda x: ("mean",x[1]))
indegree_avg2 = indegree_avg2.groupByKey().mapValues(lambda x: sum(x) / len(x))

In [68]:
indegree_avg2.take(3)

[('mean', 8.840225851338747)]

In [69]:
outdegree_avg2 = outdegree_counts.map(lambda x: ("mean",x[1]))
outdegree_avg2 = outdegree_avg2.groupByKey().mapValues(lambda x: sum(x) / len(x))
outdegree_avg2.take(3)

[('mean', 8.208173754396924)]

Same results for Map-Reduce and usual mean function.

# 3.
Write a short paragraph in which you summarize the general idea of algorithms used in 2. Try to estimate how many times each edge / each vertex is processed.

The general idea is based on the Map-Reduce paradigm. I have a column of pairs (node1, node2) and by using mapping and reducing I transform the node pairs into count pairs. The design of mapping and reducing is described above. The functions are lazy evaluated so until I call them to show the result they don't perform the calculations actually. Such programmes can be run on a cluster by splitting the tasks on different nodes.

Number of processed times for each node in a map and reduce for indegree/outdegree equals indegree/outdegree of the node. The total number of calculations performed is equal to the number of edges present.

The average calculation is based on the previusly created degree table. The complexity of the map calculation depends on the number of nodes present in a network.
