In [0]:
import boto3
import pandas as pd

In [0]:
class PageRank_for_Airports:

    def __init__(self, aws_access_key_id = "XXXXXXX", 
                 aws_secret_access_key = "XXXXXXX/XXXXX+8", 
                 bucket_name = "bucket", 
                 input_file = "datafile/T_T100D_SEGMENT_US_CARRIER_ONLY.csv", 
                 output_file = "datafile/assignment_2_output.txt",
                 teleportation_factor = "0.15",
                 initial_page_rank = "10",
                 no_of_iter = 4
                ):
        self.aws_access_key_id = aws_access_key_id
        self.aws_secret_access_key = aws_secret_access_key
        self.bucket_name = bucket_name
        self.input_file = input_file
        self.output_file = output_file
        self.initial_page_rank = initial_page_rank
        self.teleportation_factor = teleportation_factor
        self.no_of_iter = no_of_iter
        
    
    def read_file_data(self):
        
        s3 = boto3.client("s3", 
                          aws_access_key_id = self.aws_access_key_id, 
                          aws_secret_access_key = self.aws_secret_access_key)
        
        s3_obj = s3.get_object(Bucket = self.bucket_name, Key = self.input_file)
        
        df = pd.read_csv(s3_obj['Body'])
        input_data_rdd = spark.createDataFrame(df).rdd
        
        return input_data_rdd
    
    
    def helper_page_rank_calculator(self, input_data_rdd):
        initial_page_rank = self.initial_page_rank
        teleportation_factor = self.teleportation_factor
        airport_ids = input_data_rdd.map(lambda x: (x[0], x[2]))
        airport_ids_inlink_map = airport_ids.map(lambda x: (x[1], x[0]))
        
        airport_outlink_map = airport_ids.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y)
        
        airports = sc.parallelize(airport_ids.map(lambda x: x[0]).distinct().collect() + airport_ids.map(lambda x: x[1]).distinct().collect()).distinct()
        
        total_airports = airports.count()
        airport_node_rank = airports.map(lambda x: (x, initial_page_rank))
        
        for i in range(self.no_of_iter):
            temp_airport_rank = airports.map(lambda x: (x, 0.0))
            airport_node_rank = airport_node_rank.join(airport_outlink_map).map(lambda x: (x[0], float(x[1][0]/x[1][1])))
            
            temp_airport_rank = airport_ids_inlink_map.join(temp_airport_rank).map(lambda x: (x[0], x[1][1])).join(airport_node_rank).map(lambda x: (x[0], x[1][0] + x[1][1])).mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (float(x[0])+ float(y[0]), x[1]+ y[1])).map(lambda x: (x[0], x[1][0]))
            
            temp_airport_rank = temp_airport_rank.map(lambda x: (x[0], (float(teleportation_factor / total_airports) + ((1 - teleportation_factor) * x[1]))))
            
            airport_node_rank = airport_node_rank.join(temp_airport_rank).map(lambda x: (x[0], x[1][1]))
            
        result = airport_node_rank.sortBy(lambda x: -x[1])
        
        return result
    
    
    
    def write_data_to_output(self, result):
        session = boto3.Session(aws_access_key_id = self.aws_access_key_id,
                                aws_secret_access_key = self.aws_secret_access_key)
        
        s3_session = session.resource('s3')
        object = s3_session.Object(self.bucket_name, self.output_file)
        
        res = "(Airport Code, Page Rank)\n"
        for row in result.collect():
            res = res + str(row) + '\n'
            
        s = object.put(Body=res)
        
        print("File {} is uploaded to S3 bucket".format(self.output_file))
        

In [0]:
aws_access_key_id = 'XXXXXXXXXXx'
aws_secret_access_key = 'XXXXXXXX/XXXXXXX+8'
bucket_name = 'bucket'
input_file = 'datafile/T_T100D_SEGMENT_US_CARRIER_ONLY.csv' 
output_file = 'datafile/Assignment_3_Q1_output.txt'
no_of_iter = 4
teleportation_factor = 0.15
initial_page_rank = 10

pagerank_for_airports = PageRank_for_Airports(aws_access_key_id, aws_secret_access_key, bucket_name, input_file, output_file, teleportation_factor, initial_page_rank, no_of_iter)

In [0]:
input_data = pagerank_for_airports.read_file_data()

In [0]:
input_data.take(10)

Out[8]: [Row(ORIGIN='06A', ORIGIN_CITY_NAME='Kizhuyak, AK', DEST='A43', DEST_CITY_NAME='Kodiak Island, AK'),
 Row(ORIGIN='09A', ORIGIN_CITY_NAME='Homer, AK', DEST='ORI', DEST_CITY_NAME='Port Lions, AK'),
 Row(ORIGIN='09A', ORIGIN_CITY_NAME='Homer, AK', DEST='ADQ', DEST_CITY_NAME='Kodiak, AK'),
 Row(ORIGIN='09A', ORIGIN_CITY_NAME='Homer, AK', DEST='ADQ', DEST_CITY_NAME='Kodiak, AK'),
 Row(ORIGIN='1G4', ORIGIN_CITY_NAME='Peach Springs, AZ', DEST='BLD', DEST_CITY_NAME='Boulder City, NV'),
 Row(ORIGIN='1G4', ORIGIN_CITY_NAME='Peach Springs, AZ', DEST='BLD', DEST_CITY_NAME='Boulder City, NV'),
 Row(ORIGIN='2AK', ORIGIN_CITY_NAME='Deer Park, AK', DEST='SIT', DEST_CITY_NAME='Sitka, AK'),
 Row(ORIGIN='2AK', ORIGIN_CITY_NAME='Deer Park, AK', DEST='PTD', DEST_CITY_NAME='Port Alexander, AK'),
 Row(ORIGIN='2NC', ORIGIN_CITY_NAME='Asheboro, NC', DEST='MXE', DEST_CITY_NAME='Maxton, NC'),
 Row(ORIGIN='7AK', ORIGIN_CITY_NAME='Akun, AK', DEST='DUT', DEST_CITY_NAME='Unalaska, AK')]

In [0]:
result = pagerank_for_airports.helper_page_rank_calculator(input_data)
pagerank_for_airports.write_data_to_output(result)

File datafile/Assignment_3_Q1_output.txt is uploaded to S3 bucket


In [0]:
# Below line of code will download the output file from S3 and display the result
check_data = boto3.client("s3",
                  aws_access_key_id = pagerank_for_airports.aws_access_key_id,
                  aws_secret_access_key = pagerank_for_airports.aws_secret_access_key)
        
output = check_data.get_object(Bucket = pagerank_for_airports.bucket_name, Key = pagerank_for_airports.output_file)

output = pd.read_csv(output['Body'])
output


Unnamed: 0,(Airport Code,Page Rank)
0,('MKC',422.8294334860503)
1,('BWD',422.8294334860503)
2,('MCK',422.8294334860503)
3,('NGU',422.8294334860503)
4,('TLJ',422.8294334860503)
...,...,...
868,('PPV',0.32653000974425594)
869,('ABY',0.32653000974425594)
870,('CGA',0.32653000974425594)
871,('KCC',0.13387893435448575)
