In [None]:
# lakehouse/notebooks/transform_erc20_data.ipynb
# Databricks notebook source
# MAGIC %md
# MAGIC # ERC20 Transfer Data Transformation
# MAGIC 
# MAGIC This notebook processes raw ERC20 transfer data and creates enriched datasets
# MAGIC for analytics and real-time dashboards.

# COMMAND ----------

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import *
import json

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("ERC20Transformation") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Token Configuration and Metadata

# COMMAND ----------

# Define token metadata
token_metadata = {
    "0xdAC17F958D2ee523a2206206994597C13D831ec7": {"symbol": "USDT", "name": "Tether USD", "decimals": 6},
    "0xA0b86a33E6441986C3a3E95c9B95f5d9ed87D8b": {"symbol": "USDC", "name": "USD Coin", "decimals": 6},
    "0x6B175474E89094C44Da98b954EedeAC495271d0F": {"symbol": "DAI", "name": "Dai Stablecoin", "decimals": 18},
    "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": {"symbol": "WETH", "name": "Wrapped Ether", "decimals": 18}
}

In [None]:
# Create broadcast variable for efficient joins
token_metadata_broadcast = spark.sparkContext.broadcast(token_metadata)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Read Raw ERC20 Transfer Data

# COMMAND ----------

# Read raw ERC20 transfer data
raw_transfers_df = spark.read \
    .format("delta") \
    .load("Tables/raw/erc20_transfers_raw")

print("Raw ERC20 transfers schema:")
raw_transfers_df.printSchema()
print(f"\nTotal raw transfers: {raw_transfers_df.count()}")

# Show sample data
raw_transfers_df.show(5, truncate=False)

In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Data Quality and Validation

# COMMAND ----------

# Data quality checks
print("=== DATA QUALITY CHECKS ===")
print(f"Total records: {raw_transfers_df.count()}")
print(f"Records with null transaction_hash: {raw_transfers_df.filter(col('transaction_hash').isNull()).count()}")
print(f"Records with null contract_address: {raw_transfers_df.filter(col('contract_address').isNull()).count()}")
print(f"Records with zero value: {raw_transfers_df.filter(col('value') == 0).count()}")


In [None]:
# Check address formats
invalid_addresses = raw_transfers_df.filter(
    ~col("from_address").rlike("^0x[0-9a-fA-F]{40}$") |
    ~col("to_address").rlike("^0x[0-9a-fA-F]{40}$")
).count()
print(f"Records with invalid address format: {invalid_addresses}")

In [None]:
# Check for duplicates
duplicates = raw_transfers_df.groupBy("transaction_hash", "from_address", "to_address", "value") \
    .count().filter(col("count") > 1).count()
print(f"Duplicate transfers: {duplicates}")

In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Transform Raw Data to Curated Format

# COMMAND ----------

# UDF to get token metadata
def get_token_info(contract_address):
    metadata = token_metadata_broadcast.value.get(contract_address.lower())
    if metadata:
        return metadata["symbol"], metadata["name"], metadata["decimals"]
    return "UNKNOWN", "Unknown Token", 18

get_token_info_udf = udf(get_token_info, StructType([
    StructField("symbol", StringType(), True),
    StructField("name", StringType(), True),
    StructField("decimals", IntegerType(), True)
]))

In [None]:
# Transform raw transfers to curated format
curated_transfers_df = raw_transfers_df \
    .withColumn("timestamp_dt", to_timestamp(col("timestamp"))) \
    .withColumn("date", date_format(col("timestamp_dt"), "yyyy-MM-dd")) \
    .withColumn("hour", hour(col("timestamp_dt"))) \
    .withColumn("contract_address_lower", lower(col("contract_address"))) \
    .withColumn("token_info", get_token_info_udf(col("contract_address_lower"))) \
    .withColumn("token_symbol", col("token_info.symbol")) \
    .withColumn("token_name", col("token_info.name")) \
    .withColumn("token_decimals", col("token_info.decimals")) \
    .withColumn("value_normalized", col("value") / pow(10, col("token_decimals"))) \
    .withColumn("transfer_type", 
               when(col("from_address") == "0x0000000000000000000000000000000000000000", "mint")
               .when(col("to_address") == "0x0000000000000000000000000000000000000000", "burn")
               .otherwise("transfer")) \
    .withColumn("gas_cost_eth", when(col("gas_price").isNotNull() & col("gas_used").isNotNull(),
                                   col("gas_price") * col("gas_used") / pow(10, 18))) \
    .drop("token_info")

In [None]:
# Filter out unknown tokens and invalid transfers
curated_transfers_df = curated_transfers_df.filter(
    (col("token_symbol") != "UNKNOWN") & 
    (col("value_normalized") > 0)
)

print("Curated transfers schema:")
curated_transfers_df.printSchema()
print(f"Curated transfers count: {curated_transfers_df.count()}")

In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Add Transfer Analytics

# COMMAND ----------

# Add transfer size categories
curated_transfers_df = curated_transfers_df \
    .withColumn("transfer_size_category",
               when(col("value_normalized") < 100, "Small")
               .when(col("value_normalized") < 10000, "Medium")
               .when(col("value_normalized") < 100000, "Large")
               .otherwise("Whale")) \
    .withColumn("is_large_transfer", col("value_normalized") >= 100000)

In [None]:
# Add time-based features
curated_transfers_df = curated_transfers_df \
    .withColumn("day_of_week", dayofweek(col("timestamp_dt"))) \
    .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), True).otherwise(False)) \
    .withColumn("time_period",
               when(col("hour").between(0, 5), "Night")
               .when(col("hour").between(6, 11), "Morning")
               .when(col("hour").between(12, 17), "Afternoon")
               .otherwise("Evening"))

In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Write Curated Data

# COMMAND ----------

# Write curated transfers to delta table
curated_transfers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("date", "token_symbol") \
    .save("Tables/curated/erc20_transfers_curated")

print("Curated ERC20 transfer data written successfully")

In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Create Aggregated Views

# COMMAND ----------

# Token daily aggregations
daily_token_agg = curated_transfers_df \
    .groupBy("date", "token_symbol", "token_name") \
    .agg(
        count("*").alias("transfer_count"),
        countDistinct("from_address").alias("unique_senders"),
        countDistinct("to_address").alias("unique_receivers"),
        countDistinct("transaction_hash").alias("unique_transactions"),
        sum("value_normalized").alias("total_volume"),
        avg("value_normalized").alias("avg_transfer_size"),
        min("value_normalized").alias("min_transfer"),
        max("value_normalized").alias("max_transfer"),
        stddev("value_normalized").alias("transfer_size_stddev"),
        sum(when(col("transfer_size_category") == "Whale", 1).otherwise(0)).alias("whale_transfers"),
        sum(when(col("transfer_type") == "mint", col("value_normalized")).otherwise(0)).alias("minted_amount"),
        sum(when(col("transfer_type") == "burn", col("value_normalized")).otherwise(0)).alias("burned_amount"),
        avg("gas_cost_eth").alias("avg_gas_cost_eth")
    ) \
    .withColumn("volume_category",
               when(col("total_volume") < 1000000, "Low")
               .when(col("total_volume") < 10000000, "Medium")
               .when(col("total_volume") < 100000000, "High")
               .otherwise("Very High"))

In [None]:
# Hourly aggregations for real-time dashboard
hourly_token_agg = curated_transfers_df \
    .groupBy("date", "hour", "token_symbol") \
    .agg(
        count("*").alias("transfer_count"),
        sum("value_normalized").alias("total_volume"),
        avg("value_normalized").alias("avg_transfer_size"),
        countDistinct("from_address").alias("unique_senders"),
        countDistinct("to_address").alias("unique_receivers"),
        sum(when(col("is_large_transfer"), 1).otherwise(0)).alias("large_transfers")
    )

In [None]:
# Address activity aggregations
daily_address_agg = curated_transfers_df \
    .withColumn("address", col("from_address")) \
    .withColumn("flow_type", lit("outbound")) \
    .select("date", "address", "token_symbol", "value_normalized", "flow_type") \
    .union(
        curated_transfers_df \
        .withColumn("address", col("to_address")) \
        .withColumn("flow_type", lit("inbound")) \
        .select("date", "address", "token_symbol", "value_normalized", "flow_type")
    ) \
    .groupBy("date", "address", "token_symbol") \
    .agg(
        sum(when(col("flow_type") == "inbound", col("value_normalized")).otherwise(0)).alias("inbound_volume"),
        sum(when(col("flow_type") == "outbound", col("value_normalized")).otherwise(0)).alias("outbound_volume"),
        count(when(col("flow_type") == "inbound", 1)).alias("inbound_count"),
        count(when(col("flow_type") == "outbound", 1)).alias("outbound_count")
    ) \
    .withColumn("net_volume", col("inbound_volume") - col("outbound_volume")) \
    .withColumn("total_activity", col("inbound_volume") + col("outbound_volume")) \
    .filter(col("total_activity") > 1000)  # Filter for significant activity

In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Write Aggregated Data

# COMMAND ----------

# Write daily token aggregations
daily_token_agg.write \
    .format("delta") \
    .mode("overwrite") \
    .save("Tables/aggregated/erc20_daily_tokens")

# Write hourly token aggregations
hourly_token_agg.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("date") \
    .save("Tables/aggregated/erc20_hourly_tokens")

# Write address activity aggregations
daily_address_agg.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("date") \
    .save("Tables/aggregated/erc20_address_activity")

print("All aggregated ERC20 data written successfully")

In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Cross-Token Analysis

# COMMAND ----------

# Token correlation analysis
token_correlation = daily_token_agg \
    .select("date", "token_symbol", "total_volume") \
    .groupBy("date") \
    .pivot("token_symbol") \
    .agg(first("total_volume"))

# Market dominance analysis
market_share = daily_token_agg \
    .withColumn("date_volume", sum("total_volume").over(Window.partitionBy("date"))) \
    .withColumn("market_share", col("total_volume") / col("date_volume") * 100) \
    .select("date", "token_symbol", "total_volume", "market_share", "transfer_count")

market_share.write \
    .format("delta") \
    .mode("overwrite") \
    .save("Tables/aggregated/erc20_market_share")

In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Summary and Validation

# COMMAND ----------

print("=== TRANSFORMATION SUMMARY ===")
print(f"Raw transfers processed: {raw_transfers_df.count()}")
print(f"Curated transfers created: {curated_transfers_df.count()}")
print(f"Daily token aggregations: {daily_token_agg.count()}")
print(f"Hourly token aggregations: {hourly_token_agg.count()}")
print(f"Address activity records: {daily_address_agg.count()}")

print("\n=== TOKEN SUMMARY (Last 7 Days) ===")
recent_summary = daily_token_agg \
    .filter(col("date") >= date_sub(current_date(), 7)) \
    .groupBy("token_symbol") \
    .agg(
        sum("total_volume").alias("weekly_volume"),
        sum("transfer_count").alias("weekly_transfers"),
        avg("avg_transfer_size").alias("avg_transfer_size")
    ) \
    .orderBy(desc("weekly_volume"))

recent_summary.show()

print("\n=== TOP ADDRESSES BY ACTIVITY ===")
top_addresses = daily_address_agg \
    .filter(col("date") == date_sub(current_date(), 1)) \
    .orderBy(desc("total_activity")) \
    .limit(10)

top_addresses.show(truncate=False)


In [None]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## Create Views for Dashboard

# COMMAND ----------

# Create temporary views for dashboard queries
curated_transfers_df.createOrReplaceTempView("erc20_transfers_realtime")
daily_token_agg.createOrReplaceTempView("erc20_daily_summary")
hourly_token_agg.createOrReplaceTempView("erc20_hourly_summary")
market_share.createOrReplaceTempView("erc20_market_share")

print("Views created successfully for dashboard queries")


In [None]:
# COMMAND ----------

# MAGIC %sql
# MAGIC -- Optimize delta tables for better query performance
# MAGIC OPTIMIZE delta.`Tables/curated/erc20_transfers_curated` ZORDER BY (timestamp_dt, token_symbol);
# MAGIC OPTIMIZE delta.`Tables/aggregated/erc20_daily_tokens` ZORDER BY (date, token_symbol);
# MAGIC OPTIMIZE delta.`Tables/aggregated/erc20_hourly_tokens` ZORDER BY (date, hour, token_symbol);

# COMMAND ----------

# MAGIC %md
# MAGIC ## Data Quality Metrics Export

# COMMAND ----------

# Create data quality metrics for monitoring
data_quality_metrics = spark.createDataFrame([
    ("erc20_transfers", "record_count", float(curated_transfers_df.count())),
    ("erc20_transfers", "unique_tokens", float(curated_transfers_df.select("token_symbol").distinct().count())),
    ("erc20_transfers", "avg_daily_volume", float(daily_token_agg.agg(avg("total_volume")).collect()[0][0] or 0)),
    ("erc20_transfers", "data_freshness_hours", float(24))  # Placeholder - would calculate actual freshness
], ["table_name", "metric_name", "metric_value"])

data_quality_metrics.write \
    .format("delta") \
    .mode("overwrite") \
    .save("Tables/monitoring/data_quality_metrics")

print("Data quality metrics exported for monitoring")