%md
# For visits with extended stay (more than 7 days) calculate weather trend (the day temperature difference between last and first day of stay) and average temperature during stay.

In [1]:
#read Azure bolob storage data to spark dataframes hw_df (hotel-weather) and ex_df (expedia)
%run 00_setup.ipynb

23/11/22 23:02:39 WARN SSLSocketFactoryEx: Failed to load OpenSSL. Falling back to the JSSE default.
                                                                                

In [2]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sequence, explode, to_date, datediff, monotonically_increasing_id, avg, col, row_number, max, min, first, last

In [3]:
hw_df.printSchema()

root
 |-- address: string (nullable = true)
 |-- avg_tmpr_c: double (nullable = true)
 |-- avg_tmpr_f: double (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- geoHash: string (nullable = true)
 |-- id: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- wthr_date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)



In [4]:
ex_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- date_time: string (nullable = true)
 |-- site_name: integer (nullable = true)
 |-- posa_continent: integer (nullable = true)
 |-- user_location_country: integer (nullable = true)
 |-- user_location_region: integer (nullable = true)
 |-- user_location_city: integer (nullable = true)
 |-- orig_destination_distance: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- is_mobile: integer (nullable = true)
 |-- is_package: integer (nullable = true)
 |-- channel: integer (nullable = true)
 |-- srch_ci: string (nullable = true)
 |-- srch_co: string (nullable = true)
 |-- srch_adults_cnt: integer (nullable = true)
 |-- srch_children_cnt: integer (nullable = true)
 |-- srch_rm_cnt: integer (nullable = true)
 |-- srch_destination_id: integer (nullable = true)
 |-- srch_destination_type_id: integer (nullable = true)
 |-- hotel_id: long (nullable = true)



In [5]:
# more than 7 days stay
over7d_notGrupped = ex_df.filter(datediff(to_date("srch_co"), to_date("srch_ci")) > 7)
over7d = over7d_notGrupped.groupBy('hotel_id', 'srch_ci', 'srch_co').count()
# add stay_id here
over7d = over7d.withColumn('stay_id', monotonically_increasing_id())
#over7d.count()

In [6]:
# expand date range to records with exact dates and year_month field
over7 = over7d.select(
    'stay_id',
    'hotel_id',
    'srch_ci',
    'srch_co',
    explode(sequence(to_date('srch_ci'),to_date('srch_co'))).alias('date')
)
# over7.show(3)
# over7.count()

In [7]:
# joid weather data, on hotel_id and day provides only 5% coverage with temperature. Must find workaround

over7weather = over7.join(hw_df, [over7.hotel_id == hw_df.id, over7.date == hw_df.wthr_date] , "inner")
# over7weather.count()

In [8]:
# let's try join on geoHash.
# update over7 with geohash (so we can use temperature data of nearest hotels) and join weather data on geohash and date.
s = over7.join(hw_df, over7.hotel_id == hw_df.id, "inner") \
    .select(
        over7.stay_id,
        over7.hotel_id,
        over7.srch_ci,
        over7.srch_co,
        over7.date,
        hw_df['geoHash'].alias('hotel_geoHash')
    ).dropDuplicates()
# s.filter('stay_id = 1').show(2)
# s.count()

In [9]:
w = hw_df.groupBy('geoHash', 'wthr_date').agg(avg('avg_tmpr_c').alias('tmpr'))
# w.show(3)

In [10]:
sw = s.join(w, [s.hotel_geoHash == w.geoHash, s.date == w.wthr_date] , "left") \
    .select(s['*'], w.tmpr)
sw.filter('stay_id = 1').show()

                                                                                

+-------+------------+----------+----------+----------+-------------+----+
|stay_id|    hotel_id|   srch_ci|   srch_co|      date|hotel_geoHash|tmpr|
+-------+------------+----------+----------+----------+-------------+----+
|      1|824633720836|2017-09-12|2017-10-28|2017-09-12|         dr4j|NULL|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-13|         dr4j|NULL|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-14|         dr4j|NULL|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-15|         dr4j|NULL|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-16|         dr4j|NULL|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-17|         dr4j|NULL|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-18|         dr4j|NULL|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-19|         dr4j|NULL|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-20|         dr4j|23.4|
|      1|824633720836|2017-09-12|2017-10-28|2017-09-21|         dr4j|NULL|
|      1|824633720836|201

In [11]:
# count records whrere we have no temperature:
sw.select('date', 'hotel_geoHash').where(col('tmpr'). isNull()).dropDuplicates().count()

                                                                                

90840

%md
Yet the trick of joining on geoHas is also not wery helpfull.
Unfortunately for most visits we have no temperature on first and last day of visit so we can't perform requested analisys.

But code should be the next:

In [13]:
# maximum, minimum and average temperature during stay.
windowSpec  = Window.partitionBy("stay_id").orderBy(col("date"))
windowSpecDesc  = Window.partitionBy("stay_id").orderBy(col("date").desc())
windowSpecAgg  = Window.partitionBy("stay_id")

result = sw.withColumn("first_day_tmpr", first("tmpr").over(windowSpec)) \
    .withColumn("last_day_tmpr", first("tmpr").over(windowSpecDesc)) \
    .withColumn("avg_tmpr", avg("tmpr").over(windowSpecAgg)) \
    .withColumn("row",row_number().over(windowSpec)) \
    .where("row = 1") \
    .select("stay_id", "hotel_id", "srch_ci", "srch_co", "first_day_tmpr", "last_day_tmpr", "avg_tmpr")

result.show()


[Stage 41:>                                                         (0 + 1) / 1]

+-------+-------------+----------+----------+--------------+-------------+------------------+
|stay_id|     hotel_id|   srch_ci|   srch_co|first_day_tmpr|last_day_tmpr|          avg_tmpr|
+-------+-------------+----------+----------+--------------+-------------+------------------+
|      0|1468878815233|2016-10-13|2016-11-24|          NULL|         NULL|              NULL|
|      6|2173253451777|2017-08-13|2017-08-22|          NULL|         NULL|              NULL|
|      7|3143916060675|2017-08-23|2017-08-31|          NULL|         NULL|17.099999999999973|
|      9|3332894621698|2017-08-21|2017-08-29|          NULL|         NULL|              24.5|
|     19|3264175144962|2017-08-01|2017-08-16|          NULL|         NULL| 15.59999999999994|
|     22|2834678415361|2017-09-17|2017-10-07|          NULL|         NULL| 18.30000000000008|
|     25|2241972928516|2016-10-21|2016-11-02|          NULL|         NULL| 17.09999999999994|
|     26|2516850835457|2017-09-04|2017-09-19|          NULL|

                                                                                

%md
As I mention before we can't reseive first_day_tmpr and last_day_tmpr because it is absent in initial data.
Please refer to 04-demo.ipynb for example of how similar code work on valid data.

In [14]:
# Execution plan.
result.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (44)
+- Project (43)
   +- Filter (42)
      +- Window (41)
         +- WindowGroupLimit (40)
            +- Sort (39)
               +- Project (38)
                  +- Window (37)
                     +- Window (36)
                        +- Sort (35)
                           +- Window (34)
                              +- Sort (33)
                                 +- Exchange (32)
                                    +- Project (31)
                                       +- BroadcastHashJoin LeftOuter BuildRight (30)
                                          :- HashAggregate (20)
                                          :  +- HashAggregate (19)
                                          :     +- Project (18)
                                          :        +- BroadcastHashJoin Inner BuildRight (17)
                                          :           :- Project (10)
                                          :           :  +- Generate (9)
 

%md
## Execution plan analize
Most expensive operations are:
*BroadcastHashJoin Inner BuildRight (17)* - it is when we update visits data with geoHash, join on hotel_id,
*BroadcastHashJoin LeftOuter BuildRight (30)* - we update visits data with temperature join on geoHas.
Those redundand steps originated when I try to find workarount and increase temperature data coverage by using weather data from nearest hotels.
Now we know that it's gave us little improvement, so querries can be optimized by using only one join - just to get temperature by join on hotel and date.

In [None]:
result.write.parquet(data_path  + "/ouput/over7d_stay.parquet")