In [0]:
# Streaming Hotel/Weather Aggregation with Auto Loader
# ----------------------------------------
# 1) Imports & Helpers
# ----------------------------------------
from pyspark.sql.functions import (
    year, month, dayofmonth, col, to_date,
    approx_count_distinct, avg, max, min, round, desc
)
from pyspark.sql.types import (
    StructType, StringType, DoubleType, TimestampType
)

In [0]:
# ----------------------------------------
# 2) Read Secrets & Configure Spark
# ----------------------------------------
storage_account = dbutils.secrets.get("streaming", "AZURE_STORAGE_ACCOUNT_NAME")
storage_key     = dbutils.secrets.get("streaming", "AZURE_STORAGE_ACCOUNT_KEY")
container       = dbutils.secrets.get("streaming", "AZURE_CONTAINER_NAME")

config = {
    "storage_account": storage_account,
    "storage_key": storage_key,
    "container": container
}

# tell Spark how to authenticate to your Blob storage
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.blob.core.windows.net",
    storage_key
)


In [0]:
# ----------------------------------------
# 3) Build Paths
# ----------------------------------------
container_root    = f"wasbs://{container}@{storage_account}.blob.core.windows.net/"
input_path        = container_root
checkpoint_prefix = "dbfs:/checkpoints/hotel-weather/"
console_ckpt      = checkpoint_prefix + "console-hotel-weather-agg"
delta_ckpt        = checkpoint_prefix + "delta-agg-city"
delta_path        = container_root + "delta/agg_city"

# ----------------------------------------
# Cleanup any old checkpoints (on DBFS)
# ----------------------------------------
for path in (console_ckpt, delta_ckpt):
    dbutils.fs.rm(path, recurse=True)
    print(f"Deleted old checkpoint directory (if existed): {path}")

In [0]:
# ----------------------------------------
# 4) Sanity-Check Your Paths
# ----------------------------------------
print("Container root listing:")
display(dbutils.fs.ls(container_root))
print("Input path listing:")
display(dbutils.fs.ls(input_path))


path,name,size,modificationTime
wasbs://[REDACTED]@[REDACTED].blob.core.windows.net/year=2016/,year=2016/,0,0
wasbs://[REDACTED]@[REDACTED].blob.core.windows.net/year=2017/,year=2017/,0,0


path,name,size,modificationTime
wasbs://[REDACTED]@[REDACTED].blob.core.windows.net/year=2016/,year=2016/,0,0
wasbs://[REDACTED]@[REDACTED].blob.core.windows.net/year=2017/,year=2017/,0,0


In [0]:
# ----------------------------------------
# 5) Define Schema & Start Auto Loader
# ----------------------------------------
schema = (
    StructType()
      .add("address",    StringType())
      .add("avg_tmpr_c", DoubleType())
      .add("avg_tmpr_f", DoubleType())
      .add("city",       StringType())
      .add("country",    StringType())
      .add("geoHash",    StringType())
      .add("id",         StringType())
      .add("latitude",   DoubleType())
      .add("longitude",  DoubleType())
      .add("name",       StringType())
      .add("wthr_date",  StringType())
      .add("wthr_year",  StringType())
      .add("wthr_month",  StringType())
      .add("wthr_day",  StringType())
)



In [0]:
raw_stream = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "parquet")
      .option("header", "true")
      .schema(hotel_schema)
      .load(input_path)
)
print("Raw stream schema:")
raw_stream.printSchema()

In [0]:
display(raw_stream)

address,avg_tmpr_c,avg_tmpr_f,city,country,geoHash,id,latitude,longitude,name,wthr_date,wthr_year,wthr_month,wthr_day
Travelodge,9.2,48.6,Oswego,US,dr9x,25769803777,43.45161,-76.53235,309 W Seneca St,2016-10-10,,,
Quality Inn & Suites - Riverfront,9.2,48.6,Oswego,US,dr9x,369367187459,43.45818526,-76.50861875,70 E 1st St,2016-10-10,,,
Country Inn & Suites By Carlson,15.8,60.5,Clinton,US,dnkj,1047972020226,36.16315,-84.08504,710 Park Pl,2016-10-10,,,
Acorn Motor Inn,9.9,49.8,Oak Harbor,US,c28f,343597383685,48.288952,-122.657842,31530 State Route 20,2016-10-10,,,
Econo Lodge St Robert,15.9,60.7,St. Robert,US,9ywr,876173328384,37.8228,-92.14079,309 Highway Z,2016-10-10,,,
Hampton Inn-st Robert,15.9,60.7,Saint Robert,US,9ywr,3,37.82382,-92.149721,103 Saint Robert Plaza Dr,2016-10-10,,,
The Montcalm Marble Arch,8.4,47.2,London,GB,gcpv,3143916060674,51.5150522,-0.159239,2 Wallenberg Place Westminster Borough London W1H 7TN United Kingdom,2016-10-10,,,
DoubleTree by Hilton London West End,8.4,47.2,London,GB,gcpv,3221225472004,51.5201065,-0.1221393,92 Southampton Row Camden London WC1B 4BH United Kingdom,2016-10-10,,,
Montcalm Royal London House City of London,8.4,47.2,London,GB,gcpv,3332894621696,51.5218066,-0.0856081,22 25 Finsbury Square City Islington London EC2A 1DX United Kingdom,2016-10-10,,,
Radisson Blu Edwardian Berkshire,8.4,47.2,London,GB,gcpv,2508260900866,51.5146025,-0.1481978,350 Oxford Street Westminster Borough London W1C 1BY United Kingdom,2016-10-10,,,


In [0]:
# ----------------------------------------
# 6) Enrich with Timestamp, Date & Watermark
# ----------------------------------------
stream = (
    raw_stream
      .withColumn("wthr_date", col("wthr_date").cast(TimestampType()))
      .withColumn("date",      to_date(col("wthr_date")))
      .withWatermark("wthr_date", "1 day")  # handle up-to-1-day late data
)

In [0]:
display(stream)

address,avg_tmpr_c,avg_tmpr_f,city,country,geoHash,id,latitude,longitude,name,wthr_date,wthr_year,wthr_month,wthr_day,date
Travelodge,9.2,48.6,Oswego,US,dr9x,25769803777,43.45161,-76.53235,309 W Seneca St,2016-10-10T00:00:00Z,,,,2016-10-10
Quality Inn & Suites - Riverfront,9.2,48.6,Oswego,US,dr9x,369367187459,43.45818526,-76.50861875,70 E 1st St,2016-10-10T00:00:00Z,,,,2016-10-10
Country Inn & Suites By Carlson,15.8,60.5,Clinton,US,dnkj,1047972020226,36.16315,-84.08504,710 Park Pl,2016-10-10T00:00:00Z,,,,2016-10-10
Acorn Motor Inn,9.9,49.8,Oak Harbor,US,c28f,343597383685,48.288952,-122.657842,31530 State Route 20,2016-10-10T00:00:00Z,,,,2016-10-10
Econo Lodge St Robert,15.9,60.7,St. Robert,US,9ywr,876173328384,37.8228,-92.14079,309 Highway Z,2016-10-10T00:00:00Z,,,,2016-10-10
Hampton Inn-st Robert,15.9,60.7,Saint Robert,US,9ywr,3,37.82382,-92.149721,103 Saint Robert Plaza Dr,2016-10-10T00:00:00Z,,,,2016-10-10
The Montcalm Marble Arch,8.4,47.2,London,GB,gcpv,3143916060674,51.5150522,-0.159239,2 Wallenberg Place Westminster Borough London W1H 7TN United Kingdom,2016-10-10T00:00:00Z,,,,2016-10-10
DoubleTree by Hilton London West End,8.4,47.2,London,GB,gcpv,3221225472004,51.5201065,-0.1221393,92 Southampton Row Camden London WC1B 4BH United Kingdom,2016-10-10T00:00:00Z,,,,2016-10-10
Montcalm Royal London House City of London,8.4,47.2,London,GB,gcpv,3332894621696,51.5218066,-0.0856081,22 25 Finsbury Square City Islington London EC2A 1DX United Kingdom,2016-10-10T00:00:00Z,,,,2016-10-10
Radisson Blu Edwardian Berkshire,8.4,47.2,London,GB,gcpv,2508260900866,51.5146025,-0.1481978,350 Oxford Street Westminster Borough London W1C 1BY United Kingdom,2016-10-10T00:00:00Z,,,,2016-10-10


In [0]:
# ----------------------------------------
# 7) Aggregate: distinct hotels & rounded temps
# ----------------------------------------
stream = (
    raw_stream
      .withColumn("wthr_date", col("wthr_date").cast(TimestampType()))
      .withColumn("date",      to_date(col("wthr_date")))
      .withWatermark("wthr_date", "1 day")
)

aggregated = (
    stream
      .groupBy("city", "date")
      .agg(
          approx_count_distinct("id") .alias("distinct_hotels"),
          round(avg("avg_tmpr_c"), 2) .alias("avg_temp_c"),
          round(max("avg_tmpr_c"), 2) .alias("max_temp_c"),
          round(min("avg_tmpr_c"), 2) .alias("min_temp_c")
      )
)
print("Aggregated schema:")
aggregated.printSchema()

In [0]:
display(aggregated)

city,date,distinct_hotels,avg_temp_c,max_temp_c,min_temp_c
Exton,2017-09-27,1,23.6,23.6,23.6
Beaumont,2017-09-16,1,28.7,28.7,28.7
Midvale,2016-10-27,1,16.7,16.7,16.7
Stevens,2017-08-25,1,19.7,19.7,19.7
Stirling City,2017-08-23,1,23.2,23.2,23.2
Ramey,2017-08-25,1,24.7,24.7,24.7
Woodburn,2017-09-09,1,19.9,19.9,19.9
Celina,2016-10-31,1,10.7,10.7,10.7
Bloomington,2017-08-04,1,19.6,19.6,19.6
Houston,2016-10-07,1,26.9,26.9,26.9


In [0]:
# ----------------------------------------
# 8) Start Streaming Queries (separate checkpoints)
# ----------------------------------------
# 8a) Console sink (for debug)
console_q = (
    aggregated.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", False)
      .option("checkpointLocation", console_ckpt)
      .trigger(processingTime="20 seconds")
      .start()
)

In [0]:
display(console_q)

<pyspark.sql.streaming.query.StreamingQuery at 0x7fc6c4617e50>

In [0]:
# 8b) Delta Lake sink (for downstream queries & viz)
delta_q = (
    aggregated.writeStream
      .outputMode("complete")
      .format("delta")
      .option("path",               delta_path)
      .option("checkpointLocation", delta_ckpt)
      .trigger(processingTime="20 seconds")
      .start()
)

In [0]:
# ----------------------------------------
# 9) Read Back Delta & Register View
# ----------------------------------------
agg_batch_df = spark.read.format("delta").load(delta_path)
agg_batch_df.createOrReplaceTempView("agg_view")

In [0]:
# ----------------------------------------
# 10) Identify Top 10 Cities by Hotel Count
# ----------------------------------------
top10_df = spark.sql("""
  SELECT city, SUM(distinct_hotels) AS total_hotels
    FROM agg_view
   GROUP BY city
   ORDER BY total_hotels DESC
   LIMIT 10
""")
display(top10_df)
top_cities = [r["city"] for r in top10_df.collect()]

city,total_hotels
Paris,3421
London,2395
Milan,1464
Amsterdam,566
Barcelona,428
Paddington,133
Springfield,50
Houston,42
Memphis,33
Columbus,30


In [0]:
df_city.explain(True)

In [0]:
from pyspark.sql.functions import col, to_date

for city in top_cities:
    full = spark.sql(f"""
      SELECT date,
             distinct_hotels,
             avg_temp_c,
             max_temp_c,
             min_temp_c
        FROM agg_view
       WHERE city = '{city}'
       ORDER BY date
    """)
    
    # get first 3 dates
    first3 = [r.date for r in full.select("date")
                          .orderBy("date")
                          .limit(3)
                          .collect()]
    
    small = full.where(col("date").isin(first3))
    
    # unpivot with all doubles
    unpivot = small.selectExpr(
      "date",
      """
      stack(
        4,
        'Hotels',          cast(distinct_hotels as double),
        'Avg Temperature', avg_temp_c,
        'Max Temperature', max_temp_c,
        'Min Temperature', min_temp_c
      ) as (metric, value)
      """
    )
    
    displayHTML(f"<h2 style='margin-top:2em'>{city}</h2>")
    display(unpivot)


date,metric,value
2016-10-03,Hotels,228.0
2016-10-09,Hotels,228.0
2016-10-03,Avg Temperature,10.7
2016-10-09,Avg Temperature,8.7
2016-10-03,Max Temperature,10.7
2016-10-09,Max Temperature,8.7
2016-10-03,Min Temperature,10.7
2016-10-09,Min Temperature,8.7


Databricks visualization. Run in Databricks to view.

date,metric,value
2016-10-09,Hotels,1.0
2016-10-10,Hotels,250.0
2016-10-09,Avg Temperature,13.6
2016-10-10,Avg Temperature,8.4
2016-10-09,Max Temperature,13.6
2016-10-10,Max Temperature,8.4
2016-10-09,Min Temperature,13.6
2016-10-10,Min Temperature,8.4


Databricks visualization. Run in Databricks to view.

date,metric,value
2016-10-06,Hotels,157.0
2016-10-16,Hotels,157.0
2016-10-06,Avg Temperature,12.1
2016-10-16,Avg Temperature,12.3
2016-10-06,Max Temperature,12.1
2016-10-16,Max Temperature,12.3
2016-10-06,Min Temperature,12.1
2016-10-16,Min Temperature,12.3


date,metric,value
2016-10-02,Hotels,85.0
2017-08-03,Hotels,85.0
2016-10-02,Avg Temperature,13.7
2017-08-03,Avg Temperature,18.8
2016-10-02,Max Temperature,13.7
2017-08-03,Max Temperature,18.8
2016-10-02,Min Temperature,13.7
2017-08-03,Min Temperature,18.8


Databricks visualization. Run in Databricks to view.

date,metric,value
2016-10-03,Hotels,1.0
2016-10-13,Hotels,1.0
2016-10-03,Avg Temperature,16.7
2016-10-13,Avg Temperature,13.6
2016-10-03,Max Temperature,16.7
2016-10-13,Max Temperature,13.6
2016-10-03,Min Temperature,16.7
2016-10-13,Min Temperature,13.6


date,metric,value
2016-10-10,Hotels,19.0
2016-10-16,Hotels,19.0
2016-10-10,Avg Temperature,8.4
2016-10-16,Avg Temperature,12.1
2016-10-10,Max Temperature,8.4
2016-10-16,Max Temperature,12.1
2016-10-10,Min Temperature,8.4
2016-10-16,Min Temperature,12.1


date,metric,value
2016-10-01,Hotels,1.0
2016-10-02,Hotels,2.0
2016-10-01,Avg Temperature,14.8
2016-10-02,Avg Temperature,10.0
2016-10-01,Max Temperature,14.8
2016-10-02,Max Temperature,10.0
2016-10-01,Min Temperature,14.8
2016-10-02,Min Temperature,10.0


date,metric,value
2016-10-05,Hotels,1.0
2016-10-07,Hotels,1.0
2016-10-05,Avg Temperature,27.6
2016-10-07,Avg Temperature,26.9
2016-10-05,Max Temperature,27.6
2016-10-07,Max Temperature,26.9
2016-10-05,Min Temperature,27.6
2016-10-07,Min Temperature,26.9


date,metric,value
2016-10-12,Hotels,3.0
2016-10-21,Hotels,5.0
2016-10-12,Avg Temperature,21.9
2016-10-21,Avg Temperature,12.64
2016-10-12,Max Temperature,21.9
2016-10-21,Max Temperature,12.7
2016-10-12,Min Temperature,21.9
2016-10-21,Min Temperature,12.6


date,metric,value
2016-10-04,Hotels,1.0
2016-10-10,Hotels,2.0
2016-10-04,Avg Temperature,18.6
2016-10-10,Avg Temperature,10.5
2016-10-04,Max Temperature,18.6
2016-10-10,Max Temperature,10.5
2016-10-04,Min Temperature,18.6
2016-10-10,Min Temperature,10.5
