In [0]:
TARGET_DIRECTORY_ID = "b41b72d0-4e9f-4c26-8a69-f949f367c91d"
TARGET_SECRET = dbutils.secrets.get(scope="araev20113ss", key="araev20113secret")
TARGET_APP_ID = "e4d19ced-72c2-45b8-ba29-ad173c95b384"
TARGET_PATH = "abfss://data@staraev20113westeurope.dfs.core.windows.net/"
MOUNT_POINT = "/mnt/m13_spark_streaming"

TARGET_CONFIGS = {
     "fs.azure.account.auth.type": "OAuth",
     "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
     "fs.azure.account.oauth2.client.id": TARGET_APP_ID,
     "fs.azure.account.oauth2.client.secret": TARGET_SECRET,
     "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{TARGET_DIRECTORY_ID}/oauth2/token"
 }

# Mount target storage (uncomment line below to unmount)
# dbutils.fs.unmount(MOUNT_POINT)
dbutils.fs.mount(
  source = TARGET_PATH,
  mount_point = MOUNT_POINT,
  extra_configs = TARGET_CONFIGS
)

BRONZE_PATH = f'{MOUNT_POINT}/bronze_data/data'
BRONZE_CHECKPOINTS = f'{MOUNT_POINT}/bronze_data/checkpoint'
SILVER_PATH = f'{MOUNT_POINT}/silver_data/data'
SILVER_CHECKPOINTS = f'{MOUNT_POINT}/silver_data/checkpoint'

SOURCE_PATH = "abfss://m13sparkstreaming@bd201stacc.dfs.core.windows.net/hotel-weather/"

SOURCE_CONFIGS = {
     "fs.azure.account.auth.type": "OAuth",
     "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
     "fs.azure.account.oauth2.client.id": "f3905ff9-16d4-43ac-9011-842b661d556d",
     "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="araev20113ss", key="araev20113sourcesecret"),
     "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/b41b72d0-4e9f-4c26-8a69-f949f367c91d/oauth2/token"
 }

# Set credentials to connect to source storage
for key, value in SOURCE_CONFIGS.items():
    spark.conf.set(key, value)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
schema = StructType([
        StructField("address", StringType(), True),
        StructField("avg_tmpr_c", DoubleType(), True),
        StructField("avg_tmpr_f", DoubleType(), True),
        StructField("city", StringType(), True),
        StructField("country", StringType(), True),
        StructField("geoHash", StringType(), True),
        StructField("id", StringType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("name", StringType(), True),
        StructField("wthr_date", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("month", IntegerType(), True),
        StructField("day", IntegerType(), True),
])

In [0]:
bronze_stream = spark.readStream.format('cloudfiles')\
    .option('cloudFiles.format', "parquet")\
    .option("cloudFiles.partitionColumns", "year, month, day")\
    .schema(schema).format("cloudFiles")\
    .load(SOURCE_PATH)

bronze_stream.writeStream.format("delta")\
    .option("checkpointLocation", BRONZE_CHECKPOINTS)\
    .queryName("bronze_stream")\
    .start(BRONZE_PATH)

In [0]:
from pyspark.sql import functions as F

silver_stream = spark.readStream.format("delta").load(BRONZE_PATH)
silver_stream = silver_stream.groupBy("wthr_date", "country", "city")\
    .agg(
        F.approx_count_distinct("id").alias("distinct_hotels"),
        F.max("avg_tmpr_c").alias("max_tmpr"),
        F.min("avg_tmpr_c").alias("min_tmpr"),
        F.avg("avg_tmpr_c").alias("avg_tmpr"),
    )\
    .select(
        F.col("wthr_date"),
        F.col("city"),
        F.col("distinct_hotels"),
        F.col("max_tmpr"),
        F.col("min_tmpr"),
        F.col("avg_tmpr"),
    )

silver_stream.writeStream.format('delta')\
    .outputMode("complete")\
    .option("checkpointLocation", SILVER_CHECKPOINTS)\
    .queryName("silver_stream")\
    .start(SILVER_PATH)

In [0]:
display(silver_stream)

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-24,Ferriday,1,16.7,16.7,16.7
2017-08-12,Finleyville,1,20.5,20.5,20.5
2016-10-21,Franklin,1,12.6,12.6,12.6
2017-09-11,Santa Maria,2,22.2,22.2,22.2
2016-10-07,Millington,1,22.5,22.5,22.5
2017-09-11,Lawrence,1,20.4,20.4,20.4
2017-09-21,Somerset,2,23.1,23.1,23.1
2017-08-30,Clatskanie,1,18.4,18.4,18.4
2016-10-22,Kansas City,2,14.4,14.4,14.4
2016-10-18,Moorcroft,1,5.7,5.7,5.7


### Create SQL table and views for analysis and visualisation

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS m13_db;
USE m13_db;

In [0]:
spark.sql(
    f'''
    CREATE TABLE silver_table
    USING DELTA
    LOCATION "{SILVER_PATH}"'''
)

In [0]:
%sql
CREATE OR REPLACE VIEW top_10_cities AS 
SELECT city, MAX(distinct_hotels) as max_distinct_hotels
FROM silver_table
GROUP BY city
ORDER BY max_distinct_hotels DESC
LIMIT 10

In [0]:
%sql
CREATE OR REPLACE VIEW top_10_cities_by_date AS 
SELECT
  wthr_date,
  s.city,
  distinct_hotels,
  max_tmpr,
  min_tmpr,
  ROUND(avg_tmpr, 2) AS avg_tmpr
FROM
  silver_table s INNER JOIN top_10_cities t ON s.city = t.city

In [0]:
%sql
SELECT * FROM top_10_cities

city,max_distinct_hotels
Paris,444
London,250
Barcelona,211
Milan,165
Amsterdam,85
Paddington,19
New York,6
San Diego,6
Vienna,5
Houston,5


### Visualize results for each of the top 10 cities

In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'Paris' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-03,Paris,228,10.7,10.7,10.7
2016-10-09,Paris,228,8.7,8.7,8.7
2016-10-13,Paris,233,8.4,8.4,8.4
2016-10-19,Paris,233,9.4,9.4,9.4
2016-10-22,Paris,233,7.2,7.2,7.2
2016-10-23,Paris,228,7.1,7.1,7.1
2016-10-27,Paris,228,11.4,11.4,11.4
2016-10-31,Paris,233,10.7,10.7,10.7
2017-08-14,Paris,228,17.6,17.6,17.6
2017-08-15,Paris,228,19.4,19.4,19.4


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'London' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-09,London,1,13.6,13.6,13.6
2016-10-10,London,250,8.4,8.4,8.4
2016-10-12,London,123,10.4,10.4,10.4
2016-10-16,London,250,12.1,12.1,12.1
2016-10-23,London,1,9.2,9.2,9.2
2016-10-29,London,1,12.9,12.9,12.9
2017-08-01,London,123,16.1,16.1,16.1
2017-08-02,London,1,15.9,15.9,15.9
2017-08-03,London,7,16.7,16.2,16.63
2017-08-06,London,123,14.9,14.9,14.9


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'Barcelona' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-03,Barcelona,1,16.7,16.7,16.7
2016-10-13,Barcelona,1,13.6,13.6,13.6
2016-10-24,Barcelona,1,16.6,16.6,16.6
2016-10-28,Barcelona,211,17.1,16.7,17.1
2017-08-10,Barcelona,1,18.0,18.0,18.0
2017-08-27,Barcelona,210,24.5,24.5,24.5
2017-09-10,Barcelona,1,16.7,16.7,16.7
2017-09-21,Barcelona,1,15.3,15.3,15.3
2017-09-27,Barcelona,1,15.0,15.0,15.0


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'Milan' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-06,Milan,157,12.1,12.1,12.1
2016-10-16,Milan,157,12.3,12.3,12.3
2016-10-18,Milan,1,11.8,11.8,11.8
2016-10-24,Milan,157,10.7,10.7,10.7
2016-10-31,Milan,8,10.0,10.0,10.0
2017-08-04,Milan,1,27.1,27.1,27.1
2017-08-06,Milan,157,23.9,23.9,23.9
2017-08-11,Milan,8,17.2,17.2,17.2
2017-08-12,Milan,8,19.5,19.5,19.5
2017-08-14,Milan,157,22.4,22.4,22.4


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'Amsterdam' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-02,Amsterdam,85,13.7,13.7,13.7
2017-08-03,Amsterdam,85,18.8,18.8,18.8
2017-08-04,Amsterdam,85,18.3,18.3,18.3
2017-08-05,Amsterdam,8,16.3,16.3,16.3
2017-08-08,Amsterdam,85,16.2,16.2,16.2
2017-08-10,Amsterdam,6,16.4,16.4,16.4
2017-08-11,Amsterdam,85,16.9,16.9,16.9
2017-08-17,Amsterdam,6,17.1,17.1,17.1
2017-08-27,Amsterdam,8,19.0,19.0,19.0
2017-08-31,Amsterdam,85,14.4,14.4,14.4


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'Paddington' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-10,Paddington,19,8.4,8.4,8.4
2016-10-16,Paddington,19,12.1,12.1,12.1
2017-08-07,Paddington,19,15.6,15.6,15.6
2017-08-25,Paddington,19,17.1,17.1,17.1
2017-09-03,Paddington,19,13.9,13.9,13.9
2017-09-19,Paddington,19,10.5,10.5,10.5
2017-09-23,Paddington,19,14.2,14.2,14.2


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'New York' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-05,New York,1,26.5,26.5,26.5
2016-10-07,New York,1,10.7,10.7,10.7
2016-10-21,New York,6,19.1,19.1,19.1
2016-10-23,New York,1,26.6,26.6,26.6
2016-10-26,New York,1,12.9,12.9,12.9
2017-08-02,New York,1,27.6,27.6,27.6
2017-08-10,New York,1,21.2,21.2,21.2
2017-08-11,New York,6,23.3,23.3,23.3
2017-08-13,New York,1,21.1,21.1,21.1
2017-08-21,New York,1,27.1,27.1,27.1


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'San Diego' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2017-08-16,San Diego,6,19.9,19.9,19.9
2017-08-17,San Diego,6,19.6,19.6,19.6
2017-09-16,San Diego,6,20.2,20.2,20.2
2017-09-17,San Diego,6,20.4,20.4,20.4
2017-09-20,San Diego,6,20.2,20.2,20.2


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'Vienna' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-23,Vienna,5,6.2,6.2,6.2
2016-10-30,Vienna,5,8.8,8.8,8.8
2017-08-27,Vienna,5,26.7,26.7,26.7
2017-09-10,Vienna,5,16.3,16.3,16.3
2017-09-19,Vienna,5,9.9,9.9,9.9


In [0]:
%sql
SELECT * FROM top_10_cities_by_date WHERE city = 'Houston' ORDER BY wthr_date

wthr_date,city,distinct_hotels,max_tmpr,min_tmpr,avg_tmpr
2016-10-05,Houston,1,27.6,27.6,27.6
2016-10-07,Houston,1,26.9,26.9,26.9
2016-10-10,Houston,1,21.0,21.0,21.0
2016-10-13,Houston,5,26.9,25.8,26.68
2016-10-16,Houston,1,26.7,26.7,26.7
2016-10-21,Houston,1,18.7,18.7,18.7
2017-08-04,Houston,1,28.1,28.1,28.1
2017-08-06,Houston,3,28.6,28.6,28.6
2017-08-08,Houston,2,25.4,25.4,25.4
2017-08-10,Houston,2,29.7,29.4,29.55
