### Rename columns and save data frames to dict

In [0]:
import re


def camel_case_to_snake_case(column_name):
    # Add an underscore before any uppercase letter followed by a lowercase letter
    # and convert the entire string to lowercase
    return re.sub(r'(?<!^)(?=[A-Z])', '_', column_name).lower()


tables_dict = {
    'reservations': None,
    'flights': None,
    'customers': None,
    'hotels': None,
    'eur_exchange_rates': None,
}

for table_name in tables_dict.keys():
    df = spark.read.parquet(
        f'{BRONZE_LAYER_PATH}/{table_name}'
    )
    
    # Rename data frame columns
    renamed_columns = [camel_case_to_snake_case(column) for column in df.columns]
    df = df.toDF(*renamed_columns)

    # Save data frame to dictionary
    tables_dict[table_name] = df

### Reservations Data Frame transformations

In [0]:
from pyspark.sql.functions import col, date_format, regexp_extract, to_timestamp, when, lit, round


reservations_df = tables_dict['reservations']

# Convert the reservation_created_at column to timestamp
reservations_df = reservations_df.withColumn(
    'reservation_created_at',
     to_timestamp('reservation_created_at')
)

# Create columns for date and time separately
reservations_df = reservations_df.withColumn(
    'reservation_date',
    date_format('reservation_created_at', 'yyyy-MM-dd')
)
reservations_df = reservations_df.withColumn(
    'reservation_time',
    date_format('reservation_created_at', 'HH:mm')
)

# Extract the price amount and round it to two decimal places
reservations_df = reservations_df.withColumn(
    'total_price_amount',
    round(regexp_extract(col('total_price'), r'(\d+\.\d+)', 1).cast('float'), 2)
)

# Extract the price currency
reservations_df = reservations_df.withColumn(
    'total_price_currency',
    regexp_extract(col('total_price'), r'([A-Z]+)', 1)
)

# Join the exchange rates data frame with the reservations data frame
exchange_rates_df = tables_dict['eur_exchange_rates']

reservations_df = reservations_df.join(
    exchange_rates_df, 
    reservations_df['total_price_currency'] == exchange_rates_df['currency_code'],
    "left"
)

# Convert total price amount to EUR
reservations_df = reservations_df.withColumn(
    'total_price_amount_in_eur',
    when(
        col('total_price_currency') == 'EUR',
        col('total_price_amount')
    ).otherwise(
        col('total_price_amount') * col('currency_rate_to_eur')
    )
)

# Round the EUR amounts to the second decimal place
reservations_df = reservations_df.withColumn(
    'total_price_amount_in_eur',
    round(col('total_price_amount_in_eur'), 2)
)

# Select desired reservations data frame columns
reservations_df = reservations_df.select(
    'reservation_id',
    'reservation_date',
    'reservation_time',
    'customer_id',
    'flight_id',
    'hotel_id',
    'room_type',
    'total_price_amount',
    'total_price_currency',
    'total_price_amount_in_eur',
    'payment_method',
)

# Overwrite the reservations data frame
tables_dict['reservations'] = reservations_df

### Flights Data Frame transformations

In [0]:
from pyspark.sql.functions import from_unixtime, date_format, col, split, regexp_replace


flights_df = tables_dict['flights']

# Convert the flight_departure column from milliseconds to timestamp
flights_df = flights_df.withColumn(
    'flight_departure',
    from_unixtime(col('flight_departure') / 1000)
)

# Create separate columns for date and time
flights_df = flights_df.withColumn(
    'flight_departure_date', 
    date_format('flight_departure', 'yyyy-MM-dd')
).withColumn(
    'flight_departure_time', 
    date_format('flight_departure', 'HH:mm')
)

# Split the airport column into airport_code and airport_name
flights_df = flights_df.withColumn(
    'airport_code',
    split(col('airport'), r'\|').getItem(0)
).withColumn(
    'airport_name', 
    split(col('airport'), r'\|').getItem(1)
)

# Extract the flight number
flights_df = flights_df.withColumn(
    'flight_number', 
    split(col('flight'), '-').getItem(0)
)

# Extract the airline name
flights_df = flights_df.withColumn(
    'flight_airline_name', 
    split(split(col('flight'), '-').getItem(1), r'\(').getItem(0)
)

# Extract the plane model
flights_df = flights_df.withColumn(
    'flight_plane_model', 
    split(split(col('flight'), '-').getItem(1), r'\(').getItem(1)
).withColumn(
    'flight_plane_model', 
    regexp_replace('flight_plane_model', r'\)$', '')
)

# Select desired flights data frame columns
flights_df = flights_df.select(
    'flight_id',
    'flight_departure_date',
    'flight_departure_time',
    'flight_number',
    'flight_airline_name',
    'flight_plane_model',
    'airport_code',
    'airport_name',
)

# Overwrite the flights data frame
tables_dict['flights'] = flights_df

### Hotels Data Frame transformations

In [0]:
from pyspark.sql.functions import split


hotels_df = tables_dict['hotels']

# Extract hotel name
hotels_df = hotels_df.withColumn(
    'hotel_name', 
    split(hotels_df['hotel'], '\*').getItem(0)
)

# Extract hotel stars
hotels_df = hotels_df.withColumn(
    'hotel_stars', 
    split(hotels_df['hotel'], '\*').getItem(1)
)

# Select desired hotels data frame columns
hotels_df = hotels_df.select(
    'hotel_id',
    'hotel_name',
    'hotel_stars'
)

# Overwrite the hotels data frame
tables_dict['hotels'] = hotels_df

### Customers Data Frame transformations

In [0]:
from pyspark.sql.functions import when, split, concat_ws, to_date, date_format, current_date, datediff


customers_df = tables_dict['customers']

# Extract customer first name
customers_df = customers_df.withColumn(
    'customer_first_name', 
    split(customers_df['customer_name'], ' ').getItem(0)
)

# Extract customer last name
customers_df = customers_df.withColumn(
    'customer_last_name', 
    concat_ws(' ', split(customers_df['customer_name'], ' ').getItem(1))
)

# Map customer gender from 'M'/'F' to 'Male'/'Female'
customers_df = customers_df.withColumn(
    'customer_gender',
    when(customers_df['customer_gender'] == 'M', 'Male')
    .when(customers_df['customer_gender'] == 'F', 'Female')
)

# Convert customer_date_of_birth to date type
customers_df = customers_df.withColumn(
    'customer_date_of_birth', 
    to_date(customers_df['customer_date_of_birth'], 'M/d/yyyy')
)

# Calculate customer age
customers_df = customers_df.withColumn(
    'customer_age', 
    (datediff(current_date(), customers_df['customer_date_of_birth']) / 365.25).cast("int")
)

# Change birth date format
customers_df = customers_df.withColumn(
    'customer_date_of_birth', 
    date_format(customers_df['customer_date_of_birth'], 'yyyy-MM-dd')
)

# Select desired customers data frame columns
customers_df = customers_df.select(
    'customer_id',
    'customer_first_name',
    'customer_last_name',
    'customer_gender',
    'customer_email',
    'customer_date_of_birth',
    'customer_age',
)

# Overwrite the customers data frame
tables_dict['customers'] = customers_df

Save Data Frames to Silver layer

In [0]:
import datetime


for table_name, df in tables_dict.items():
    df.write \
    .format('delta') \
    .mode('overwrite') \
    .option('mergeSchema', 'true') \
    .save(f'{SILVER_LAYER_PATH}/{table_name}')

    print(f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Table "{table_name}" saved to silver layer.')
