In [None]:
from pyspark.sql.functions import *
from pyspark.sql.functions import col, isnan, count,round
from datetime import datetime
import pandas as pd
import logging
from utils import DataPreprocessor



# Extraction Process

In [None]:
#Load data from blob
blob_account_name = 'experimentaldata'
blob_container_name = 'bookings'
blob_access_key = dbutils.secrets.get(scope="intellishore",key="storage")
#blob_path = 'marketing_sample_for_booking_com-travel_hotels__20191001_20191231__30k_data.json'
#blob_path = "wasbs://"+blob_container_name+"@"+blob_account_name+".blob.core.windows.net"

In [None]:
# already mounted
# dbutils.fs.mount(
#   source="wasbs://"+blob_container_name+"@"+blob_account_name+".blob.core.windows.net",
#   mount_point="/mnt/myblob",
#   extra_configs={
#     "fs.azure.account.key."+blob_account_name+".blob.core.windows.net": blob_access_key
#   }
# )



Out[19]: True

In [None]:
# Define the path to the mounted directory
path = "/mnt/myblob/"

# Get a list of all files in the mounted directory
files = dbutils.fs.ls(path)

# Filter the list to include only JSON files
json_files = [f for f in files if f.name.endswith(".json")]

# Sort the list by the modification time of each file, in descending order
sorted_files = sorted(json_files, key=lambda f: f.modificationTime, reverse=True)

# Get the path to the latest modified file
latest_file_path = sorted_files[0].path

# Read the data from the latest modified file into a DataFrame
df = spark.read.option("inferSchema", "true").option("multiline", "true").json(latest_file_path)


In [None]:
# Un-nest the json format

df = df.select('root.*')
df = df.withColumn("page", explode(df["page"]))
df = df.select("page.*")
df = df.select('pageurl',"record.*")
df = df.select("pageurl", "uniq_id", "hotel_id", "hotel_name", "review_count", "rating_count", "default_rank", "price_rank", "ota", "checkin_date", "crawled_date", explode("room_type").alias("room_type"))
df = df.select("pageurl", "uniq_id", "hotel_id", "hotel_name", "review_count", "rating_count", "default_rank", "price_rank", "ota", "checkin_date", "crawled_date", "room_type.*")
df.show()


+--------------------+--------------------+--------+--------------------+------------+------------+------------+----------+-----------+------------+--------------------+-------------------+----------------------+--------------------+-------------------+---------------+
|             pageurl|             uniq_id|hotel_id|          hotel_name|review_count|rating_count|default_rank|price_rank|        ota|checkin_date|        crawled_date|room_type_breakfast|room_type_cancellation|      room_type_name|room_type_occupancy|room_type_price|
+--------------------+--------------------+--------+--------------------+------------+------------+------------+----------+-----------+------------+--------------------+-------------------+----------------------+--------------------+-------------------+---------------+
|https://www.booki...|62ff4459d734cbddc...| 1469636|  OYO Rooms Madhapur|           4|         4.6|         226|       123|booking.com|  2019-11-03|2019-11-03 11:14:...|          breakfast| 

In [None]:
# Correct the data types
df_silver = df.withColumn('hotel_id', df.hotel_id.cast('int')) \
                  .withColumn('review_count', df.review_count.cast('float')) \
                  .withColumn('rating_count', df.rating_count.cast('float')) \
                  .withColumn('default_rank', df.default_rank.cast('int')) \
                  .withColumn('price_rank', df.price_rank.cast('int')) \
                  .withColumn('checkin_date', to_date(col('checkin_date'), 'yyyy-MM-dd')) \
                  .withColumn("crawled_date", to_timestamp(col("crawled_date"),"yyyy-MM-dd HH:mm:ss Z")) \
                  .withColumn('room_type_price', df.room_type_price.cast('float'))


In [None]:
preprocessor = DataPreprocessor(df_silver,threshold=10)

In [None]:
#Data checks
preprocessor.count_rows(df_silver)
preprocessor.num_columns(df_silver,16)

2023-02-27 21:37:19,552 - __main__ - INFO - Valid Input
2023-02-27 21:37:19,553 - __main__ - INFO - Valid number of columns


In [None]:
# Check for nulls
above,below = preprocessor.get_nulls(df_silver,threshold=10)
# Get null report
preprocessor.null_status(10,above,below)

2023-02-27 21:39:33,316 - __main__ - INFO - Getting null report
2023-02-27 21:39:38,616 - __main__ - INFO - Columns with high number of nulls are [('review_count', 23.81838695536568), ('rating_count', 37.96324908553894), ('default_rank', 12.940432473608187), ('price_rank', 19.49580677383887)]
2023-02-27 21:39:38,617 - __main__ - INFO - The number of nulls in columns below 10% are [('hotel_id', 0.017965836165946835), ('room_type_price', 3.6988063498451345)]


In [None]:
# check for nulls and remove columns with more than 10% null values
no_nulls = preprocessor.remove_nulls(df_silver,above,rows=False)

2023-02-27 21:40:47,115 - __main__ - INFO - The columns removed [('review_count', 23.81838695536568), ('rating_count', 37.96324908553894), ('default_rank', 12.940432473608187), ('price_rank', 19.49580677383887)]


In [None]:
#Data checks
preprocessor.count_rows(no_nulls)
no_columns = len(df_silver.columns)-len(above)
preprocessor.num_columns(no_nulls,no_columns)

2023-02-27 21:41:04,797 - __main__ - INFO - Valid Input
2023-02-27 21:41:04,798 - __main__ - INFO - Valid number of columns


In [None]:
# Remove the rows with null values
no_nulls_rows = preprocessor.remove_nulls(no_nulls,below,rows=True)

2023-02-27 21:42:27,167 - __main__ - INFO - The number of rows removed is 5169 which is the 3.7146162856711675%


In [None]:
unique = no_duplicates.select(col('hotel_id')).distinct().collect()

In [None]:
len(unique)

Out[132]: 14884

In [None]:
# Remove duplicates
no_duplicates = preprocessor.check_duplicates(no_nulls_rows)

2023-02-27 21:45:33,818 - __main__ - INFO - Number of duplicates removed is 583 which is the 0.435126582278481% of dataset


In [None]:
# Last check for remaining nulls
last_above,last_below = preprocessor.get_nulls(no_duplicates,0)
preprocessor.null_status(0,last_above,last_below)

2023-02-27 21:46:33,624 - __main__ - INFO - Getting null report
2023-02-27 21:46:41,427 - __main__ - INFO - No nulls above threshold detected
2023-02-27 21:46:41,428 - __main__ - INFO - No nulls below threshold


In [None]:
no_duplicates

Out[91]: DataFrame[pageurl: string, uniq_id: string, hotel_id: int, hotel_name: string, ota: string, checkin_date: date, crawled_date: timestamp, room_type_breakfast: string, room_type_cancellation: string, room_type_name: string, room_type_occupancy: bigint, room_type_price: float]

In [None]:
%sql
CREATE TABLE IF NOT EXISTS silver.bookings_silver (
  pageurl STRING,
  uniq_id STRING,
  hotel_id INT NOT NULL,
  hotel_name STRING NOT NULL,
  ota STRING,
  crawled_date TIMESTAMP,
  checkin_date DATE,
  room_type_breakfast STRING,
  room_type_cancellation STRING,
  room_type_name STRING,
  room_type_occupancy BIGINT,
  room_type_price FLOAT
)
USING delta
LOCATION 'dbfs:/user/hive/warehouse/silver/bookings_silver'
OPTIONS (
  PRIMARY_KEY 'uniq_id'
)


In [None]:
# write the data to silver data base
no_duplicates.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("dbfs:/user/hive/warehouse/silver/bookings_silver")

# Gold tables (Aggregations)

In [None]:
%sql
CREATE TABLE IF NOT EXISTS gold.revenue (
 
  hotel_id INT NOT NULL,
  hotel_name STRING NOT NULL,
  revenue int
)
USING delta
LOCATION 'dbfs:/user/hive/warehouse/gold/revenue'
OPTIONS (
  PRIMARY_KEY 'hotel_id'
)


In [None]:
%sql
-- what was the total income of hotels based on bookings they had in 2019
WITH rev AS (
  SELECT 
      hotel_id as id,
      hotel_name as name,
      round(SUM(room_type_price),0) AS revenue
  FROM silver.bookings_silver
  GROUP BY 1,2
  ORDER BY SUM(room_type_price) DESC)
  INSERT INTO gold.revenue(hotel_id,hotel_name,revenue)
  SELECT id,name,revenue FROM rev
  WHERE NOT EXISTS (
  SELECT 1 FROM gold.revenue
  WHERE hotel_id = id AND hotel_name = name
)

num_affected_rows,num_inserted_rows
0,0


In [None]:
%sql
CREATE TABLE IF NOT EXISTS gold.month_quarter_demand (
 
  hotel_id INT NOT NULL,
  hotel_name STRING NOT NULL,
  bookings INT,
  month INT,
  quarter STRING,
  month_quarter STRING,
  demand int
)
USING delta
LOCATION 'dbfs:/user/hive/warehouse/gold/month_quarter_demand'
OPTIONS (
  PRIMARY_KEY 'hotel_id'
)

In [None]:
%sql
WITH Quarter AS(
        SELECT
        hotel_id as Id,
        hotel_name as Name,
        count(*) AS Bookings,
        
        date_format(checkin_date, 'MM') AS Month,
        
        CASE 
            WHEN date_format(checkin_date, 'MM') BETWEEN '01' AND '03' THEN 'Q1'
            WHEN date_format(checkin_date, 'MM') BETWEEN '03' AND '06' THEN 'Q2'
            WHEN date_format(checkin_date, 'MM') BETWEEN '06' AND '09' THEN 'Q3' 
            ELSE 'Q4'
        END AS Quarter 
        FROM silver.bookings_silver
        GROUP BY 1,2,4,5
)
 INSERT INTO gold.month_quarter_demand(hotel_id,hotel_name,bookings,month,quarter,quarter_month,demand)
  SELECT 
    Id,
    Name,
    Bookings,
    Month,
    Quarter,
    CONCAT(Month, '_', Quarter), 
    Bookings - COALESCE(lag(Bookings) OVER (PARTITION BY Id ORDER BY Month, Quarter), 0) AS Difference
FROM Quarter
  WHERE NOT EXISTS (
  SELECT 1 FROM gold.revenue
  WHERE hotel_id = id AND hotel_name = name
)


Id,Name,Bookings,Month,Quarter,"concat(Month, _, Quarter)",Difference
74729,"Taj Palace, New Delhi",58,1,Q1,01_Q1,58
74729,"Taj Palace, New Delhi",85,11,Q4,11_Q4,27
74729,"Taj Palace, New Delhi",348,12,Q4,12_Q4,263


In [None]:
%sql
DROP TABLE IF EXISTS gold.rooms_aggr

In [None]:
%sql
CREATE TABLE IF NOT EXISTS gold.rooms_aggr(
 
  hotel_id INT NOT NULL,
  hotel_name STRING NOT NULL,
  breakfast STRING,
  cancelation STRING,
  room_name STRING,
  avg_price INT,
  avg_num_persons INT,
  bookings INT,
  avg_days_dif INT
)
USING delta
LOCATION 'dbfs:/user/hive/warehouse/gold/rooms_aggr'
OPTIONS (
  PRIMARY_KEY 'hotel_id'
)

In [None]:
%sql
WITH rooms AS (SELECT 
  b.hotel_id AS id, 
  b.hotel_name, 
  b.room_type_breakfast AS breakfast, 
  b.room_type_cancellation AS cancellation, 
  b.room_type_name AS room_name,
  round(AVG(b.room_type_price),0) AS avg_price,
  round(AVG(b.room_type_occupancy),0) AS avg_num_persons,
  COUNT(b.crawled_date) AS bookings,
  a.avg_days_diff
FROM 
  silver.bookings_silver b
  INNER JOIN (
    SELECT 
      hotel_id, 
      hotel_name, 
      room_type_breakfast, 
      room_type_cancellation, 
      room_type_name,
      ROUND(AVG(DATEDIFF(checkin_date,CAST(crawled_date AS DATE))),0) AS avg_days_diff
    FROM 
      silver.bookings_silver  
    GROUP BY 1,2,3,4,5
  ) a ON 
    b.hotel_id = a.hotel_id AND 
    b.hotel_name = a.hotel_name AND 
    b.room_type_breakfast = a.room_type_breakfast AND 
    b.room_type_cancellation = a.room_type_cancellation AND 
    b.room_type_name = a.room_type_name
GROUP BY 
  1,2,3,4,5,9
  )
INSERT INTO gold.rooms_aggr(hotel_id, hotel_name, breakfast, cancelation, room_name, avg_price,avg_num_persons, bookings, avg_days_dif)
SELECT 
  id,
  hotel_name,
  breakfast,
  cancellation,
  room_name,
  avg_price,
  avg_num_persons,
  bookings,
  avg_days_diff
FROM rooms
WHERE NOT EXISTS (
SELECT 1 FROM gold.rooms_aggr
WHERE hotel_id = id AND hotel_name = name
)


num_affected_rows,num_inserted_rows
45948,45948
