# Installing Pyspark 

In [2]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 49.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=d66e3a8445c6b5f17b22859168e9e64dd2901ae48b0e05fc0a5f004291be5b7c
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


# Importing necessary libraries 

In [3]:
#Initializing PySpark
from pyspark.sql import functions as f
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

#Spark Config
conf = SparkConf().setAppName("sample_app")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName('Test').getOrCreate()
sqlContext = SQLContext(sc)



# Reading the data and its schema and datatypes

In [7]:
# Reading the csv file data
df = spark.read.format("csv").option("header","true").option("Inferschema","true").load("/content/Airline_data.csv")
df.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1552|           UA|      183

# Checking the columns/attributes names


In [None]:
col = df.columns                                                                # Retreving the columns names and stored under varible
print('List of column names: ')                                                 # Printing a define header
rdd = sc.parallelize(col)                                                       # By usinf sc parallelize function converting the data into rrd transformation
rdd.collect()                                                                   # Calling the action

List of column names: 


['Year',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay']

# Checking the number of rows and columns

In [None]:
row = df.count()                                                                # Counting the number of rows
columns = len(df.columns)                                                       # Counting the number of columns
print("Number of rows : ",row)
print("Number of columns: ",columns)

Number of rows :  426
Number of columns:  29


# 1.) Show a sample of 5 records from dataset.

In [6]:
df.limit(5).show()                                                                      # Fetching the top 5 rows data by using limit function

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1552|           UA|      183

# 2) Read the data with data types.

In [None]:
# Reading the csv file
df = spark.read.format("csv").option("header","true").option("Inferschema","true").load("/content/Airline_data.csv")
df.show()
# Prnting the schema
df.printSchema()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1552|           UA|      183

# 3.) Make a new column MonthStr, Which has months in form of 01, 02, 03, ..., 12.

In [None]:
# By using left padding function we leading space to zero
df = df.withColumn("Monthstr", f.lpad(f.col("Month"), 2, "0"))
df.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|Monthstr|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1

In [None]:
# checking the Monthstr columns and its records zero is add or not.
df.select("Monthstr").show()

+--------+
|Monthstr|
+--------+
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
+--------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import udf,col
def month(x):
  
     return("%02d" % (x))                           # %02d performs decimal integer conversion d, formatted with zero padding (0 flag), with width 2.
     
a = udf(month)                                      #converting the udf assinging the variables a
df = df.withColumn("MonthStr",a(col("Month")))      #adding new column by using withColumn
df.show()                                           #its show the result

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|MonthStr|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1

In [9]:
# checking the Monthstr columns and its records zero is add or not.
df.select("Monthstr").show()

+--------+
|Monthstr|
+--------+
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
|      01|
+--------+
only showing top 20 rows



# 4.) Find the number of(#) flights each airline made. 

In [None]:
# By using count  finction conting the number f recods
df1 = df.select(f.count("UniqueCarrier"))    
df1.show()

+--------------------+
|count(UniqueCarrier)|
+--------------------+
|                 426|
+--------------------+



# 5.) Find the mean Arrival Delay per origination airport.

In [None]:
# By using the groupBy function grouping the data and taking mean out of it
mean_arival_delay = df.groupBy("Origin").mean("ArrDelay")
mean_arival_delay.show()

+------+-------------------+
|Origin|      avg(ArrDelay)|
+------+-------------------+
|   LIH|0.16666666666666666|
|   HNL|  14.21774193548387|
|   EWR|               9.25|
|   DEN| 20.166666666666668|
|   IAD| 12.966666666666667|
|   SFO| 11.215384615384615|
|   PHL|  6.827586206896552|
|   OGG|  16.24137931034483|
+------+-------------------+



# 6.) What is the average departure delay from each airport?

In [None]:
# By using the groupBy function grouping the data and taking average out of it
Avg_departure_delay = df.groupBy("Origin").avg("DepDelay")
Avg_departure_delay.show()

+------+-------------------+
|Origin|      avg(DepDelay)|
+------+-------------------+
|   LIH|-3.7666666666666666|
|   HNL|  3.217741935483871|
|   EWR|  4.958333333333333|
|   DEN|               27.6|
|   IAD|                8.9|
|   SFO| 19.646153846153847|
|   PHL| 16.137931034482758|
|   OGG|                6.0|
+------+-------------------+

