In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from itertools import chain
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 3, Finished, Available, Finished)

# Flight Data

In [2]:
# Read the table
df = spark.read.table("flight_table")
display(df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2dabcffc-1dc7-4d5b-91ca-d1ad8b962d9e)

In [3]:
# Drop Duplicates
print(f"DataFrame Size Before Drop Duplicates: ({df.count()} rows, {len(df.columns)} columns)")
df = df.dropDuplicates()
print(f"DataFrame Size After Drop Duplicates: ({df.count()} rows, {len(df.columns)} columns)")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 5, Finished, Available, Finished)

DataFrame Size Before Drop Duplicates: (10683 rows, 11 columns)
DataFrame Size After Drop Duplicates: (10463 rows, 11 columns)


In [4]:
# Create Route_Start and Route_Destination

# Extract first element for Route_Start
df = df.withColumn("Route_Start", split(col("Route"), " ? ")[0])

# Extract last element for Route_Destination using size() - 1
df = df.withColumn("Route_Destination", expr("split(Route, ' ? ')[size(split(Route, ' ? ')) - 1]"))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 6, Finished, Available, Finished)

In [5]:
# Convert Dep_Time to timestamp format "HH:mm dd-MM-YYYY"
df = df.withColumn("Dep_Time", to_timestamp(expr("concat(Dep_Time, ' ', date_format(Date_of_Journey, 'dd-MM-yyyy'))"), "HH:mm dd-MM-yyyy"))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 7, Finished, Available, Finished)

In [6]:
# Convert Arrival_Time to timestamp format

# Extract Date_of_Journey as a string (for concatenation)
df = df.withColumn("Date_Str", col("Date_of_Journey").cast("string"))

# Extract hour and minute from Arrival_Time
df = df.withColumn("Hour_Minute", regexp_extract(col("Arrival_Time"), r"(\d{1,2}:\d{2})", 1))

# Extract day and month if exists in "hh:mm dd MMM" format
df = df.withColumn("Arrival_Day", regexp_extract(col("Arrival_Time"), r"(\d{1,2}) [A-Za-z]{3}", 1))
df = df.withColumn("Arrival_Month", regexp_extract(col("Arrival_Time"), r"\d{1,2} ([A-Za-z]{3})", 1))

# Extract year from Date_of_Journey
df = df.withColumn("Year", date_format(col("Date_of_Journey"), "yyyy"))

# Map month abbreviation to numbers
month_mapping = {
    "Jan": "01", "Feb": "02", "Mar": "03", "Apr": "04", "May": "05", "Jun": "06",
    "Jul": "07", "Aug": "08", "Sep": "09", "Oct": "10", "Nov": "11", "Dec": "12"
}

month_map_expr = create_map([lit(x) for x in chain(*month_mapping.items())])
df = df.withColumn("Arrival_Month_Num", month_map_expr[col("Arrival_Month")])

# Determine the correct date for Arrival_Time
df = df.withColumn("Arrival_Date",
                   when(col("Arrival_Day") != "", 
                        concat(col("Year"), lit("-"), col("Arrival_Month_Num"), lit("-"), col("Arrival_Day")))
                   .otherwise(col("Date_Str")))

# Concatenate to form the final timestamp
df = df.withColumn("Arrival_Time", to_timestamp(concat(col("Arrival_Date"), lit(" "), col("Hour_Minute")), "yyyy-MM-dd HH:mm"))

# Drop temporary columns
df = df.drop("Date_Str", "Hour_Minute", "Arrival_Day", "Arrival_Month", "Arrival_Month_Num", "Year", "Arrival_Date")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 8, Finished, Available, Finished)

In [7]:
# Convert Duration to total minutes

# Extract hours and minutes from Duration
df = df.withColumn("hours", regexp_extract(col("Duration"), r"(\d+)h", 1).cast("int"))
df = df.withColumn("minutes", regexp_extract(col("Duration"), r"(\d+)m", 1).cast("int"))

# Replace null values (where there might be no hours or minutes)
df = df.withColumn("hours", when(col("hours").isNull(), lit(0)).otherwise(col("hours")))
df = df.withColumn("minutes", when(col("minutes").isNull(), lit(0)).otherwise(col("minutes")))

# Calculate total duration in minutes
df = df.withColumn("Duration_Minutes", col("hours") * 60 + col("minutes"))

# Drop temporary columns
df = df.drop("hours", "minutes")

# Drop the old Duration column
df = df.drop("Duration")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 9, Finished, Available, Finished)

In [8]:
# Add Id column

# Define a window specification (order by any column, e.g., 'Date_of_Journey')
window_spec = Window.orderBy("Date_of_Journey")  # Change column based on sorting preference

# Add an 'id' column starting from 1
df = df.withColumn("Id", row_number().over(window_spec))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 10, Finished, Available, Finished)

In [9]:
# Reorder the column
df = df.select(
    "Id",
    "Airline", 
    "Date_of_Journey", 
    "Source", 
    "Destination", 
    "Route", 
    "Route_Start",
    "Route_Destination",
    "Dep_Time", 
    "Arrival_Time", 
    "Duration_Minutes",
    "Total_Stops", 
    "Additional_Info", 
    "Price"
)

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 11, Finished, Available, Finished)

In [11]:
# Show the transformed data
display(df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b812d0be-e8dd-4dc0-8104-bd57481aec84)

In [12]:
# Save the data to delta table as managed table
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("Tables/flights")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 14, Finished, Available, Finished)

In [13]:
# Check the data
df_check = spark.read.table("flights")
display(df)

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 21c74377-cae4-4bc3-b4fa-0ffd688f0fb5)

# Payment Data

In [14]:
payments_df = spark.read.csv('Files/payments.csv', header=True, inferSchema=True)
display(payments_df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7a7d0092-3d5a-4737-8fe0-9d9b4cacdda9)

In [15]:
# Convert transaction_date from string to date
payments_df = payments_df.withColumn("transaction_date", to_date(payments_df["transaction_date"], "yyyy-MM-dd"))
display(payments_df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5507bf90-5700-41c1-87d2-628107496bc7)

In [16]:
payments_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("Tables/payments")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 18, Finished, Available, Finished)

# Passenger Data

In [18]:
passengers_df = spark.read.csv('Files/passengers.csv', header=True, inferSchema=True)
display(passengers_df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c3b8ef2e-57f0-455a-81c1-653295237ef3)

In [20]:
# Convert date_of_birth from string to date
passengers_df = passengers_df.withColumn("date_of_birth", to_date(passengers_df["date_of_birth"], "yyyy-MM-dd"))
display(passengers_df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c894438a-c5f6-48ae-b0bf-5241be504722)

In [21]:
passengers_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("Tables/passengers")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 23, Finished, Available, Finished)

# Booking Data

In [22]:
bookings_df = spark.read.csv('Files/bookings.csv', header=True, inferSchema=True)
display(bookings_df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 24, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 33cee128-4561-41bf-b8eb-85f3a8a8905a)

In [23]:
# Convert booking_date from string to date
bookings_df = bookings_df.withColumn("booking_date", to_date(bookings_df["booking_date"], "yyyy-MM-dd"))
display(bookings_df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 25, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 80c446e5-cc82-43bc-8cc5-21a915c0e8f3)

In [24]:
bookings_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("Tables/bookings")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 26, Finished, Available, Finished)

# Airport Data

In [26]:
airports_df = spark.read.csv('Files/airports.csv', header=True, inferSchema=True)
display(airports_df.limit(10))

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 28, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 761bdf16-ec99-4918-9944-19105c241737)

In [27]:
airports_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("Tables/airports")

StatementMeta(, 0adc0ae7-b7de-409b-8edd-3e4736f98213, 29, Finished, Available, Finished)