In [0]:
STORAGE_ACCOUNT = "cxdlbbronze"
CONTAINER = "landingzone"
Folder = "BTC-USD-Partitioned"

lakehouse_path = f"abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/{Folder}"
checkpoint_path = lakehouse_path + "_checkpoint/last_trade.json"

client_id = dbutils.secrets.get(scope="landingzone-secret", key="client-id")
client_secret = dbutils.secrets.get(scope="landingzone-secret", key="client-secret")
tenant_id = dbutils.secrets.get(scope="landingzone-secret", key="tenant-id")

account_host = f"{STORAGE_ACCOUNT}.dfs.core.windows.net"
spark.conf.set(f"fs.azure.account.auth.type.{account_host}", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_host}",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{account_host}", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{account_host}", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{account_host}",
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
dbutils.fs.rm("abfss://landingzone@cxdlbbronze.dfs.core.windows.net/BTC-USD-Partitioned/", recurse=True)
dbutils.fs.rm("abfss://landingzone@cxdlbbronze.dfs.core.windows.net/BTC-USD-Partitioned_checkpoint/", recurse=True)
dbutils.fs.rm("abfss://landingzone@cxdlbbronze.dfs.core.windows.net/BTC-USD-Partitioned_checkpoint/", recurse=True)
dbutils.fs.rm("abfss://landingzone@cxdlbbronze.dfs.core.windows.net/BTC-USD-Partitioned_parquet/", recurse=True)


True

In [0]:
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, DecimalType, StringType
from pyspark.sql.functions import col, lit, max as spark_max, to_timestamp, date_format
from datetime import datetime
import time

spark = SparkSession.builder.getOrCreate()

# -------------------------
# Config
# -------------------------
api_url = "https://api.exchange.coinbase.com/products/BTC-USD/trades"


STORAGE_ACCOUNT = "cxdlbbronze"
CONTAINER = "landingzone"
Folder = "BTC-USD-Partitioned"

lakehouse_path = f"abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/{Folder}"
checkpoint_path = lakehouse_path + "_checkpoint/last_trade.json"

parquet_path = lakehouse_path + "_parquet"

client_id = dbutils.secrets.get(scope="landingzone-secret", key="client-id")
client_secret = dbutils.secrets.get(scope="landingzone-secret", key="client-secret")
tenant_id = dbutils.secrets.get(scope="landingzone-secret", key="tenant-id")

account_host = f"{STORAGE_ACCOUNT}.dfs.core.windows.net"
spark.conf.set(f"fs.azure.account.auth.type.{account_host}", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_host}",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{account_host}", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{account_host}", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{account_host}",
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

#By default Spark uses 200 shuffle partitions — too high
spark.conf.set("spark.sql.shuffle.partitions", 8)


def benchmark(name, action):
    start = time.time()
    result = action()
    duration = time.time() - start
    return (name, duration, result if isinstance(result, int) else None)

# -------------------------
# Get last watermark (trade_id)
# -------------------------
def get_last_trade_id():
    try:
        df = spark.read.json(checkpoint_path)
        return df.collect()[0]["last_trade_id"]
    except Exception:
        return 0   # start from 0 for first run

last_trade_id = get_last_trade_id()
print(f"Last processed trade_id = {last_trade_id}")

# -------------------------
# Define standardized schema
# -------------------------
trade_schema = StructType([
    StructField("trade_id", LongType(), False),
    StructField("trade_time", TimestampType(), False),
    StructField("price", DecimalType(18,2), False),
    StructField("size", DecimalType(18,8), False),
    StructField("side", StringType(), False),
    StructField("ingest_ts", TimestampType(), False)
])

# -------------------------
# Fetch trades from Coinbase API
# -------------------------
response = requests.get(api_url, headers={"Accept": "application/json"})
if response.status_code != 200:
    raise Exception(f"API call failed: {response.text}")

data = response.json()

# Convert to Spark DataFrame
raw_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(d) for d in data]))

# Coinbase API returns fields: time, trade_id, price, size, side
raw_df.show(5, truncate=False)

# -------------------------
# Transform + Standardize schema
# -------------------------
df = (raw_df
    .withColumn("trade_id", col("trade_id").cast(LongType()))
    .withColumn("trade_time", to_timestamp(col("time")))
    .withColumn("price", col("price").cast(DecimalType(18,2)))
    .withColumn("size", col("size").cast(DecimalType(18,8)))
    .withColumn("ingest_ts", lit(datetime.utcnow()))
    .withColumn("trade_date", date_format(col("trade_time"), "yyyy-MM-dd"))  # string date
    .select("trade_id", "trade_time", "price", "size", "side", "ingest_ts","trade_date")
)

# -------------------------
# Filter Incremental Records
# -------------------------
df_incremental = df.filter(col("trade_id") > last_trade_id)

if df_incremental.count() == 0:
    print("No new trades to load.")
else:
    # Add load timestamp for auditing
    df_incremental = df_incremental.withColumn("ingest_ts", lit(datetime.now().isoformat()))

    # ---------------------------------------------
    # Write to Azure Data Lakehouse (Delta format)
    # ---------------------------------------------
    df_incremental.write.format("delta").partitionBy("trade_date").mode("append").save(lakehouse_path)

    # Write as Parquet
    df_incremental.write.mode("append").parquet(parquet_path)
    benchmarks = []
    # Count trades
    benchmarks.append(benchmark("Delta Count", lambda: spark.read.format("delta").load(lakehouse_path).count()))
    benchmarks.append(benchmark("Parquet Count", lambda: spark.read.parquet(parquet_path).count()))

    # Partition filter
    benchmarks.append(benchmark("Delta Filter", lambda: spark.read.format("delta").load(lakehouse_path).filter("trade_date='2025-09-21'").count()))
    benchmarks.append(benchmark("Parquet Filter", lambda: spark.read.parquet(parquet_path).filter("trade_date='2025-09-21'").count()))
    bench_df = spark.createDataFrame(benchmarks, ["Test", "Duration_sec", "RowCount"])
    bench_df.show(truncate=False)

    # -------------------------
    # Update watermark
    # -------------------------
    new_trade_id = df_incremental.agg(spark_max("trade_id")).collect()[0][0]
    spark.createDataFrame([{"last_trade_id": new_trade_id}]).write.mode("append").json(checkpoint_path)

    print(f"✅ Loaded {df_incremental.count()} new trades. Updated watermark = {new_trade_id}")


Last processed trade_id = 876394297
+---------------+----+----------+---------------------------+---------+
|price          |side|size      |time                       |trade_id |
+---------------+----+----------+---------------------------+---------+
|115704.39000000|sell|0.00021391|2025-09-21T11:18:50.317826Z|876394379|
|115704.39000000|sell|0.00004064|2025-09-21T11:18:47.374875Z|876394378|
|115704.39000000|sell|0.00004098|2025-09-21T11:18:46.539722Z|876394377|
|115704.38000000|buy |0.00013658|2025-09-21T11:18:43.976789Z|876394376|
|115704.39000000|sell|0.00020997|2025-09-21T11:18:43.200237Z|876394375|
+---------------+----+----------+---------------------------+---------+
only showing top 5 rows


  .withColumn("ingest_ts", lit(datetime.utcnow()))


+--------------+------------------+--------+
|Test          |Duration_sec      |RowCount|
+--------------+------------------+--------+
|Delta Count   |0.7224810123443604|1098    |
|Parquet Count |0.7677757740020752|1098    |
|Delta Filter  |0.8253052234649658|1098    |
|Parquet Filter|0.6873869895935059|1098    |
+--------------+------------------+--------+

✅ Loaded 82 new trades. Updated watermark = 876394379
