# Data Analysis & Visualization Assignment # 0
# Muhammad Kashif
# 21i-0851
# -----------------------------------------
# Muhammad Huzaifa
# 21i-2460

## Setting Up & Downloading All Necessary Libraries & Datasets

In [1]:
# Install Java and Spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -qO - https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz | tar xz
!pip install findspark pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
# Downloading the Amazon Review Dataset
!wget https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/categoryFiles/All_Amazon_Review.json.gz

--2025-03-02 10:25:49--  https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/categoryFiles/All_Amazon_Review.json.gz
Resolving datarepo.eng.ucsd.edu (datarepo.eng.ucsd.edu)... 132.239.8.30
Connecting to datarepo.eng.ucsd.edu (datarepo.eng.ucsd.edu)|132.239.8.30|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35974228440 (34G) [application/x-gzip]
Saving to: ‘All_Amazon_Review.json.gz’


2025-03-02 11:17:38 (11.0 MB/s) - ‘All_Amazon_Review.json.gz’ saved [35974228440/35974228440]



## All Libraries

In [None]:
# Pyspark Function Modules
from pyspark.sql.functions import approx_count_distinct
from pyspark.sql.functions import col
from pyspark.sql.functions import sum

# Pyspark Types
from pyspark.sql.types import StructField
from pyspark.sql.types import IntegerType
from pyspark.sql.types import BooleanType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType

# Pyspark Modules
from pyspark.sql import SparkSession
import findspark

# Google Colab Specific Library
from google.colab import files

# Other Necessary Libraries
import logging
import time
import os

### All Declarations

In [None]:
file_path = "/content/All_Amazon_Review.json.gz"

## All Functions

### Setup Functions

In [29]:
# Set up Logging to a File
def setup_logging():
    log_filename = "logs.txt"
    logging.basicConfig(filename=log_filename, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    return log_filename

# Set up Spark Environment
def setup_spark():
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"
    findspark.init()

    spark = SparkSession.builder \
        .appName("AmazonReviews") \
        .config("spark.driver.memory", "4g") \
        .config("spark.sql.shuffle.partitions", "200") \
        .getOrCreate()

    logging.info("Spark Session Created")
    return spark

### Utility Functions

In [30]:
# Define Schema for Faster Loading
def get_schema():
    return StructType([
        StructField("reviewerID", StringType(), True),
        StructField("asin", StringType(), True),
        StructField("reviewText", StringType(), True),
        StructField("overall", IntegerType(), True),
        StructField("unixReviewTime", IntegerType(), True),
        StructField("summary", StringType(), True),
        StructField("verified", BooleanType(), True),
        StructField("vote", StringType(), True),
        StructField("reviewTime", StringType(), True)
    ])

# Load Data with Logging & Error Handling
def load_data(spark, file_path, schema):
    start_time = time.time()
    logging.info(f"Data loading started at {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")

    try:
        df = spark.read.json(file_path, schema=schema, mode="PERMISSIVE")
        record_count = df.count()

        logging.info(f"Data successfully loaded! Total records: {record_count}")

    except Exception as e:
        logging.error(f"Error during loading: {e}")
        return None, 0

    end_time = time.time()
    elapsed_time = end_time - start_time

    logging.info(f"Data loading completed at {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}")
    logging.info(f"Total time taken: {elapsed_time:.2f} seconds")

    return df, record_count

# Partition Data for Performance
def partition_data(df):
    partitioned_path = "partitioned_reviews"

    # Partitioning by 'overall' rating
    df.write.mode("overwrite").partitionBy("overall").parquet(partitioned_path)

    logging.info(f"Data partitioned and saved to: {partitioned_path}")
    print(f"Partitioned data saved at: {partitioned_path}")

# Save Logs to a File in Colab
def download_logs(log_filename):
    print("\nDownloading log file...")
    from google.colab import files
    files.download(log_filename)

### Main Driver Function

In [31]:
# Validate Data + Memory Usage Stats
def validate_data(df):
    logging.info("Validating data...")

    # Display sample rows
    df.show(5, truncate=False)

    # Check for null values in key columns
    important_columns = ["reviewerID", "asin", "reviewText", "overall"]
    null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in important_columns])

    print("\nChecking for Null Values:")
    null_counts.show()

    # Count malformed records
    corrupt_records = df.filter(df.reviewerID.isNull() & df.asin.isNull() & df.reviewText.isNull())
    corrupt_count = corrupt_records.count()

    logging.info(f"Number of malformed records: {corrupt_count}")

    # 🆕 Approximate memory usage
    bytes_per_record = 100  # Rough estimation
    estimated_size_mb = (df.count() * bytes_per_record) / (1024 * 1024)

    logging.info(f"Estimated DataFrame Size: {estimated_size_mb:.2f} MB")

## Main

### Setting Up Spark & Schema

In [4]:
log_filename = setup_logging()
spark = setup_spark()
schema = get_schema()

### Loading Data & Displaying The First 5 Rows

In [5]:
df, record_count = load_data(spark, file_path, schema)

print(record_count, "records are found!")

df.head(5)

233055327 records are found!


[Row(reviewerID='A27BTSGLXK2C5K', asin='B017O9P72A', reviewText="Alexa is not able to control my lights. If I ask her to tell me what LIFX can do, she will give me an example with one of my group names. If I use that exact same group name in a new request, she'll await that she doesn't recognize the name. This skill is VERY buggy and has not yet worked for me. I even rest Alexa, uninstalled LIFX, and set everything up again.", overall=None, unixReviewTime=1449792000, summary="VERY Buggy, doesn't work.", verified=False, vote=None, reviewTime='12 11, 2015'),
 Row(reviewerID='A27ZJ1NCBFP1HZ', asin='B017O9P72A', reviewText='Alexa works great for me so far, but I\'m also only controlling a single bulb at the moment. Turning on/off, changing colors and adjusting brightness are all easy and quick. That being said, I\'m expecting complications as I add more bulbs (hope for the best prepare for the worst, right?)\n\nI\'d speculate that some other users\' frustrations might stem from Alexa not r

### Validating The Data & Applying Partitioning

In [6]:
if df is not None and record_count > 0:
    validate_data(df)
    partition_data(df)  # Partitioning for performance

+--------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------------+-------------------------+--------+----+-----------+
|reviewerID    |asin      |reviewText                    

Py4JJavaError: An error occurred while calling o80.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 10) (3d403495e2e9 executor driver): org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@6cc6ace2 : No space left on device
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:245)
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:306)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:255)
	... 42 more
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@6cc6ace2 : No space left on device
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:245)
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:306)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


### Automatically Downloading The Log File

In [28]:
# Download logs file after execution
download_logs(log_filename)


Downloading log file...


# Data Analysis & Visualization Assignment # 1 - 2

## Downloading All Necessary Libraries

In [10]:
!pip install dash

Collecting dash
  Downloading dash-2.18.2-py3-none-any.whl.metadata (10 kB)
Collecting Flask<3.1,>=1.0.4 (from dash)
  Downloading flask-3.0.3-py3-none-any.whl.metadata (3.2 kB)
Collecting Werkzeug<3.1 (from dash)
  Downloading werkzeug-3.0.6-py3-none-any.whl.metadata (3.7 kB)
Collecting dash-html-components==2.0.0 (from dash)
  Downloading dash_html_components-2.0.0-py3-none-any.whl.metadata (3.8 kB)
Collecting dash-core-components==2.0.0 (from dash)
  Downloading dash_core_components-2.0.0-py3-none-any.whl.metadata (2.9 kB)
Collecting dash-table==5.0.0 (from dash)
  Downloading dash_table-5.0.0-py3-none-any.whl.metadata (2.4 kB)
Collecting retrying (from dash)
  Downloading retrying-1.3.4-py3-none-any.whl.metadata (6.9 kB)
Downloading dash-2.18.2-py3-none-any.whl (7.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m29.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dash_core_components-2.0.0-py3-none-any.whl (3.8 kB)
Downloading dash_html_compo

## All Libraries

In [11]:
# Pyspark Types
from pyspark.sql.types import StructField
from pyspark.sql.types import IntegerType
from pyspark.sql.types import BooleanType
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType

# Pyspark Function Modules
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import explode
from pyspark.sql.functions import to_date
from pyspark.sql.functions import stddev
from pyspark.sql.functions import length
from pyspark.sql.functions import split
from pyspark.sql.functions import count
from pyspark.sql.functions import mean
from pyspark.sql.functions import when
from pyspark.sql.functions import expr
from pyspark.sql.functions import col
from pyspark.sql.functions import min
from pyspark.sql.functions import max
from pyspark.sql.functions import avg

# Pyspark Machine Learning Feature Modules
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import RegexTokenizer

# Pyspark Machine Learning Clustering Modules
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import LDA

# Pyspark Modules
from pyspark.sql import SparkSession
import findspark

# Dash Dependencies
from dash.dependencies import Output
from dash.dependencies import Input

# Dash Modules
from dash import html
from dash import dcc
import dash

# Visualization Libraries
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import plotly.express as px
import seaborn as sns

# Other Necessary Libraries
import pandas as pd
import shutil
import time
import os

## All Functions

### Utility Functions

In [12]:
# Load the processed data
def load_transformed_data(spark, path="/content/processed_output/"):
    return spark.read.parquet(path)

# Get Dataset Shape
def get_shape(df):
    print(f"Dataset contains {df.count()} rows and {len(df.columns)} columns.")

# Check Column Types
def check_schema(df):
    print("Schema:")
    df.printSchema()

### Streaming Data Pipeline Functions

#### Setting Up Functions

In [13]:
# Set Up Spark Streaming
def setup_spark():
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"
    findspark.init()

    spark = SparkSession.builder \
        .appName("AmazonReviewsStreaming") \
        .config("spark.driver.memory", "4g") \
        .getOrCreate()

    print("Spark Streaming Session Created!")
    return spark


# Define Schema for Structured Streaming
def get_schema():
    return StructType([
        StructField("reviewerID", StringType(), True),
        StructField("asin", StringType(), True),
        StructField("reviewText", StringType(), True),
        StructField("overall", IntegerType(), True),
        StructField("unixReviewTime", IntegerType(), True),
        StructField("summary", StringType(), True),
        StructField("verified", BooleanType(), True),
        StructField("vote", StringType(), True),
        StructField("reviewTime", StringType(), True)
    ])

#### Necessary Functions

In [14]:
# Preprocess the Data (Handle Missing Values & Duplicates)
def clean_data(df):
    return df.dropDuplicates(["reviewerID", "asin"]) \
             .dropna(subset=["reviewerID", "asin", "reviewText", "overall"])


# Detect Anomalies in Review Ratings (Using Z-score)
def detect_anomalies(df):
    stats = df.select(mean(col("overall")).alias("mean_rating"),
                      stddev(col("overall")).alias("stddev_rating")).collect()

    if stats:
        mean_rating = stats[0]["mean_rating"]
        stddev_rating = stats[0]["stddev_rating"]

        df = df.withColumn("is_anomaly", when((col("overall") - mean_rating) / stddev_rating > 2, True).otherwise(False))

    return df

#### Main Driver Function

In [26]:
# Streaming Function to Read Data in Real-Time
def start_streaming(spark, schema):
    input_dir = "/content/partitioned_reviews"
    output_dir = "/content/processed_output"

    # Create directories for streaming simulation
    shutil.rmtree(input_dir, ignore_errors=True)
    shutil.rmtree(output_dir, ignore_errors=True)
    os.makedirs(input_dir)
    os.makedirs(output_dir)

    print("Streaming directories created!")

    # Read new files as they arrive
    streaming_df = spark.readStream.schema(schema).json(input_dir)

    # Apply data cleaning and anomaly detection
    cleaned_df = clean_data(streaming_df)
    processed_df = detect_anomalies(cleaned_df)

    # Write output as Parquet files (real-time updates)
    query = processed_df.writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/content/checkpoints") \
        .option("path", output_dir) \
        .trigger(processingTime="10 seconds") \
        .start()

    print("Streaming started! Place new JSON files in `/content/partitioned_reviews`.")

    return query

### Advanced EDA Functions

#### Data Understanding & Summary Statistics Functions

#### Necessary Functions

In [16]:
# Count Missing Values in Each Column
def check_missing_values(df):
    print("Missing Values Count:")
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Compute Summary Statistics
def compute_statistics(df):
    print("Summary Statistics:")
    stats_df = df.select(
        mean(col("overall")).alias("Mean"),
        expr("percentile_approx(overall, 0.5)").alias("Median"),
        expr("mode(overall)").alias("Mode"),  # Requires extra handling in Spark
        min(col("overall")).alias("Min"),
        max(col("overall")).alias("Max"),
        stddev(col("overall")).alias("Std Dev")
    )
    stats_df.show()

#### Main Driver Function

In [17]:
# Run All EDA Steps
def run_eda(spark):
    df = load_transformed_data(spark)

    get_shape(df)
    check_schema(df)
    check_missing_values(df)
    compute_statistics(df)

### Bonus Feature Functions

#### Load Bonus Dataset Function

In [18]:
# Incorporating bonus dataset
def load_bonus_dataset(spark, path="/content/omp_serial.json"):
    print("\nLoading and analyzing bonus dataset...")

    schema = StructType([
        StructField("ID", StringType(), True),
        StructField("realId", StringType(), True),
        StructField("pragma", StringType(), True),
        StructField("label_para", IntegerType(), True),
        StructField("label_function_call", IntegerType(), True),
        StructField("label_nested_loop", IntegerType(), True),
        StructField("label_reduction", IntegerType(), True),
        StructField("label_task", IntegerType(), True),
        StructField("label_simd", IntegerType(), True),
        StructField("label_target", IntegerType(), True)
    ])

    df_bonus = spark.read.json(path, schema=schema)

    # Count interactions per product
    df_bonus.groupBy("pragma").count().orderBy(col("count").desc()).show(5)

    return df_bonus

#### Anomaly Detection Using Machine Learning Function

In [19]:
# Detect anomalies using machine learning
def detect_anomalies_mllib(df):
    print("\nDetecting anomalies using Spark MLlib...")

    assembler = VectorAssembler(inputCols=["overall"], outputCol="features")
    df_vectorized = assembler.transform(df)

    kmeans = KMeans(k=2, seed=1, featuresCol="features", predictionCol="cluster")
    model = kmeans.fit(df_vectorized)

    df_clustered = model.transform(df_vectorized)
    df_anomalies = df_clustered.filter(df_clustered["cluster"] == 1)  # Assuming cluster 1 contains anomalies

    print(f"Anomalies detected: {df_anomalies.count()}")
    return df_anomalies

#### Creating Interactive Dashboard Function

In [20]:
# Interactive dashboard
def start_dashboard(df):
    df_pandas = df.toPandas()

    app = dash.Dash(__name__)

    app.layout = html.Div([
        html.H1("Amazon Reviews Interactive Dashboard"),
        dcc.Graph(id="review-distribution", figure=px.box(df_pandas, y="overall", title="Review Rating Distribution")),
        dcc.Graph(id="top-products", figure=px.bar(df_pandas.groupby("asin").size().reset_index(name="review_count").nlargest(10, "review_count"),
                                                   x="asin", y="review_count", title="Top 10 Most Reviewed Products")),
        dcc.Graph(id="sentiment-distribution", figure=px.pie(df_pandas, names="sentiment", title="Sentiment Distribution"))
    ])

    app.run_server(debug=False, port=8050)

#### Performance Optimization Function

In [21]:
# Performance optimization
def optimize_spark(df):
    print("\nOptimizing Spark jobs with caching and partitioning...")

    df.cache()  # Cache for faster access
    df = df.repartition(10, "overall")  # Partition data by rating for parallel processing

    return df

## Main

### Run The Streaming Pipeline

In [34]:
# Run the Streaming Pipeline

spark = setup_spark()
schema = get_schema()
query = start_streaming(spark, schema)

# Let the streaming run for 2 minutes before stopping (for testing)
query.awaitTermination(120)
query.stop()

print("Streaming stopped.")

Spark Streaming Session Created!
Streaming directories created!
Streaming started! Place new JSON files in `/content/partitioned_reviews`.
Streaming stopped.


### Run Data Understanding & Summary Statistics EDA

In [35]:
# Execute EDA
run_eda(spark)

Dataset contains 233055327 rows and 308 columns.


### Do Query Based EDA

In [None]:
df = load_transformed_data(spark)

# Top 5 Most Reviewed Products
print("\nTop 5 Most Reviewed Products:")
df.groupBy("asin").agg(count("reviewText").alias("review_count")) \
  .orderBy(col("review_count").desc()) \
  .show(5, truncate=False)

# Average Review Ratings by Category
print("\nAverage Review Ratings by Category:")
df.groupBy("asin").agg(avg("overall").alias("avg_rating")) \
  .orderBy(col("avg_rating").desc()) \
  .show(10, truncate=False)

# Correlation Between Review Length & Rating
print("\nCorrelation Between Review Length & Rating:")
df = df.withColumn("review_length", length(col("reviewText")))
df.select(F.corr("review_length", "overall").alias("correlation_review_length_rating")).show()

# Review Trends Over Time (Time-Series Analysis)
print("\nReview Trends Over Time:")
df.withColumn("review_date", to_date(col("reviewTime"), "MM dd, yyyy")) \
  .groupBy("review_date") \
  .agg(count("reviewText").alias("review_count")) \
  .orderBy(col("review_date")) \
  .show(10, truncate=False)

# Percentage of Reviews Mentioning 'Refund', 'Return', or 'Defective'
print("\nPercentage of Reviews Mentioning 'Refund', 'Return', or 'Defective':")
keyword_df = df.withColumn("contains_keyword",
                           when(col("reviewText").rlike("(?i)refund|return|defective"), 1).otherwise(0))
total_reviews = df.count()
keyword_reviews = keyword_df.filter(col("contains_keyword") == 1).count()
percentage = (keyword_reviews / total_reviews) * 100 if total_reviews > 0 else 0
print(f"🔸 {percentage:.2f}% of reviews mention refund, return, or defective.")

# Most Polarized Products (1-Star vs. 5-Star)
print("\nMost Polarized Products (Highest 1-Star and 5-Star Reviews):")
df.filter((col("overall") == 1) | (col("overall") == 5)) \
  .groupBy("asin") \
  .agg(count(when(col("overall") == 1, True)).alias("one_star_reviews"),
       count(when(col("overall") == 5, True)).alias("five_star_reviews")) \
  .orderBy((col("one_star_reviews") + col("five_star_reviews")).desc()) \
  .show(5, truncate=False)

# Verified vs. Non-Verified Purchase Ratings
print("\nVerified vs. Non-Verified Purchase Ratings:")
df.groupBy("verified") \
  .agg(avg("overall").alias("avg_rating")) \
  .show()

# Fake-Looking Reviews (Excessive Word Repetition)
print("\nFake-Looking Reviews (Excessive Word Repetition):")
df.withColumn("word_count", expr("size(split(reviewText, ' '))")) \
  .withColumn("unique_word_count", expr("size(array_distinct(split(reviewText, ' ')))")) \
  .withColumn("word_repetition_ratio", col("word_count") / col("unique_word_count")) \
  .orderBy(col("word_repetition_ratio").desc()) \
  .select("asin", "reviewText", "word_repetition_ratio") \
  .show(5, truncate=False)

### Do Text Analysis EDA (NLP)

In [None]:
# Load Processed Data
df = load_transformed_data(spark)

# 1. Most Frequent Words (Word Cloud)
print("\nGenerating Word Cloud for Most Frequent Words...")

# Tokenize words
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")
df_tokenized = tokenizer.transform(df)

# Remove stopwords
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
df_cleaned = stopwords_remover.transform(df_tokenized)

# Explode words into separate rows for frequency counting
df_exploded = df_cleaned.select(explode(col("filtered_words")).alias("word"))
df_word_count = df_exploded.groupBy("word").count().orderBy(col("count").desc())

# Convert to Pandas for WordCloud
top_words = df_word_count.limit(100).toPandas()
word_freq_dict = dict(zip(top_words["word"], top_words["count"]))

# Generate Word Cloud
plt.figure(figsize=(10, 5))
wordcloud = WordCloud(width=800, height=400, background_color="white").generate_from_frequencies(word_freq_dict)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.show()

# Sentiment Analysis (Positive, Neutral, Negative)
print("\nPerforming Sentiment Analysis...")

df_sentiment = df.withColumn("sentiment",
                             when(col("overall") >= 4, "Positive")
                             .when(col("overall") == 3, "Neutral")
                             .otherwise("Negative"))

# Display Sentiment Counts
df_sentiment.groupBy("sentiment").count().show()

# Topic Modeling using LDA (Bonus)
print("\nPerforming Topic Modeling (LDA)...")

# Convert text into numerical format for LDA
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features", vocabSize=1000, minDF=5)
df_vectorized = vectorizer.fit(df_cleaned).transform(df_cleaned)

# Train LDA Model
lda = LDA(k=5, maxIter=10, featuresCol="features")
lda_model = lda.fit(df_vectorized)

# Show topics
topics = lda_model.describeTopics(5)
topics.show(truncate=False)

### Do Correlation & Business Insights

In [None]:
# Load Processed Data
df = load_transformed_data(spark)

# Highly Reviewed Products & Top-Rated Categories
print("\nHighly Reviewed Products:")
df.groupBy("asin") \
  .agg(count("reviewText").alias("review_count"), avg("overall").alias("avg_rating")) \
  .orderBy(col("review_count").desc()) \
  .show(10, truncate=False)

# Detect Words That Correlate with Positive/Negative Reviews
print("\nFinding Words That Correlate with Review Sentiment...")

# Tokenize words
df_tokenized = df.withColumn("words", split(col("reviewText"), " "))

# Explode words into separate rows
df_exploded = df_tokenized.select(col("overall"), explode(col("words")).alias("word"))

# Aggregate word frequency per sentiment
df_word_sentiment = df_exploded.groupBy("word") \
    .agg(avg("overall").alias("avg_rating"), count("word").alias("word_count")) \
    .filter(col("word_count") > 50) \
    .orderBy(col("avg_rating").desc())

df_word_sentiment.show(10, truncate=False)

# Correlation Heatmap for Product Success Factors
print("\nCorrelation Heatmap for Product Success Factors")

# Convert Spark DataFrame to Pandas
df_corr = df.select("overall", "verified", "vote").toPandas()

# Convert categorical boolean to numeric
df_corr["verified"] = df_corr["verified"].astype(int)
df_corr["vote"] = pd.to_numeric(df_corr["vote"], errors="coerce")

# Compute Correlation Matrix
correlation_matrix = df_corr.corr()

# Plot Heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", fmt=".2f")
plt.title("Correlation Heatmap - Product Success Factors")
plt.show()

### Creating Interactive Dash App With 8 Visualizations

In [None]:
# Load Processed Data
df = load_transformed_data(spark).toPandas()

# Convert review time to date format
df["reviewTime"] = pd.to_datetime(df["reviewTime"], errors="coerce")

# Create output folder
os.makedirs("visualizations", exist_ok=True)

# Bar Chart - Top 10 Most Reviewed Products
fig1 = px.bar(
    df.groupby("asin").size().reset_index(name="review_count").nlargest(10, "review_count"),
    x="asin", y="review_count", title="Top 10 Most Reviewed Products"
)
fig1.write_html("visualizations/top_reviewed_products.html")

# Box Plot - Distribution of Ratings
fig2 = px.box(df, y="overall", title="Distribution of Review Ratings")
fig2.write_html("visualizations/review_distribution.html")

# Scatter Plot - Review Length vs. Rating
df["review_length"] = df["reviewText"].str.len()
fig3 = px.scatter(df, x="review_length", y="overall", title="Review Length vs. Rating")
fig3.write_html("visualizations/review_length_vs_rating.html")

# Time-Series - Review Trends Over Time
fig4 = px.line(
    df.groupby(df["reviewTime"].dt.to_period("M")).size().reset_index(name="review_count"),
    x="reviewTime", y="review_count", title="Review Trends Over Time"
)
fig4.write_html("visualizations/review_trends.html")

# Pie Chart - Sentiment Distribution
df["sentiment"] = df["overall"].apply(lambda x: "Positive" if x >= 4 else "Neutral" if x == 3 else "Negative")
fig5 = px.pie(df, names="sentiment", title="Sentiment Distribution in Reviews")
fig5.write_html("visualizations/sentiment_distribution.html")

# Bar Chart - Verified vs. Non-Verified Purchase Ratings
fig6 = px.bar(
    df.groupby("verified")["overall"].mean().reset_index(),
    x="verified", y="overall", title="Verified vs. Non-Verified Purchase Ratings"
)
fig6.write_html("visualizations/verified_vs_non_verified.html")

# Heatmap - Correlation of Features
fig7 = px.imshow(df[["overall", "review_length"]].corr(), text_auto=True, title="Feature Correlation Heatmap")
fig7.write_html("visualizations/correlation_heatmap.html")

# Word Cloud - Most Frequent Words


wordcloud = WordCloud(width=800, height=400, background_color="white").generate(" ".join(df["reviewText"].dropna()))
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.savefig("visualizations/word_cloud.png", bbox_inches="tight")

# Create Dash App for Interactive Exploration
app = dash.Dash(__name__)

app.layout = html.Div([
    html.H1("Amazon Reviews - Interactive Data Analysis"),
    dcc.Graph(figure=fig1),
    dcc.Graph(figure=fig2),
    dcc.Graph(figure=fig3),
    dcc.Graph(figure=fig4),
    dcc.Graph(figure=fig5),
    dcc.Graph(figure=fig6),
    dcc.Graph(figure=fig7),
    html.Img(src="visualizations/word_cloud.png", style={"width": "70%"})
])

# Run the Dash App
app.run_server(debug=True, port=8050)

### Run The Bonus Feature Pipeline

In [None]:
spark = setup_spark()
schema = get_schema()
df = load_transformed_data(spark)

# Apply optimizations
df = optimize_spark(df)

# Detect anomalies
anomalies_df = detect_anomalies_mllib(df)

# Load bonus dataset
df_bonus = load_bonus_dataset(spark)

# Start interactive dashboard
start_dashboard(df)

# ***Thank You!***