# NBA Best Performance using Real-time Data Processing with Azure Databricks (and Event Hubs)

This notebook demonstrates a real-time data processing workflow in Databricks using Structured Streaming to ingest data from Event Hubs. Designed a Bronze-Silver-Gold architecture to refine and transform data to find the best performance from a basketball game, with an automated email alert to notify stakeholders of the top results.

- Data Sources: Streaming data from IoT devices or social media feeds. (Simulated in Event Hubs)
- Ingestion: Azure Event Hubs for capturing real-time data.
- Processing: Azure Databricks for stream processing using Structured Streaming.
- Storage: Processed data stored Azure Data Lake (Delta Format).

### Azure Services Required
- Databricks Workspace
- Azure Data Lake Storage
- Azure Event Hub

### Azure Databricks Configuration Required
- Single Node Compute Cluster: `12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)`
- Maven Library installed on Compute Cluster: `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22`

## Sample Data:
### {
###     "gameId": "G78901",
###     "date": "2025-03-18",
###     "team_stats": {
###         "team": "Lakers",
###         "total_points": 120,
###         "total_assists": 30,
###         "total_rebounds": 55,
###         "total_steals": 8,
###         "total_blocks": 7,
###         "total_turnovers": 9,
###         "total_fouls": 17
###     },
###     "players": [
###         {"player": "LeBron James", "points": 32, "assists": 8, "rebounds": 10, "steals": 2, "blocks": 2, "fouls": 2, "turnovers": 2},
###         {"player": "Anthony Davis", "points": 25, "assists": 4, "rebounds": 12, "steals": 1, "blocks": 4, "fouls": 3, "turnovers": 1},
###         {"player": "D'Angelo Russell", "points": 18, "assists": 9, "rebounds": 4, "steals": 2, "blocks": 1, "fouls": 2, "turnovers": 2},
###         {"player": "Austin Reaves", "points": 16, "assists": 6, "rebounds": 5, "steals": 3, "blocks": 0, "fouls": 3, "turnovers": 2}
###     ],
###     "quarter": 4,
###     "time": "00:00"
### }


%md
The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas

In [0]:
try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS hive_metastore.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS hive_metastore.silver;")
except:
    print('check if silver schema already exists')

try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS hive_metadata.gold;")
except:
    print('check if gold schema already exists')

%md
Importing the libraries.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import smtplib
from email.mime.text import MIMEText


%md
#### Bronze Layer

%md
* Set up Azure Event hubs connection string.
* Defining JSON Schema
* Reading and writing the stream to Bronze Layer

In [0]:
# Event Hubs Configuration
connectionString = "Endpoint=sb://jack-namespace-demo.servicebus.windows.net/;SharedAccessKeyName=databricks;SharedAccessKey=##############################################;EntityPath=eh_nba"

ehConf = {
    "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
}

In [0]:
#JSON Schema
# No JSON parsing in Bronze layer, store raw data
player_schema = ArrayType(StructType([
    StructField("player", StringType()),
    StructField("points", IntegerType()),
    StructField("assists", IntegerType()),
    StructField("rebounds", IntegerType()),
    StructField("steals", IntegerType()),
    StructField("blocks", IntegerType()),
    StructField("fouls", IntegerType()),
    StructField("turnovers", IntegerType())
]))

team_stats_schema = StructType([
    StructField("team", StringType()),
    StructField("total_points", IntegerType()),
    StructField("total_assists", IntegerType()),
    StructField("total_rebounds", IntegerType()),
    StructField("total_steals", IntegerType()),
    StructField("total_blocks", IntegerType()),
    StructField("total_turnovers", IntegerType()),
    StructField("total_fouls", IntegerType())
])

json_schema = StructType([
    StructField("gameId", StringType()),
    StructField("date", StringType()),
    StructField("team_stats", team_stats_schema),
    StructField("players", player_schema),
    StructField("quarter", IntegerType()),
    StructField("time", StringType())
])

In [0]:
# Read raw streaming data from Event Hubs

df_bronze = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load()

# Write to Bronze Table
df_bronze.writeStream \
    .option("checkpointLocation", "/mnt/checkpoints/bronze/nba") \
    .outputMode("append") \
    .format("delta") \
    .toTable("hive_metastore.bronze.nba")

df_bronze.display()

%md
Checking whether data is stored in Bronze NBA Table

In [0]:
%sql 

select * from hive_metastore.bronze.nba


###Silver Layer

%md
* Reading the stream from the bronze 
* eventID is added for dropping duplicates
* Data is flattened into player and team stats
* Writing the transformed stream into silver layer

In [0]:
# Read raw streaming data from Bronze Table
df_silver = spark.readStream \
    .format("delta") \
    .table("hive_metastore.bronze.nba") \
    .withColumn("body", col("body").cast("string")) \
    .withColumn("body", from_json(col("body"), json_schema)) \
    .withColumn("eventId", expr("uuid()")) \
    .select("eventId", "body.*", col("enqueuedTime").alias("timestamp")) \
    .dropDuplicates(["eventId"])

# Flatten data in Silver Layer
df_silver_flat = df_silver.withColumn("player", explode(col("players"))) \
    .select("eventId", "gameId", "date", "team_stats.team", "team_stats.total_points", "team_stats.total_assists", 
            "team_stats.total_rebounds", "team_stats.total_steals", "team_stats.total_blocks", 
            "team_stats.total_turnovers", "team_stats.total_fouls", "player.player", "player.points", 
            "player.assists", "player.rebounds", "player.steals", "player.blocks", "player.fouls", 
            "player.turnovers", "quarter", "time", "timestamp")

# Write to Silver Table
df_silver_flat.writeStream \
    .option("checkpointLocation", "/mnt/checkpoints/silver/nba_cleaned") \
    .outputMode("append") \
    .format("delta") \
    .toTable("hive_metastore.silver.nba_cleaned")

df_silver_flat.display()

Checking whether data is stored in Silver NBA Table

In [0]:
%sql 

select * from hive_metastore.silver.nba


###Gold Layer


* Reading from Silver Layer
* Aggregation 
* Email Notification
* Flag is added to send only the new results

In [0]:
# Aggregation in Gold Layer with Date-based Grouping
df_gold = df_silver_flat \
    .groupBy("date", "gameId", "team", "player") \
    .agg(
        sum("points").alias("total_points"),
        sum("assists").alias("total_assists"),
        sum("rebounds").alias("total_rebounds"),
        sum("steals").alias("total_steals"),
        sum("blocks").alias("total_blocks"),
        sum("turnovers").alias("total_turnovers"),
        sum("fouls").alias("total_fouls"),
        count("eventId").alias("game_count")
    ) \
    .withColumn("flag", lit(0)) \
    .withColumn(
        "performance_rating",
        col("total_points") * 1.5 +
        col("total_assists") * 1.2 +
        col("total_rebounds") * 1.3 +
        col("total_steals") * 2 +
        col("total_blocks") * 2 -
        col("total_turnovers") * 1.5
    )


def send_email(subject, body):
    sender_email = "###########"
    receiver_email = "##############"
    app_password = "tcxd kowt ppge eujr"
    
    msg = MIMEText(body)
    msg["Subject"] = subject
    msg["From"] = sender_email
    msg["To"] = receiver_email
    
    try:
        server = smtplib.SMTP("smtp.gmail.com", 587)
        server.starttls()
        server.login(sender_email, app_password)
        server.sendmail(sender_email, receiver_email, msg.as_string())
        server.quit()
    except Exception as e:
        print(f"Error sending email: {e}")

# Store Top Performer and Best Team
def log_star_performances(batch_df, batch_id):
    top_performer = batch_df.orderBy(col("performance_rating").desc()).limit(1)
    best_team = batch_df.groupBy("team").agg(sum("performance_rating").alias("team_score")).orderBy(col("team_score").desc()).limit(1)
    print(top_performer.schema)
    
    if top_performer.count() > 0 and best_team.count() > 0:
        top_player = top_performer.collect()[0]  # Use `.first()` instead of `.collect()[0]`
        best_team_stats = best_team.collect()[0]

        email_body = f"""
        NBA Best Performance for {top_player['date']}

        🏆 Top Performer: {top_player['player']} ({top_player['team']})
        Performance Rating: {top_player['performance_rating']}

        🔥 Best Team: {best_team_stats['team']}
    
        """


        send_email("NBA Daily Performance Report", email_body)





    # Write Top Performer and Best Team to Gold Tables
    top_performer.write.mode("overwrite").saveAsTable("hive_metastore.gold.nba_top_performer")
    best_team.write.mode("overwrite").saveAsTable("hive_metastore.gold.nba_best_team")

   # Write to a staging table before merging
    batch_df.write.mode("overwrite").saveAsTable("hive_metastore.gold.nba_staging")

    #Join Condition for Flag Updation
    spark.sql("""
    MERGE INTO hive_metastore.gold.nba g
    USING hive_metastore.gold.nba_staging t
    ON g.team = t.team AND g.date = t.date
    WHEN MATCHED THEN UPDATE SET g.flag = 1
    WHEN NOT MATCHED THEN INSERT *
    """)


# Use foreachBatch for Gold Table Writes
df_gold.writeStream \
    .foreachBatch(log_star_performances) \
    .outputMode("complete") \
    .start()

df_gold.display()




Checking whether data is stored in Gold NBA Tables

In [0]:
%sql
#Player Performance
select * from hive_metastore.gold.nba_top_performer
#Team Performance
select * from hive_metastore.gold.nba_best_team