Qu. 3: Data Analysis and PageRank in Spark [46pt]

In [0]:
# Required modules
import re
import sys
from operator import add

# Set File Paths
tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"

# Obtain airports dataset
airports  = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true', delimiter='\t').load(airportsnaFilePath)
airports.registerTempTable("airports")

# Obtain departure Delays data
delays = sqlContext.read.format("com.databricks.spark.csv").options(header='true').load(tripdelaysFilePath)
delays.registerTempTable("delays")
delays.cache()

Out[2]: DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

3.a. [3pt] Show the top 10 airport and top 10 delays from both dataframes in a nice table format

In [0]:
print("The top 10 rows of the delays file:")
delays.show(10,truncate=False)

print("The top 10 rows of the airports file:")
airports.show(10,truncate=False)

The top 10 rows of the delays file:
+--------+-----+--------+------+-----------+
|date    |delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|6    |602     |ABE   |ATL        |
|01020600|-8   |369     |ABE   |DTW        |
|01021245|-2   |602     |ABE   |ATL        |
|01020605|-4   |602     |ABE   |ATL        |
|01031245|-4   |602     |ABE   |ATL        |
|01030605|0    |602     |ABE   |ATL        |
|01041243|10   |602     |ABE   |ATL        |
|01040605|28   |602     |ABE   |ATL        |
|01051245|88   |602     |ABE   |ATL        |
|01050605|9    |602     |ABE   |ATL        |
+--------+-----+--------+------+-----------+
only showing top 10 rows

The top 10 rows of the airports file:
+-----------+-----+-------+----+
|City       |State|Country|IATA|
+-----------+-----+-------+----+
|Abbotsford |BC   |Canada |YXX |
|Aberdeen   |SD   |USA    |ABR |
|Abilene    |TX   |USA    |ABI |
|Akron      |OH   |USA    |CAK |
|Alamosa    |CO   |USA    |ALS |
|Alban

3.b. [9pt] Run sql commands to answer the following questions:

(i) What US city incurs the most delays as an origin airport? Run an sql query to find out.

In [0]:
%sql
SELECT origin,City, count(origin) as number_of_delays FROM (SELECT t1.origin, t2.City FROM delays t1 JOIN airports t2 on t2.IATA = t1.origin) GROUP by origin,City order by number_of_delays desc limit 1

origin,City,number_of_delays
ATL,Atlanta,91484


(ii) We would like to know, for each origin airport and state combination,
the average distance and delay of all outgoing flights with positive delays.
In addition, in the same query, we would like to see the average state delay
(regardless of origin airport) for each airport-state combination with positive delay.

Write an sql query that returns origin, state, and also:
average distance (mean distance between origin and destinations over all outgoing flights)
average delay (average delay of all outgoing flights from an origin, with a positive delay)
average state delay (average dealy over all outgloing flights from all airports in the same state, with a positive delay).

The query should return the results sorted by decreasing average state delay, with only the first 10 rows shown.

Hint: use a window function among other sql commands.

In [0]:
%sql
CREATE OR REPLACE TABLE with_state SELECT t1.origin, t1.distance,t1.delay,t2.State FROM delays t1 JOIN airports t2 on t2.IATA = t1.origin WHERE 
t1.delay > 0;

SELECT distinct  origin, State, ROUND (AVG(distance) OVER (PARTITION BY origin),2) AS avg_airport_distance,
                ROUND (AVG(delay) OVER (PARTITION BY origin),2) AS avg_airport_delay,
                ROUND (AVG(delay) OVER (PARTITION BY State),2) AS avg_state_delay
                FROM with_state  ORDER BY avg_state_delay desc limit 10

origin,State,avg_airport_distance,avg_airport_delay,avg_state_delay
BTV,VT,408.02,67.78,67.78
RAP,SD,471.18,43.48,49.86
FSD,SD,415.05,52.22,49.86
BGR,ME,467.62,70.94,49.74
PWM,ME,403.31,46.61,49.74
ORF,VA,462.26,45.04,48.53
CHO,VA,399.15,55.2,48.53
ROA,VA,328.12,68.04,48.53
RIC,VA,476.9,48.74,48.53
PHF,VA,458.28,46.55,48.53


(iii) We would like to compute the PageRank vector only for origin nodes that have outgoing edges.
Remove from the delays dataframe all the records of flights connecting to destination aiports that
appear only as destination (also called dead-end nodes, or dangling page).
That is, if a record contains as origin an airport called ABC, and as destination an airport called XYZ,
you should keep it only if XYZ is an origin airport for another record.
Use an sql command inside python using Spark.sql, and update the delays variable to contain the output.

In [0]:
print("Before removal there were "+str(delays.count())+" rows")
delays = spark.sql("select * from delays as orig where exists (select origin from delays as dest where dest.origin = orig.destination)")
print("After removal "+str(delays.count())+" rows left")

Before removal there were 1391578 rows
After removal 1377301 rows left


3.c[6 pt] In this question we build an object representing the network of delayed flights connecting between airports, as a preparation for the PageRank algorithm.
For this, we will only look at the origin and destination columns, not giving any weight to the delay time.
Each (origin, destination) pair should have at most one link in the the network, even if there are multipled delayed flights connecting them.

Create a new RDD data structure of tuples called ranks, storing the initial PageRank value for each origin node. Set the initialization value as 1/n for all nodes, where n is the number of origin node.
We will ignore nodes that are only destination and do no not appear as origin, in order to avoid dangling pages and to simplify the calculations.

Next, create another RDD data structure of tuples, named links, where each tuple is composed of the origin and an iterator (a GroupByKey) of the destination.
Show the first  10  rows of the resulting links and ranks RDD data structures.

In [0]:
from pyspark.sql import functions
import pyspark.sql.functions as F

origins = spark.sql("select distinct origin from delays")
n = origins.count()
ranks = origins.rdd.map(lambda orig: (orig[0], 1/n))

grouped = ((spark.sql("select distinct origin,destination  from delays")).rdd.map(lambda orig: (orig[0], orig[1]))).groupByKey()
links = grouped.map(lambda orig: (orig[0], list(orig[1])))

print("First 10 rows of the links file:")
print(links.take(10))
print("")
print("First 10 rows of the ranks file:")
print(ranks.take(10))

First 10 rows of the links file:
[('ATL', ['GSP', 'HDN', 'ALB', 'SRQ', 'BZN', 'GRB', 'LGA', 'PIT', 'AEX', 'ORD', 'TUL', 'CHA', 'GTR', 'MEM', 'ASE', 'CAK', 'ILM', 'DSM', 'IAH', 'PVD', 'CLT', 'MHT', 'EVV', 'ABE', 'AUS', 'CID', 'AGS', 'GSO', 'DEN', 'CRW', 'BQK', 'SJU', 'CVG', 'ROC', 'EYW', 'BMI', 'PNS', 'SFO', 'DAB', 'LAS', 'MCI', 'HNL', 'ECP', 'DHN', 'MSN', 'FSM', 'CSG', 'HPN', 'MCO', 'JFK', 'TYS', 'PHX', 'SAT', 'ATW', 'PHL', 'AVL', 'EWN', 'BDL', 'MLI', 'SAV', 'MTJ', 'MIA', 'TLH', 'ORF', 'COS', 'RDU', 'MSP', 'CMH', 'SNA', 'FAR', 'SLC', 'EGE', 'DTW', 'BUF', 'PBI', 'JAX', 'MDT', 'FAY', 'MOB', 'SYR', 'BOS', 'GRR', 'RSW', 'SEA', 'STL', 'GNV', 'PDX', 'ABQ', 'OMA', 'ABY', 'STT', 'MKE', 'BHM', 'FLL', 'DCA', 'CLE', 'FWA', 'MSY', 'CAE', 'LEX', 'LAX', 'SJC', 'MYR', 'AVP', 'XNA', 'VPS', 'TPA', 'JAN', 'SDF', 'HOU', 'EWR', 'ROA', 'SMF', 'SHV', 'DAL', 'PWM', 'SAN', 'PIA', 'GPT', 'LFT', 'CHO', 'BWI', 'MLU', 'MDW', 'ELP', 'ONT', 'OAJ', 'GRK', 'FSD', 'BNA', 'TRI', 'FNT', 'SBN', 'ICT', 'PHF', 'TUS', 'MLB'

3.d. [6pt] Recall the PageRank algorithm:

Set  r=1n1n 
For  i=1  to  I :
   Set  r←1−βn1n+βMtr 
The conts function below, is used to create an iterator that transfers from the ranks vector  r  to  Mr  , that is used in each iteration of the algorithm (in part 2 above).
Using the conts function, join the ranks to the links data, and create an RDD object called contrib, which stores for each node  j  the sum  ∑ni=1mijri , i.e. the contributions of the PageRank scores over all of the nodes that link to it.
Display the top  10  values of the resulting RDD

Hint: First, use flatmap to obtain the contribution  mijri  for each link  i→j . Then, reduce to sum the contributions from all links going into the same destination node  j .

Next, update the ranks vector using the resulting contrib according to the Page Rank algorithm, with  β=0.85 . This completes one iteration of the algorithm.
Display the first  10  values of the resulting ranks RDD

In [0]:
# Converts nodes iterator and ranks vec to ranks normalized by out-degree  
def conts(nodes, rank): 
    """For each node in the graph calculate the number of connected  nodes, 
    and for each provide an node, adjusted rank by size of connected nodes:
    """
    num_nodes = len(nodes)
    for node in nodes:
        yield (node, rank / num_nodes)

contrib = links.join(ranks).flatMap(lambda x: conts(x[1][0],x[1][1])) 
ranks = contrib.reduceByKey(add).mapValues(lambda rank: rank*0.85 + 0.15/n)

print("First 10 values of the ranks RDD file:")
print(ranks.take(10))

First 10 values of the ranks RDD file:
[('GSP', 0.0010828977338659136), ('SRQ', 0.0008933302138109328), ('BZN', 0.0008545172310725196), ('GRB', 0.0007000804537888937), ('PIT', 0.0021256102937464993), ('AEX', 0.0006634933693757223), ('ORD', 0.07631369644235021), ('CHA', 0.0006916444453528852), ('GTR', 0.0006098803157626687), ('CAK', 0.0010119651891122173)]


3.e. [6pt]: The above code implemented one iteration of the PageRank algorithm.
Use a loop to apply  50  iterations starting from the uniform initialization, and with a beta of  0.85 .
Show the  10  airports with the highest PageRank score, along with their PageRank values, and City name.

In [0]:
import pandas as pd
#reset the 'ranks' and 'links' files:
origins = spark.sql("select distinct origin from delays")
n = origins.count()
ranks = origins.rdd.map(lambda orig: (orig[0], 1/n))
grouped = ((spark.sql("select distinct origin,destination  from delays")).rdd.map(lambda orig: (orig[0], orig[1]))).groupByKey()
links = grouped.map(lambda orig: (orig[0], list(orig[1])))

for i in range(50):
    contrib = links.join(ranks).flatMap(lambda x: conts(x[1][0],x[1][1])) 
    ranks = contrib.reduceByKey(add).mapValues(lambda rank: rank*0.85 + 0.15/n)

top_ranks = ranks.top(10, key=lambda val: val[1])
top_ranks = pd.DataFrame(top_ranks,columns = ["Airport","PageRank score"])
with_city = spark.sql("SELECT origin,City FROM (SELECT t1.origin, t2.City FROM delays t1 JOIN airports t2 on t2.IATA = t1.origin) GROUP by origin,City").toPandas()
#to add the city name:
top_ranks["City"] = top_ranks["Airport"].map(with_city.set_index('origin')['City'])

print("The  10  airports with the highest PageRank score are: ")
top_ranks

The  10  airports with the highest PageRank score are: 


Unnamed: 0,Airport,PageRank score,City
0,ATL,0.036344,Atlanta
1,ORD,0.032325,Chicago
2,DFW,0.032179,Dallas
3,DEN,0.025874,Denver
4,IAH,0.02146,Houston
5,MSP,0.018768,Minneapolis
6,SLC,0.018729,Salt Lake City
7,DTW,0.017699,Detroit
8,LAX,0.015828,Los Angeles
9,SFO,0.015545,San Francisco


3.f [4pt] In this sub-question we run the PageRank algoithm on the much larger, wikiepdia dataset.
Load the wikipedia network dataset file created in Quesion 1.
Your uploaded datasets can be accessed via: dbfs:/FileStore/shared_uploads/your.account.email@whatever.ending.you.have.com/
You should upload into two RDD objects using the sc.textFile command:
One, called keyvalue containing the nodes, uploaded from the keyvalue file.
Another, called transition containing the edges, uploaded from the file created in Question 1.b., in the format of one edge (two IDs) per line.
Display the top  10  (nodes or edges) for each RDD

In [0]:
keyvalue_path = "dbfs:/FileStore/tables/keyvalue"
transition_path = "dbfs:/FileStore/tables/network"

keyvalue = spark.sparkContext.textFile(keyvalue_path)
transition = spark.sparkContext.textFile(transition_path)

print("The top 10 nodes (by 'keyvalue' file) are:")
print(keyvalue.take(10))
print(" ")
print("The top 10 edges (by 'transition' file) are:")
print(transition.take(10))

The top 10 nodes (by 'keyvalue' file) are:
['0\thttps://en.wikipedia.org/wiki/Statistics', '1\thttps://en.wikipedia.org/wiki/Category:Statistics', '2\thttps://en.wikipedia.org/wiki/Portal:Mathematics', '3\thttps://en.wikipedia.org/wiki/Normal_distribution', '4\thttps://en.wikipedia.org/wiki/Scatter_plot', '5\thttps://en.wikipedia.org/wiki/Iris_flower_data_set', '6\thttps://en.wikipedia.org/wiki/Data', '7\thttps://en.wikipedia.org/wiki/Statistical_model', '8\thttps://en.wikipedia.org/wiki/Statistical_survey', '9\thttps://en.wikipedia.org/wiki/Experimental_design']
 
The top 10 edges (by 'transition' file) are:
['0 1', '0 2', '0 3', '0 4', '0 5', '0 6', '0 7', '0 8', '0 9', '0 10']


3.g.[6pt] We want to avoid dangling pages when running the PageRank algorithm.
To do so, convert the transition RDD into a data-frame. In this data-frame, remove all links where the destination webpage does not appear as one of the source webpages (in similar to 1.c.).
Repeat the process until you get a sub-network where every node is a source-node, i.e, has a positive out-degree (you may need to repeat the process more than once).

Remark In order to access a Spark data-frame via Spark.sql, you need to first declare it. For example, if your data-frame is called transition_df, add to your code the line: transition_df.createOrReplaceTempView("transition_df")

In [0]:
import pandas as pd

transition_df = transition.map(lambda x: x.split(" ")).toDF().toPandas()
transition_df.columns = ["source","destination"]
spark_df = spark.createDataFrame(transition_df)
spark_df.createOrReplaceTempView("spark_df")
print("Before removal there were "+str(transition_df.shape[0])+" rows")
spark_df = spark.sql("select * from spark_df as a WHERE  EXISTS (select * from spark_df as b where b.source = a.destination)")
transition_df = spark_df.toPandas()
print("After removal "+str(transition_df.shape[0])+" rows left")

Before removal there were 11874148 rows
After removal 5459898 rows left


3.h.[6pt] Finally, run the PageRank algorithm on the wikipedia netowrk, with 10 iterations and  β=0.85 , and a uniform ranks vector initialization.
Your implementation should be similar to the implementation for the flight delays dataset. Show the 20 wikipedia pages with the highest PageRank values you got (the url along with their PageRank scores)

In [0]:
import pandas as pd
from pyspark.sql import functions
import pyspark.sql.functions as F
from operator import add

#create the ranks and links files:
sources = spark.sql("select distinct source from spark_df")
n = sources.count()
ranks = sources.rdd.map(lambda source: (source[0], 1/n))

grouped = ((spark.sql("select distinct source,destination  from spark_df")).rdd.map(lambda orig: (orig[0], orig[1]))).groupByKey()
links = grouped.map(lambda orig: (orig[0], list(orig[1])))

def conts(nodes, rank): 
    """For each node in the graph calculate the number of connected  nodes, 
    and for each provide an node, adjusted rank by size of connected nodes:
    """
    num_nodes = len(nodes)
    for node in nodes:
        yield (node, rank / num_nodes)

for i in range(10):
    contrib = links.join(ranks).flatMap(lambda x: conts(x[1][0],x[1][1])) 
    ranks = contrib.reduceByKey(add).mapValues(lambda rank: rank*0.85 + 0.15/n)

top_ranks = ranks.top(20, key=lambda val: val[1])
top_ranks = pd.DataFrame(top_ranks,columns = ["page ID","PageRank score"])

#to get the urls:
keyvalue_df = keyvalue.map(lambda x: x.split("\t")).toDF().toPandas()
keyvalue_df.columns = ["ID","URL"]

top_ranks["URL"] = top_ranks["page ID"].map(keyvalue_df.set_index('ID')['URL'])

print("The 20 wikipedia pages with the highest PageRank score are: ")
top_ranks

The 20 wikipedia pages with the highest PageRank score are: 


Unnamed: 0,page ID,PageRank score,URL
0,454,0.002127,https://en.wikipedia.org/wiki/Special:SpecialP...
1,448,0.002127,https://en.wikipedia.org/wiki/Wikipedia:Contents
2,445,0.002127,https://en.wikipedia.org/wiki/Special:MyTalk
3,453,0.002127,https://en.wikipedia.org/wiki/Help:Introduction
4,447,0.002127,https://en.wikipedia.org/wiki/Main_Page
5,449,0.002127,https://en.wikipedia.org/wiki/Special:Random
6,452,0.002127,https://en.wikipedia.org/wiki/Help:Contents
7,450,0.002127,https://en.wikipedia.org/wiki/Wikipedia:About
8,451,0.002127,https://en.wikipedia.org/wiki/Wikipedia:Contac...
9,442,0.002075,https://en.wikipedia.org/wiki/Help:Category
