In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, avg, min, max, stddev, count, first, last,
    row_number
)
from pyspark.sql.window import Window
import datetime

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Secret-based config
storage_account_name = dbutils.secrets.get(scope="cryptoSecret", key="azure-storage-account-name")
storage_account_key = dbutils.secrets.get(scope="cryptoSecret", key="azure-storage-account-key")
container_name = "crypto-data"

spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

# Today's date
today_str = datetime.datetime.utcnow().strftime("%Y-%m-%d")

# Define paths
base_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net"
daily_summary_root = f"{base_path}/output/daily_summary"
overall_out_path   = f"{base_path}/output/overall_summary/{today_str}_overall_summary.parquet"

# ── 1. Discover valid daily summary folders ────────────────────────────────
summary_dirs = []
for item in dbutils.fs.ls(daily_summary_root):
    try:
        inner_files = dbutils.fs.ls(item.path)
        if any(f.name.endswith(".parquet") for f in inner_files):
            summary_dirs.append(item.path)
    except Exception:
        pass  # skip unreadable folders

if not summary_dirs:
    raise FileNotFoundError("No daily summary parquet directories found!")

# ── 2. Read and union all valid daily summary data ─────────────────────────
df_all = spark.read.parquet(*summary_dirs)

# ── 3. Cast date column to date type ───────────────────────────────────────
df_all = df_all.withColumn("date", col("date").cast("date"))

# ── 4. Calculate first and last closing prices per coin across all dates ──
w_date = Window.partitionBy("coin").orderBy("date")

df_with_open_close = (
    df_all
    .withColumn("first_open", first("opening_price").over(w_date))
    .withColumn("last_close", last("closing_price").over(w_date))
)

# ── 5. Aggregate overall stats per coin ────────────────────────────────────
df_overall = (
    df_with_open_close.groupBy("coin")
    .agg(
        count("*").alias("total_days"),
        avg("avg_price").alias("overall_avg_price"),
        min("min_price").alias("overall_min_price"),
        max("max_price").alias("overall_max_price"),
        stddev("avg_price").alias("stddev_avg_price"),
        first("first_open").alias("opening_price"),
        last("last_close").alias("closing_price")
    )
    .withColumn("price_change", col("closing_price") - col("opening_price"))
    .withColumn("percent_change", (col("price_change") / col("opening_price")) * 100)
)

# ── 6. Add serial number (s_no) ordered by coin ────────────────────────────
w_serial = Window.orderBy("coin")
df_overall_final = df_overall.withColumn("s_no", row_number().over(w_serial))

# Reorder columns: s_no first
ordered_cols = ["s_no"] + [col_name for col_name in df_overall_final.columns if col_name != "s_no"]
df_overall_final = df_overall_final.select(ordered_cols)

# ── 7. Show and save ───────────────────────────────────────────────────────
# df_overall_final.show(truncate=False)
df_overall_final.write.mode("overwrite").parquet(overall_out_path)
print(f"✅ Saved overall summary with s_no to: {overall_out_path}")




+----+--------+----------+------------------+-----------------+-----------------+------------------+-------------+-------------+------------+-------------------+
|s_no|coin    |total_days|overall_avg_price |overall_min_price|overall_max_price|stddev_avg_price  |opening_price|closing_price|price_change|percent_change     |
+----+--------+----------+------------------+-----------------+-----------------+------------------+-------------+-------------+------------+-------------------+
|1   |bitcoin |3         |9051455.26758016  |8989647.0        |9152960.0        |77450.61804051751 |9144583.0    |9000746.0    |-143837.0   |-1.5729202742213615|
|2   |dogecoin|3         |16.306880684435907|15.66            |16.96            |0.6115487896069971|16.95        |15.7         |-1.25       |-7.374631268436578 |
|3   |ethereum|3         |221096.25422813787|213520.0         |226039.0         |6373.263550331786 |226034.0     |213688.0     |-12346.0    |-5.462010140067424 |
+----+--------+----------+--