## RAW

In [0]:
import requests
import pandas as pd
from pyspark.sql.functions import current_timestamp

# 1. Fetch data directly into memory
URL = "https://api.coingecko.com/api/v3/coins/markets"
params = {'vs_currency': 'usd', 'order': 'market_cap_desc', 'per_page': 50}
response = requests.get(URL, params=params)

if response.status_code == 200:
    # 2. Convert to Spark DataFrame without saving a file
    json_data = response.json()
    spark_df = spark.createDataFrame(pd.DataFrame(json_data))
    
    # 3. Add metadata and save as a Managed Delta Table
    bronze_df = spark_df.withColumn("ingested_at", current_timestamp())
    bronze_df.write.format("delta").mode("overwrite").saveAsTable("bronze_crypto")
    
    print("Table 'bronze_crypto' created successfully!")
else:
    print(f"Failed to fetch data: {response.status_code}")

In [0]:
display(spark.table("bronze_crypto"))

##Cleanse

In [0]:
from pyspark.sql.functions import col, when, current_timestamp

def transform_bronze_to_silver():
    # 1. Read from the Bronze Table
    bronze_df = spark.table("bronze_crypto")
    
    # 2. Clean and Transform
    silver_df = (bronze_df
        .select(
            col("id").alias("coin_id"),
            col("symbol"),
            col("name"),
            col("current_price").cast("double").alias("price_usd"),
            col("market_cap").cast("long"),
            col("total_volume").cast("long").alias("daily_volume"),
            col("last_updated").cast("timestamp")
        )
        # Filter: Only keep coins with a valid price
        .filter(col("price_usd") > 0)
        # Add a calculated column: Is it a high volume coin? (e.g., > $100M)
        .withColumn("is_high_volume", when(col("daily_volume") > 100000000, True).otherwise(False))
        # Add a processing timestamp for the Silver layer
        .withColumn("silver_processed_at", current_timestamp())
    )
    
    # 3. Save as the Silver Table
    silver_df.write.format("delta").mode("overwrite").saveAsTable("silver_crypto")
    
    print("Successfully created 'silver_crypto' table!")

transform_bronze_to_silver()

In [0]:
display(spark.table("silver_crypto"))

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import avg, count, round

def create_gold_layer():
    # 1. Read from Silver
    silver_df = spark.table("silver_crypto")
    
    # 2. Aggregate: Average Price and Count by High Volume status
    gold_df = (silver_df
        .groupBy("is_high_volume")
        .agg(
            count("coin_id").alias("coin_count"),
            round(avg("price_usd"), 2).alias("avg_price_usd")
        )
    )
    
    # 3. Save as Gold Table
    gold_df.write.format("delta").mode("overwrite").saveAsTable("gold_crypto_stats")
    
    print("Gold Layer Created: Summary stats ready for reporting.")

    
create_gold_layer()


In [0]:
display(spark.table("gold_crypto_stats"))

Databricks visualization. Run in Databricks to view.

Author : Subham Nanda