# Crypto Delta Lake Star Schema Pipeline

End-to-end Databricks workflow that promotes normalized crypto pricing data into a star schema ready for BI consumption.

## Workflow

- Bronze: land raw CoinGecko JSON (one row per asset).
- Silver: normalize into ingestion batches, cryptocurrencies, price snapshots, market metrics.
- Gold: build star schema dimensions and fact table with Delta Lake merges.
- Quality checks: confirm deduplication, row counts, and snapshot recency before publishing.

In [None]:
# COMMAND ----------
from pyspark.sql import functions as F
from delta.tables import DeltaTable

dbutils.widgets.text('raw_input_path', 'dbfs:/mnt/crypto/landing_zone/crypto_prices_sample.json', 'Raw JSON Path')
dbutils.widgets.text('bronze_table', 'crypto_raw.prices_bronze', 'Bronze Table')
dbutils.widgets.text('silver_db', 'crypto_silver', 'Silver Database')
dbutils.widgets.text('gold_db', 'crypto_gold', 'Gold Database')
dbutils.widgets.text('snapshot_interval_minutes', '5', 'Snapshot Interval (min)')

landing_path = dbutils.widgets.get('raw_input_path')
bronze_table = dbutils.widgets.get('bronze_table')
silver_db = dbutils.widgets.get('silver_db')
gold_db = dbutils.widgets.get('gold_db')
snapshot_interval = int(dbutils.widgets.get('snapshot_interval_minutes'))
snapshot_seconds = snapshot_interval * 60

spark.sql('CREATE DATABASE IF NOT EXISTS crypto_raw')
spark.sql(f'CREATE DATABASE IF NOT EXISTS {silver_db}')
spark.sql(f'CREATE DATABASE IF NOT EXISTS {gold_db}')

spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
spark.conf.set('spark.databricks.delta.schema.autoMerge.enabled', 'true')

def upsert_to_delta(df, table_name, key_cols):
    assignments = {col: F.col(f'source.{col}') for col in df.columns}
    if spark.catalog.tableExists(table_name):
        target = DeltaTable.forName(spark, table_name)
        condition = ' AND '.join([f'target.{col} = source.{col}' for col in key_cols])
        (target.alias('target').merge(df.alias('source'), condition)
               .whenMatchedUpdate(set=assignments)
               .whenNotMatchedInsert(values=assignments)
               .execute())
    else:
        df.write.format('delta').mode('overwrite').saveAsTable(table_name)

In [None]:
# COMMAND ----------
raw_df = (
    spark.read.option('multiLine', True).json(landing_path)
    .withColumn('ingested_at_ts', F.to_timestamp('ingested_at'))
    .withColumn('record_count', F.size('records'))
)

bronze_rows = (
    raw_df.select(
        'ingested_at_ts',
        'source',
        'record_count',
        F.posexplode_outer('records').alias('record_index', 'record')
    )
    .withColumn('landing_ts', F.current_timestamp())
    .withColumn(
        'record_hash',
        F.sha2(
            F.concat_ws('|', F.col('source'), F.col('ingested_at_ts').cast('string'), F.col('record.id')),
            256
        )
    )
)

bronze_rows.write.format('delta').mode('append').saveAsTable(bronze_table)
print(f'Bronze upsert complete: {bronze_rows.count()} raw rows written.')

In [None]:
# COMMAND ----------
bronze_df = spark.table(bronze_table)

silver_enriched = (
    bronze_df
    .filter(F.col('record').isNotNull())
    .withColumn('batch_id', F.sha2(F.concat_ws('|', F.col('source'), F.col('ingested_at_ts').cast('string')), 256))
    .withColumn('last_updated_ts', F.to_timestamp(F.col('record.last_updated')))
    .withColumn('ath_date_ts', F.to_timestamp(F.col('record.ath_date')))
    .withColumn('atl_date_ts', F.to_timestamp(F.col('record.atl_date')))
    .withColumn(
        'snapshot_time',
        F.when(
            F.col('last_updated_ts').isNotNull(),
            F.from_unixtime(
                F.round(F.unix_timestamp(F.col('last_updated_ts')) / snapshot_seconds) * snapshot_seconds
            ).cast('timestamp')
        ).otherwise(F.col('ingested_at_ts'))
    )
    .withColumn(
        'snapshot_key',
        F.sha2(
            F.concat_ws(
                '|',
                F.col('record.id'),
                F.date_format('snapshot_time', 'yyyy-MM-dd HH:mm:ss'),
                F.col('batch_id')
            ),
            256
        )
    )
)

silver_flat = silver_enriched.select(
    'batch_id',
    'source',
    'ingested_at_ts',
    'landing_ts',
    'snapshot_time',
    'snapshot_key',
    'last_updated_ts',
    'ath_date_ts',
    'atl_date_ts',
    F.col('record.id').alias('crypto_id'),
    F.col('record.symbol').alias('symbol'),
    F.col('record.name').alias('name'),
    F.col('record.image').alias('image_url'),
    F.col('record.current_price').alias('current_price'),
    F.col('record.high_24h').alias('high_24h'),
    F.col('record.low_24h').alias('low_24h'),
    F.col('record.price_change_24h').alias('price_change_24h'),
    F.col('record.price_change_percentage_24h').alias('price_change_pct_24h'),
    F.col('record.ath').alias('ath'),
    F.col('record.ath_change_percentage').alias('ath_change_pct'),
    F.col('record.atl').alias('atl'),
    F.col('record.atl_change_percentage').alias('atl_change_pct'),
    F.col('record.market_cap').alias('market_cap'),
    F.col('record.market_cap_rank').alias('market_cap_rank'),
    F.col('record.fully_diluted_valuation').alias('fully_diluted_valuation'),
    F.col('record.total_volume').alias('total_volume'),
    F.col('record.market_cap_change_24h').alias('market_cap_change_24h'),
    F.col('record.market_cap_change_percentage_24h').alias('market_cap_change_pct_24h'),
    F.col('record.circulating_supply').alias('circulating_supply'),
    F.col('record.total_supply').alias('total_supply'),
    F.col('record.max_supply').alias('max_supply'),
    F.col('record.roi.times').alias('roi_times'),
    F.col('record.roi.currency').alias('roi_currency'),
    F.col('record.roi.percentage').alias('roi_percentage')
).filter(F.col('snapshot_key').isNotNull())

ingestion_df = (
    silver_flat.groupBy('batch_id', 'source', 'ingested_at_ts')
    .agg(
        F.count('*').alias('record_count'),
        F.min('landing_ts').alias('landed_at')
    )
    .withColumnRenamed('ingested_at_ts', 'ingested_at')
)

cryptocurrencies_df = (
    silver_flat.groupBy('crypto_id', 'symbol', 'name', 'image_url')
    .agg(
        F.min('snapshot_time').alias('first_snapshot_at'),
        F.max('last_updated_ts').alias('last_source_update_at'),
        F.max('landing_ts').alias('last_landed_at')
    )
    .withColumn('last_source_update_at', F.coalesce(F.col('last_source_update_at'), F.col('last_landed_at')))
    .drop('last_landed_at')
)

price_snapshots_df = (
    silver_flat.select(
        'snapshot_key',
        'crypto_id',
        'batch_id',
        'snapshot_time',
        F.col('last_updated_ts').alias('last_updated_at'),
        'current_price',
        'high_24h',
        'low_24h',
        'price_change_24h',
        'price_change_pct_24h',
        'ath',
        'ath_change_pct',
        F.col('ath_date_ts').alias('ath_date'),
        'atl',
        'atl_change_pct',
        F.col('atl_date_ts').alias('atl_date')
    ).dropDuplicates(['snapshot_key'])
)

market_metrics_df = (
    silver_flat.select(
        'snapshot_key',
        'market_cap',
        'market_cap_rank',
        'fully_diluted_valuation',
        'total_volume',
        'market_cap_change_24h',
        'market_cap_change_pct_24h',
        'circulating_supply',
        'total_supply',
        'max_supply',
        'roi_times',
        'roi_currency',
        'roi_percentage'
    ).dropDuplicates(['snapshot_key'])
)

upsert_to_delta(
    ingestion_df.select('batch_id', 'source', 'ingested_at', 'record_count', 'landed_at'),
    f'{silver_db}.ingestion_batches',
    ['batch_id']
)

upsert_to_delta(
    cryptocurrencies_df.select('crypto_id', 'symbol', 'name', 'image_url', 'first_snapshot_at', 'last_source_update_at'),
    f'{silver_db}.cryptocurrencies',
    ['crypto_id']
)

upsert_to_delta(
    price_snapshots_df,
    f'{silver_db}.price_snapshots',
    ['snapshot_key']
)

upsert_to_delta(
    market_metrics_df,
    f'{silver_db}.market_metrics',
    ['snapshot_key']
)

print('Silver layer refreshed.')

In [None]:
# COMMAND ----------
spark.sql(f'''
CREATE TABLE IF NOT EXISTS {gold_db}.dim_cryptocurrency (
    crypto_key BIGINT GENERATED BY DEFAULT AS IDENTITY,
    crypto_id STRING NOT NULL,
    symbol STRING NOT NULL,
    name STRING NOT NULL,
    image_url STRING,
    first_snapshot_at TIMESTAMP,
    last_source_update_at TIMESTAMP,
    updated_at TIMESTAMP
) USING DELTA
TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')
''')

spark.sql(f'''
CREATE TABLE IF NOT EXISTS {gold_db}.dim_source (
    source_key BIGINT GENERATED BY DEFAULT AS IDENTITY,
    source_name STRING NOT NULL,
    first_ingested_at TIMESTAMP,
    last_ingested_at TIMESTAMP,
    updated_at TIMESTAMP
) USING DELTA
TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')
''')

spark.sql(f'''
CREATE TABLE IF NOT EXISTS {gold_db}.dim_date (
    date_key INT,
    full_date DATE,
    day_of_month INT,
    day_name STRING,
    iso_week INT,
    month_of_year INT,
    month_name STRING,
    quarter_of_year INT,
    calendar_year INT,
    is_weekend BOOLEAN
) USING DELTA
TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')
''')

spark.sql(f'''
CREATE TABLE IF NOT EXISTS {gold_db}.dim_time (
    time_key INT,
    full_time STRING,
    hour_24 INT,
    minute_of_hour INT,
    second_of_minute INT
) USING DELTA
TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')
''')

spark.sql(f'''
CREATE TABLE IF NOT EXISTS {gold_db}.fact_crypto_price_metrics (
    fact_id BIGINT GENERATED BY DEFAULT AS IDENTITY,
    business_key STRING NOT NULL,
    crypto_key BIGINT NOT NULL,
    date_key INT NOT NULL,
    time_key INT NOT NULL,
    source_key BIGINT NOT NULL,
    batch_id STRING NOT NULL,
    snapshot_timestamp TIMESTAMP NOT NULL,
    last_updated_at TIMESTAMP NOT NULL,
    current_price DECIMAL(20,8) NOT NULL,
    high_24h DECIMAL(20,8),
    low_24h DECIMAL(20,8),
    price_change_24h DECIMAL(20,8),
    price_change_pct_24h DECIMAL(10,5),
    ath DECIMAL(20,8),
    ath_change_pct DECIMAL(10,5),
    atl DECIMAL(20,8),
    atl_change_pct DECIMAL(10,5),
    market_cap BIGINT,
    market_cap_rank INT,
    fully_diluted_valuation BIGINT,
    total_volume BIGINT,
    market_cap_change_24h BIGINT,
    market_cap_change_pct_24h DECIMAL(10,5),
    circulating_supply DECIMAL(30,8),
    total_supply DECIMAL(30,8),
    max_supply DECIMAL(30,8),
    roi_times DECIMAL(20,8),
    roi_percentage DECIMAL(20,8),
    load_timestamp TIMESTAMP NOT NULL,
    updated_at TIMESTAMP
) USING DELTA
TBLPROPERTIES ('delta.autoOptimize.autoCompact' = 'true')
''')

print('Gold schema ensured.')

In [None]:
# COMMAND ----------
silver_cryptos = spark.table(f'{silver_db}.cryptocurrencies')
silver_batches = spark.table(f'{silver_db}.ingestion_batches')
silver_prices = spark.table(f'{silver_db}.price_snapshots')
silver_metrics = spark.table(f'{silver_db}.market_metrics')

dim_crypto_df = silver_cryptos.select(
    'crypto_id',
    'symbol',
    'name',
    'image_url',
    'first_snapshot_at',
    'last_source_update_at',
    F.current_timestamp().alias('updated_at')
)

dim_source_df = (
    silver_batches.groupBy('source')
    .agg(
        F.min('ingested_at').alias('first_ingested_at'),
        F.max('ingested_at').alias('last_ingested_at')
    )
    .withColumnRenamed('source', 'source_name')
    .withColumn('updated_at', F.current_timestamp())
)

dim_date_df = (
    silver_prices.select(F.to_date('snapshot_time').alias('full_date'))
    .dropna()
    .distinct()
    .withColumn('date_key', F.year('full_date') * 10000 + F.month('full_date') * 100 + F.dayofmonth('full_date'))
    .withColumn('day_of_month', F.dayofmonth('full_date'))
    .withColumn('day_name', F.date_format('full_date', 'E'))
    .withColumn('iso_week', F.weekofyear('full_date'))
    .withColumn('month_of_year', F.month('full_date'))
    .withColumn('month_name', F.date_format('full_date', 'MMM'))
    .withColumn('quarter_of_year', F.quarter('full_date'))
    .withColumn('calendar_year', F.year('full_date'))
    .withColumn('is_weekend', F.dayofweek('full_date').isin(1, 7))
    .select(
        'date_key',
        'full_date',
        'day_of_month',
        'day_name',
        'iso_week',
        'month_of_year',
        'month_name',
        'quarter_of_year',
        'calendar_year',
        F.col('is_weekend').cast('boolean').alias('is_weekend')
    )
)

dim_time_df = (
    silver_prices.select(F.date_trunc('minute', 'snapshot_time').alias('minute_ts'))
    .dropna()
    .distinct()
    .withColumn('time_key', F.hour('minute_ts') * 100 + F.minute('minute_ts'))
    .withColumn('full_time', F.date_format('minute_ts', 'HH:mm:ss'))
    .withColumn('hour_24', F.hour('minute_ts'))
    .withColumn('minute_of_hour', F.minute('minute_ts'))
    .withColumn('second_of_minute', F.second('minute_ts'))
    .select('time_key', 'full_time', 'hour_24', 'minute_of_hour', 'second_of_minute')
)

upsert_to_delta(dim_crypto_df, f'{gold_db}.dim_cryptocurrency', ['crypto_id'])
upsert_to_delta(dim_source_df, f'{gold_db}.dim_source', ['source_name'])
upsert_to_delta(dim_date_df, f'{gold_db}.dim_date', ['date_key'])
upsert_to_delta(dim_time_df, f'{gold_db}.dim_time', ['time_key'])

dim_crypto = spark.table(f'{gold_db}.dim_cryptocurrency').select('crypto_key', 'crypto_id')
dim_source = spark.table(f'{gold_db}.dim_source').select('source_key', 'source_name')
dim_date = spark.table(f'{gold_db}.dim_date').select('date_key', 'full_date')
dim_time = spark.table(f'{gold_db}.dim_time').select('time_key', 'full_time')

fact_df = (
    silver_prices.alias('ps')
    .join(silver_metrics.alias('mm'), 'snapshot_key', 'inner')
    .join(silver_batches.alias('ib'), 'batch_id', 'inner')
    .join(dim_crypto.alias('dc'), F.col('ps.crypto_id') == F.col('dc.crypto_id'), 'inner')
    .join(dim_source.alias('ds'), F.col('ib.source') == F.col('ds.source_name'), 'inner')
    .join(dim_date.alias('dd'), F.to_date('ps.snapshot_time') == F.col('dd.full_date'), 'inner')
    .join(dim_time.alias('dt'), F.date_format('ps.snapshot_time', 'HH:mm:ss') == F.col('dt.full_time'), 'inner')
    .select(
        F.sha2(
            F.concat_ws('|', F.col('dc.crypto_key'), F.date_format('ps.snapshot_time', 'yyyy-MM-dd HH:mm:ss')),
            256
        ).alias('business_key'),
        F.col('dc.crypto_key'),
        F.col('dd.date_key'),
        F.col('dt.time_key'),
        F.col('ds.source_key'),
        F.col('ps.batch_id'),
        F.col('ps.snapshot_time').alias('snapshot_timestamp'),
        F.col('ps.last_updated_at'),
        F.col('ps.current_price'),
        F.col('ps.high_24h'),
        F.col('ps.low_24h'),
        F.col('ps.price_change_24h'),
        F.col('ps.price_change_pct_24h'),
        F.col('ps.ath'),
        F.col('ps.ath_change_pct'),
        F.col('ps.atl'),
        F.col('ps.atl_change_pct'),
        F.col('mm.market_cap'),
        F.col('mm.market_cap_rank'),
        F.col('mm.fully_diluted_valuation'),
        F.col('mm.total_volume'),
        F.col('mm.market_cap_change_24h'),
        F.col('mm.market_cap_change_pct_24h'),
        F.col('mm.circulating_supply'),
        F.col('mm.total_supply'),
        F.col('mm.max_supply'),
        F.col('mm.roi_times'),
        F.col('mm.roi_percentage'),
        F.current_timestamp().alias('load_timestamp'),
        F.current_timestamp().alias('updated_at')
    )
)

upsert_to_delta(fact_df, f'{gold_db}.fact_crypto_price_metrics', ['business_key'])

print('Gold layer refreshed.')

In [None]:
# COMMAND ----------
fact_rows = spark.sql(f'SELECT COUNT(DISTINCT business_key) AS fact_rows FROM {gold_db}.fact_crypto_price_metrics')
fact_rows.show()

duplicates = spark.sql(f'''
SELECT business_key, COUNT(*) AS cnt
FROM {gold_db}.fact_crypto_price_metrics
GROUP BY business_key
HAVING COUNT(*) > 1
''')
duplicates.show()

spark.sql(f'''
SELECT d.symbol,
       MAX(f.snapshot_timestamp) AS latest_snapshot,
       COUNT(*) AS row_count
FROM {gold_db}.fact_crypto_price_metrics f
JOIN {gold_db}.dim_cryptocurrency d
  ON f.crypto_key = d.crypto_key
GROUP BY d.symbol
ORDER BY latest_snapshot DESC
''').show()

In [None]:
# COMMAND ----------
# Data quality checks for negative magnitudes and missing critical attributes
silver_prices = spark.table(f"{silver_db}.price_snapshots")
silver_metrics = spark.table(f"{silver_db}.market_metrics")
gold_fact = spark.table(f"{gold_db}.fact_crypto_price_metrics")

dq_issues = []

negative_price_count = silver_prices.filter("current_price <= 0 OR high_24h < 0 OR low_24h < 0").count()
if negative_price_count > 0:
    dq_issues.append(f"Found {negative_price_count} price records with invalid negative values.")

negative_metric_count = silver_metrics.filter("market_cap < 0 OR total_volume < 0 OR circulating_supply < 0").count()
if negative_metric_count > 0:
    dq_issues.append(f"Found {negative_metric_count} market metric records with negative magnitudes.")

null_fact_count = gold_fact.filter("snapshot_timestamp IS NULL OR current_price IS NULL").count()
if null_fact_count > 0:
    dq_issues.append(f"Found {null_fact_count} fact rows missing critical attributes.")

if dq_issues:
    raise ValueError("; ".join(dq_issues))
print("âœ… Data quality checks passed (no negative magnitudes, required fields present).")