# **Project 2**


## Task 1: Data Loading, Cleansing, and Exploration

In [None]:
# Step 1: Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, count, when, regexp_extract

# Step 2: Initialize PySpark Session
spark = SparkSession.builder \
    .appName("Retail and Sentiment Analytics") \
    .getOrCreate()

# Step 3: Load Datasets
sentiment_df = spark.read.csv("reviews.csv", header=True, inferSchema=True)
retail_df = spark.read.csv("Copy of Online Retail.csv", header=True, inferSchema=True)

# Step 4: Preview the Data
print("Initial Data Preview - Reviews DataFrame:")
sentiment_df.show(5, truncate=False)

print("Initial Data Preview - Online Retail DataFrame:")
retail_df.show(5, truncate=False)

# Step 5: Confirm Schema
print("Schema for Reviews DataFrame:")
sentiment_df.printSchema()

print("Schema for Online Retail DataFrame:")
retail_df.printSchema()

# Step 6: Handle Missing Values
print("Missing values in Reviews DataFrame:")
sentiment_df.select([count(when(col(c).isNull(), c)).alias(c) for c in sentiment_df.columns]).show()

print("Missing values in Online Retail DataFrame:")
retail_df.select([count(when(col(c).isNull(), c)).alias(c) for c in retail_df.columns]).show()

# Drop rows with missing values
sentiment_df = sentiment_df.dropna(how='any')
retail_df = retail_df.dropna(how='any')

# Step 7: Remove Duplicates
sentiment_df = sentiment_df.dropDuplicates()
retail_df = retail_df.dropDuplicates()

# Step 8: Validate and Convert Dates in Online Retail DataFrame
# Add a column to check if InvoiceDate is in the expected format
retail_df = retail_df.withColumn(
    "ValidDate", regexp_extract(col("InvoiceDate"), r"\d{2}-\d{2}-\d{4} \d{2}:\d{2}", 0)
)

# Filter out rows with invalid InvoiceDate
invalid_dates = retail_df.filter(col("ValidDate") == "").select("InvoiceDate").distinct()
if invalid_dates.count() > 0:
    print("Rows with invalid InvoiceDate format:")
    invalid_dates.show()

retail_df = retail_df.filter(col("ValidDate") != "").drop("ValidDate")

# Convert InvoiceDate to Timestamp
retail_df = retail_df.withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"), "dd-MM-yyyy HH:mm"))

# Step 9: Post-Cleansing Preview
print("Data Preview after Cleansing - Reviews DataFrame:")
sentiment_df.show(5, truncate=False)

print("Data Preview after Cleansing - Online Retail DataFrame:")
retail_df.show(5, truncate=False)

# Step 10: Post-Cleansing Row Count
print("Post-cleansing number of rows in Reviews DataFrame:", sentiment_df.count())
print("Post-cleansing number of rows in Online Retail DataFrame:", retail_df.count())

# Step 11: Summary Statistics for Validation
print("Summary Statistics for Reviews DataFrame:")
sentiment_df.describe().show()

print("Summary Statistics for Online Retail DataFrame:")
retail_df.describe().show()

# Step 12: Additional Checks
# Check distinct values in the Sentiment column of Reviews DataFrame
print("Distinct Sentiments in Reviews DataFrame:")
sentiment_df.select("Sentiment").distinct().show()

# Check distinct countries in the Online Retail DataFrame
print("Distinct Countries in Online Retail DataFrame:")
retail_df.select("Country").distinct().show()


Initial Data Preview - Reviews DataFrame:
+-------------------------------------------------------------------------------------------+---------+
|Review                                                                                     |Sentiment|
+-------------------------------------------------------------------------------------------+---------+
|This product exceeded my expectations! It's high-quality and performs exceptionally well.  |Positive |
|The product was decent. It worked fine, but it wasn't anything special.                    |Neutral  |
|I had a terrible experience with this company. The customer service was rude and unhelpful.|Negative |
|It's an okay product. Nothing to write home about.                                         |Neutral  |
|Disappointed with the product. It didn't meet my expectations.                             |Negative |
+-------------------------------------------------------------------------------------------+---------+
only showing top 5 row

## Task 2: Sales Data Aggregation and Feature Engineering

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Register DataFrame as SQL Temporary View
retail_df.createOrReplaceTempView("online_retail")

# Step 2: Total Sales Per Product Per Month
sales_per_product_month = spark.sql("""
    SELECT
        StockCode,
        YEAR(InvoiceDate) AS Year,
        MONTH(InvoiceDate) AS Month,
        SUM(Quantity * UnitPrice) AS TotalSales
    FROM online_retail
    GROUP BY StockCode, Year, Month
    ORDER BY Year, Month, TotalSales DESC
""")
print("Total Sales Per Product Per Month:")
sales_per_product_month.show(10, truncate=False)

# Step 3: Average Revenue Per Customer Segment
# Assuming `CustomerID` segments can be identified or grouped into ranges
avg_revenue_per_segment = spark.sql("""
    SELECT
        CustomerID,
        SUM(Quantity * UnitPrice) AS TotalRevenue,
        CASE
            WHEN CustomerID < 18000 THEN 'Low-Value Customers'
            WHEN CustomerID BETWEEN 18000 AND 19000 THEN 'Mid-Value Customers'
            ELSE 'High-Value Customers'
        END AS CustomerSegment
    FROM online_retail
    GROUP BY CustomerID
""")
avg_revenue_per_segment = avg_revenue_per_segment.groupBy("CustomerSegment").agg(
    F.avg("TotalRevenue").alias("AvgRevenue")
)
print("Average Revenue Per Customer Segment:")
avg_revenue_per_segment.show()

# Step 4: Seasonal Patterns for Top-Selling Products
seasonal_patterns = spark.sql("""
    SELECT
        StockCode,
        MONTH(InvoiceDate) AS Month,
        SUM(Quantity * UnitPrice) AS MonthlySales
    FROM online_retail
    GROUP BY StockCode, Month
    ORDER BY MonthlySales DESC
""")
print("Seasonal Patterns for Top-Selling Products:")
seasonal_patterns.show(10, truncate=False)

# Step 5: Feature Engineering

# Derive Customer Lifetime Value (CLV)
customer_lifetime_value = retail_df.groupBy("CustomerID").agg(
    F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("CustomerLifetimeValue")
)
print("Customer Lifetime Value:")
customer_lifetime_value.show(10, truncate=False)

# Derive Product Popularity Score
# Popularity = Total Quantity Sold + Number of Unique Customers + Total Revenue
product_popularity = retail_df.groupBy("StockCode").agg(
    F.sum("Quantity").alias("TotalQuantitySold"),
    F.countDistinct("CustomerID").alias("UniqueCustomers"),
    F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("TotalRevenue")
)
product_popularity = product_popularity.withColumn(
    "PopularityScore",
    F.col("TotalQuantitySold") + F.col("UniqueCustomers") + F.col("TotalRevenue")
)
print("Product Popularity Score:")
product_popularity.orderBy(F.desc("PopularityScore")).show(10, truncate=False)

# Derive Seasonal Trends
seasonal_trends = retail_df.withColumn("Month", F.month("InvoiceDate"))
seasonal_trends = seasonal_trends.groupBy("StockCode", "Month").agg(
    F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("MonthlyRevenue")
)
seasonal_trends = seasonal_trends.withColumn(
    "Trend",
    F.when(F.col("Month").isin([12, 1, 2]), "Winter")
    .when(F.col("Month").isin([3, 4, 5]), "Spring")
    .when(F.col("Month").isin([6, 7, 8]), "Summer")
    .when(F.col("Month").isin([9, 10, 11]), "Autumn")
)
print("Seasonal Trends:")
seasonal_trends.show(10, truncate=False)


Total Sales Per Product Per Month:
+---------+----+-----+------------------+
|StockCode|Year|Month|TotalSales        |
+---------+----+-----+------------------+
|22423    |2010|12   |16608.900000000005|
|85123A   |2010|12   |8245.649999999994 |
|21623    |2010|12   |6938.489999999998 |
|82484    |2010|12   |6672.330000000001 |
|21137    |2010|12   |6248.820000000001 |
|79321    |2010|12   |6199.34           |
|22189    |2010|12   |5532.630000000002 |
|22188    |2010|12   |5157.790000000001 |
|22328    |2010|12   |4769.999999999997 |
|22086    |2010|12   |4575.799999999996 |
+---------+----+-----+------------------+
only showing top 10 rows

Average Revenue Per Customer Segment:
+-------------------+-----------------+
|    CustomerSegment|       AvgRevenue|
+-------------------+-----------------+
|Low-Value Customers|619.9361326530609|
|Mid-Value Customers|970.1955319148936|
+-------------------+-----------------+

Seasonal Patterns for Top-Selling Products:
+---------+-----+-----------


## Task 3: Demand Forecasting Model

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# Step 1: Prepare the Data for Time Series Forecasting
# Filter popular products (example: Top 5 products by total sales)
popular_products = retail_df.groupBy("StockCode").agg(
    F.sum(F.col("Quantity") * F.col("UnitPrice")).alias("TotalRevenue")
).orderBy(F.desc("TotalRevenue")).limit(5).select("StockCode")

popular_data = retail_df.join(popular_products, on="StockCode", how="inner")

# Aggregate data by StockCode and Date (remove time granularity)
popular_data = popular_data.withColumn("InvoiceDate", F.to_date("InvoiceDate"))
daily_sales = popular_data.groupBy("StockCode", "InvoiceDate").agg(
    F.sum("Quantity").alias("DailySales")
)

# Create Lag Features for Time Series Modeling
window_spec = Window.partitionBy("StockCode").orderBy("InvoiceDate")
for lag in range(1, 8):  # Create 7 lag features for the past week
    daily_sales = daily_sales.withColumn(f"lag_{lag}", F.lag("DailySales", lag).over(window_spec))

# Remove rows with null values (incomplete lag features)
daily_sales = daily_sales.dropna()

# Step 2: Assemble Features and Labels
assembler = VectorAssembler(
    inputCols=[f"lag_{lag}" for lag in range(1, 8)],
    outputCol="features"
)
daily_sales = assembler.transform(daily_sales).select("features", "DailySales")

# Step 3: Split Data into Training and Testing
train_data, test_data = daily_sales.randomSplit([0.8, 0.2], seed=42)

# Step 4: Build the Regression Model Pipeline
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="DailySales", predictionCol="prediction")

pipeline = Pipeline(stages=[scaler, lr])

# Step 5: Hyperparameter Tuning with CrossValidator
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

evaluator = RegressionEvaluator(labelCol="DailySales", predictionCol="prediction", metricName="rmse")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)

# Step 6: Train the Model
cv_model = crossval.fit(train_data)

# Step 7: Evaluate the Model on Test Data
predictions = cv_model.transform(test_data)
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})

print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")

# Step 8: Display Predictions
print("Sample Predictions:")
predictions.select("features", "DailySales", "prediction").show(10, truncate=False)


Root Mean Squared Error (RMSE): 109.97027900797575
Mean Absolute Error (MAE): 86.26772043009171
Sample Predictions:
+------------------------------------------+----------+------------------+
|features                                  |DailySales|prediction        |
+------------------------------------------+----------+------------------+
|[0.0,192.0,562.0,63.0,24.0,33.0,24.0]     |8         |90.70934065160615 |
|[1.0,13.0,48.0,16.0,97.0,37.0,149.0]      |2         |66.97378520138594 |
|[2.0,1.0,2.0,2.0,1.0,2.0,10.0]            |1         |64.28218979835289 |
|[2.0,10.0,1.0,10.0,2.0,3.0,1008.0]        |1         |62.32870598085804 |
|[10.0,1.0,10.0,2.0,3.0,1008.0,2.0]        |2         |160.86110754405468|
|[14.0,6.0,6.0,2.0,10.0,27.0,18.0]         |24        |66.06005655063524 |
|[18.0,26.0,3.0,30.0,4.0,18.0,20.0]        |27        |65.40887860570464 |
|[24.0,328.0,69.0,80.0,58.0,1.0,14.0]      |24        |95.69075514482255 |
|[42.0,151.0,188.0,149.0,321.0,136.0,198.0]|303       |78.3

## Task 4: Sentiment Analysis on Customer Reviews

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when, regexp_replace, lower
from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram, CountVectorizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.sql.types import DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Customer Reviews Sentiment Analysis") \
    .getOrCreate()

# Load the reviews dataset
file_path = "/content/reviews.csv"  # Replace with your dataset path
data = spark.read.csv(file_path, header=True, inferSchema=True)


# Preprocessing: Cleaning text
data_cleaned = data.withColumn("Review", regexp_replace(col("Review"), "[^a-zA-Z\\s]", "")) \
                   .withColumn("Review", lower(col("Review")))

# Tokenize the text
tokenizer = Tokenizer(inputCol="Review", outputCol="tokens")

# Remove stop words
stop_words_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# Generate n-grams
ngram = NGram(n=2, inputCol="filtered_tokens", outputCol="ngrams")

# Convert text labels to numeric (StringIndexer)
label_indexer = StringIndexer(inputCol="Sentiment", outputCol="label")

# Convert tokens to feature vectors
vectorizer = CountVectorizer(inputCol="ngrams", outputCol="features")

# Train a Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Define the pipeline
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, ngram, label_indexer, vectorizer, lr])

# Split data into training and testing sets
train_data, test_data = data_cleaned.randomSplit([0.8, 0.2], seed=42)

# Fit the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)

# Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Set Accuracy: {accuracy:.2f}")


# UDF to assign sentiment scores
sentiment_to_score = udf(lambda sentiment: 1.0 if sentiment == "Positive" else
                         (-1.0 if sentiment == "Negative" else 0.0), DoubleType())

# Add sentiment_score column
scored_data = data_cleaned.withColumn(
    "sentiment_score",
    when(col("Sentiment").isNull(), 0.0).otherwise(sentiment_to_score(col("Sentiment")))
)

# Debug: Verify sentiment scores
scored_data.select("Review", "Sentiment", "sentiment_score").show()

# Aggregate sentiment score for each review
aggregate_score = scored_data.groupBy("Review") \
    .agg({"sentiment_score": "sum"}) \
    .withColumnRenamed("sum(sentiment_score)", "aggregate_sentiment_score")

# Display aggregate sentiment scores
aggregate_score.show()

# Stop the Spark session
spark.stop()


Test Set Accuracy: 0.91
+--------------------+---------+---------------+
|              Review|Sentiment|sentiment_score|
+--------------------+---------+---------------+
|this product exce...| Positive|            1.0|
|the product was d...|  Neutral|            0.0|
|i had a terrible ...| Negative|           -1.0|
|its an okay produ...|  Neutral|            0.0|
|disappointed with...| Negative|           -1.0|
|avoid this compan...| Negative|           -1.0|
|i had a terrible ...| Negative|           -1.0|
|avoid this compan...| Negative|           -1.0|
|this product exce...| Positive|            1.0|
|this product is o...| Positive|            1.0|
|absolutely horren...| Negative|           -1.0|
|i had a terrible ...| Negative|           -1.0|
|this experience w...|  Neutral|            0.0|
|topquality produc...| Positive|            1.0|
|im extremely diss...| Negative|           -1.0|
|the service was s...|  Neutral|            0.0|
|the product was d...|  Neutral|            0