### Add configuration

In [0]:
%run ./includes/configuration

### Cleaning previous data to consistency

In [0]:
dbutils.fs.rm(raw_path, True)
dbutils.fs.rm(bronze_path, True)
dbutils.fs.rm(silver_path, True)
dbutils.fs.rm(gold_path, True)
dbutils.fs.rm(checkpoint_path, True)

### Create stream

In [0]:
raw_df = (
  spark.readStream.format("cloudFiles")
  .option("cloudFiles.partitionColumns", "year, month, day")
  .option("cloudFiles.format", "parquet")
  .option("cloudFiles.includeExistingFiles", True)
  .option("cloudFiles.maxFilesPerTrigger", 10)
  .option("cloudFiles.maxBytesPerTrigger", "10m")
  .schema(raw_data_schema)
  .load(raw_data_dir)
)
display(raw_df)

address,avg_tmpr_c,avg_tmpr_f,city,country,geoHash,id,latitude,longitude,name,wthr_date,year,month,day
Homewood Suites - Mall of America,15.9,60.7,Minneapolis,US,9zvx,1202590842885,44.85201,-93.23993,2261 Killebrew Dr,2016-10-01,2016,10,1
Little Belt Inn Neihart,10.8,51.5,Neihart,US,c837,1571958030336,46.933598,-110.735886,316 N Main St,2016-10-01,2016,10,1
Great American Inn Suites,16.6,61.8,Devils Lake,US,cb99,944892805127,48.10113,-98.84847,1116 Highway 2 E,2016-10-01,2016,10,1
Pine Needles Golf Course,20.9,69.6,Southern Pines,US,dnr8,1460288880641,35.197103,-79.392412,1005 Midland Rd,2016-10-01,2016,10,1
Microtel Inn By Wyndham Southern Pines,20.9,69.6,Southern Pines,US,dnr8,197568495620,35.159924,-79.410489,205 Windstar Pl,2016-10-01,2016,10,1
Super 8 Blackwell Ok,18.6,65.4,Blackwell,US,9ydc,730144440325,36.8115,-97.29803,1014 W Doolin Ave,2016-10-01,2016,10,1
Clarion Suites Philadelphia,15.4,59.7,Philadelphia,US,dr4e,1151051235333,39.95546,-75.15647,1010 Race St,2016-10-01,2016,10,1
Holiday Inn,15.4,59.7,Philadelphia,US,dr4e,609885356033,39.907322,-75.1502,10 Packer Ave,2016-10-01,2016,10,1
Cambria Hotel & Suites,15.7,60.2,Roanoke,US,dnx5,1305670057989,37.256149,-79.947005,301 Reserve Ave,2016-10-01,2016,10,1
Hampton Inn Roanoke/salem,15.7,60.2,Salem,US,dnx5,1331439861763,37.265157,-80.03187,1886 Electric Rd,2016-10-01,2016,10,1


### Create Bronze stream from raw data stream

In [0]:
from pyspark.sql.functions import lit, current_timestamp

bronze_stream = (
    raw_df.select(
        "*", 
        lit("bd201stacc/m13sparkstreaming").alias("datasource"),
        current_timestamp().alias("ingesttime")
    )
    .writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", bronze_checkpoint)
    .partitionBy("wthr_date")
    .queryName("write_raw_to_bronze")
    .start(bronze_path)
)

### Check stream status

In [0]:
bronze_stream.status

### Create Delta table

In [0]:
spark.sql(f"DROP TABLE IF EXISTS hotel_weather_silver")
spark.sql(f"""
CREATE TABLE `hotel_weather_silver` ( 
    `wthr_date` STRING, 
    `country` STRING, 
    `city` STRING, 
    `num_distinct_hotels` BIGINT, 
    `avg_tmpr` DOUBLE, 
    `max_tmpr` DOUBLE, 
    `min_tmpr` DOUBLE) 
    USING delta PARTITIONED BY (wthr_date) 
    LOCATION '{silver_path}'
""")


In [0]:
from delta.tables import DeltaTable

silver_table = DeltaTable.forName(spark, "hotel_weather_silver")

### Task 1. Using Spark calculate in Databricks Notebooks for each city each day:

    * Number of distinct hotels in the city.
    * Average/max/min temperature in the city.

Note. There is no function Distinct on streaming DF. Need to use approx_count_distinct()

In [0]:
from pyspark.sql.functions import approx_count_distinct, avg, max, min, round

silver_stream_df = (
  spark.readStream
  .format("delta")
  .load(bronze_path)
  .groupBy("wthr_date", "country", "city")
  .agg(
    approx_count_distinct("id", 0.03).alias("num_distinct_hotels"),
    round(avg("avg_tmpr_c"), 1).alias("avg_tmpr"),
    max("avg_tmpr_c").alias("max_tmpr"),
    min("avg_tmpr_c").alias("min_tmpr"))
)


### Update silver table by batches

In [0]:
def upsert_to_delta(micro_batch_df, batch_id):
  (silver_table.alias("t").merge(micro_batch_df.alias("s"), "s.wthr_date = t.wthr_date AND s.country = t.country AND s.city = t.city")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute())

(silver_stream_df.writeStream
  .format("delta")
  .foreachBatch(upsert_to_delta)
  .outputMode("update")
  .option("checkpointLocation", silver_checkpoint) 
  .start()
)

In [0]:
silver_stream_df.explain(True)

In [0]:
dbutils.fs.ls(silver_path)

### Task 2. Visualize incoming data in Databricks Notebook for 10 biggest cities (the biggest number of hotels in the city, one chart for one city):

    * X-axis: date (date of observation).
    * Y-axis: number of distinct hotels, average/max/min temperature.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
 
window_func = Window.partitionBy("wthr_date").orderBy(desc("num_distinct_hotels"))

In [0]:
from pyspark.sql.types import IntegerType

top_ten_biggest_cities = (silver_table.toDF()
  .select("wthr_date", "country", "city", "num_distinct_hotels", "avg_tmpr", "max_tmpr", "min_tmpr", row_number().over(window_func).cast(IntegerType()).alias("rnum"))
  .filter(col("rnum") <= 10)
)

In [0]:
display(top_ten_biggest_cities)

wthr_date,country,city,num_distinct_hotels,avg_tmpr,max_tmpr,min_tmpr,rnum
2016-10-01,US,Albuquerque,4,18.9,18.9,18.9,1
2016-10-01,US,Virginia Beach,3,24.7,24.7,24.7,2
2016-10-01,US,Enterprise,2,3.8,3.8,3.8,3
2016-10-01,US,Mobile,2,20.5,20.5,20.5,4
2016-10-01,US,Fryeburg,2,11.2,11.2,11.2,5
2016-10-01,US,Lancaster,2,14.0,14.0,14.0,6
2016-10-01,US,Southern Pines,2,20.9,20.9,20.9,7
2016-10-01,US,Rockford,2,15.8,15.8,15.8,8
2016-10-01,US,San Francisco,2,13.2,13.2,13.2,9
2016-10-01,US,Philadelphia,2,15.4,15.4,15.4,10
