# Configure connection with Azure storage and Load data

In [0]:
# Define Azure Blob Storage container names
container_name = 'tfstate'  # Input container name
container_out_name = 'data'  # Output container name

# Define Azure Storage account names
storage_account_name = 'sparkexercises'  # Input storage account name
storage_out_account_name = 'stdevelopmenttyga'  # Output storage account name

# Retrieve SAS tokens from Databricks secrets
sas_token = dbutils.secrets.get(scope="AzureConnection", key="SAS_in")  # SAS token for input storage
sas_token_out = dbutils.secrets.get(scope="AzureConnection", key="SAS_out")  # SAS token for output storage

# Construct paths for input and output storage
main_path = f'wasbs://{container_name}@{storage_account_name}.blob.core.windows.net'  # Path for input data
main_out_path = f'abfss://{container_out_name}@{storage_out_account_name}.dfs.core.windows.net'  # Path for output data

# Configure Spark to use SAS tokens for input storage (Blob Storage)
spark.conf.set(f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net", sas_token)

# Configure Spark to use SAS tokens for output storage (Data Lake Storage Gen2)
spark.conf.set(f"fs.azure.account.auth.type.{storage_out_account_name}.dfs.core.windows.net", "SAS")  # Set authentication type to SAS
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_out_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")  # Specify SAS token provider
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_out_account_name}.dfs.core.windows.net", sas_token_out)  # Set the SAS token for output storage

**Load avro data and transform it to delta table**

In [0]:
# Define the path to the Avro files
avro_file_path = main_path + '/m07sparksql/expedia/*.avro'  # Path to Avro files in Azure Blob Storage

# SQL query to create a Delta table from Avro files
sql = f"""
CREATE OR REPLACE TABLE expedia_bronze 
  USING DELTA AS  
  SELECT * FROM AVRO.`{avro_file_path}`  
"""

# Execute the SQL query to create the Delta table
spark.sql(sql)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

**Load parquet data and transform it to delta table**

In [0]:
# Define the path to the Parquet files for hotel weather data
hotel_weather_file_path = main_path + '/m07sparksql/hotel-weather/'  # Path to Parquet files in Azure Blob Storage

# SQL query to create a Delta table from Parquet files
sql = f"""
CREATE OR REPLACE TABLE HotelWeather_bronze 
  USING DELTA AS  
  SELECT * FROM PARQUET.`{hotel_weather_file_path}`  
"""

# Execute the SQL query to create the Delta table
spark.sql(sql)


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

**Clean data to silver layer**

In [0]:
%sql
-- Create a silver table for expedia data
CREATE OR REPLACE TABLE expedia_silver
USING DELTA AS  -- Use Delta Lake as the storage format
SELECT DISTINCT 
  user_id,  
  srch_ci, 
  srch_co, 
  date_diff(srch_co, srch_ci) AS NumberOfDaysInHotel,  -- Calculate the number of days in the hotel
  srch_adults_cnt + srch_children_cnt AS NumberOfVisitors,  -- Calculate the total number of visitors
  hotel_id 
FROM expedia_bronze  
WHERE 
  date_diff(srch_co, srch_ci) >= 0  -- Ensure check-out date is after check-in date
  AND srch_adults_cnt + srch_children_cnt > 0;  -- Ensure there are visitors

-- Create a silver table for hotel data
CREATE OR REPLACE TABLE hotel_silver
USING DELTA AS  -- Use Delta Lake as the storage format
SELECT DISTINCT 
  id,  
  name, 
  address,  
  city, 
  country 
FROM HotelWeather_bronze; 

-- Create a silver table for hotel weather data
CREATE OR REPLACE TABLE HotelWeather_silver
USING DELTA AS  -- Use Delta Lake as the storage format
SELECT DISTINCT 
  id AS hotelId,  
  avg_tmpr_c,  
  wthr_date, 
  year, 
  month,  
  day 
FROM HotelWeather_bronze; 

num_affected_rows,num_inserted_rows


# Top 10 hotels with max absolute temperature difference by month.

**Query analyze** - The most time (resource) consuming in below query is execution of the windows function

In [0]:
%sql
EXPLAIN
SELECT H.*,HW.MaxAbsoluteDifferenceByMonth,HW.rank,HW.year,HW.month
FROM (
       Select HW.hotelId
              ,HW.year
              ,HW.month
              ,ROUND(ABS(max(HW.avg_tmpr_c) - min(HW.avg_tmpr_c)),2) MaxAbsoluteDifferenceByMonth
              ,DENSE_RANK() OVER (ORDER BY ROUND(ABS(max(HW.avg_tmpr_c) - min(HW.avg_tmpr_c)),2) DESC) rank
       FROM HotelWeather_silver HW
       GROUP BY HW.hotelId,HW.year,HW.month
       ) hW
JOIN hotel_silver H
       ON H.id = HW.hotelId
WHERE rank <= 10

plan
"== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Initial Plan ==  Project [id#15073, name#15074, address#15075, city#15076, country#15077, MaxAbsoluteDifferenceByMonth#15039, rank#15040, year#15070, month#15071]  +- BroadcastHashJoin [hotelId#15067], [id#15073], Inner, BuildLeft, false, true  :- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=9219]  : +- Filter ((rank#15040 <= 10) AND isnotnull(hotelId#15067))  : +- RunningWindowFunction [hotelId#15067, year#15070, month#15071, _w1#15097 AS MaxAbsoluteDifferenceByMonth#15039, dense_rank(_w1#15097) windowspecdefinition(_w1#15097 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#15040], [_w1#15097 DESC NULLS LAST], false  : +- WindowGroupLimit [_w1#15097 DESC NULLS LAST], dense_rank(_w1#15097), 10, Final  : +- Sort [_w1#15097 DESC NULLS LAST], false, 0  : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9213]  : +- WindowGroupLimit [_w1#15097 DESC NULLS LAST], dense_rank(_w1#15097), 10, Partial  : +- Sort [_w1#15097 DESC NULLS LAST], false, 0  : +- HashAggregate(keys=[hotelId#15067, year#15070, month#15071], functions=[finalmerge_max(merge max#15105) AS max(avg_tmpr_c#15068)#15082, finalmerge_min(merge min#15107) AS min(avg_tmpr_c#15068)#15083])  : +- Exchange hashpartitioning(hotelId#15067, year#15070, month#15071, 200), ENSURE_REQUIREMENTS, [plan_id=9207]  : +- HashAggregate(keys=[hotelId#15067, year#15070, month#15071], functions=[partial_max(avg_tmpr_c#15068) AS max#15105, partial_min(avg_tmpr_c#15068) AS min#15107])  : +- Project [hotelId#15067, avg_tmpr_c#15068, year#15070, month#15071]  : +- Filter if (isnotnull(_databricks_internal_edge_computed_column_skip_row#15128)) (_databricks_internal_edge_computed_column_skip_row#15128 = false) else isnotnull(raise_error(DELTA_SKIP_ROW_COLUMN_NOT_FILLED, map(keys: [], values: []), NullType))  : +- FileScan parquet spark_catalog.default.hotelweather_silver[hotelId#15067,avg_tmpr_c#15068,year#15070,month#15071,_databricks_internal_edge_computed_column_skip_row#15128] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/user/hive/warehouse/hotelweather_silver], PartitionFilters: [], PushedFilters: [], ReadSchema: struct"


In [0]:
%sql
-- Create a golden table for top ten hotels with maximum absolute temperature difference
CREATE OR REPLACE TABLE TopTenHotelsWihMaxAbsTemp_golden
USING DELTA AS  -- Use Delta Lake as the storage format
SELECT 
  H.*,  -- 
  HW.MaxAbsoluteDifferenceByMonth,  
  HW.rank, 
  HW.year,  
  HW.month  
FROM (-- Join hotel data with aggregated hotel weather data
  SELECT  -- Aggregate hotel weather data by hotel ID, year, and month
    HW.hotelId,  
    HW.year,  
    HW.month,  
    ROUND(ABS(max(HW.avg_tmpr_c) - min(HW.avg_tmpr_c)), 2) AS MaxAbsoluteDifferenceByMonth,-- Calculate maximum absolute temperature difference for each month
    DENSE_RANK() OVER (ORDER BY ROUND(ABS(max(HW.avg_tmpr_c) - min(HW.avg_tmpr_c)), 2) DESC) AS rank -- Rank hotels by temperature difference in descending order
  FROM HotelWeather_silver HW
  GROUP BY HW.hotelId, HW.year, HW.month  
) HW
JOIN hotel_silver H
ON H.id = HW.hotelId
WHERE rank <= 10; -- Filter to include only top ten hotels by temperature difference


num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM TopTenHotelsWihMaxAbsTemp_golden

id,name,address,city,country,MaxAbsoluteDifferenceByMonth,rank,year,month
1571958030336,316 N Main St,Little Belt Inn Neihart,Neihart,US,19.1,1,2017,9
1571958030336,316 N Main St,Little Belt Inn Neihart,Neihart,US,18.5,2,2016,10
146028888072,2799 W Airport Way,Rodeway Inn Airport,Boise,US,18.5,2,2017,9
77309411328,Po Box 1327,The Point,Saranac Lake,US,18.2,3,2016,10
695784701957,2011 Rodgers Dr,Rodeway Inn,Gillette,US,16.3,4,2016,10
369367187456,555 E Lafayette Blvd,Greektown Casino,Detroit,US,16.2,5,2016,10
146028888068,501 National St,Super 8 Belle Fourche,Belle Fourche,US,15.5,6,2017,9
25769803779,663 E Cloverland Dr,Advance Motel,Ironwood,US,15.1,7,2017,9
77309411335,1400 Historic Dr,Clarion Inn Strasburg - Lancaster,Strasburg,US,14.9,8,2016,10
1520418422787,2 Little Beaver Rd,Brooks Donald L Jr,Strasburg,US,14.9,8,2016,10


**Save top ten hotels with max absolute temperature to Azure storage**

In [0]:
# Define the output path for the Parquet file
TopTenHotelsWihMaxAbsTemp_file_path = main_out_path + '/TopTenHotelsWihMaxAbsTemp'  # Output path in Azure Data Lake Storage Gen2

# SQL query to create a Parquet table from the golden table
sql = f"""
CREATE TABLE IF NOT EXISTS TopTenHotelsWihMaxAbsTemp  
USING PARQUET 
LOCATION '{TopTenHotelsWihMaxAbsTemp_file_path}'  
PARTITIONED BY (year, month)  
AS 
SELECT * 
FROM TopTenHotelsWihMaxAbsTemp_golden  
"""

# Execute the SQL query to create the Parquet table
spark.sql(sql)

DataFrame[]

# Top 10 busy hotels for each month.

**Query Analyze** - The most time (resource) consuming in below query is execution of the windows function

In [0]:
%sql
EXPLAIN
SELECT DISTINCT 
        hotel_id
        , name
        , address
        , city
        , country
        , date_format(srch_date, 'yyyy-MM') srch_month
        , srch_count
        ,year(srch_date) year
        , month(srch_date) month
        , ranking 
FROM (
    SELECT hotel_id
          , srch_date
          , count(srch_date) srch_count
          , row_number() over (partition by srch_date order by count(srch_date) desc) ranking
    FROM (     
        select hotel_id
               , explode(sequence(trunc(srch_ci, 'month'),trunc(srch_co, 'month'), interval 1 month)) as srch_date
        from expedia_silver
    )
    GROUP BY hotel_id, srch_date
) V 
LEFT JOIN hotel_silver H 
    on V.hotel_id = H.id
where ranking <= 10
order by year, month, ranking;


plan
"== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Initial Plan ==  Sort [year#16186 ASC NULLS FIRST, month#16187 ASC NULLS FIRST, ranking#16184 ASC NULLS FIRST], true, 0  +- Exchange rangepartitioning(year#16186 ASC NULLS FIRST, month#16187 ASC NULLS FIRST, ranking#16184 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=10499]  +- HashAggregate(keys=[hotel_id#16219L, name#16221, address#16222, city#16223, country#16224, srch_month#16185, srch_count#16183L, year#16186, month#16187, ranking#16184], functions=[])  +- Exchange hashpartitioning(hotel_id#16219L, name#16221, address#16222, city#16223, country#16224, srch_month#16185, srch_count#16183L, year#16186, month#16187, ranking#16184, 200), ENSURE_REQUIREMENTS, [plan_id=10496]  +- HashAggregate(keys=[hotel_id#16219L, name#16221, address#16222, city#16223, country#16224, srch_month#16185, srch_count#16183L, year#16186, month#16187, ranking#16184], functions=[])  +- Project [hotel_id#16219L, name#16221, address#16222, city#16223, country#16224, srch_month#16185, srch_count#16183L, year#16186, month#16187, ranking#16184]  +- BroadcastHashJoin [hotel_id#16219L], [cast(id#16220 as bigint)], LeftOuter, BuildRight, false, true  :- HashAggregate(keys=[hotel_id#16219L, date_format(cast(srch_date#16226 as timestamp), yyyy-MM, Some(Etc/UTC)) AS srch_month#16185, srch_count#16183L, year(srch_date#16226) AS year#16186, month(srch_date#16226) AS month#16187, ranking#16184], functions=[])  : +- Filter (ranking#16184 <= 10)  : +- RunningWindowFunction [hotel_id#16219L, srch_date#16226, srch_count#16183L, row_number() windowspecdefinition(srch_date#16226, _w3#16230L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#16184], [srch_date#16226], [_w3#16230L DESC NULLS LAST], false  : +- WindowGroupLimit [srch_date#16226], [_w3#16230L DESC NULLS LAST], row_number(), 10, Final  : +- Sort [srch_date#16226 ASC NULLS FIRST, _w3#16230L DESC NULLS LAST], false, 0  : +- Exchange hashpartitioning(srch_date#16226, 200), ENSURE_REQUIREMENTS, [plan_id=10484]  : +- WindowGroupLimit [srch_date#16226], [_w3#16230L DESC NULLS LAST], row_number(), 10, Partial  : +- Sort [srch_date#16226 ASC NULLS FIRST, _w3#16230L DESC NULLS LAST], false, 0  : +- HashAggregate(keys=[hotel_id#16219L, srch_date#16226], functions=[finalmerge_count(merge count#16233L) AS count(1)#16228L])  : +- Exchange hashpartitioning(hotel_id#16219L, srch_date#16226, 200), ENSURE_REQUIREMENTS, [plan_id=10478]  : +- HashAggregate(keys=[hotel_id#16219L, srch_date#16226], functions=[partial_count(1) AS count#16233L])  : +- Generate explode(sequence(trunc(cast(srch_ci#16215 as date), month), trunc(cast(srch_co#16216 as date), month), Some(INTERVAL '1' MONTH), Some(Etc/UTC))), [hotel_id#16219L], false, [srch_date#16226]  : +- Project [srch_ci#16215, srch_co#16216, hotel_id#16219L]  : +- Filter if (isnotnull(_databricks_internal_edge_computed_column_skip_row#16254)) (_databricks_internal_edge_computed_column_skip_row#16254 = false) else isnotnull(raise_error(DELTA_SKIP_ROW_COLUMN_NOT_FILLED, map(keys: [], values: []), NullType))  : +- FileScan parquet spark_catalog.default.expedia_silver[srch_ci#16215,srch_co#16216,hotel_id#16219L,_databricks_internal_edge_computed_column_skip_row#16254] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/user/hive/warehouse/expedia_silver], PartitionFilters: [], PushedFilters: [], ReadSchema: struct"


In [0]:
%sql
-- Create a golden table for top ten busy hotels per month
CREATE OR REPLACE TABLE TopTenBusyHotelsPerMonth_golden
USING DELTA AS  -- Use Delta Lake as the storage format
SELECT DISTINCT -- Select distinct columns from both hotel search data and hotel details
  hotel_id,  
  name,  
  address,  
  city, 
  country, 
  date_format(srch_date, 'yyyy-MM') AS srch_month, -- Format search date as 'yyyy-MM' for grouping by month
  srch_count,  
  year(srch_date) AS year,  
  month(srch_date) AS month,  
  ranking  
FROM ( -- Aggregate search counts by hotel ID and search date
  SELECT 
    hotel_id, 
    srch_date,  
    count(srch_date) AS srch_count, 
    row_number() OVER (PARTITION BY srch_date ORDER BY count(srch_date) DESC) AS ranking -- Rank hotels by search count in descending order within each date
  FROM (
    SELECT  -- Generate a sequence of dates between check-in and check-out for each hotel
      hotel_id,  
      explode(sequence(trunc(srch_ci, 'month'), trunc(srch_co, 'month'), interval 1 month)) AS srch_date
    FROM expedia_silver 
  )
  GROUP BY hotel_id, srch_date  -- Group by hotel ID and search date
) V
LEFT JOIN hotel_silver H 
ON V.hotel_id = H.id
WHERE ranking <= 10 -- Filter to include only top ten hotels by search count per date
ORDER BY year, month, ranking;


num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM TopTenBusyHotelsPerMonth_golden

hotel_id,name,address,city,country,srch_month,srch_count,year,month,ranking
1864015806473,South Quay Marsh Wall Tower Hamlets London E14 9SH United Kingdom,Hilton London Canary Wharf,London,GB,2016-10,405,2016,10,1
1958505086979,Llan 24 Eixample 08015 Barcelona Spain,Ofelias Hotel 4 Sup,Barcelona,ES,2016-10,396,2016,10,2
936302870530,40 Apollonos St,Arion,Ramey,US,2016-10,395,2016,10,3
1151051235330,200 Cowboy Ln,Pillar Hotels Resorts,Pleasanton,US,2016-10,393,2016,10,4
575525617665,485 Douglas Ave,Alpine Motel,Providence,US,2016-10,391,2016,10,5
695784701953,162 Henry Martin Trl,Miss Kitty's Boarding House,Statesville,US,2016-10,390,2016,10,6
103079215105,115 E James St,Websters Manor,Mullins,US,2016-10,390,2016,10,7
206158430209,250 Odlin Rd,Days Inn Bangor Airport,Bangor,US,2016-10,389,2016,10,8
2568390443017,Via della Giustizia 10 D Bicocca Zara 20125 Milan Italy,IH Hotels Milano Gioia,Milan,IT,2016-10,389,2016,10,9
2920577761284,Calabria 115 117 Eixample 08015 Barcelona Spain,Hotel Villa Emilia,Barcelona,ES,2016-10,389,2016,10,10


**Save Top ten busy hotels for each month**

In [0]:
# Define the output path for the Parquet file
TopTenBusyHotelsPerMonth_file_path = main_out_path + '/TopTenBusyHotelsPerMonth'  # Output path in Azure Data Lake Storage Gen2

# SQL query to create a Parquet table from the golden table
sql = f"""
CREATE TABLE IF NOT EXISTS TopTenBusyHotelsPerMonth 
USING PARQUET 
LOCATION '{TopTenBusyHotelsPerMonth_file_path}'  
PARTITIONED BY (year, month)  
AS 
SELECT *  
FROM TopTenBusyHotelsPerMonth_golden  
"""

# Execute the SQL query to create the Parquet table
spark.sql(sql)


DataFrame[]

# For visits with extended stay (more than 7 days) calculate weather trend (the day temperature difference between last and first day of stay) and average temperature during stay

**Query Analyze** - The most time (resource) consuming in below query is execution of the windows function and sorting

In [0]:
%sql
EXPLAIN
SELECT DISTINCT 
      H.name
      ,H.ID
      , srch_ci StartStay
      , srch_co EndStay
      , avg_stay_tmpr_c AverageTemperatureC
      , (last_day_tmpr_c - first_day_tmpr_c) TrendTemperatureC
      ,year(srch_ci) year
      , month(srch_ci) month
FROM (
      SELECT hotel_id
            , srch_ci
            , srch_co
            ,AVG(avg_tmpr_c) OVER stay AS avg_stay_tmpr_c
            ,FIRST_VALUE(avg_tmpr_c) OVER stay AS first_day_tmpr_c
            ,LAST_VALUE(avg_tmpr_c) OVER stay AS last_day_tmpr_c
      FROM expedia_silver e
      LEFT JOIN HotelWeather_silver hw 
          ON e.hotel_id = hw.hotelId
          AND wthr_date BETWEEN srch_ci AND srch_co
      WHERE e.NumberOfDaysInHotel > 7
      WINDOW stay AS (
            PARTITION BY hotel_id, srch_ci, srch_co 
            ORDER BY wthr_date 
            ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
      ) ES
JOIN Hotel_silver H
    ON ES.hotel_id = H.id

plan
"== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Initial Plan ==  HashAggregate(keys=[name#16897, ID#16896, StartStay#16846, EndStay#16847, AverageTemperatureC#16848, TrendTemperatureC#16849, year#16850, month#16851], functions=[])  +- Exchange hashpartitioning(name#16897, ID#16896, StartStay#16846, EndStay#16847, AverageTemperatureC#16848, TrendTemperatureC#16849, year#16850, month#16851, 200), ENSURE_REQUIREMENTS, [plan_id=11388]  +- HashAggregate(keys=[name#16897, ID#16896, StartStay#16846, EndStay#16847, knownfloatingpointnormalized(normalizenanandzero(AverageTemperatureC#16848)) AS AverageTemperatureC#16848, knownfloatingpointnormalized(normalizenanandzero(TrendTemperatureC#16849)) AS TrendTemperatureC#16849, year#16850, month#16851], functions=[])  +- Project [name#16897, ID#16896, srch_ci#16885 AS StartStay#16846, srch_co#16886 AS EndStay#16847, avg_stay_tmpr_c#16843 AS AverageTemperatureC#16848, (last_day_tmpr_c#16845 - first_day_tmpr_c#16844) AS TrendTemperatureC#16849, year(cast(srch_ci#16885 as date)) AS year#16850, month(cast(srch_ci#16885 as date)) AS month#16851]  +- BroadcastHashJoin [hotel_id#16889L], [cast(id#16896 as bigint)], Inner, BuildRight, false, true  :- Window [hotel_id#16889L, srch_ci#16885, srch_co#16886, avg(avg_tmpr_c#16891) windowspecdefinition(hotel_id#16889L, srch_ci#16885, srch_co#16886, wthr_date#16892 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg_stay_tmpr_c#16843, first_value(avg_tmpr_c#16891, false) windowspecdefinition(hotel_id#16889L, srch_ci#16885, srch_co#16886, wthr_date#16892 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_day_tmpr_c#16844, last_value(avg_tmpr_c#16891, false) windowspecdefinition(hotel_id#16889L, srch_ci#16885, srch_co#16886, wthr_date#16892 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_day_tmpr_c#16845], [hotel_id#16889L, srch_ci#16885, srch_co#16886], [wthr_date#16892 ASC NULLS FIRST]  : +- Sort [hotel_id#16889L ASC NULLS FIRST, srch_ci#16885 ASC NULLS FIRST, srch_co#16886 ASC NULLS FIRST, wthr_date#16892 ASC NULLS FIRST], false, 0  : +- Exchange hashpartitioning(hotel_id#16889L, srch_ci#16885, srch_co#16886, 200), ENSURE_REQUIREMENTS, [plan_id=11379]  : +- Project [hotel_id#16889L, srch_ci#16885, srch_co#16886, wthr_date#16892, avg_tmpr_c#16891]  : +- BroadcastHashJoin [hotel_id#16889L], [cast(hotelId#16890 as bigint)], LeftOuter, BuildRight, ((wthr_date#16892 >= srch_ci#16885) AND (wthr_date#16892 <= srch_co#16886)), false, true  : :- Project [srch_ci#16885, srch_co#16886, hotel_id#16889L]  : : +- Filter (((if (isnotnull(_databricks_internal_edge_computed_column_skip_row#16946)) (_databricks_internal_edge_computed_column_skip_row#16946 = false) else isnotnull(raise_error(DELTA_SKIP_ROW_COLUMN_NOT_FILLED, map(keys: [], values: []), NullType)) AND isnotnull(NumberOfDaysInHotel#16887)) AND (NumberOfDaysInHotel#16887 > 7)) AND isnotnull(hotel_id#16889L))  : : +- FileScan parquet spark_catalog.default.expedia_silver[srch_ci#16885,srch_co#16886,NumberOfDaysInHotel#16887,hotel_id#16889L,_databricks_internal_edge_computed_column_skip_row#16946] Batched: true, DataFilters: [isnotnull(NumberOfDaysInHotel#16887), (NumberOfDaysInHotel#16887 > 7), isnotnull(hotel_id#16889L)], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/user/hive/warehouse/expedia_silver], PartitionFilters: [], PushedFilters: [IsNotNull(NumberOfDaysInHotel), GreaterThan(NumberOfDaysInHotel,7), IsNotNull(hotel_id)], ReadSchema: struct"


In [0]:
%sql
-- Create a golden table for stays exceeding 7 days
CREATE OR REPLACE TABLE ExceededStay_golden
USING DELTA AS  -- Use Delta Lake as the storage format
SELECT DISTINCT -- Select distinct columns from both hotel search data and hotel details
  H.name,  
  H.ID,  
  srch_ci AS StartStay, 
  srch_co AS EndStay,  
  avg_stay_tmpr_c AS AverageTemperatureC,  -- Average temperature during the stay
  (last_day_tmpr_c - first_day_tmpr_c) AS TrendTemperatureC,  -- Temperature trend during the stay
  year(srch_ci) AS year, 
  month(srch_ci) AS month  
FROM (
  SELECT  -- Calculate average and trend temperatures for stays exceeding 7 days
    hotel_id,  -- Hotel identifier
    srch_ci,  -- Start date of the stay
    srch_co,  -- End date of the stay
    AVG(avg_tmpr_c) OVER stay AS avg_stay_tmpr_c,-- Calculate average temperature over the entire stay
    FIRST_VALUE(avg_tmpr_c) OVER stay AS first_day_tmpr_c,-- Get the temperature on the first day of the stay
    LAST_VALUE(avg_tmpr_c) OVER stay AS last_day_tmpr_c-- Get the temperature on the last day of the stay
  FROM expedia_silver e 
  LEFT JOIN HotelWeather_silver hw 
    ON e.hotel_id = hw.hotelId  
    AND wthr_date BETWEEN srch_ci AND srch_co  -- Ensure weather data is within the stay period
  WHERE e.NumberOfDaysInHotel > 7  -- Filter stays exceeding 7 days
  WINDOW stay AS (  -- Define a window over the stay period to calculate averages and trends
    PARTITION BY hotel_id, srch_ci, srch_co  
    ORDER BY wthr_date 
    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING -- Consider all rows in the window
  )
) ES
JOIN Hotel_silver H
ON ES.hotel_id = H.id

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM ExceededStay_golden

name,ID,StartStay,EndStay,AverageTemperatureC,TrendTemperatureC,year,month
1401 W 22nd St,1,2016-10-15,2016-10-23,16.6,0.0,2016,10
1705 S State St,6,2017-08-04,2017-08-17,,,2017,8
555 Clover Ln,8589934594,2016-10-06,2016-10-19,,,2016,10
Veitshoechheimer Str 5b,25769803782,2017-08-28,2017-09-07,,,2017,8
7063 Carroll Ave,34359738368,2016-10-27,2016-11-05,,,2016,10
2700 W Atlantic Blvd,34359738369,2016-10-13,2016-10-24,24.4,-2.0,2016,10
24001 Lorenzo Rd,34359738372,2017-08-19,2017-08-29,23.7,0.0,2017,8
5460 N River Rd,60129542147,2017-08-20,2017-09-03,25.3,0.0,2017,8
1914 Poplar Ave,85899345924,2017-09-26,2017-10-11,,,2017,9
50 N E Catawba Rd,111669149699,2016-10-23,2016-11-06,,,2016,10


**Save extended stay file**

In [0]:
# Define the output path for the Parquet file
ExceededStay_file_path = main_out_path + '/ExceededStay'  # Output path in Azure Data Lake Storage Gen2

# SQL query to create a Parquet table from the golden table
sql = f"""
CREATE TABLE IF NOT EXISTS ExceededStay 
USING PARQUET 
LOCATION '{ExceededStay_file_path}'  
PARTITIONED BY (year, month) 
AS 
SELECT * 
FROM ExceededStay_golden 
"""

# Execute the SQL query to create the Parquet table
spark.sql(sql)


DataFrame[]