In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col, sum as _sum, round

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Storage details
container_name = "silver"
storage_account_name = "coindeckostorage"

# SAS token
sas_token = "sp=racwdlmeop&st=2026-02-15T04:13:54Z&se=2026-06-18T12:28:54Z&spr=https&sv=2024-11-04&sr=c&sig=jzBah6PxChRvUBDlnKJLxfSCjvEx5U0dGwCIYD80xB0%3D"

# Configure Spark to use SAS token
spark.conf.set(
    f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net",
    sas_token
)

# Define Silver Delta path (use wasbs:// for SAS)
silver_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/cleanedData/"

# Read Delta table into DataFrame
silver_df = spark.read.format("delta").load(silver_path)



In [0]:
# Storage details
gold_container = "gold"
storage_account_name = "coindeckostorage"

# SAS token
sas_token = "sp=racwdlmeop&st=2026-02-15T04:59:48Z&se=2026-07-16T13:14:48Z&spr=https&sv=2024-11-04&sr=c&sig=4%2FiSbFGAw8fU8OD8LjS5nPW%2FNqPY26wY%2BYyQjpOtAtk%3D"

# Configure Spark to use SAS token
spark.conf.set(
    f"fs.azure.sas.{gold_container}.{storage_account_name}.blob.core.windows.net",
    sas_token
)

# Define Gold Delta path (use wasbs:// for SAS)
gold_path = f"wasbs://{gold_container}@{storage_account_name}.blob.core.windows.net/volume_to_market_ratio/"

# ---- Gold Transformation: Volume-to-MarketCap Ratio ----
gold_df = silver_df.withColumn(
    "volume_to_mcap_ratio",
    round(col("total_volume") / col("market_cap"), 4)
).select(
    col("coin_name"),
    col("symbol"),
    col("current_price"),
    col("market_cap"),
    col("total_volume"),
    col("volume_to_mcap_ratio")
)

# ---- Write to Gold Layer as Delta ----
gold_df.write.format("delta").mode("overwrite").partitionBy("symbol").save(gold_path)

# Display sample for analysts
display(gold_df.limit(20))


coin_name,symbol,current_price,market_cap,total_volume,volume_to_mcap_ratio
BlackRock USD Institutional Digital Liquidity Fund,buidl,1.0,1840586028,0.0,0.0
Bitcoin Cash,bch,564.49,11288756747,273201580.0,0.0242
Bitget Token,bgb,2.42,1695549690,22672869.0,0.0134
Algorand,algo,0.099694,884509578,31911902.0,0.0361
Avalanche,avax,9.69,4180618219,267176844.0,0.0639
Stacks,stx,0.277984,493924298,13467189.0,0.0273
Canton,cc,0.164065,6201808058,9675870.0,0.0016
Chainlink,link,9.19,6502666837,326839817.0,0.0503
Bittensor,tao,190.45,1825981433,210725488.0,0.1154
Beldex,bdx,0.080254,610380642,12921402.0,0.0212


Databricks visualization. Run in Databricks to view.

In [0]:
# ---- Gold Transformation: Opening & Closing Price ----
gold_df = silver_df.withColumn(
    # Opening price = current price - absolute change
    "open_price",
    col("current_price") - col("price_change_24h")
).withColumn(
    # Closing price = open price * (1 + percentage change)
    "close_price",
    (col("current_price") - col("price_change_24h")) * (1 + col("price_change_percentage_24h") / 100)
).withColumn(
    # Round percentage change for readability
    "price_change_pct",
    round(col("price_change_percentage_24h"), 2)
).select(
    col("coin_name"),
    col("symbol"),
    col("open_price"),
    col("close_price"),
    col("price_change_pct"),
    col("current_price"),
    col("market_cap"),
    col("total_volume")
)

# ---- Write Gold DataFrame to ADLS as Delta ----
gold_path = f"wasbs://{gold_container}@{storage_account_name}.blob.core.windows.net/open_close_price/"
gold_df.write.format("delta").mode("overwrite").partitionBy("symbol").save(gold_path)

# ---- Display sample for analysts ----
display(gold_df.limit(30))
gold_df.printSchema()

coin_name,symbol,open_price,close_price,price_change_pct,current_price,market_cap,total_volume
BlackRock USD Institutional Digital Liquidity Fund,buidl,1.0,1.0,0.0,1.0,1840586028,0.0
Spiko EU T-Bills Money Market Fund,eutbl,1.23979049,1.2399995186766142,0.02,1.24,759044594,0.0
Ethereum Classic,etc,8.653814,9.1400120973434,5.62,9.14,1420766255,49036040.0
Bitcoin Cash,bch,560.95,564.489314025,0.63,564.49,11288756747,273201580.0
Bitget Token,bgb,2.37472247,2.419957845914289,1.9,2.42,1695549690,22672869.0
Ethena USDe,usde,0.99898138,0.9991510070383242,0.02,0.999151,6298843086,78676964.0
Cronos,cro,0.08099749,0.083151998934753,2.66,0.083152,3314078924,17761329.0
Algorand,algo,0.09608951,0.099693990872267,3.75,0.099694,884509578,31911902.0
Cosmos Hub,atom,2.131502,2.210170410065,3.69,2.21,1087166058,36805788.0
Avalanche,avax,9.176176,9.6901464644064,5.6,9.69,4180618219,267176844.0


Databricks visualization. Run in Databricks to view.

root
 |-- coin_name: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- open_price: double (nullable = true)
 |-- close_price: double (nullable = true)
 |-- price_change_pct: double (nullable = true)
 |-- current_price: double (nullable = true)
 |-- market_cap: long (nullable = true)
 |-- total_volume: double (nullable = true)



In [0]:
from pyspark.sql.functions import col, sum as _sum, round

# Calculate total market cap across all coins
total_market_cap = silver_df.agg(_sum("market_cap").alias("total_market_cap")).collect()[0]["total_market_cap"]

# Add dominance percentage column using Spark SQL round()
dominance_df = silver_df.withColumn(
    "dominance_pct",
    round((col("market_cap") / total_market_cap) * 100, 2)   # âœ… use Spark round()
).select(
    col("coin_name"),
    col("symbol"),
    col("market_cap"),
    col("dominance_pct"),
    col("market_cap_rank")
)

# Define Gold Delta path (use wasbs:// for SAS if writing with SAS token)
gold_container = "gold"
storage_account_name = "coindeckostorage"
dominance_path = f"wasbs://{gold_container}@{storage_account_name}.blob.core.windows.net/market_dominance/"

# Write to Gold Layer as Delta
dominance_df.write.format("delta").mode("overwrite").partitionBy("symbol").save(dominance_path)

# Display sample for analysts
display(dominance_df.limit(10))
