## Utility Functions

In [0]:
def write_latest(table_name):
    def _write(batch_df, batch_id):
        (
            batch_df
            .write
            .format("delta")
            .mode("overwrite")
            .option("overwriteSchema", "true")
            .saveAsTable(table_name)
        )
    return _write


In [0]:
import os

def get_eventhub_conf(eventhub_name):
    raw_conn_str = f"{os.environ['EVENTHUB_CONN']}EntityPath={eventhub_name}"

    encrypted_conf = None
    try:
        encrypted_conf = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(raw_conn_str)
    except Exception as e:
        print(f"Encryption failed: {e}")

    if encrypted_conf is None:
        raise RuntimeError("Encrypted EventHub connection string is None")

    return {
        "eventhubs.connectionString": encrypted_conf
    }

## Reading the Stream of data

In [0]:
NEWS_EVENT_HUB_NAME = "news-eventhub"
news_eventhub_conf = get_eventhub_conf(NEWS_EVENT_HUB_NAME)

news_raw = (
    spark.readStream
         .format("eventhubs")
         .options(**news_eventhub_conf)
         .load()
)

debug_news_df = news_raw.selectExpr(
    "cast(body as string) as raw_json"
)

display(debug_news_df)

raw_json
"{""symbol"": ""ETH"", ""name"": ""Ethereum"", ""description"": ""Tom Lee defended ETH treasury drawdowns as structural, not a flaw."", ""prediction"": ""Positive"", ""published_at"": ""2026-02-07T14:30:55Z""}"
"{""symbol"": ""BTC"", ""name"": ""Bitcoin"", ""description"": ""Nic Carter showed most Bitcoin devs see quantum risk as distant, but legitimate."", ""prediction"": ""Neutral"", ""published_at"": ""2026-02-07T14:30:55Z""}"
"{""symbol"": ""DOGE"", ""name"": ""Dogecoin"", ""description"": ""Major indicators like Bollinger Bands hint the downtrend is almost over for Dogecoin."", ""prediction"": ""Positive"", ""published_at"": ""2026-02-07T14:26:00Z""}"
"{""symbol"": ""ETH"", ""name"": ""Ethereum"", ""description"": ""Ethereum (ETH) had one of its sharpest historic declines over the past 10 days, shedding 40% of its value and briefly sliding below $2,000."", ""prediction"": ""Negative"", ""published_at"": ""2026-02-07T14:22:21Z""}"
"{""symbol"": ""BTC"", ""name"": ""Bitcoin"", ""description"": ""Bitcoin (BTC) price is hovering near $68,890, but the Sharpe Ratio shows intact caution, indicating rising risk relative to returns, slipping into a historical bear-market zone."", ""prediction"": ""Negative"", ""published_at"": ""2026-02-07T14:18:40Z""}"
"{""symbol"": ""BTC"", ""name"": ""Bitcoin"", ""description"": ""Scott Melker discussed Bitcoin's resilience through past crashes but also highlighted potential systemic threats to its stability."", ""prediction"": ""Neutral"", ""published_at"": ""2026-02-07T14:15:43Z""}"
"{""symbol"": ""ONDO"", ""name"": ""Ondo"", ""description"": ""21Shares advanced an Ondo ETF filing, and the token price shows signs of recovery."", ""prediction"": ""Positive"", ""published_at"": ""2026-02-07T14:11:05Z""}"
"{""symbol"": ""BTC"", ""name"": ""Bitcoin"", ""description"": ""Debate ensues about whether the crypto market has crashed or is merely experiencing a dip, with Bitcoin's behavior surprising analysts."", ""prediction"": ""Neutral"", ""published_at"": ""2026-02-07T14:10:40Z""}"
"{""symbol"": ""SOL"", ""name"": ""Solana"", ""description"": ""Solana (SOL) staged a sharp intraday recovery, posting a 12% daily gain despite lingering market uncertainty, though long-term holder buying momentum is slowing."", ""prediction"": ""Positive"", ""published_at"": ""2026-02-07T14:06:53Z""}"
"{""symbol"": ""SHIB"", ""name"": ""Shiba Inu"", ""description"": ""The Shiba Inu team issued a crucial wallet security notice to the SHIB community due to a new emerging threat."", ""prediction"": ""Negative"", ""published_at"": ""2026-02-07T14:05:00Z""}"


In [0]:
PRICE_EVENT_HUB_NAME = "price-eventhub"
price_eventhub_conf = get_eventhub_conf(PRICE_EVENT_HUB_NAME)

price_raw = (
    spark.readStream
         .format("eventhubs")
         .options(**price_eventhub_conf)
         .load()
)

debug_price_df = price_raw.selectExpr(
    "cast(body as string) as raw_json"
)

display(debug_price_df)


raw_json
"{""symbol"": ""ADA"", ""price"": 0.27381256039175883, ""volume_24h"": 1061909859.6656979, ""percent_change_1h"": -0.37771354, ""percent_change_24h"": 1.03673843, ""percent_change_7d"": -6.08514992, ""cmc_rank"": 11, ""last_updated"": ""2026-02-08T14:31:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""AVAX"", ""price"": 9.243254527448247, ""volume_24h"": 304212062.75301474, ""percent_change_1h"": -0.15117315, ""percent_change_24h"": 1.29899695, ""percent_change_7d"": -7.32292671, ""cmc_rank"": 24, ""last_updated"": ""2026-02-08T14:30:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""BNB"", ""price"": 643.7943907515754, ""volume_24h"": 1860304310.6880085, ""percent_change_1h"": -0.08611107, ""percent_change_24h"": 0.99016663, ""percent_change_7d"": -14.82577413, ""cmc_rank"": 5, ""last_updated"": ""2026-02-08T14:31:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""BTC"", ""price"": 71184.8649707489, ""volume_24h"": 44447605920.33668, ""percent_change_1h"": -0.31340765, ""percent_change_24h"": 3.23136686, ""percent_change_7d"": -8.54772637, ""cmc_rank"": 1, ""last_updated"": ""2026-02-08T14:30:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""CMC20"", ""price"": 145.87145996431073, ""volume_24h"": 5510111.44739485, ""percent_change_1h"": 0.0117525, ""percent_change_24h"": 3.32337675, ""percent_change_7d"": -9.23952748, ""cmc_rank"": 8834, ""last_updated"": ""2026-02-08T14:30:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""DOGE"", ""price"": 0.097937677332417, ""volume_24h"": 1135918997.6999178, ""percent_change_1h"": -0.3450108, ""percent_change_24h"": 1.82412894, ""percent_change_7d"": -6.6335076, ""cmc_rank"": 9, ""last_updated"": ""2026-02-08T14:30:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""DOT"", ""price"": 1.3628771096147025, ""volume_24h"": 127304972.57746078, ""percent_change_1h"": -0.07675709, ""percent_change_24h"": 1.39123517, ""percent_change_7d"": -10.12536904, ""cmc_rank"": 33, ""last_updated"": ""2026-02-08T14:30:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""ENS"", ""price"": 6.023535923379377, ""volume_24h"": 26155480.18624322, ""percent_change_1h"": -0.41945786, ""percent_change_24h"": 0.06160907, ""percent_change_7d"": -12.10788156, ""cmc_rank"": 128, ""last_updated"": ""2026-02-08T14:30:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""ETH"", ""price"": 2118.629549450064, ""volume_24h"": 32943822587.794125, ""percent_change_1h"": -0.62566569, ""percent_change_24h"": 3.75026439, ""percent_change_7d"": -10.33118284, ""cmc_rank"": 2, ""last_updated"": ""2026-02-08T14:30:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"
"{""symbol"": ""LINK"", ""price"": 8.92583523371451, ""volume_24h"": 720055017.7729955, ""percent_change_1h"": -0.72390771, ""percent_change_24h"": 1.69884009, ""percent_change_7d"": -7.94144795, ""cmc_rank"": 16, ""last_updated"": ""2026-02-08T14:31:00.000Z"", ""ingested_at"": ""2026-02-08T14:31:43Z""}"


## Clean & Transform the Data

In [0]:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import (
    StructType, StructField,
    StringType, TimestampType
)

news_schema = StructType([
    StructField("symbol", StringType(), False),
    StructField("name", StringType(), True),
    StructField("description", StringType(), True),
    StructField("prediction", StringType(), True),
    StructField("published_at", StringType(), True)
])

df_news = (
    news_raw
      .select(from_json(col("body").cast("string"), news_schema).alias("data"))
      .select("data.*")
      .filter(col("symbol") != 'N/A')
      .filter(col("symbol").isNotNull())
)


In [0]:
df_news_bronze = df_news.dropDuplicates(subset=['symbol', 'description'])

display(df_news_bronze)


symbol,name,description,prediction,published_at
DOGE,Dogecoin,Major indicators like Bollinger Bands hint the downtrend is almost over for Dogecoin.,Positive,2026-02-07T14:26:00Z
ETH,Ethereum,"Ethereum (ETH) had one of its sharpest historic declines over the past 10 days, shedding 40% of its value and briefly sliding below $2,000.",Negative,2026-02-07T14:22:21Z
SHIB,Shiba Inu,The Shiba Inu team issued a crucial wallet security notice to the SHIB community due to a new emerging threat.,Negative,2026-02-07T14:05:00Z
BTC,Bitcoin,Scott Melker discussed Bitcoin's resilience through past crashes but also highlighted potential systemic threats to its stability.,Neutral,2026-02-07T14:15:43Z
ENS,Ethereum Name Service,ENS drops its L2.,Negative,2026-02-07T14:02:59Z
BTC,Bitcoin,"Discussion on the end of the monetary system and collapsing fiat currencies, implying a positive outlook for Bitcoin as an alternative.",Positive,2026-02-07T14:01:22Z
BTC,Bitcoin,Macro investor Jordi Visser discusses why Bitcoin is selling off and capital shifting towards scarce assets.,Negative,2026-02-07T14:01:13Z
ETH,Ethereum,"Vitalik Buterin calls for L2 shift as Ethereum L1 scales, indicating continued development and scaling efforts.",Positive,2026-02-07T14:02:59Z
BTC,Bitcoin,"Debate ensues about whether the crypto market has crashed or is merely experiencing a dip, with Bitcoin's behavior surprising analysts.",Neutral,2026-02-07T14:10:40Z
ETH,Ethereum,"Tom Lee defended ETH treasury drawdowns as structural, not a flaw.",Positive,2026-02-07T14:30:55Z


In [0]:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

price_schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("volume_24h", DoubleType(), True),
    StructField("percent_change_1h", DoubleType(), True),
    StructField("percent_change_24h", DoubleType(), True),
    StructField("percent_change_7d", DoubleType(), True),
    StructField("cmc_rank", IntegerType(), True),
    StructField("last_updated", StringType(), True),
    StructField("ingested_at", StringType(), True)
])

df_price = (
    price_raw
      .select(from_json(col("body").cast("string"), price_schema).alias("data"))
      .select("data.*")
      .filter(col("symbol").isNotNull())
      .filter(col("price").isNotNull())
)



In [0]:
df_price_bronze = df_price.dropDuplicates(subset=['symbol', 'last_updated'])

## Storing Delta tables in Bronze layer

In [0]:
news_query = (
    df_news_bronze
      .writeStream
      .foreachBatch(write_latest("crypto_sentiment_bronze_news"))
      .option("checkpointLocation", "/chk/crypto_sentiment_bronze_news")
      .start()
)

In [0]:
price_query = (
    df_price_bronze
      .writeStream
      .foreachBatch(write_latest("crypto_sentiment_bronze_price"))
      .option("checkpointLocation", "/chk/crypto_sentiment_bronze_price")
      .start()
)

## Verifying the records

In [0]:
from delta.tables import DeltaTable

try:   
    dt = DeltaTable.forName(spark, "crypto_sentiment_bronze_news")
     # Returns metadata including numRecords
    print(dt.detail().select("numFiles").collect()) 
except Exception as e:
    print(f"Error: {e}")

[Row(numFiles=7)]


In [0]:
try:   
    dt = DeltaTable.forName(spark, "crypto_sentiment_bronze_price")
     # Returns metadata including numRecords
    print(dt.detail().select("numFiles").collect()) 
except Exception as e:
    print(f"Error: {e}")

[Row(numFiles=15)]


In [0]:
# %sql
# drop table crypto_sentiment_silver_price;

# drop table crypto_sentiment_bronze_news;

In [0]:
# dbutils.fs.rm("/chk/crypto_sentiment_silver_price", recurse=True)
# dbutils.fs.rm("/chk/crypto_sentiment_bronze_news", recurse=True)