In [None]:

from pyspark.sql import SparkSession

# Column functions
from pyspark.sql.functions import (
    col,
    when,
    length,
    from_unixtime,
    regexp_replace,
    lower
)


In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("AmazonReviewsSentimentAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

In [None]:
# Data cleaning pipeline
def clean_review_data(df):
    """
    Comprehensive data cleaning for Amazon reviews
    """
    # Remove null review texts and invalid ratings
    cleaned_df = df.filter(
        (col("reviewText").isNotNull()) & 
        (col("reviewText") != "") &
        (col("overall").between(1, 5))
    )
    
    # Add derived features
    cleaned_df = cleaned_df.withColumn(
        "review_length", length(col("reviewText"))
    ).withColumn(
        "review_date", from_unixtime(col("unixReviewTime"))
    ).withColumn(
        "sentiment_label", 
        when(col("overall") >= 4, "positive")
        .when(col("overall") <= 2, "negative")
        .otherwise("neutral")
    ).withColumn(
        "helpful_ratio",
        when(col("helpful").getItem(1) > 0, 
             col("helpful").getItem(0) / col("helpful").getItem(1))
        .otherwise(0.0)
    )
    
    # Text preprocessing
    cleaned_df = cleaned_df.withColumn(
        "cleaned_text",
        regexp_replace(
            regexp_replace(lower(col("reviewText")), "[^a-zA-Z0-9\\s]", ""),
            "\\s+", " "
        )
    )
    
    return cleaned_df
    
# Load raw reviews data
raw_reviews = spark.read.json("hdfs://namenode:9000/user/data/amazon_reviews/")

# Apply cleaning pipeline
processed_reviews = clean_review_data(raw_reviews)

# Cache for performance
processed_reviews.cache()

print(f"Records after cleaning: {processed_reviews.count()}")