In [0]:
# Install the required libraries
%pip install azure-eventhub
%pip install nest-asyncio
%pip install vaderSentiment

import asyncio
import nest_asyncio
from azure.eventhub.aio import EventHubConsumerClient
from pyspark.sql.functions import col, udf, avg
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, TimestampType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.sql import SparkSession

# Install nest_asyncio to allow the event loop to run in a Databricks notebook
nest_asyncio.apply()

# Create Spark Session if not already created
spark = SparkSession.builder.appName("EventHubSentimentAnalysis").getOrCreate()

# Setting Up the Event Hub Consumer
CONNECTION_STR = "Endpoint=sb://youtube-comments-namespace.servicebus.windows.net/;SharedAccessKeyName=ListenPolicy;SharedAccessKey=ACCESS_KEY;EntityPath=youtube-comment-stream"
EVENTHUB_NAME = "youtube-comment-stream"
CONSUMER_GROUP = "$Default"

# Define schema for incoming Event Hub messages
schema = StructType([
    StructField("channelId", StringType(), True),
    StructField("videoId", StringType(), True),
    StructField("textDisplay", StringType(), True),
    StructField("authorDisplayName", StringType(), True),
    StructField("likeCount", IntegerType(), True),
    StructField("publishedAt", TimestampType(), True),
])

# VADER Sentiment Analyzer
analyzer = SentimentIntensityAnalyzer()

# Delta Lake path
output_delta_path = "/mnt/delta/youtube_comments"

# Define the message limit if you want to stop after a specific number
message_limit = 1000
message_count = 0

# UDF for sentiment analysis
def get_sentiment(text):
    sentiment = analyzer.polarity_scores(text)
    return sentiment['compound']  # Only return the compound score

get_sentiment_udf = udf(get_sentiment, StringType())

# Async function to fetch and process each message
async def on_event(partition_context, event):
    global message_count
    message_count += 1

    event_data = event.body_as_str()  # Extract event data
    print(f"Received message {message_count} from partition {partition_context.partition_id}: {event_data}")
    
    # Parse the received message into a DataFrame for processing
    received_df = spark.read.json(spark.sparkContext.parallelize([event_data]), schema=schema)

    # Data cleaning: Remove null values and duplicates
    cleaned_comments_df = received_df.filter(col("textDisplay").isNotNull()).dropDuplicates()

    # Apply Sentiment Analysis on comments
    sentiment_comments_df = cleaned_comments_df.withColumn("sentiment", get_sentiment_udf(col("textDisplay")))

    # Write the DataFrame to Delta table
    sentiment_comments_df.write.format("delta").mode("append").save(output_delta_path)
    print(f"Data written to Delta Lake for message {message_count}")

    # Update checkpoint after processing each message
    await partition_context.update_checkpoint(event)
    
    # Check if message limit is reached, stop receiving more events
    if message_count >= message_limit:
        print(f"Message limit {message_limit} reached. Stopping event processing.")
        raise asyncio.CancelledError()

# Function to handle events
async def receive_events():
    client = EventHubConsumerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        consumer_group=CONSUMER_GROUP,
        eventhub_name=EVENTHUB_NAME
    )

    try:
        # Receive events continuously and process them in real-time
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # Start from the earliest available event
            starting_position_inclusive=True
        )
    except asyncio.CancelledError:
        print("Event receiving stopped due to message limit.")
    finally:
        print("Closing Event Hub consumer.")
        await client.close()

# Main function
async def main():
    await receive_events()

# Start the process
asyncio.run(main())
