<a href="https://colab.research.google.com/github/Harshaveenakondeti/pyspark/blob/main/pyspark_airline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 52.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=e19957f0589aec27e9649aa78aec50ce5e2dd1b5d2bd37c4affcb0720de8212b
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
# initializing pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
conf= SparkConf().setAppName('man_assign')
sc=SparkContext(conf=conf)
spark=SparkSession.builder.appName('Test').getOrCreate()
sqlcontext=SQLContext(sc)



In [None]:
# reading data
df = spark.read.format("csv").option("header","true").option("inferschema","true").load("/content/Airline_data.csv")

In [None]:
# check the data if read properly or not
df.show(5)   # first 5 rows to display

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1552|           UA|      183

In [None]:
#  display columns names
df.columns 

['Year',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay']

In [None]:
# schema of data, means datatypes of columns
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |

Here time columns showing as integer. it should be timestamp format. so we need to cast.

In [None]:
from pyspark.sql import functions as f  # importing pyspark functions
from pyspark.sql.functions import col,udf   # column function


# Data Analysis
## To get insights from data, we need to answer following questions.
1. Show a sample of 5 records from dataset.
2. Read the data with data types.
3. Make a new column MonthStr, Which has months in form of 01, 02, 03, ..., 12.
4. Find the # of flights each airline made.
5. Find the mean arrival delay per origination airport.
6. What is the average departure delay from each airport?

# 1. Show a sample of 5 records from dataset.

In [None]:
df.limit(5).show()


+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1552|           UA|      183

# 2.Read the data with data types.

In [None]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |

# 3.Make a new column MonthStr, Which has months in form of 01, 02, 03, ..., 12.

In [None]:
def ms(x):
  x=str(x)
  if len(str(x))==1:
    y='0'+str(x)
    return (str(y))
a=udf(ms)
ms(1)
df2 =df.withColumn("MonthStr",a(col(str("Month"))))
df2.show(2)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|MonthStr|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|1989|    1|        23|        1|   1419|      1230|   1742|      1

we created new column using withColumn Monthstr which takes month values and add 0 to single digit number months by using udf.

# 4.Find the # of flights each airline made.

In [None]:
df.agg({"UniqueCarrier":"count"}).show()

+--------------------+
|count(UniqueCarrier)|
+--------------------+
|                 426|
+--------------------+



In [None]:
df.groupBy("UniqueCarrier").count().show()

+-------------+-----+
|UniqueCarrier|count|
+-------------+-----+
|           UA|  426|
+-------------+-----+



each airline has 426 number of flights

# 5.Find the mean arrival delay per origination airport.





In [None]:
df.groupBy("Origin").avg('Arrdelay').show()

+------+-------------------+
|Origin|      avg(Arrdelay)|
+------+-------------------+
|   LIH|0.16666666666666666|
|   HNL|  14.21774193548387|
|   EWR|               9.25|
|   DEN| 20.166666666666668|
|   IAD| 12.966666666666667|
|   SFO| 11.215384615384615|
|   PHL|  6.827586206896552|
|   OGG|  16.24137931034483|
+------+-------------------+



LIH having  lowest arrival delay. DEN having lowest arrival delay.

# 6. What is the average departure delay from each airport?

In [None]:
df.groupBy('Origin').avg('Depdelay').show()

+------+-------------------+
|Origin|      avg(Depdelay)|
+------+-------------------+
|   LIH|-3.7666666666666666|
|   HNL|  3.217741935483871|
|   EWR|  4.958333333333333|
|   DEN|               27.6|
|   IAD|                8.9|
|   SFO| 19.646153846153847|
|   PHL| 16.137931034482758|
|   OGG|                6.0|
+------+-------------------+



1. LIH has lowest departure delay.
2. SFO having highest departure delay.



# Conclusion:
We install pyspark in our google colab. we started spark session with app name of mand_assign and created spark context with test. we read csv format file into spark dataframe. we did basic exploration by reading columns & print schema. we did analysis of data with give qquestions. we show sample of data using limit. we read datatypes using printSchema(). we created new column wuth MonthStr by replacing month value of 1 to 01. we calculated arrival delay mean from origin airport. we calculated mean departure delay from origin airport using groupBy and avg functions. SFO origin having highest mean departure delay. and LIH has lowest departure delay.DEN having highest mean arrival delay.LIH having lowest arrival delay.

so LIH origin is the best one with lowest  mean arrival delay and  mean departure delay.

**Author** : Harsha Veena

**IDE** : Google Colab

**Language** : pyspark

**Functions used** : agg,count,groupBy, avg,udf,col,printSchema,limit,withColumn

**Created On**: 5:7:2022
