In [0]:
import string
from pyspark.sql import DataFrame
from pyspark.sql.functions import current_timestamp
from pyspark import SparkFiles

In [0]:
type(spark)


In [0]:
spark.conf.set(
  "fs.azure.account.auth.type.hotelstorage5.blob.core.windows.net",
  "SAS"
)
spark.conf.set(
  "fs.azure.sas.raw.hotelstorage5.blob.core.windows.net",
  "sp=r&st=2025-07-18T05:21:03Z&se=2025-07-18T13:36:03Z&spr=https&sv=2024-11-04&sr=b&sig=y2APnhwMfNejhkBYVIyrxbuiaYWRpZC5b5hi5Ny%2BqX4%3D"
)

# Access file
file_path = "wasbs://raw@hotelstorage5.blob.core.windows.net/hotel_bookings.csv"

df = spark.read.option("header", "true").csv(file_path)
df.show(5)


In [0]:
display(df)

In [0]:
df.describe().show()

In [0]:
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
display(null_counts)


In [0]:
from pyspark.sql.functions import when, col

df_cleaned = df.fillna({
    'children': 0,
    'country': 'Unknown',
    'agent': 0
})


In [0]:
from pyspark.sql.functions import concat_ws, to_date

df_cleaned = df_cleaned.withColumn(
    "arrival_date",
    to_date(concat_ws("-", "arrival_date_year", "arrival_date_month", "arrival_date_day_of_month"), "yyyy-MMMM-d")
)


In [0]:
display(df_cleaned)

In [0]:
df_cleaned = df_cleaned.withColumn("is_canceled", col("is_canceled").cast("integer"))


In [0]:
from pyspark.sql.functions import expr

df_cleaned = df_cleaned.withColumn("total_nights", col("stays_in_weekend_nights") + col("stays_in_week_nights"))
df_cleaned = df_cleaned.withColumn("total_revenue", expr("adr * total_nights"))


In [0]:
display(df_cleaned)

In [0]:
df_cleaned = df_cleaned.filter(col("is_canceled") == 0)


In [0]:
df_cleaned.write.mode("overwrite").csv("dbfs:/mnt/hoteldata/processed/")

In [0]:
display(dbutils.fs.ls("dbfs:/mnt/hoteldata/processed/"))


In [0]:
#df_check = spark.read.csv("dbfs:/mnt/hoteldata/processed/")
#display(df_check)

In [0]:
#df_cleaned = spark.read.option("header", True).csv("dbfs:/mnt/hoteldata/processed/")
#display(df_cleaned)

In [0]:
display(df_cleaned)

In [0]:
from pyspark.sql.functions import concat_ws, sha2, col

dim_customer = df_cleaned.select(
    "country", "market_segment", "adults", "children", "babies"
).dropDuplicates()

# Create synthetic customer_id using hash
dim_customer = dim_customer.withColumn(
    "customer_id", sha2(concat_ws("-", *dim_customer.columns), 256)
)


In [0]:
display(dim_customer)

In [0]:
from pyspark.sql.functions import year, month, dayofweek, monotonically_increasing_id, to_date

dim_time = df_cleaned.select("arrival_date").dropDuplicates()

dim_time = dim_time.withColumn("year", year("arrival_date")) \
                   .withColumn("month", month("arrival_date")) \
                   .withColumn("weekday", dayofweek("arrival_date")) \
                   .withColumn("time_id", monotonically_increasing_id())


In [0]:
display(dim_time)

In [0]:
dim_hotel = df_cleaned.select(
    "hotel", "meal", "distribution_channel"
).dropDuplicates()

dim_hotel = dim_hotel.withColumn(
    "hotel_id", sha2(concat_ws("-", *dim_hotel.columns), 256)
)


In [0]:
display(dim_hotel)

In [0]:
# Join customer_id and hotel_id to original df
df_fact = df_cleaned \
    .join(dim_customer, on=["country", "market_segment", "adults", "children", "babies"], how="left") \
    .join(dim_hotel, on=["hotel", "meal", "distribution_channel"], how="left")

df_fact = df_fact.withColumn("stay_days", col("stays_in_week_nights") + col("stays_in_weekend_nights")) \
                 .withColumn("revenue", col("adr") * col("stay_days")) \
                 .withColumn("booking_id", monotonically_increasing_id())

fact_bookings = df_fact.select(
    "booking_id", "hotel_id", "customer_id", "revenue", "stay_days", "adr"
)


In [0]:
display(df_fact)

In [0]:
display(fact_bookings)

In [0]:
fact_bookings.write.mode("overwrite").option("header", True).csv("dbfs:/mnt/hoteldata/fact_bookings")
dim_customer.write.mode("overwrite").option("header", True).csv("dbfs:/mnt/hoteldata/dim_customer")
dim_hotel.write.mode("overwrite").option("header", True).csv("dbfs:/mnt/hoteldata/dim_hotel")
dim_time.write.mode("overwrite").option("header", True).csv("dbfs:/mnt/hoteldata/dim_time")


In [0]:
def save_df_to_adls(df, folder_name):
    abfs_path = f"abfss://raw@hotelstorage5.dfs.core.windows.net/hoteldata/{folder_name}"
    spark.conf.set(
        "fs.azure.sas.raw.hotelstorage5.dfs.core.windows.net",
        "sp=racw&st=2025-07-18T06:52:08Z&se=2025-07-18T15:07:08Z&spr=https&sv=2024-11-04&sr=c&sig=GJzb2mWBCPIbI8ZiDxls5tzoKuX1FjlAAHwOmcYMGss%3D"
    )
    df.write \
        .format("csv") \
        .option("header", True) \
        .mode("overwrite") \
        .save(abfs_path)

save_df_to_adls(fact_bookings, "fact_bookings")
save_df_to_adls(dim_customer, "dim_customer")
save_df_to_adls(dim_time, "dim_time")
save_df_to_adls(dim_hotel, "dim_hotel")
