In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\spark\\spark-3.3.1-bin-hadoop2'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
#Using the codes above, we built a spark session and set a name for the application. Then, the data was cached in off-heap memory to avoid storing it directly on disk, and the amount of memory was manually specified.
spark = SparkSession.builder.appName('FlightPrices').config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","10g").getOrCreate()

In [4]:
#an escape character to avoid commas in the .csv file when parsing
dataframe = spark.read.csv('./data.csv', header=True,escape="\"")

In [5]:
dataframe.show(n=5, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------
 legId                             | 9ca0e81111c683bec1012473feefd28f 
 searchDate                        | 2022-04-16                       
 flightDate                        | 2022-04-17                       
 startingAirport                   | ATL                              
 destinationAirport                | BOS                              
 fareBasisCode                     | LA0NX0MC                         
 travelDuration                    | PT2H29M                          
 elapsedDays                       | 0                                
 isBasicEconomy                    | False                            
 isRefundable                      | False                            
 isNonStop                         | True                             
 baseFare                          | 217.67                           
 totalFare                         | 248.60                           
 seats

count number of rows

In [6]:
dataframe.count()

13900000

In [7]:
#show columns and their datatype
dataframe.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: string (nullable = true)
 |-- flightDate: string (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: string (nullable = true)
 |-- isBasicEconomy: string (nullable = true)
 |-- isRefundable: string (nullable = true)
 |-- isNonStop: string (nullable = true)
 |-- baseFare: string (nullable = true)
 |-- totalFare: string (nullable = true)
 |-- seatsRemaining: string (nullable = true)
 |-- totalTravelDistance: string (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: str

In [8]:
#drop duplicated/irrelevant columns
cols = ("segmentsCabinCode","segmentsDistance","segmentsDurationInSeconds","segmentsEquipmentDescription","segmentsAirlineCode","segmentsAirlineName","segmentsDepartureAirportCode","segmentsArrivalAirportCode","segmentsArrivalTimeRaw","segmentsArrivalTimeEpochSeconds","segmentsDepartureTimeRaw","segmentsDepartureTimeEpochSeconds")
df=dataframe.drop(*cols)

In [9]:
#def findNullNoneEmpty(dataframe):
#    df = dataframe.select([count(when(col(c).contains('None') | \
#                            col(c).contains('NULL') | \
#                            (col(c) == '' ) | \
#                            col(c).isNull() | \
#                            isnan(c), c 
#                           )).alias(c)
#                    for c in dataframe.columns])
#    return df
#null_df = findNullNoneEmpty(df)
#null_df.show()

In [10]:
#deal with missing values
from pyspark.ml.feature import Imputer
df = df.withColumn('totalTravelDistance', df['totalTravelDistance'].cast('integer'))
imputer = Imputer(
inputCols=['totalTravelDistance'],
outputCols=['totalTravelDistance'])
df=imputer.fit(df).transform(df)

In [11]:
#drop all null rows
df = df.dropna()

In [None]:
#find any duplicated rows
df = df.distinct()

In [None]:
#Summary Statistics
df.describe(["totalTravelDistance","totalFare"]).show()

In [None]:
df.printSchema()

In [None]:
df_100200 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(100,200)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance100200")
df_100200.show()

In [None]:
df_200300 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(200,300)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance200300")
df_200300.show()

In [None]:
df_300400 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(300,400)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance300400")
df_300400.show()

In [None]:
df_400500 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(400,500)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance400500")
df_400500.show()

In [None]:
df_500600 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(500,600)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance500600")
df_500600.show()

In [None]:
df_600700 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(600,700)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance600700")
df_600700.show()

In [None]:
df_700800 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(700,800)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance700800")
df_700800.show()

In [None]:
df_800900 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(800,900)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance800900")
df_800900.show()

In [None]:
df_9001000 = df.select(df.totalFare, df.totalTravelDistance).filter(df.totalFare.between(900,1000)).agg(avg(col("totalTravelDistance"))).withColumnRenamed("avg(totalTravelDistance)", "avgDistance9001000")
df_9001000.show()

In [None]:
df_all = df_100200.join(df_200300).join(df_300400).join(df_400500).join(df_500600).join(df_600700).join(df_700800).join(df_800900).join(df_9001000)


In [None]:
df_all = df_all.toPandas()

In [None]:
df_all.plot.bar()

change data to int, by default it is string

df = df.withColumn("x", df["totalTravelDistance"].cast('int')).withColumn("y", df["totalFare"].cast('int'))

import matplotlib.pyplot as plt
# taking sample of 0.1 of whole data
# convert it to pandas dataframe 
sampled_data = df.select('x','y').sample(False, 0.1).toPandas()
# and at the end lets use our beautiful matplotlib
plt.scatter(sampled_data.x,sampled_data.y)
plt.xlabel('distance')
plt.ylabel('price')
plt.title('relation of total travel distance and prices')
plt.show()