In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import col, udf, array_join
from pyspark.sql.types import StringType
import html
import re

# Initialize Spark session
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

# Load CSV file into DataFrame
file_path = "/FileStore/tables/cleaned_youtube_comments.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Inspect the schema of the DataFrame
df.printSchema()

# Function to clean HTML tags and decode characters
def clean_comment(comment):
    if comment is None:
        return ""
    # Decode HTML entities
    comment = html.unescape(comment)
    # Remove HTML tags
    comment = re.sub(r'<.*?>', '', comment)
    return comment

# Register UDF
clean_comment_udf = udf(clean_comment, StringType())

# Apply UDF to clean comments
df_cleaned = df.withColumn("cleaned_comment", clean_comment_udf(col("comment"))).cache()

# Tokenization
tokenizer = Tokenizer(inputCol="cleaned_comment", outputCol="words")
wordsData = tokenizer.transform(df_cleaned).cache()

# Stop word removal
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filteredData = remover.transform(wordsData).cache()

# Join the elements of the array into a single string
finalData = filteredData.withColumn("filtered_words_str", array_join(col("filtered_words"), " ")).cache()

# Remove duplicates in the final DataFrame
finalDataDistinct = finalData.dropDuplicates()

# Define output path
output_path = "/FileStore/tables/processed_youtube_comments.csv"

# Remove existing directory if exists
try:
    dbutils.fs.rm(output_path, recurse=True)
except Exception as e:
    print(f"Error removing existing directory: {e}")

# Save processed data to CSV
try:
    finalDataDistinct.select("author", "comment", "filtered_words_str", "published_at").write.mode("overwrite").csv(output_path, header=True)
    print("Data saved successfully.")
except Exception as e:
    print(f"Error writing data to CSV: {e}")

# Verify record counts at each stage
initial_count = df.count()
cleaned_count = df_cleaned.count()
tokenized_count = wordsData.count()
filtered_count = filteredData.count()
final_count = finalDataDistinct.count()

print(f"Initial record count: {initial_count}")
print(f"Record count after cleaning: {cleaned_count}")
print(f"Record count after tokenization: {tokenized_count}")
print(f"Record count after stop word removal: {filtered_count}")
print(f"Final distinct record count: {final_count}")


root
 |-- author: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- published_at: string (nullable = true)

Data saved successfully.
Initial record count: 8489
Record count after cleaning: 8489
Record count after tokenization: 8489
Record count after stop word removal: 8489
Final distinct record count: 8479


In [0]:
pip install nltk

Python interpreter will be restarted.
Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
Collecting tqdm
  Downloading tqdm-4.66.4-py3-none-any.whl (78 kB)
Collecting regex>=2021.8.3
  Downloading regex-2024.5.15-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (774 kB)
Installing collected packages: tqdm, regex, nltk
Successfully installed nltk-3.8.1 regex-2024.5.15 tqdm-4.66.4
Python interpreter will be restarted.


In [0]:
processed_file_path = "/FileStore/tables/processed_youtube_comments.csv"
processed_df = spark.read.csv(processed_file_path, header=True, inferSchema=True)

# Display the schema and a few rows to verify the data
processed_df.printSchema()
processed_df.show(5)

# Import necessary libraries for sentiment analysis
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Download VADER lexicon
nltk.download('vader_lexicon')

# Initialize VADER sentiment analyzer
sid = SentimentIntensityAnalyzer()

# Define a function to classify sentiment
def analyze_sentiment(text):
    if text is None:
        return "neutral"
    scores = sid.polarity_scores(text)
    if scores['compound'] >= 0.05:
        return "positive"
    elif scores['compound'] <= -0.05:
        return "negative"
    else:
        return "neutral"

# Register the function as a UDF
analyze_sentiment_udf = udf(analyze_sentiment, StringType())

# Apply the UDF to classify sentiment
sentiment_df = processed_df.withColumn("sentiment", analyze_sentiment_udf(col("filtered_words_str")))

# Show some examples
sentiment_df.select("author", "comment", "filtered_words_str", "sentiment").show(5)

# Group by sentiment and count the occurrences
sentiment_counts = sentiment_df.groupBy("sentiment").count()

# Show the aggregated result
sentiment_counts.show()

root
 |-- author: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- filtered_words_str: string (nullable = true)
 |-- published_at: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------+
|              author|             comment|  filtered_words_str|        published_at|
+--------------------+--------------------+--------------------+--------------------+
|     @jameswalsh2427|The Best Presente...|best presenter yo...|2024-06-09T00:47:58Z|
|@circleinforthecu...|i hope we see stu...|hope see stuff ba...|2024-06-01T12:39:58Z|
|       @kristversoza|"<a href=""https:...|"30:01 get rick r...|2024-05-23T11:11:10Z|
|           @nuk4lear|Me: ferrofluid lo...|me: ferrofluid lo...|2024-04-26T02:19:52Z|
|@BeautifulRecitat...|Sir can you pleas...|sir please give u...|2024-04-15T15:05:23Z|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


+--------------------+--------------------+--------------------+---------+
|              author|             comment|  filtered_words_str|sentiment|
+--------------------+--------------------+--------------------+---------+
|     @jameswalsh2427|The Best Presente...|best presenter yo...| positive|
|@circleinforthecu...|i hope we see stu...|hope see stuff ba...| positive|
|       @kristversoza|"<a href=""https:...|"30:01 get rick r...|  neutral|
|           @nuk4lear|Me: ferrofluid lo...|me: ferrofluid lo...| positive|
|@BeautifulRecitat...|Sir can you pleas...|sir please give u...| positive|
+--------------------+--------------------+--------------------+---------+
only showing top 5 rows

+---------+-----+
|sentiment|count|
+---------+-----+
| positive| 3740|
|  neutral| 3660|
| negative| 1079|
+---------+-----+



# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import col, udf, array_join
from pyspark.sql.types import StringType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import html
import re

# Initialize Spark session
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

# Load CSV file into DataFrame
file_path = "/FileStore/tables/cleaned_youtube_comments.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Function to clean HTML tags and decode characters
def clean_comment(comment):
    if comment is None:
        return ""
    # Decode HTML entities
    comment = html.unescape(comment)
    # Remove HTML tags
    comment = re.sub(r'<.*?>', '', comment)
    return comment

# Register UDF for cleaning comments
clean_comment_udf = udf(clean_comment, StringType())

# Apply UDF to clean comments
df_cleaned = df.withColumn("cleaned_comment", clean_comment_udf(col("comment")))

# Tokenization
tokenizer = Tokenizer(inputCol="cleaned_comment", outputCol="words")
wordsData = tokenizer.transform(df_cleaned)

# Stop word removal
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filteredData = remover.transform(wordsData)

# Join the elements of the array into a single string
finalData = filteredData.withColumn("filtered_words_str", array_join(col("filtered_words"), " "))

# Sentiment analysis function using VaderSentiment
def analyze_sentiment(comment):
    analyzer = SentimentIntensityAnalyzer()
    scores = analyzer.polarity_scores(comment)
    return scores['compound']

# Register UDF for sentiment analysis
analyze_sentiment_udf = udf(analyze_sentiment, StringType())

# Apply the sentiment analysis UDF to the filtered comments
sentiment_df = finalData.withColumn("sentiment_score", analyze_sentiment_udf(col("filtered_words_str")))

# Calculate overall sentiment statistics
sentiment_stats = sentiment_df.agg({
    "sentiment_score": "avg",
    "sentiment_score": "min",
    "sentiment_score": "max"
}).first()

# Print the correct aggregated values
print(f"Average Sentiment Score: {sentiment_stats['avg(sentiment_score)']}")
print(f"Minimum Sentiment Score: {sentiment_stats['min(sentiment_score)']}")
print(f"Maximum Sentiment Score: {sentiment_stats['max(sentiment_score)']}")