In [None]:
import pyspark
from pyspark.sql.functions import col, substring, year, month, dayofmonth, max, date_format, to_date, trim, udf, \
                            split, from_unixtime, expr, when, array
from pyspark.sql.types import IntegerType, ArrayType, TimestampType

from py4j.java_gateway import java_import
from time import time
import re

df = spark.read.parquet('/datalake/bronze/flights')

df = df.withColumn("searchDate", to_date(col("searchDate"), "yyyy-MM-dd")) \
        .withColumn("flightDate", to_date(col("flightDate"), "yyyy-MM-dd"))

df = df.withColumn("startingAirport", trim(col("startingAirport"))) \
        .withColumn("destinationAirport", trim(col("destinationAirport"))) \
        .withColumn("fareBasisCode", trim(col("fareBasisCode")))

In [None]:
#convert travelDuration to total of minutes
def duration_to_minutes(duration):
    match = re.match(r'PT(\d+H)?(\d+M)?', duration)
    if not match:
        return None
    hours = int(match.group(1)[:-1]) if match.group(1) else 0
    minutes = int(match.group(2)[:-1]) if match.group(2) else 0
    return hours * 60 + minutes

# Register the function as a UDF
duration_to_minutes_udf = udf(duration_to_minutes, IntegerType())

# Convert travelDuration to total minutes
df = df.withColumn("travelDuration", duration_to_minutes_udf(df["travelDuration"]))

In [None]:
#filter out rows where baseFare is greater than totalFare
df= df.filter(df["baseFare"] <= df["totalFare"])

#filter out rows where baseFare is greater than totalFare
df= df.filter(df["seatsRemaining"] >= 0)

def split_to_array(df,new_column, old_column):
    new_df = df.withColumn(new_column, 
                   when(df[old_column].isNull(), array())
                   .otherwise(split(col(old_column), r'\|\|')))
    return new_df

# Define a UDF to convert ISO 8601 string to timestamp
def iso_to_timestamp(iso_string):
    from datetime import datetime
    return datetime.fromisoformat(iso_string.replace("Z", "+00:00"))

# Split the column into an array of strings
df = split_to_array(df, "arrivalTimeArray", "segmentsArrivalTimeRaw")
df = split_to_array(df, "departureTimeArray", "segmentsDepartureTimeRaw")

iso_to_timestamp_udf = udf(iso_to_timestamp, TimestampType())

# Apply the UDF to convert each element of the array to a timestamp
df = df.withColumn("arrivalTimeArray", expr("transform(arrivalTimeArray, x -> timestamp(x))"))
df = df.withColumn("departureTimeArray", expr("transform(departureTimeArray, x -> timestamp(x))"))

In [None]:
#convert to array in arrival airport ,airline and description
df = split_to_array(df, "arrivalAirportArray", "segmentsArrivalAirportCode")
df = split_to_array(df, "departureAirportArray", "segmentsDepartureAirportCode")

df = split_to_array(df, "airlineCodeArray", "segmentsAirlineCode")
df = split_to_array(df, "airlineNameArray", "segmentsAirlineName")

df = split_to_array(df, "equipDescriptionArray", "segmentsEquipmentDescription")

df = split_to_array(df, "CabinCodeArray", "segmentsCabinCode")


In [None]:
replace_and_convert_udf = \
    expr("transform(durationSecondsArray, x -> if(x is null or trim(x) in ('None', 'null'), 0, cast(x as int)))")

df = split_to_array(df, "durationSecondsArray", "segmentsDurationInSeconds")
df = df.withColumn("durationSecondsArray", replace_and_convert_udf)

In [None]:
replace_and_convert_udf = \
    expr("transform(distanceArray, x -> if(x is null or trim(x) in ('None', 'null'), 0, cast(x as int)))")
df = split_to_array(df, "distanceArray", "segmentsDistance")
df = df.withColumn("distanceArray", replace_and_convert_udf)

In [None]:
#drop segment columns
segment_columns = [col_name for col_name in df.columns if col_name.startswith("segment")]
df = df.drop(*segment_columns)

In [None]:
offset = 0
chunk_size = 1000000
min_index = df.agg(min('index')).collect()[0][0]
max_index = min_index + chunk_size

tblQuery = f"SELECT * \
            FROM fligths \
            WHERE index BETWEEN {min_index-1} AND {max_index}"
df.createOrReplaceTempView("fligths")

while True:
    start = time.time()
    output_df = spark.sql(tblQuery)

    if df.rdd.isEmpty():
        break

    #write data to datalake
    output_df.write.partitionBy("year","month","day").mode("append").parquet("/datalake/silver/flights")
    print("It tooks " + str(time.time() - start) +" seconds to insert another chunks")

    min_index = max_index + 1
    max_index = max_index + chunk_size