In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Data_Cleaning').getOrCreate()
input_file_path = "gs://my-bigdata-project-ma/landing/itineraries.csv"
sdf = spark.read.option("delimiter", ",").option("header", True).csv(input_file_path)
sdf.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/13 16:42:15 INFO SparkEnv: Registering MapOutputTracker
24/12/13 16:42:15 INFO SparkEnv: Registering BlockManagerMaster
24/12/13 16:42:15 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/12/13 16:42:16 INFO SparkEnv: Registering OutputCommitCoordinator
24/12/13 16:42:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+-------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segmentsDi

In [2]:
sdf = sdf.drop('fareBasisCode', 'segmentsDepartureTimeEpochSeconds', 
               'segmentsDepartureTimeRaw', 'segmentsArrivalTimeEpochSeconds', 
               'segmentsArrivalTimeRaw', 'segmentsArrivalAirportCode', 
               'segmentsDepartureAirportCode', 'segmentsAirlineName', 
               'segmentsAirlineCode', 'segmentsEquipmentDescription', 
               'segmentsDurationInSeconds', 'segmentsDistance', 
               'segmentsCabinCode')
print("done")

done


In [3]:
sdf = sdf.na.drop()
sdf.show(5)

+--------------------+----------+----------+---------------+------------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|
+--------------------+----------+----------+---------------+------------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+
|9ca0e81111c683bec...|2022-04-16|2022-04-17|            ATL|               BOS|       PT2H29M|          0|         False|       False|     True|  217.67|   248.60|             9|                947|
|98685953630e772a0...|2022-04-16|2022-04-17|            ATL|               BOS|       PT2H30M|          0|         False|       False|     True|  217.67|   248.60|             4|                947|
|98d9

In [4]:
from pyspark.sql.functions import col, isnan, when, count
missing_data = sdf.select([count(when(col(c).isNull(), c)).alias(c) for c in sdf.columns])
num_of_missing = missing_data.first()  
total_count = sum(num_of_missing)
print(f"Total number of missing values: {total_count}")



Total number of missing values: 0


                                                                                

In [5]:
sdf = sdf.withColumn("totalFare", col("totalFare").cast("double")) \
         .withColumn("baseFare", col("baseFare").cast("double")) \
         .withColumn("travelDuration", col("travelDuration").cast("double")) \
         .withColumn("elapsedDays", col("elapsedDays").cast("integer")) \
         .withColumn("seatsRemaining", col("seatsRemaining").cast("integer")) \
         .withColumn("totalTravelDistance", col("totalTravelDistance").cast("double"))
sdf.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: string (nullable = true)
 |-- flightDate: string (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: double (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: string (nullable = true)
 |-- isRefundable: string (nullable = true)
 |-- isNonStop: string (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: double (nullable = true)



In [7]:
output_file_path = "gs://my-bigdata-project-ma/cleaned/cleaned_itineraries.parquet"
sdf.write.mode("overwrite").parquet(output_file_path)
print('done')

                                                                                

done
