In [2]:
import os
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, approx_count_distinct, avg, max, min, row_number
from pyspark.sql.window import Window

# Environment variables for Azure Data Lake Storage
storage_account_name = os.getenv("STORAGE_ACCOUNT_NAME")
storage_container_name = os.getenv("STORAGE_CONTAINER_NAME")
tf_var_client_id = os.getenv("TF_VAR_CLIENT_ID")
tf_var_client_secret = os.getenv("TF_VAR_CLIENT_SECRET")
tf_var_tenant_id = os.getenv("TF_VAR_TENANT_ID")

if not all(
    [
        storage_account_name,
        storage_container_name,
        tf_var_client_id,
        tf_var_client_secret,
        tf_var_tenant_id,
    ]
):
    raise ValueError("One or more environment variables are missing")

# Configure Spark session
spark = (
    SparkSession.builder.appName("IncrementalProcessing")
    .config("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net",
    "OAuth",
)
spark.conf.set(
    f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net",
    tf_var_client_id,
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net",
    tf_var_client_secret,
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net",
    f"https://login.microsoftonline.com/{tf_var_tenant_id}/oauth2/token",
)

# Load Parquet files from ADLS
incremental_hotel_weather_path = f"abfss://{storage_container_name}@{storage_account_name}.dfs.core.windows.net/incremental-hotel-weather"
df = spark.read.format("parquet").parquet(incremental_hotel_weather_path)

# Show the loaded DataFrame
df.show(truncate=False)

ValueError: One or more environment variables are missing

In [None]:
df_stream = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .schema(df.schema)
    .load(incremental_hotel_weather_path)
)

result_df = df_stream.groupBy("city", "wthr_date").agg(
    F.approx_count_distinct("id").alias("distinct_hotels"),
    F.avg("avg_tmpr_c").alias("avg_temperature"),
    F.max("avg_tmpr_c").alias("max_temperature"),
    F.min("avg_tmpr_c").alias("min_temperature"),
)

result_df.explain(extended=True)

== Parsed Logical Plan ==
'Aggregate ['city, 'wthr_date], ['city, 'wthr_date, approx_count_distinct('id, 0.05, 0, 0) AS distinct_hotels#467, avg('avg_tmpr_c) AS avg_temperature#469, max('avg_tmpr_c) AS max_temperature#471, min('avg_tmpr_c) AS min_temperature#473]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@1a77144e,cloudFiles,List(),Some(StructType(StructField(address,StringType,true),StructField(avg_tmpr_c,DoubleType,true),StructField(avg_tmpr_f,DoubleType,true),StructField(city,StringType,true),StructField(country,StringType,true),StructField(geoHash,StringType,true),StructField(id,StringType,true),StructField(latitude,DoubleType,true),StructField(longitude,DoubleType,true),StructField(name,StringType,true),StructField(wthr_date,StringType,true),StructField(processed_date,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true),StructField(day,IntegerType,true))),List(),None,Map(cloudFiles.format -> parquet, path -> abfss://data

In [None]:
query = (
    result_df.writeStream.outputMode("complete")
    .format("memory")
    .queryName("result_df_query")
    .start()
)

In [None]:
# Window specification to rank cities by distinct hotels and weather date
window_spec = Window.partitionBy("city").orderBy(col("distinct_hotels").desc(), col("wthr_date").desc())

# Add a row number to each partition
result_df_with_row_num = result_df.withColumn("row_number", row_number().over(window_spec))

# Filter to get the top 1 row per city
unique_cities_df = result_df_with_row_num.filter(col("row_number") == 1).drop("row_number")

# Get the top 10 cities
window_spec_top_10 = Window.orderBy(col("distinct_hotels").desc())
top_10_cities_df = unique_cities_df.withColumn("rank", row_number().over(window_spec_top_10)).filter(col("rank") <= 10).drop("rank")

# Show the resulting DataFrame
top_10_cities_df.show(truncate=False)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-682172774778850>:15[0m
[1;32m     12[0m top_10_cities_df [38;5;241m=[39m unique_cities_df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mrank[39m[38;5;124m"[39m, row_number()[38;5;241m.[39mover(window_spec_top_10))[38;5;241m.[39mfilter(col([38;5;124m"[39m[38;5;124mrank[39m[38;5;124m"[39m) [38;5;241m<[39m[38;5;241m=[39m [38;5;241m10[39m)[38;5;241m.[39mdrop([38;5;124m"[39m[38;5;124mrank[39m[38;5;124m"[39m)
[1;32m     14[0m [38;5;66;03m# Show the resulting DataFrame[39;00m
[0;32m---> 15[0m top_10_cities_df[38;5;241m.[39mshow(truncate[38;5;241m=[39m[38;5;28;01mFalse[39;00m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [