In [1]:
import pandas as pd
import logging
import torch
from transformers import pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType, MapType, FloatType, StructType, StructField
from tqdm import tqdm  # Import tqdm

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("SentimentAnalysis") \
    .getOrCreate()

# Define the schema for the output of the UDF, including a debug message
schema = StructType([
    StructField("sentiment", StringType(), True),
    StructField("probabilities", MapType(StringType(), FloatType()), True),
    StructField("debug_message", StringType(), True)  # Add debug message field
])

logging.basicConfig(filename='sentiment_analysis.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

@pandas_udf(schema)
def get_sentiment_probabilities_udf(review_text_series):
    from transformers import pipeline
    import torch

    device = 0 if torch.cuda.is_available() else -1
    classifier = pipeline(
        "sentiment-analysis",
        model="distilbert-base-uncased-finetuned-sst-2-english",
        return_all_scores=True,
        framework="pt",
        device=device
    )

    sentiments = []
    probabilities_list = []
    debug_messages = []

    for review_text in tqdm(review_text_series, desc="Processing reviews", unit="review"):
        if review_text is None or not isinstance(review_text, str) or review_text.strip() == '':
            sentiments.append(None)
            probabilities_list.append(None)
            debug_messages.append("Input was None or empty")
            continue

        max_length = 512
        truncated_text = review_text[:max_length]

        try:
            # Perform sentiment analysis
            result = classifier(truncated_text)[0]  # List of dictionaries
            best_result = max(result, key=lambda x: x['score'])  # Get the highest score
            sentiment = best_result['label']
            probabilities = {item['label']: item['score'] for item in result}
            debug_messages.append(f"Success: {truncated_text[:30]} -> {sentiment}")
        except Exception as e:
            sentiment = None
            probabilities = None
            debug_messages.append(f"Error: {str(e)}")

        sentiments.append(sentiment)
        probabilities_list.append(probabilities)

    return pd.DataFrame({'sentiment': sentiments, 'probabilities': probabilities_list, 'debug_message': debug_messages})

In [4]:
# Read the CSV file into a Pandas DataFrame
df_pandas = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/data/rating.csv')

# Convert the Pandas DataFrame to a Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# Apply the Pandas UDF
df_with_sentiments = df_spark.withColumn(
    "sentiment_probabilities",
    get_sentiment_probabilities_udf(df_spark["review_text"])
)

# Extract 'sentiment', 'probabilities', and 'debug_message' from the StructType
df_with_sentiments = df_with_sentiments.select(
    '*',
    df_with_sentiments['sentiment_probabilities']['sentiment'].alias('sentiment'),
    df_with_sentiments['sentiment_probabilities']['probabilities'].alias('probabilities'),
    df_with_sentiments['sentiment_probabilities']['debug_message'].alias('debug_message')  # Extract debug message
).drop('sentiment_probabilities')

# Convert the resulting DataFrame back to a Pandas DataFrame
df_with_sentiments_pd = df_with_sentiments.toPandas()

In [5]:
# Print columns & shape
display(df_with_sentiments_pd.columns)
print("Dataset shape:", df_with_sentiments_pd.shape)

Index(['reviewid', 'userid', 'movieid', 'movie_title', 'rating', 'review_date',
       'review_text', 'user_href', 'review_href', 'download_flag', 'sentiment',
       'probabilities', 'debug_message'],
      dtype='object')

Dataset shape: (10468, 13)


In [6]:
# Drop rows with error
df_with_sentiments_pd.drop(columns=['probabilities','debug_message'], inplace=True)
df_with_sentiments_pd = df_with_sentiments_pd.dropna(subset=['review_text'])
df_with_sentiments_pd = df_with_sentiments_pd.dropna(subset=['sentiment'])

In [7]:
# Show the DataFrame after dropping the column
display(df_with_sentiments_pd.columns)
print(df_with_sentiments_pd.shape)

Index(['reviewid', 'userid', 'movieid', 'movie_title', 'rating', 'review_date',
       'review_text', 'user_href', 'review_href', 'download_flag',
       'sentiment'],
      dtype='object')

(10468, 11)


In [8]:
# Save the updated DataFrame to a new CSV file
df_with_sentiments_pd.to_csv('/content/drive/MyDrive/Colab Notebooks/output/rating_auto_label_sentiment_two_classes.csv', index=False)
print("The sentiment analysis has been added to the CSV file, along with debug messages.")


The sentiment analysis has been added to the CSV file, along with debug messages.
