In [0]:
# Azure Storage settings
output_storage_account = "e6o"
output_container = "data"

spark.conf.set(
    f"fs.azure.account.key.{output_storage_account}.blob.core.windows.net",
    dbutils.secrets.get(scope="hw2secret", key="AZURE_STORAGE_ACCOUNT_KEY_DESTINATION"))

# Read Hotel-Weather data and save it in Delta format
hotel_weather_df = spark.read.format("parquet").load(f"wasbs://{output_container}@{output_storage_account}.blob.core.windows.net/joined_data/")
hotel_weather_df.show()

+-------+-------------------+---------+--------------+---------------------+--------------------+------------------+-------------------------+-------+---------+----------+-------+----------+----------+---------------+-----------------+-----------+-------------------+------------------------+-------------+--------------------+----------+----------+--------------+-------+-------+---------------+----------+---------+--------------------+----------+----+-----+---+
|     id|          date_time|site_name|posa_continent|user_location_country|user_location_region|user_location_city|orig_destination_distance|user_id|is_mobile|is_package|channel|   srch_ci|   srch_co|srch_adults_cnt|srch_children_cnt|srch_rm_cnt|srch_destination_id|srch_destination_type_id|     hotel_id|             address|avg_tmpr_c|avg_tmpr_f|          city|country|geoHash|accomodation_id|  latitude|longitude|                name| wthr_date|year|month|day|
+-------+-------------------+---------+--------------+----------------

In [0]:
# Azure Storage settings
input_storage_account = "homework2corvin"
output_storage_account = "developmentwesteurope6o"
input_container = "hw2"
output_container = "data"

# Setting up Storage account keys
spark.conf.set(
    f"fs.azure.account.key.{input_storage_account}.blob.core.windows.net",
    dbutils.secrets.get(scope="hw2secret", key="AZURE_STORAGE_ACCOUNT_KEY_SOURCE"))

spark.conf.set(
    f"fs.azure.account.key.{output_storage_account}.blob.core.windows.net",
    dbutils.secrets.get(scope="hw2secret", key="AZURE_STORAGE_ACCOUNT_KEY_DESTINATION"))

# Create database if it doesn't exist
spark.sql("CREATE DATABASE IF NOT EXISTS mydatabase")

# Read Expedia data from the source container and save it in Delta format to the data output container 
expedia_df = spark.read.format("avro").load(f"wasbs://{input_container}@{input_storage_account}.blob.core.windows.net/expedia/")

expedia_df.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(f"wasbs://{output_container}@{output_storage_account}.blob.core.windows.net/delta/expedia/")

# Register the Expedia Delta table in the Metastore
spark.sql("DROP TABLE IF EXISTS mydatabase.expedia")
spark.sql(f"""
    CREATE TABLE mydatabase.expedia
    USING DELTA
    LOCATION 'wasbs://{output_container}@{output_storage_account}.blob.core.windows.net/delta/expedia/'
""")

# Read Hotel-Weather data from the source container and save it in Delta format to the data output container, also partitioning is applied
hotel_weather_df = spark.read.format("parquet").load(f"wasbs://{input_container}@{input_storage_account}.blob.core.windows.net/hotel-weather/hotel-weather/")

hotel_weather_df.write.format("delta").mode("overwrite") \
    .partitionBy("year", "month", "day") \
    .option("overwriteSchema", "true") \
    .save(f"wasbs://{output_container}@{output_storage_account}.blob.core.windows.net/delta/hotel-weather/")

# Register the Hotel-Weather Delta table in the Metastore
spark.sql("DROP TABLE IF EXISTS mydatabase.hotel_weather")
spark.sql(f"""
    CREATE TABLE mydatabase.hotel_weather
    USING DELTA
    LOCATION 'wasbs://{output_container}@{output_storage_account}.blob.core.windows.net/delta/hotel-weather/'
""")

# Refresh cache to see the most up-to-date data
spark.sql("REFRESH TABLE mydatabase.expedia")
spark.sql("REFRESH TABLE mydatabase.hotel_weather")

#Due to the same column name in the two dataframes, we need to rename the column
hotel_weather_df = hotel_weather_df.withColumnRenamed("id", "accomodation_id")
# Join the Expedia and Hotel Weather data
joined_df = expedia_df.join(hotel_weather_df, expedia_df.hotel_id == hotel_weather_df.accomodation_id, "left")

# Save the intermediate DataFrame partitioned
joined_df.write.format("parquet") \
    .mode("overwrite") \
    .partitionBy("year", "month", "day") \
    .save(f"wasbs://{output_container}@{output_storage_account}.blob.core.windows.net/joined_data/")


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
%sql
SELECT 
    address,    -- Select the hotel address.
    year,       -- Select the year from the data.
    month,      -- Select the month from the data.
    -- Calculate the temperature difference within each group:
    --   1. Find the maximum average temperature (avg_tmpr_c) in the group.
    --   2. Find the minimum average temperature (avg_tmpr_c) in the group.
    --   3. Compute the absolute difference between these values.
    --   4. Round the result to 2 decimal places and alias it as 'temp_diff'.
    ROUND(ABS(MAX(avg_tmpr_c) - MIN(avg_tmpr_c)), 2) AS temp_diff
FROM mydatabase.hotel_weather   -- Data is sourced from the hotel_weather table.
GROUP BY address, year, month    -- Group the records by hotel address, year, and month.
ORDER BY temp_diff DESC          -- Order the groups by temperature difference in descending order.
LIMIT 10;                       -- Limit the result to the top 10 groups with the largest temperature differences.


address,year,month,temp_diff
Comfort Inn,2016,10,23.5
Studio 6,2016,10,23.0
Quality Inn and Suites,2017,9,21.7
Motel 6,2016,10,21.7
Quality Inn and Suites,2016,10,21.1
Americas Best Value Inn,2016,10,20.3
Quality Inn & Suites,2017,9,20.2
Rodeway Inn,2016,10,20.2
Comfort Inn,2017,9,19.9
Americas Best Value Inn,2017,9,19.6


In [0]:
%sql
EXPLAIN
SELECT 
    address,
    year, 
    month, 
    ROUND(ABS(MAX(avg_tmpr_c) - MIN(avg_tmpr_c)), 2) AS temp_diff
FROM mydatabase.hotel_weather
GROUP BY address, year, month
ORDER BY temp_diff DESC
LIMIT 10;


plan
"== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Initial Plan ==  ColumnarToRow  +- PhotonResultStage  +- PhotonTopK(sortOrder=[temp_diff#361 DESC NULLS LAST], partitionOrderCount=0)  +- PhotonShuffleExchangeSource  +- PhotonShuffleMapStage  +- PhotonShuffleExchangeSink SinglePartition  +- PhotonTopK(sortOrder=[temp_diff#361 DESC NULLS LAST], partitionOrderCount=0)  +- PhotonGroupingAgg(keys=[address#391, year#402, month#403], functions=[finalmerge_max(merge max#409) AS max(avg_tmpr_c#392)#405, finalmerge_min(merge min#411) AS min(avg_tmpr_c#392)#406])  +- PhotonShuffleExchangeSource  +- PhotonShuffleMapStage  +- PhotonShuffleExchangeSink hashpartitioning(address#391, year#402, month#403, 200)  +- PhotonGroupingAgg(keys=[address#391, year#402, month#403], functions=[partial_max(avg_tmpr_c#392) AS max#409, partial_min(avg_tmpr_c#392) AS min#411])  +- PhotonProject [address#391, avg_tmpr_c#392, year#402, month#403]  +- PhotonScan parquet spark_catalog.mydatabase.hotel_weather[address#391,avg_tmpr_c#392,year#402,month#403,day#404] DataFilters: [], DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[wasbs://data@developmentwesteurope6o.blob.core.windows.net/delta/..., OptionalDataFilters: [], PartitionFilters: [], ReadSchema: struct, RequiredDataFilters: [] == Photon Explanation == The query is fully supported by Photon."


In [0]:
%sql
-- First CTE: Generate one row per day for each hotel booking.
WITH exploded_dates AS (
    SELECT
        ex.hotel_id,          -- The hotel ID from the Expedia bookings table.
        hw.address,           -- The hotel address from the hotel weather table.
        -- Generate a sequence of dates from the check-in date (inclusive) to the day before check-out (inclusive)
        -- and then create a separate row for each date.
        explode(
            sequence(
                CAST(ex.srch_ci AS DATE), 
                CAST(ex.srch_co AS DATE) - INTERVAL 1 DAY, 
                interval 1 day
            )
        ) AS visit_date
    FROM mydatabase.expedia ex
    LEFT JOIN mydatabase.hotel_weather hw
        ON ex.hotel_id = hw.id  -- Join the hotel weather data on matching hotel IDs.
    WHERE 
        CAST(ex.srch_ci AS DATE) < CAST(ex.srch_co AS DATE)  -- Only consider bookings with a valid date range.
        AND hw.address IS NOT NULL                           -- Only include records where the hotel address is available.
),
-- Second CTE: Aggregate visit counts by hotel address for each month.
monthly_visits AS (
    SELECT
        address,
        YEAR(visit_date) AS year,   -- Extract the year from the visit date.
        MONTH(visit_date) AS month, -- Extract the month from the visit date.
        COUNT(*) AS visits_count    -- Count the number of visit days per address in the given month.
    FROM exploded_dates
    GROUP BY address, YEAR(visit_date), MONTH(visit_date)
),
-- Third CTE: Rank hotels within each month based on the number of visits.
ranked_hotels AS (
    SELECT *,
        -- Use DENSE_RANK to assign a rank to each hotel within each year and month partition,
        -- ordering by visits_count in descending order so that hotels with the most visits rank highest.
        DENSE_RANK() OVER (PARTITION BY year, month ORDER BY visits_count DESC) AS rank
    FROM monthly_visits
)
-- Final selection: Return hotels that are among the top 10 for each month.
SELECT * 
FROM ranked_hotels 
WHERE rank <= 10;

address,year,month,visits_count,rank
Americas Best Value Inn,2016,10,58490,1
Econo Lodge,2016,10,39688,2
Quality Inn,2016,10,39560,3
Motel 6,2016,10,38296,4
Comfort Inn,2016,10,37805,5
Comfort Suites,2016,10,28450,6
Holiday Inn Express,2016,10,25479,7
Best Western,2016,10,25202,8
Days Inn,2016,10,23373,9
Budget Inn,2016,10,20359,10


In [0]:
%sql
EXPLAIN
WITH exploded_dates AS (
    SELECT
        ex.hotel_id,
        hw.address,
        explode(sequence(
            CAST(ex.srch_ci AS DATE), 
            CAST(ex.srch_co AS DATE) - INTERVAL 1 DAY, 
            interval 1 day)) AS visit_date
    FROM mydatabase.expedia ex
    LEFT JOIN mydatabase.hotel_weather hw
    ON ex.hotel_id = hw.id
    WHERE CAST(ex.srch_ci AS DATE) < CAST(ex.srch_co AS DATE)
    AND hw.address IS NOT NULL
),
monthly_visits AS (
    SELECT
        address,
        YEAR(visit_date) AS year,
        MONTH(visit_date) AS month,
        COUNT(*) AS visits_count
    FROM exploded_dates
    GROUP BY address, YEAR(visit_date), MONTH(visit_date)
),
ranked_hotels AS (
    SELECT *,
        DENSE_RANK() OVER (PARTITION BY year, month ORDER BY visits_count DESC) AS rank
    FROM monthly_visits
)
SELECT * FROM ranked_hotels WHERE rank <= 10;


plan
"== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Initial Plan ==  Filter (rank#670 <= 10)  +- RunningWindowFunction [address#740, year#667, month#668, visits_count#669L, dense_rank(visits_count#669L) windowspecdefinition(year#667, month#668, visits_count#669L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#670], [year#667, month#668], [visits_count#669L DESC NULLS LAST], false  +- Sort [year#667 ASC NULLS FIRST, month#668 ASC NULLS FIRST, visits_count#669L DESC NULLS LAST], false, 0  +- Exchange hashpartitioning(year#667, month#668, 200), ENSURE_REQUIREMENTS, [plan_id=917]  +- Project [address#740, year#667, month#668, visits_count#669L]  +- Filter (_local_dense_rank#808 <= 10)  +- RunningWindowFunction [address#740, year#667, month#668, visits_count#669L, dense_rank(visits_count#669L) windowspecdefinition(year#667, month#668, visits_count#669L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _local_dense_rank#808], [year#667, month#668], [visits_count#669L DESC NULLS LAST], true  +- Sort [year#667 ASC NULLS FIRST, month#668 ASC NULLS FIRST, visits_count#669L DESC NULLS LAST], false, 0  +- HashAggregate(keys=[address#740, _groupingexpression#769, _groupingexpression#770], functions=[finalmerge_count(merge count#772L) AS count(1)#754L])  +- Exchange hashpartitioning(address#740, _groupingexpression#769, _groupingexpression#770, 200), ENSURE_REQUIREMENTS, [plan_id=911]  +- HashAggregate(keys=[address#740, _groupingexpression#769, _groupingexpression#770], functions=[partial_count(1) AS count#772L])  +- Project [address#740, year(visit_date#759) AS _groupingexpression#769, month(visit_date#759) AS _groupingexpression#770]  +- Generate explode(sequence(cast(srch_ci#732 as date), date_add(cast(srch_co#733 as date), -1), Some(INTERVAL '1' DAY), Some(Etc/UTC))), [address#740], false, [visit_date#759]  +- ColumnarToRow  +- PhotonResultStage  +- PhotonProject [srch_ci#732, srch_co#733, address#740]  +- PhotonBroadcastHashJoin [hotel_id#739L], [cast(id#746 as bigint)], Inner, BuildRight, false, true  :- PhotonScan parquet spark_catalog.mydatabase.expedia[srch_ci#732,srch_co#733,hotel_id#739L] DataFilters: [isnotnull(hotel_id#739L), isnotnull(srch_ci#732), isnotnull(srch_co#733), (cast(srch_ci#732 as d..., DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[wasbs://data@developmentwesteurope6o.blob.core.windows.net/delta/..., OptionalDataFilters: [hashedrelationcontains(hotel_id#739L)], PartitionFilters: [], ReadSchema: struct, RequiredDataFilters: [isnotnull(hotel_id#739L), isnotnull(srch_ci#732), isnotnull(srch_co#733), (cast(srch_ci#732 as d...  +- PhotonShuffleExchangeSource  +- PhotonShuffleMapStage  +- PhotonShuffleExchangeSink SinglePartition  +- PhotonProject [address#740, id#746]  +- PhotonScan parquet spark_catalog.mydatabase.hotel_weather[address#740,id#746,year#751,month#752,day#753] DataFilters: [isnotnull(address#740), isnotnull(id#746)], DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[wasbs://data@developmentwesteurope6o.blob.core.windows.net/delta/..., OptionalDataFilters: [], PartitionFilters: [], ReadSchema: struct, RequiredDataFilters: [isnotnull(address#740), isnotnull(id#746)] == Photon Explanation == Photon does not fully support the query because: 	sequence(cast(srch_ci#732 as date), date_add(cast(srch_co#733 as date), -1), Some(INTERVAL '1' DAY), Some(Etc/UTC)) is not supported:  Expression sequence does not support DateType. Reference node: 	Generate explode(sequence(cast(srch_ci#732 as date), date_add(cast(srch_co#733 as date), -1), Some(INTERVAL '1' DAY), Some(Etc/UTC))), [address#740], false, [visit_date#759]"


In [0]:
%sql
WITH exploded_dates AS (
    SELECT
        ex.id AS booking_id,
        ex.hotel_id,
        ex.srch_ci,
        ex.srch_co,
        explode(sequence(
            CAST(ex.srch_ci AS DATE), 
            CAST(ex.srch_co AS DATE) - INTERVAL 1 DAY,
            INTERVAL 1 DAY
        )) AS visit_date
    FROM mydatabase.expedia ex
    WHERE DATEDIFF(ex.srch_co, ex.srch_ci) BETWEEN 7 AND 30
      AND ex.srch_co > ex.srch_ci
),
joined_weather AS (
    SELECT 
        ed.booking_id,
        ed.hotel_id,
        ed.visit_date,
        hw.avg_tmpr_c,
        hw.address
    FROM exploded_dates ed
    LEFT JOIN mydatabase.hotel_weather hw
      ON ed.hotel_id = hw.id 
         AND CAST(hw.wthr_date AS DATE) = ed.visit_date
    WHERE hw.avg_tmpr_c IS NOT NULL
),
windowed_temps AS (
    SELECT
        booking_id,
        hotel_id,
        address,
        visit_date,
        avg_tmpr_c,
        FIRST_VALUE(avg_tmpr_c) OVER (PARTITION BY booking_id ORDER BY visit_date) AS first_temp,
        LAST_VALUE(avg_tmpr_c) OVER (PARTITION BY booking_id ORDER BY visit_date 
            ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_temp
    FROM joined_weather
),
temp_calculations AS (
    SELECT
        booking_id,
        hotel_id,
        address,
        MIN(visit_date) AS first_day,
        MAX(visit_date) AS last_day,
        ROUND(last_temp - first_temp, 2) AS temp_trend,
        ROUND(AVG(avg_tmpr_c), 2) AS avg_temperature
    FROM windowed_temps
    GROUP BY booking_id, hotel_id, address, first_temp, last_temp
)
SELECT *
FROM temp_calculations
WHERE temp_trend IS NOT NULL
  AND avg_temperature IS NOT NULL
  AND DATEDIFF(last_day, first_day) >= 7
ORDER BY ABS(temp_trend) DESC;


booking_id,hotel_id,address,first_day,last_day,temp_trend,avg_temperature
560359,369367187456,Greektown Casino,2016-10-06,2016-10-26,-16.2,11.23
432180,137438953473,Country Inn and Suites By Carlson,2016-10-17,2016-10-27,-14.4,12.57
1345044,137438953473,Country Inn and Suites By Carlson,2016-10-17,2016-10-27,-14.4,12.57
1378048,335007449088,Americinn Lodge & Suites Appleton,2016-10-06,2016-10-13,-13.5,10.55
2321750,472446402561,Meadow Farm Bed and Breakfast,2016-10-19,2016-10-26,-13.4,9.1
49062,833223655425,Fairbridge Inn,2017-09-11,2017-09-19,-13.1,13.13
1735184,1151051235329,Red Banks Motel,2017-09-22,2017-09-30,-12.7,18.75
1956439,1151051235329,Red Banks Motel,2017-09-22,2017-09-30,-12.7,18.75
601476,395136991232,Parkway Inn Jellico,2017-08-20,2017-09-11,-11.7,18.65
1138791,240518168576,Hotel Deca - A Noble House Hotel,2017-09-06,2017-09-20,-11.7,16.37


In [0]:
%sql
-- First CTE: Create one row per day for each booking that lasts between 7 and 30 days.
WITH exploded_dates AS (
    SELECT
        ex.id AS booking_id,     -- Unique booking identifier.
        ex.hotel_id,             -- Hotel identifier.
        ex.srch_ci,              -- Check-in date.
        ex.srch_co,              -- Check-out date.
        -- Generate a sequence of dates from the check-in date to the day before the check-out date.
        -- The 'explode' function creates a separate row for each date in the sequence.
        explode(sequence(
            CAST(ex.srch_ci AS DATE), 
            CAST(ex.srch_co AS DATE) - INTERVAL 1 DAY,
            INTERVAL 1 DAY
        )) AS visit_date
    FROM mydatabase.expedia ex
    WHERE 
        -- Only include bookings with a duration between 7 and 30 days.
        DATEDIFF(ex.srch_co, ex.srch_ci) BETWEEN 7 AND 30
        AND ex.srch_co > ex.srch_ci  -- Ensure the check-out date is after the check-in date.
),
-- Second CTE: Join the exploded booking dates with hotel weather data.
joined_weather AS (
    SELECT 
        ed.booking_id,   -- Booking identifier from the exploded_dates CTE.
        ed.hotel_id,     -- Hotel identifier.
        ed.visit_date,   -- The individual visit date generated earlier.
        hw.avg_tmpr_c,   -- The average temperature from the hotel_weather table.
        hw.address       -- Hotel address.
    FROM exploded_dates ed
    LEFT JOIN mydatabase.hotel_weather hw
      -- Join on matching hotel IDs and where the weather date corresponds to the visit_date.
      ON ed.hotel_id = hw.id 
         AND CAST(hw.wthr_date AS DATE) = ed.visit_date
    WHERE hw.avg_tmpr_c IS NOT NULL  -- Only include records with temperature data.
),
-- Third CTE: Apply window functions to capture the first and last temperature of each booking.
windowed_temps AS (
    SELECT
        booking_id,
        hotel_id,
        address,
        visit_date,
        avg_tmpr_c,
        -- Get the first average temperature for the booking (earliest visit_date).
        FIRST_VALUE(avg_tmpr_c) OVER (PARTITION BY booking_id ORDER BY visit_date) AS first_temp,
        -- Get the last average temperature for the booking (latest visit_date)
        -- using an unbounded frame to cover the full partition.
        LAST_VALUE(avg_tmpr_c) OVER (
            PARTITION BY booking_id 
            ORDER BY visit_date 
            ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
        ) AS last_temp
    FROM joined_weather
),
-- Fourth CTE: Calculate temperature trend and average temperature for each booking.
temp_calculations AS (
    SELECT
        booking_id,
        hotel_id,
        address,
        MIN(visit_date) AS first_day,      -- The first day of the booking.
        MAX(visit_date) AS last_day,         -- The last day of the booking.
        -- Calculate the temperature trend as the difference between last and first temperatures.
        ROUND(last_temp - first_temp, 2) AS temp_trend,
        -- Calculate the average temperature during the booking.
        ROUND(AVG(avg_tmpr_c), 2) AS avg_temperature
    FROM windowed_temps
    GROUP BY booking_id, hotel_id, address, first_temp, last_temp
)
-- Final query: Retrieve bookings with valid temperature trends and a minimum duration of 7 days,
-- and order the results by the absolute temperature trend in descending order.
SELECT *
FROM temp_calculations
WHERE temp_trend IS NOT NULL
  AND avg_temperature IS NOT NULL
  AND DATEDIFF(last_day, first_day) >= 7
ORDER BY ABS(temp_trend) DESC;

In [0]:
%sql
EXPLAIN
WITH exploded_dates AS (
    SELECT
        ex.id AS booking_id,
        ex.hotel_id,
        ex.srch_ci,
        ex.srch_co,
        explode(sequence(
            CAST(ex.srch_ci AS DATE), 
            CAST(ex.srch_co AS DATE) - INTERVAL 1 DAY,
            INTERVAL 1 DAY
        )) AS visit_date
    FROM mydatabase.expedia ex
    WHERE DATEDIFF(ex.srch_co, ex.srch_ci) BETWEEN 7 AND 30
      AND ex.srch_co > ex.srch_ci
),
joined_weather AS (
    SELECT 
        ed.booking_id,
        ed.hotel_id,
        ed.visit_date,
        hw.avg_tmpr_c,
        hw.address
    FROM exploded_dates ed
    LEFT JOIN mydatabase.hotel_weather hw
      ON ed.hotel_id = hw.id 
         AND CAST(hw.wthr_date AS DATE) = ed.visit_date
    WHERE hw.avg_tmpr_c IS NOT NULL
),
windowed_temps AS (
    SELECT
        booking_id,
        hotel_id,
        address,
        visit_date,
        avg_tmpr_c,
        FIRST_VALUE(avg_tmpr_c) OVER (PARTITION BY booking_id ORDER BY visit_date) AS first_temp,
        LAST_VALUE(avg_tmpr_c) OVER (PARTITION BY booking_id ORDER BY visit_date 
            ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_temp
    FROM joined_weather
),
temp_calculations AS (
    SELECT
        booking_id,
        hotel_id,
        address,
        MIN(visit_date) AS first_day,
        MAX(visit_date) AS last_day,
        ROUND(last_temp - first_temp, 2) AS temp_trend,
        ROUND(AVG(avg_tmpr_c), 2) AS avg_temperature
    FROM windowed_temps
    GROUP BY booking_id, hotel_id, address, first_temp, last_temp
)
SELECT *
FROM temp_calculations
WHERE temp_trend IS NOT NULL
  AND avg_temperature IS NOT NULL
  AND DATEDIFF(last_day, first_day) >= 7
ORDER BY ABS(temp_trend) DESC;


plan
"== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Initial Plan ==  Sort [abs(temp_trend#871) DESC NULLS LAST], true, 0  +- Exchange rangepartitioning(abs(temp_trend#871) DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1246]  +- Filter (((isnotnull(last_day#870) AND isnotnull(first_day#869)) AND isnotnull(avg_temperature#872)) AND (datediff(last_day#870, first_day#869) >= 7))  +- HashAggregate(keys=[booking_id#865L, hotel_id#927L, address#942, knownfloatingpointnormalized(normalizenanandzero(first_temp#867)) AS first_temp#867, knownfloatingpointnormalized(normalizenanandzero(last_temp#868)) AS last_temp#868], functions=[min(visit_date#957), max(visit_date#957), avg(avg_tmpr_c#943)])  +- Filter isnotnull(round((last_temp#868 - first_temp#867), 2))  +- Window [booking_id#865L, hotel_id#927L, address#942, visit_date#957, avg_tmpr_c#943, first_value(avg_tmpr_c#943, false) windowspecdefinition(booking_id#865L, visit_date#957 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS first_temp#867, last_value(avg_tmpr_c#943, false) windowspecdefinition(booking_id#865L, visit_date#957 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_temp#868], [booking_id#865L], [visit_date#957 ASC NULLS FIRST]  +- Sort [booking_id#865L ASC NULLS FIRST, visit_date#957 ASC NULLS FIRST], false, 0  +- Exchange hashpartitioning(booking_id#865L, 200), ENSURE_REQUIREMENTS, [plan_id=1240]  +- Project [booking_id#865L, hotel_id#927L, address#942, visit_date#957, avg_tmpr_c#943]  +- BroadcastHashJoin [hotel_id#927L, visit_date#957], [cast(id#948 as bigint), cast(wthr_date#952 as date)], Inner, BuildRight, false, true  :- Project [id#908L AS booking_id#865L, hotel_id#927L, visit_date#957]  : +- Generate explode(sequence(cast(srch_ci#920 as date), date_add(cast(srch_co#921 as date), -1), Some(INTERVAL '1' DAY), Some(Etc/UTC))), [id#908L, hotel_id#927L], false, [visit_date#957]  : +- ColumnarToRow  : +- PhotonResultStage  : +- PhotonScan parquet spark_catalog.mydatabase.expedia[id#908L,srch_ci#920,srch_co#921,hotel_id#927L] DataFilters: [isnotnull(hotel_id#927L), isnotnull(srch_co#921), isnotnull(srch_ci#920), (srch_co#921 > srch_ci..., DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[wasbs://data@developmentwesteurope6o.blob.core.windows.net/delta/..., OptionalDataFilters: [], PartitionFilters: [], ReadSchema: struct, RequiredDataFilters: [isnotnull(hotel_id#927L), isnotnull(srch_co#921), isnotnull(srch_ci#920), (srch_co#921 > srch_ci...  +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1237]  +- ColumnarToRow  +- PhotonResultStage  +- PhotonProject [address#942, avg_tmpr_c#943, id#948, wthr_date#952]  +- PhotonScan parquet spark_catalog.mydatabase.hotel_weather[address#942,avg_tmpr_c#943,id#948,wthr_date#952,year#953,month#954,day#955] DataFilters: [isnotnull(avg_tmpr_c#943), isnotnull(id#948), isnotnull(wthr_date#952)], DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[wasbs://data@developmentwesteurope6o.blob.core.windows.net/delta/..., OptionalDataFilters: [], PartitionFilters: [], ReadSchema: struct, RequiredDataFilters: [isnotnull(avg_tmpr_c#943), isnotnull(id#948), isnotnull(wthr_date#952)] == Photon Explanation == Photon does not fully support the query because: 	sequence(cast(srch_ci#920 as date), date_add(cast(srch_co#921 as date), -1), Some(INTERVAL '1' DAY), Some(Etc/UTC)) is not supported:  Expression sequence does not support DateType. Reference node: 	Generate explode(sequence(cast(srch_ci#920 as date), date_add(cast(srch_co#921 as date), -1), Some(INTERVAL '1' DAY), Some(Etc/UTC))), [id#908L, hotel_id#927L], false, [visit_date#957] Photon does not fully support the query because:  Unsupported node: Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1207]. Reference node: 	Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1237]"


In [0]:
%sql

OPTIMIZE mydatabase.expedia
ZORDER BY (id, srch_ci);

OPTIMIZE mydatabase.hotel_weather
ZORDER BY (id, wthr_date);

CREATE TABLE mydatabase.date_sequence AS 
SELECT explode(sequence(
    to_date('2016-10-01'), 
    to_date('2018-10-06'), 
    INTERVAL 1 DAY)) AS visit_date;

CREATE TABLE mydatabase.exploded_dates AS
SELECT 
    ex.id AS booking_id,
    ex.hotel_id,
    ds.visit_date
FROM mydatabase.expedia ex
JOIN mydatabase.date_sequence ds
  ON ds.visit_date BETWEEN CAST(ex.srch_ci AS DATE) AND CAST(ex.srch_co AS DATE) - INTERVAL 1 DAY
WHERE DATEDIFF(ex.srch_co, ex.srch_ci) BETWEEN 7 AND 30;

CREATE TABLE mydatabase.joined_weather AS
SELECT 
    ed.booking_id,
    ed.hotel_id,
    ed.visit_date,
    hw.avg_tmpr_c,
    hw.address
FROM mydatabase.exploded_dates ed
LEFT JOIN mydatabase.hotel_weather hw
  ON ed.hotel_id = hw.id 
     AND hw.wthr_date = ed.visit_date
WHERE hw.avg_tmpr_c IS NOT NULL;

CREATE TABLE mydatabase.windowed_temps AS 
SELECT
    booking_id,
    hotel_id,
    address,
    visit_date,
    avg_tmpr_c,
    FIRST_VALUE(avg_tmpr_c) OVER (
        PARTITION BY booking_id ORDER BY visit_date
    ) AS first_temp,
    LAST_VALUE(avg_tmpr_c) OVER (
        PARTITION BY booking_id ORDER BY visit_date 
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) AS last_temp
FROM mydatabase.joined_weather;

OPTIMIZE mydatabase.exploded_dates
ZORDER BY (booking_id, visit_date);

OPTIMIZE mydatabase.joined_weather
ZORDER BY (booking_id, visit_date);

OPTIMIZE mydatabase.windowed_temps
ZORDER BY (booking_id, visit_date);


WITH temp_calculations AS (
    SELECT
        booking_id,
        hotel_id,
        address,
        MIN(visit_date) AS first_day,
        MAX(visit_date) AS last_day,
        ROUND(last_temp - first_temp, 2) AS temp_trend,
        ROUND(AVG(avg_tmpr_c), 2) AS avg_temperature
    FROM mydatabase.windowed_temps
    GROUP BY booking_id, hotel_id, address, first_temp, last_temp
)
SELECT *
FROM temp_calculations
WHERE temp_trend IS NOT NULL
  AND avg_temperature IS NOT NULL
  AND DATEDIFF(last_day, first_day) >= 7
ORDER BY ABS(temp_trend) DESC;



com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 87.0 failed 4 times, most recent failure: Lost task 0.3 in stage 87.0 (TID 119) (10.139.64.4 executor driver): shaded.databricks.org.apache.hadoop.fs.azure.AzureException: hadoop_azure_shaded.com.microsoft.azure.storage.StorageException: The specified resource does not exist.
	at shaded.databricks.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.updateFolderLastModifiedTime(AzureNativeFileSystemStore.java:3626)
	at shaded.databricks.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.updateFolderLastModifiedTime(AzureNativeFileSystemStore.java:3637)
	at shaded.databricks.org.apache.hadoop.fs.azure.NativeAzureFileSystem.updateParentFolderLastModifiedTime(NativeAzureFileSystem.java:3378)
	at shaded.databricks.org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSy

In [0]:
df_first = spark.sql("""
	SELECT 
    address,    -- Select the hotel address.
    year,       -- Select the year from the data.
    month,      -- Select the month from the data.
    -- Calculate the temperature difference within each group:
    --   1. Find the maximum average temperature (avg_tmpr_c) in the group.
    --   2. Find the minimum average temperature (avg_tmpr_c) in the group.
    --   3. Compute the absolute difference between these values.
    --   4. Round the result to 2 decimal places and alias it as 'temp_diff'.
    ROUND(ABS(MAX(avg_tmpr_c) - MIN(avg_tmpr_c)), 2) AS temp_diff
	FROM mydatabase.hotel_weather   -- Data is sourced from the hotel_weather table.
	GROUP BY address, year, month    -- Group the records by hotel address, year, and month.
	ORDER BY temp_diff DESC          -- Order the groups by temperature difference in descending order.
	LIMIT 10;                       -- Limit the result to the top 10 groups with the largest temperature differences.
""")

df_first.write.format("parquet") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .save("dbfs:/mnt/data/final_datamarts/first")

In [0]:
df_check = spark.read.parquet("dbfs:/mnt/data/final_datamarts/first")
df_check.show(10)

+--------------------+---------+----+-----+
|             address|temp_diff|year|month|
+--------------------+---------+----+-----+
|         Rodeway Inn|     20.2|2016|   10|
|Americas Best Val...|     20.3|2016|   10|
|Quality Inn and S...|     21.1|2016|   10|
|             Motel 6|     21.7|2016|   10|
|            Studio 6|     23.0|2016|   10|
|         Comfort Inn|     23.5|2016|   10|
|Americas Best Val...|     19.6|2017|    9|
|         Comfort Inn|     19.9|2017|    9|
|Quality Inn & Suites|     20.2|2017|    9|
|Quality Inn and S...|     21.7|2017|    9|
+--------------------+---------+----+-----+



In [0]:
df_second = spark.sql("""
	WITH exploded_dates AS (
		SELECT
			ex.hotel_id,
			hw.address,
			explode(sequence(
				CAST(ex.srch_ci AS DATE), 
				CAST(ex.srch_co AS DATE) - INTERVAL 1 DAY, 
				interval 1 day)) AS visit_date
		FROM mydatabase.expedia ex
		LEFT JOIN mydatabase.hotel_weather hw
		ON ex.hotel_id = hw.id
		WHERE CAST(ex.srch_ci AS DATE) < CAST(ex.srch_co AS DATE)
		AND hw.address IS NOT NULL
	),
	monthly_visits AS (
		SELECT
			address,
			YEAR(visit_date) AS year,
			MONTH(visit_date) AS month,
			COUNT(*) AS visits_count
		FROM exploded_dates
		GROUP BY address, YEAR(visit_date), MONTH(visit_date)
	),
	ranked_hotels AS (
		SELECT *,
			DENSE_RANK() OVER (PARTITION BY year, month ORDER BY visits_count DESC) AS rank
		FROM monthly_visits
	)
	SELECT * FROM ranked_hotels WHERE rank <= 10;
""")
df_second.write.format("parquet") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .save("dbfs:/mnt/data/final_datamarts/second")

In [0]:
df_check = spark.read.parquet("dbfs:/mnt/data/final_datamarts/second")
df_check.show(10)

+--------------------+------------+----+----+-----+
|             address|visits_count|rank|year|month|
+--------------------+------------+----+----+-----+
|Sofitel Paris Le ...|         420|   1|2017|   11|
|The Pelham Starho...|         300|   2|2017|   11|
|   Hotel Millersburg|         300|   2|2017|   11|
|Best Western Hote...|         270|   3|2017|   11|
|     TH Street Duomo|         270|   3|2017|   11|
|IH Hotels Milano ...|         270|   3|2017|   11|
|      Shoshone Lodge|         270|   3|2017|   11|
|            Nu Hotel|         270|   3|2017|   11|
|Conservatorium Hotel|         246|   4|2017|   11|
|Mercure Vaugirard...|         240|   5|2017|   11|
+--------------------+------------+----+----+-----+
only showing top 10 rows



In [0]:
df_third = spark.sql("""
	WITH exploded_dates AS (
		SELECT
			ex.id AS booking_id,
			ex.hotel_id,
			ex.srch_ci,
			ex.srch_co,
			explode(sequence(
				CAST(ex.srch_ci AS DATE), 
				CAST(ex.srch_co AS DATE) - INTERVAL 1 DAY,
				INTERVAL 1 DAY
			)) AS visit_date
		FROM mydatabase.expedia ex
		WHERE DATEDIFF(ex.srch_co, ex.srch_ci) BETWEEN 7 AND 30
		AND ex.srch_co > ex.srch_ci
	),
	joined_weather AS (
		SELECT 
			ed.booking_id,
			ed.hotel_id,
			ed.visit_date,
			hw.avg_tmpr_c,
			hw.address
		FROM exploded_dates ed
		LEFT JOIN mydatabase.hotel_weather hw
		ON ed.hotel_id = hw.id 
			AND CAST(hw.wthr_date AS DATE) = ed.visit_date
		WHERE hw.avg_tmpr_c IS NOT NULL
	),
	windowed_temps AS (
		SELECT
			booking_id,
			hotel_id,
			address,
			visit_date,
			avg_tmpr_c,
			FIRST_VALUE(avg_tmpr_c) OVER (PARTITION BY booking_id ORDER BY visit_date) AS first_temp,
			LAST_VALUE(avg_tmpr_c) OVER (PARTITION BY booking_id ORDER BY visit_date 
				ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_temp
		FROM joined_weather
	),
	temp_calculations AS (
		SELECT
			booking_id,
			hotel_id,
			address,
			MIN(visit_date) AS first_day,
			MAX(visit_date) AS last_day,
			ROUND(last_temp - first_temp, 2) AS temp_trend,
			ROUND(AVG(avg_tmpr_c), 2) AS avg_temperature
		FROM windowed_temps
		GROUP BY booking_id, hotel_id, address, first_temp, last_temp
	)
	SELECT *
	FROM temp_calculations
	WHERE temp_trend IS NOT NULL
	AND avg_temperature IS NOT NULL
	AND DATEDIFF(last_day, first_day) >= 7
	ORDER BY ABS(temp_trend) DESC;
""")

df_third.write.format("parquet") \
    .mode("overwrite") \
    .partitionBy("first_day", "last_day") \
    .save("dbfs:/mnt/data/final_datamarts/third")

In [0]:
df_check = spark.read.parquet("dbfs:/mnt/data/final_datamarts/third")
df_check.show(10)
df_check.count()

+----------+-------------+--------------------+----------+---------------+----------+----------+
|booking_id|     hotel_id|             address|temp_trend|avg_temperature| first_day|  last_day|
+----------+-------------+--------------------+----------+---------------+----------+----------+
|    660388|1262720385024|Holiday Inn Expre...|      -1.6|           34.1|2017-08-25|2017-09-03|
|     22002|1262720385024|Holiday Inn Expre...|      -1.6|           34.1|2017-08-25|2017-09-03|
|   2484059|1812476198912|  Staunton Hotel B B|      -3.2|           15.5|2017-08-25|2017-09-03|
|   2456610|1855425871872|           Avo Hotel|      -3.2|           15.5|2017-08-25|2017-09-03|
|   2418879|2310692405248|DoubleTree by Hil...|      -3.2|           15.5|2017-08-25|2017-09-03|
|   2389551|2602750181378|Old Ship Inn Hackney|      -3.2|           15.5|2017-08-25|2017-09-03|
|   2338050|2834678415363|Club Quarters Hot...|      -3.2|           15.5|2017-08-25|2017-09-03|
|   2318405|2276332666883|Holi

1410