In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession 
from pyspark.sql.types import *


In [2]:
spark = (SparkSession.builder.appName("Cleaning").getOrCreate())

hadoop_rdd = spark.sparkContext.textFile("hdfs://namenode:9000/input", minPartitions=120)


In [3]:
# We want to base the dataset on direct flights, aka Non-Stop flights.
def filterNonStop(x):
    row = x.split(",")
    if row[8] == "True": return True
    else: return False

In [4]:
#['legId 0', 'searchDate 1', 'flightDate 2', 'startingAirport 3',
#       'destinationAirport 4', 'travelDuration 5', 'elapsedDays 6', 'isBasicEconomy 7',
#       'isNonStop 8', 'baseFare 9', 'totalFare 10', 'seatsRemaining 11',
#       'totalTravelDistance 12', 'day_of_week 13', 'days_between_search_and_flight 14']

# After removeCols, the columns are:
#['searchDate 0', 'flightDate 1', 'startingAirport 2',
#       'destinationAirport 3', 'travelDuration 4',
#       'totalFare 5', 'totalTravelDistance 6', 'days_between_search_and_flight 7']

In [5]:
def removeCols(x): 
    elements = []
    row = x.split(",")
    for index, element in enumerate(row):
        #Excluding LegID 0, elapsedDays 6, IsBasicEconomy 7, isNonStop 8, baseFare 9,seatsRemaining 11, dayOfWeek 13
        if index in [1, 2, 3, 4, 5, 10, 12, 14]:
            elements.append(element)
    return elements


In [6]:
def filterFloatable(y):
    row = y.split(",")
    # We see that some totalTravelDistance values are empty, so we need to check for that.
    # This is also pre-removed columns. Old indexes
    totalFare = row[10]
    totalTravelDistance = row[12]

    try:
        float(totalTravelDistance)
        float(totalFare)
        return True
    except ValueError:
        return False


In [7]:
def cnvrtFloat(y): # totalFare 5 and totalTravelDistance 6
    totalFare = y[5]
    y[5] = float(totalFare)

    totalTravelDistance = y[6]
    y[6] = float(totalTravelDistance)
    return y

In [8]:
import datetime
def cnvrtDate(y): # searchDate 0 and flightDate 1
    searchDate = y[0]
    searchDate = searchDate.split("-")
    searchDate = datetime.date(int(searchDate[0]), int(searchDate[1]), int(searchDate[2]))

    flightDate = y[1]
    flightDate = flightDate.split("-")
    flightDate = datetime.date(int(flightDate[0]), int(flightDate[1]), int(flightDate[2]))
    
    y[0] = searchDate
    y[1] = flightDate
    return y

In [9]:
def cnvrtInt(y): # days_between_search_and_flight 7
    x = y[7] 

    days_between = int(x)

    y[7] = days_between
    
    return y

In [10]:
import re

def cnvrtDuration(y): # travelDuration 4
    x = y[4]

    houres, minutes = 0, 0

    houres = r"(\d+)H"
    minutes = r"(\d+)M"

    houres_pattern = re.search(houres, x)
    minutes_pattern = re.search(minutes, x)

    if houres_pattern:
        houres = int(houres_pattern.group(1))

    if minutes_pattern:
        minutes = int(minutes_pattern.group(1))
     
    y[4] = f"{houres}:{minutes}"
    
    return y 


In [11]:
cnt = hadoop_rdd.filter(filterNonStop)
cnt = cnt.filter(filterFloatable)
cnt = cnt.map(removeCols)
cnt = cnt.map(cnvrtFloat)
cnt = cnt.map(cnvrtDate)
cnt = cnt.map(cnvrtInt)
cnt = cnt.map(cnvrtDuration)


In [12]:
table = StructType([
    StructField("searchDate", DateType(), False), 
    StructField("flightDate", DateType(), False),
    StructField("startingAirport", StringType(), False),
    StructField("destinationAirport", StringType(), False),
    StructField("travelDuration", StringType(), False),
    StructField("totalFare", FloatType(), False),
    StructField("totalTravelDistance", FloatType(), False),
    StructField("days_between_search_and_flight", IntegerType(), False)
])

In [13]:
df = spark.createDataFrame(cnt, schema=table)

In [14]:
df.write.mode("overwrite").csv("/project/data_clean")

                                                                                

In [15]:
df.printSchema()

root
 |-- searchDate: date (nullable = false)
 |-- flightDate: date (nullable = false)
 |-- startingAirport: string (nullable = false)
 |-- destinationAirport: string (nullable = false)
 |-- travelDuration: string (nullable = false)
 |-- totalFare: float (nullable = false)
 |-- totalTravelDistance: float (nullable = false)
 |-- days_between_search_and_flight: integer (nullable = false)



In [16]:
df.show(5)

+----------+----------+---------------+------------------+--------------+---------+-------------------+------------------------------+
|searchDate|flightDate|startingAirport|destinationAirport|travelDuration|totalFare|totalTravelDistance|days_between_search_and_flight|
+----------+----------+---------------+------------------+--------------+---------+-------------------+------------------------------+
|2022-04-16|2022-04-17|            ATL|               CLT|          1:10|    398.6|              228.0|                             1|
|2022-04-16|2022-04-17|            ATL|               CLT|          1:13|    398.6|              228.0|                             1|
|2022-04-16|2022-04-17|            ATL|               DEN|          3:14|   296.61|             1207.0|                             1|
|2022-04-16|2022-04-17|            ATL|               DEN|          3:21|   296.61|             1207.0|                             1|
|2022-04-16|2022-04-17|            ATL|               D