In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *


In [7]:
#create session
appName = "data processing in spark"
spark = SparkSession \
.builder \
.appName(appName) \
.config("spark.some.config.option", "some-value") \
.getOrCreate()



In [10]:
flightSchema = StructType([
    StructField("DayOfMonth", IntegerType(), False),
    StructField("DayOfWeek", IntegerType(), False),
    StructField("Carrier", StringType(), False),
    StructField("OriginAirportID", IntegerType(), False),
    StructField("DestAirportID", IntegerType(), False),
    StructField("DepDelay", IntegerType(), False),
    StructField("ArrDelay", IntegerType(), False)
])

flights = spark.read.csv('C:/Users/aayushi srivastava/Documents/AayushiSrivastavaJobSearch/PySparkProjects/dataset/flights.csv', schema = flightSchema, header = True)
flights.show(2)


+----------+---------+-------+---------------+-------------+--------+--------+
|DayOfMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 2 rows



In [15]:
airportSchema = StructType([
    StructField("airport_id",IntegerType(), False),
    StructField("city", StringType(), False),
    StructField("state", StringType(), False),
    StructField("name", StringType(), False)
])

airports = spark.read.csv('C:/Users/aayushi srivastava/Documents/AayushiSrivastavaJobSearch/PySparkProjects/dataset/airports.csv',header =True, schema =airportSchema)
airports.show(2)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
+----------+-----------+-----+--------------------+
only showing top 2 rows



In [16]:
#merge two dataframes and (flights and airports) and show how many flights from each city
flightsByOrigin = flights.join(airports,flights.OriginAirportID == airports.airport_id).groupBy("city").count()
flightsByOrigin.show(3)

+--------------+-----+
|          city|count|
+--------------+-----+
|       Phoenix|89720|
|         Omaha|13487|
|Raleigh/Durham|28301|
+--------------+-----+
only showing top 3 rows



In [18]:
#Drop duplicated data and calculate how many duplicated data

# count number of original data rows
n1 = flights.count()
print("Number of original data rows:",n1)

#count data rows after deleting duplicated data
n2 = flights.dropDuplicates().count()
print("Number of data rows after deleting duplicate rows", n2)

#count duplicate data
n3 = n1 - n2
print("Number of duplicated Data", n3)

Number of original data rows: 2702218
Number of data rows after deleting duplicate rows 2696657
Number of duplicated Data 5561


In [20]:
#specify duplicate criteria by certain column
df = spark.createDataFrame([("Rony",27,168),
                            ("Rony",15,165),
                            ("Rony",27,168)],
                          ["name","age","height"])
df.show()
df.dropDuplicates().show()

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
|Rony| 27|   168|
+----+---+------+

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
+----+---+------+



In [21]:
df.dropDuplicates(['name']).show()

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
+----+---+------+



In [22]:
#Handle Missing Data
#Delete row if there is atleast one (column) missing data
flightsNoMissingValue = flights.dropDuplicates().dropna(
how = 'any', subset=["ArrDelay", "DepDelay"]) #use how="all" for all column missing data
numberOfMissingValueAny = n1 - flightsNoMissingValue.count()
print("number of missing value rows: ", numberOfMissingValueAny)

number of missing value rows:  5561


In [23]:
#Fill the missing data using mean of each corresponding column data
#take mean value
meanArrDelay = flights.groupBy().avg("ArrDelay").take(1)[0][0]
print("mean ArrDelay: ",meanArrDelay)
meanDepDelay = flights.groupBy().avg("DepDelay").take(1)[0][0]
print("mean DepDelay: ",meanDepDelay)
#drop duplicated data and fill missing value with mean value
flightscleanData = flights.fillna(
{'ArrDelay': meanArrDelay, 'DepDelay': meanDepDelay})

flights.groupBy().avg("ArrDelay").show()

mean ArrDelay:  6.6550108096386005
mean DepDelay:  10.510732294729737
+------------------+
|     avg(ArrDelay)|
+------------------+
|6.6550108096386005|
+------------------+



In [25]:
#Explore the statistics of data
flightscleanData.describe('DepDelay','ArrDelay').show()


+-------+------------------+------------------+
|summary|          DepDelay|          ArrDelay|
+-------+------------------+------------------+
|  count|           2702218|           2702218|
|   mean|10.510732294729737|6.6550108096386005|
| stddev| 36.02975608466141| 38.54758423679155|
|    min|               -63|               -94|
|    max|              1863|              1845|
+-------+------------------+------------------+



In [26]:
#calculate the correlation between two variables to know whether the variable is related to each other or not
correlation = flightscleanData.corr('DepDelay','ArrDelay')
print("correlation between departure delay and arrival delay: ",correlation)


correlation between departure delay and arrival delay:  0.9392560468773389
