In [2]:
spark

In [8]:
from pyspark.sql import functions as F

# Define source and output paths
source_bucket_name = 'gs://my-bigdata-project-es/landing'
source_file_path = f"{source_bucket_name}/itineraries.csv"
output_bucket_name = 'gs://my-bigdata-project-es/cleaned'
output_file_path = f"{output_bucket_name}/cleaned_itineraries.parquet"

# Load the data
sdf = spark.read.options(header='true', inferschema='true').csv(source_file_path)


                                                                                

In [9]:
# Show initial data shape
print("Initial Data Shape:", sdf.count(), "rows and", len(sdf.columns), "columns")


# Display the schema of the dataframe
sdf.printSchema()



Initial Data Shape: 82138753 rows and 27 columns
root
 |-- legId: string (nullable = true)
 |-- searchDate: timestamp (nullable = true)
 |-- flightDate: timestamp (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: st

                                                                                

In [10]:
#Drop columns that are not needed
columns_to_drop = ['segmentsEquipmentDescription','segmentsDurationInSeconds','segmentsDistance','segmentsCabinCode']
sdf = sdf.drop(*columns_to_drop)

In [11]:
#Handle missing values and remove rows with null values
sdf = sdf.dropna()

In [12]:
#Check for nulls again after cleaning

missing_values = sdf.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in sdf.columns])
missing_values.show()



+-----+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+-------------------+-------------------+
|legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineName|segmentsAirlineCode|
+-----+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------

                                                                                

In [13]:
#Write the cleaned DataFrame to Parquet
sdf.write.mode('overwrite').parquet(output_file_path)

print("Cleaned data written to:", output_file_path)

                                                                                

Cleaned data written to: gs://my-bigdata-project-es/cleaned/cleaned_itineraries.parquet
