In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

In [19]:
from pyspark.sql.functions import avg

Spark context is the older entry point (spark 1.0) for accessing the rdd api and for other contexts like hive context and sql context.
With Spark2.0 dataframes were included as independent api which have all the context with it. SparkContext needs SparkConf but SparkSession doesn't need any.
SparkContext, HiveContext and SqlContext are now available through SparkSession but they only for backward comptibility.

In [2]:
spark = SparkSession.builder.appName("Flight Data Analysis").getOrCreate()

25/05/12 17:46:55 WARN Utils: Your hostname, Nothing-Is-Real resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/12 17:46:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/12 17:46:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#creating spark context and initializing spark session
conf = (SparkConf().setMaster("local[*]").setAppName("Flight Data Analysis").set("spark.executro.memory", "2g"))
sc = SparkContext(conf=conf)
# sc.setLogLevel("Error")
spark = SparkSession(sc)

25/05/11 05:33:52 WARN Utils: Your hostname, Nothing-Is-Real resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/11 05:33:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/11 05:33:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/11 05:34:04 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [9]:
#Loading and peeking at data
#Autoinference of schema [Takes time to walk through data and decide the data type]
df = spark.read.options(header=True, nullValue='NA', inferSchema=True).csv("/home/aman/programs/gitrepos/PySpark/flight_data_analysis/Airports2.csv")
print("Total row count: ", df.count())
df.show(5)
df.printSchema()


#timeit >> 3.19 s ± 353 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

                                                                                

Total row count:  3606803
+--------------+-------------------+-------------+----------------+----------+-----+-------+--------+----------+-----------------+----------------------+----------------+-----------------+----------------+-----------------+
|Origin_airport|Destination_airport|  Origin_city|Destination_city|Passengers|Seats|Flights|Distance|  Fly_date|Origin_population|Destination_population| Org_airport_lat| Org_airport_long|Dest_airport_lat|Dest_airport_long|
+--------------+-------------------+-------------+----------------+----------+-----+-------+--------+----------+-----------------+----------------------+----------------+-----------------+----------------+-----------------+
|           MHK|                AMW|Manhattan, KS|        Ames, IA|        21|   30|      1|     254|2008-10-01|           122049|                 86219| 39.140998840332|-96.6707992553711|            NULL|             NULL|
|           EUG|                RDM|   Eugene, OR|        Bend, OR|        41|

In [7]:
#Defining schema to load data using the provided schema instead of auto inference of schema
#Pre-defining of schema provides control over actual data type and takes less time to create the dataframe.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, NumericType, DateType, LongType

schema = StructType([StructField("Origin_airport",StringType(), True),
                     StructField("Destination_airport", StringType(), True),
                     StructField("Origin_city", StringType(), True),
                     StructField("Destination_city", StringType(), True),
                     StructField("Passengers", IntegerType(), True),
                     StructField("Seats", IntegerType(), True),
                     StructField("Flights", IntegerType(), True),
                     StructField("Distance", LongType(), True),
                     StructField("Fly_date", DateType(), True),
                     StructField("Origin_population", LongType(), True),
                     StructField("Destination_population", LongType(), True),
                     StructField("Org_airport_lat", StringType(), True),
                     StructField("Org_airport_long", StringType(), True),
                     StructField("Dest_airport_lat", StringType(), True),
                     StructField("Dest_airport_long", StringType(), True),
                     
                     ])

dataframe = spark.read.options(header=True, nullValue='NA').csv("/home/aman/programs/gitrepos/PySpark/flight_data_analysis/Airports2.csv", schema=schema)

print("Total row count: ", dataframe.count())
dataframe.printSchema()
dataframe.show(5)

#timeit >> 482 ms ± 23.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Total row count:  3606803
root
 |-- Origin_airport: string (nullable = true)
 |-- Destination_airport: string (nullable = true)
 |-- Origin_city: string (nullable = true)
 |-- Destination_city: string (nullable = true)
 |-- Passengers: integer (nullable = true)
 |-- Seats: integer (nullable = true)
 |-- Flights: integer (nullable = true)
 |-- Distance: long (nullable = true)
 |-- Fly_date: date (nullable = true)
 |-- Origin_population: long (nullable = true)
 |-- Destination_population: long (nullable = true)
 |-- Org_airport_lat: string (nullable = true)
 |-- Org_airport_long: string (nullable = true)
 |-- Dest_airport_lat: string (nullable = true)
 |-- Dest_airport_long: string (nullable = true)

+--------------+-------------------+-------------+----------------+----------+-----+-------+--------+----------+-----------------+----------------------+----------------+-----------------+----------------+-----------------+
|Origin_airport|Destination_airport|  Origin_city|Destination_city|P

In [10]:
dataframe.cache()

DataFrame[Origin_airport: string, Destination_airport: string, Origin_city: string, Destination_city: string, Passengers: int, Seats: int, Flights: int, Distance: bigint, Fly_date: date, Origin_population: bigint, Destination_population: bigint, Org_airport_lat: string, Org_airport_long: string, Dest_airport_lat: string, Dest_airport_long: string]

In [11]:
"""
    What are the most frequent origin and destination airports in the dataset?
    How many unique origin and destination airports are there?
"""

print("Unique no of origin and destination airports are: ")
print("Origin airports: ", dataframe.select("Origin_airport").distinct().count())
print("Destination airports: ", dataframe.select("Destination_airport").distinct().count())


print("Most frequent origin and destination airports are: ")
print("Origin airport with count : ")
dataframe.groupBy("Origin_airport").count().orderBy("count", ascending=0).show(1)

print("Destination airport with count : ")
dataframe.groupBy("Destination_airport").count().orderBy("count", ascending=0).show(1)


#verfication for occurance count
print(dataframe.select("Origin_airport").where(col("Origin_airport")=='ORD').count())
print(dataframe.select("Destination_airport").where(col("Destination_airport")=='ORD').count())

Unique no of origin and destination airports are: 


                                                                                

Origin airports:  683
Destination airports:  708
Most frequent origin and destination airports are: 
Origin airport with count : 
+--------------+------+
|Origin_airport| count|
+--------------+------+
|           ORD|158496|
+--------------+------+
only showing top 1 row

Destination airport with count : 
+-------------------+------+
|Destination_airport| count|
+-------------------+------+
|                ORD|160054|
+-------------------+------+
only showing top 1 row

158496
160054


In [12]:
"""
    Which city pairs (origin and destination) have the highest number of flights?
    What is the most common origin city for flights?
"""

dataframe.groupBy(['Origin_airport', 'Destination_airport']).count().orderBy('count', ascending=0).show(1)

dataframe.groupBy('Origin_city').count().orderBy('count', ascending=0).show(1)

+--------------+-------------------+-----+
|Origin_airport|Destination_airport|count|
+--------------+-------------------+-----+
|           LAX|                SFO| 5694|
+--------------+-------------------+-----+
only showing top 1 row

+-----------+------+
|Origin_city| count|
+-----------+------+
|Chicago, IL|186312|
+-----------+------+
only showing top 1 row



In [13]:
"""
    What is the total number of passengers for all flights in the dataset?
    Which route (origin to destination) has the highest number of passengers?
"""

dataframe.select("Passengers").agg(sum("Passengers")).show()

dataframe.groupBy(['Origin_airport', 'Destination_airport']).agg(sum('Passengers')).orderBy('sum(Passengers)', ascending=0).show(1)

+---------------+
|sum(Passengers)|
+---------------+
|     9698370217|
+---------------+

+--------------+-------------------+---------------+
|Origin_airport|Destination_airport|sum(Passengers)|
+--------------+-------------------+---------------+
|           OGG|                HNL|       32364612|
+--------------+-------------------+---------------+
only showing top 1 row



In [21]:
"""
    What is the average number of seats available per flight?
    Which flight route has the highest seat capacity?
"""

dataframe.select('Flights', 'Seats').agg(sum('Flights').alias('Total_Flights'), sum('Seats').alias('Total_Seats')).select(col('Total_Seats')/col('Total_Flights').alias('Average_Seats')).show()


dataframe.groupBy('Origin_airport', 'Destination_airport').agg(avg('Seats')).orderBy('avg(Seats)', ascending=0).show()

+----------------------------------------------+
|(Total_Seats / Total_Flights AS Average_Seats)|
+----------------------------------------------+
|                            108.74072362773029|
+----------------------------------------------+

+--------------+-------------------+------------------+
|Origin_airport|Destination_airport|        avg(Seats)|
+--------------+-------------------+------------------+
|           HOU|                DAL| 33454.32802937576|
|           LGA|                DCA|31680.222338204592|
|           DAL|                HOU|  31445.4315403423|
|           DCA|                LGA| 30073.12996031746|
|           BOS|                LGA|28716.677619893428|
|           LGA|                BOS|28488.496062992126|
|           HNL|                OGG|22195.539660056656|
|           OGG|                HNL|20688.098374322635|
|           HOU|                MSY| 18012.41176470588|
|           MSY|                HOU|17687.453271028036|
|           DFW|          

In [24]:
"""
    How many flights are there in total?
    What is the average number of flights between two cities?
"""

dataframe.select(sum('Flights').alias('Total_Flights')).show()

dataframe.groupBy('Origin_city', 'Destination_city').agg(avg('Flights').alias('Average_Flights')).show()

+-------------+
|Total_Flights|
+-------------+
|    134277303|
+-------------+

+-------------------+----------------+------------------+
|        Origin_city|Destination_city|   Average_Flights|
+-------------------+----------------+------------------+
|     Des Moines, IA|        Elko, NV|1.0934579439252337|
|         Eureka, CA|        Elko, NV|               1.0|
|      Baltimore, MD|       Akron, OH|               1.0|
|        Lansing, MI|       Fargo, ND|               1.0|
|         Albany, GA|       Miami, FL|               2.0|
|       Missoula, MT|       Omaha, NE|               1.0|
|          Miami, FL|       Tampa, FL| 50.17626886145405|
|        Midland, TX|      Austin, TX|17.413447782546495|
|West Palm Beach, FL|      Austin, TX|           1.03125|
|         Topeka, KS|      Bangor, ME|1.6666666666666667|
|         Dayton, OH|        Waco, TX|               1.0|
|       Columbus, OH|       Akron, OH|               1.0|
|        Chicago, IL|       Fargo, ND| 57.1705882

In [28]:
from pyspark.sql.functions import max, min
"""
    What is the longest and shortest flight distance in the dataset?
    Which route has the longest distance?
"""
dataframe.select(max('Distance').alias('Longest_Distance'), min('Distance').alias('Shortest_Distance')).show()


dataframe.select('Origin_airport', 'Destination_airport', 'Distance').orderBy('Distance', ascending=1).show(1)
dataframe.select('Origin_airport', 'Destination_airport', 'Distance').orderBy('Distance', ascending=0).show(1)

+----------------+-----------------+
|Longest_Distance|Shortest_Distance|
+----------------+-----------------+
|            5095|                0|
+----------------+-----------------+

+--------------+-------------------+--------+
|Origin_airport|Destination_airport|Distance|
+--------------+-------------------+--------+
|           PHL|                PHL|       0|
+--------------+-------------------+--------+
only showing top 1 row

+--------------+-------------------+--------+
|Origin_airport|Destination_airport|Distance|
+--------------+-------------------+--------+
|           BOS|                HNL|    5095|
+--------------+-------------------+--------+
only showing top 1 row



In [33]:
"""
    What is the range of dates covered in the dataset?
    How many flights occurred on a specific date (e.g., 2008-10-01)?
"""

dataframe.select(min('Fly_date').alias('Start_Date'), max('Fly_date').alias('Last_Date')).show()

dataframe.groupBy('Fly_date').agg(sum('Flights').alias('Total_Flights')).where(col('Fly_date')=='1990-01-01').show()

+----------+----------+
|Start_Date| Last_Date|
+----------+----------+
|1990-01-01|2009-12-01|
+----------+----------+

+----------+-------------+
|  Fly_date|Total_Flights|
+----------+-------------+
|1990-01-01|       422366|
+----------+-------------+



In [41]:
"""
   - What is the average population of origin and destination cities?
   - Which city has the highest population as an origin or destination?

"""

dataframe.groupBy('Origin_city', 'Destination_city').agg(avg('Origin_population'), avg('Destination_population')).show()


dataframe.groupBy('Origin_city').agg(max('Origin_population').alias('Population')).orderBy('Population', ascending=0).show(1)

dataframe.groupBy('Destination_city').agg(max('Destination_population').alias('Population')).orderBy('Population', ascending=0).show(1)

+-------------------+----------------+----------------------+---------------------------+
|        Origin_city|Destination_city|avg(Origin_population)|avg(Destination_population)|
+-------------------+----------------+----------------------+---------------------------+
|     Des Moines, IA|        Elko, NV|    461837.85046728974|                    43886.0|
|         Eureka, CA|        Elko, NV|              122284.0|                    43663.5|
|      Baltimore, MD|       Akron, OH|     2634257.888888889|          699513.4444444445|
|        Lansing, MI|       Fargo, ND|              455731.5|                   190810.5|
|         Albany, GA|       Miami, FL|     161870.2857142857|                1.0777358E7|
|       Missoula, MT|       Omaha, NE|               98675.5|                   787086.5|
|          Miami, FL|       Tampa, FL|     9606516.685871055|         2355536.3127572015|
|        Midland, TX|      Austin, TX|    117147.16309012876|         1215219.5894134478|
|West Palm

In [42]:
dataframe.columns

['Origin_airport',
 'Destination_airport',
 'Origin_city',
 'Destination_city',
 'Passengers',
 'Seats',
 'Flights',
 'Distance',
 'Fly_date',
 'Origin_population',
 'Destination_population',
 'Org_airport_lat',
 'Org_airport_long',
 'Dest_airport_lat',
 'Dest_airport_long']

In [46]:
"""
 **Latitude and Longitude:**
   - Are there any missing latitude or longitude values for airports?
   - What is the geographical distribution of origin and destination airports?
"""

print(dataframe.filter(dataframe.Org_airport_lat.isNull()).count())

print(dataframe.filter(dataframe.Org_airport_long.isNull()).count())

print(dataframe.filter(dataframe.Dest_airport_lat.isNull()).count())

print(dataframe.filter(dataframe.Dest_airport_long.isNull()).count())


6954
6954
6807
6807


In [53]:
"""
 **Combined Questions:**
    - Which city pair has the highest number of passengers and the longest distance?
    - Are there any correlations between the number of passengers and the population of the origin or destination city?
"""


dataframe.groupBy('Origin_city', 'Destination_city').agg(max('Passengers').alias('Passengers'), max('Distance').alias('Distance')).orderBy(['Passengers', 'Distance'], ascending=[0,0]).show(1)

dataframe.groupBy('Origin_city', 'Destination_city').agg(max('Passengers').alias('Passengers'), max('Distance').alias('Distance')).orderBy('Distance', ascending=0).show(1)

#correlation between no of passengers and population of origin or destination city

print("Origin_city_population vs No of Passengers correlation: ", dataframe.stat.corr('Origin_population', 'Passengers'))


print("Destination_city_population vs No of Passengers correlation: ", dataframe.stat.corr('Destination_population', 'Passengers'))


+-----------+----------------+----------+--------+
|Origin_city|Destination_city|Passengers|Distance|
+-----------+----------------+----------+--------+
|Kahului, HI|    Honolulu, HI|     89597|     100|
+-----------+----------------+----------+--------+
only showing top 1 row

+-----------+----------------+----------+--------+
|Origin_city|Destination_city|Passengers|Distance|
+-----------+----------------+----------+--------+
| Boston, MA|    Honolulu, HI|         0|    5095|
+-----------+----------------+----------+--------+
only showing top 1 row

Origin_city_population vs No of Passengers correlation:  0.10271982618165293
Destination_city_population vs No of Passengers correlation:  0.09975446573980201


In [28]:
sc.stop()