# 📘 Final Project: Data Analysis using MongoDB and Apache Spark

This project demonstrates how to work with a real-world dataset (Amazon Books Reviews Dataset) using MongoDB for storage and Apache Spark (PySpark) for processing. We explore schema design, querying, performance optimization, and visual insights.

---

# Step 1: Import libraries
import pandas as pd
import numpy as np
import json
from pymongo import MongoClient

In [None]:
# Step 2: Configure Pandas display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_colwidth', None)

In [None]:
# Step 3: Load CSV files
books_df = pd.read_csv("books_data.csv")
ratings_df = pd.read_csv("Books_rating.csv")

## 📂 Dataset Overview

We use the **Amazon Books Reviews** dataset from Kaggle, which contains user-generated reviews for a wide variety of books available on Amazon. This real-world dataset includes valuable attributes such as:

- **Title**: The name of the book.
- **Author(s)**: The author(s) of the book.
- **Categories**: Genre or subject of the book (e.g., Comics, Fiction, Education).
- **Rating**: User rating scores, typically from 1 to 5.
- **Review Text**: Actual written review provided by users.
- **Review Date**: When the review was posted.
- **ASIN**: Unique Amazon product identifier for each book.

This dataset provides an excellent foundation for exploring data storage, processing, and analysis using MongoDB and Apache Spark due to its unstructured nature and scale.

In [None]:
# Step 4: Basic exploration
print("📘 Books Data Sample:")
print(books_df.head(), "\n")

print("📝 Ratings Data Sample:")
print(ratings_df.head(), "\n")

print("📊 Shapes:")
print("Books Data Shape:", books_df.shape)
print("Ratings Data Shape:", ratings_df.shape, "\n")

print("🔍 Missing Values in Books Data:")
print(books_df.isnull().sum(), "\n")

print("🔍 Missing Values in Ratings Data:")
print(ratings_df.isnull().sum(), "\n")

In [None]:
# Step 5: Optional Cleaning
# Drop rows with missing essential review text
ratings_df = ratings_df.dropna(subset=["review/text"])


# Fill missing summaries with a placeholder (safe version)
ratings_df["review/summary"] = ratings_df["review/summary"].fillna("No summary provided")


# You can apply similar cleaning to books_df if needed
# books_df = books_df.dropna()  # example

## 🗃 Storing Dataset in MongoDB
We connect to MongoDB and insert our dataset using an optimized schema.

In [None]:
# Step 6: Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")  # Change if hosted elsewhere
db = client["books_database"]

In [None]:
# Step 7: Convert DataFrames to list of dictionaries
books_data = books_df.to_dict("records")
ratings_data = ratings_df.to_dict("records")

In [None]:
from pymongo.errors import BulkWriteError

def insert_in_batches(collection, data, batch_size=100):
    for i in range(0, len(data), batch_size):
        batch = data[i:i+batch_size]
        try:
            collection.insert_many(batch)
        except BulkWriteError as bwe:
            print(f"❌ Bulk write error: {bwe.details}")
        except Exception as e:
            print(f"❌ Error inserting batch {i // batch_size}: {e}")

db["books"].drop()
db["ratings"].drop()

db["books"].insert_many(books_data)
insert_in_batches(db["ratings"], ratings_data)

print("✅ Data inserted into MongoDB successfully.")

## ⚙ Data Processing with PySpark
We use PySpark to read, transform, and analyze the dataset loaded from MongoDB.

In [None]:
from pyspark.sql import SparkSession

# Step 1: Create a Spark session
spark = SparkSession.builder \
    .appName("BooksMiniProject") \
    .getOrCreate()

# Step 2: Load your CSV files into Spark DataFrames
books_df = spark.read.csv("books_data.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv("Books_rating.csv", header=True, inferSchema=True)

# Optional: Show first few rows for confirmation
books_df.show(5)
ratings_df.show(5)

In [None]:
# Read documents from MongoDB, exclude '_id'
books_docs = list(db["books"].find({}, {"_id": 0}))
ratings_docs = list(db["ratings"].find({}, {"_id": 0}).limit(10000))  # 🔁 Adjust limit as needed

# Convert to Spark DataFrames
df_books = spark.createDataFrame(books_docs)
df_ratings = spark.createDataFrame(ratings_docs)

# Show sample rows
df_books.show(3)
df_ratings.show(3)

## 🔍 Querying with Spark SQL
Analyze average ratings using Spark SQL and display top-rated books.

In [None]:
from pyspark.sql.functions import avg

# Optional: rename to avoid column name conflict during join
df_ratings = df_ratings.withColumnRenamed("review/score", "rating_score")

# Perform inner join on Title
df_joined = df_ratings.join(df_books, on="Title", how="inner")

# Select relevant columns
df_selected = df_joined.select(
    "Title",
    "authors",
    "categories",
    "rating_score"
)

# Compute average rating per book
df_avg = df_selected.groupBy("Title", "authors", "categories") \
    .agg(avg("rating_score").alias("avg_rating")) \
    .orderBy("avg_rating", ascending=False)

# Show top 10 rated books
df_avg.show(10, truncate=False)

In [None]:
# Get top 10 books from df_avg
top_books = df_avg.limit(10).toPandas().to_dict("records")

# Save to MongoDB
db["top_books"].drop()  # Drop existing collection if it exists
db["top_books"].insert_many(top_books)  # Insert top books

print("✅ Top 10 books saved to MongoDB collection 'top_books'")

## 📊 Genre-Based Filtering
We filter and analyze specific genres like Comics.

In [None]:
# Filter books where categories contain "Comics"
df_comics = df_avg.filter(df_avg.categories.contains("Comics"))
df_comics.show(truncate=False)

### Indexing for Performance

Create indexes on frequently used fields

In [None]:
db["books"].create_index("Title")
db["books"].create_index("publishedDate")
db["books"].create_index("categories")

In [None]:
list(db["books"].list_indexes())

## 📈 Visualization of Top-Rated Books
Using Matplotlib to show top-rated books.

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Convert to Pandas DataFrame from Spark DataFrame
top_books_df = df_avg.limit(10).toPandas()

# Check if the DataFrame is not empty and contains required columns
if not top_books_df.empty and 'Title' in top_books_df.columns and 'avg_rating' in top_books_df.columns:
    # Plot horizontal bar chart
    plt.figure(figsize=(10, 6))
    plt.barh(top_books_df['Title'], top_books_df['avg_rating'], color='skyblue')
    plt.xlabel("Average Rating")
    plt.title("Top 10 Rated Books")
    plt.gca().invert_yaxis()  # Puts highest-rated book at the top
    plt.tight_layout()
    plt.show()
else:
    print("⚠ DataFrame is empty or missing required columns.")

## 📆 Year-Based Analysis
Analyzing books by year of publication.

Filter by Year

In [None]:
# Join df_avg with df_books to access publishedDate
df_avg_with_date = df_avg.join(df_books.select('Title', 'publishedDate'), on='Title', how='inner')

# Filter books published after 2010
df_recent_books = df_avg_with_date.filter(df_avg_with_date.publishedDate >= '2010')

# Show result
df_recent_books.show(truncate=False)

# Genre Distribution (Pie Chart)

In [None]:
# Get the count of books by genre
genre_counts = df_avg.select("categories").rdd.flatMap(lambda x: x).countByValue()

# Sort by count and take top N
top_n = 10
sorted_genre_counts = sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)

# Separate top N genres and the "Others"
top_genres = sorted_genre_counts[:top_n]
other_genres = sorted_genre_counts[top_n:]

# Combine remaining genres into "Other"
if other_genres:
    top_genres.append(("Other", sum([count for _, count in other_genres])))

# Prepare data for plotting
genre_labels = [str(k) for k, _ in top_genres]
genre_sizes = [v for _, v in top_genres]

# Plot pie chart
plt.figure(figsize=(8, 8))
plt.pie(genre_sizes, labels=genre_labels, autopct='%1.1f%%', startangle=140, colors=plt.cm.Paired.colors)
plt.title("Top Genres in Books (Others Combined)")
plt.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
plt.show()

#  Books Published by Year (Bar Plot)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas (limit to avoid memory overload)
year_counts_df = df_books.select("publishedDate").limit(100000).toPandas()

# Extract the year from the publishedDate
year_counts_df['Year'] = pd.to_datetime(year_counts_df['publishedDate'], errors='coerce').dt.year

# Drop rows with NaN Year values
year_counts_df = year_counts_df.dropna(subset=['Year'])

# Create a new column for decade categorization
year_counts_df['Decade'] = (year_counts_df['Year'] // 10) * 10

# Group by Decade and count the number of books
decade_counts = year_counts_df.groupby('Decade').size().reset_index(name='Book Count')

# Sort the decades for the plot
decade_counts = decade_counts.sort_values(by='Decade')

# Plot the bar chart
plt.figure(figsize=(10, 6))
plt.bar(decade_counts['Decade'].astype(str), decade_counts['Book Count'], color='skyblue')

# Add labels and title
plt.xlabel("Decade")
plt.ylabel("Number of Books Published")
plt.title("Books Published by Decade")
plt.xticks(rotation=45)  # Rotate labels for better readability
plt.tight_layout()

# Show the plot
plt.show()

# Average Rating by Genre (Box Plot)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Convert the Spark DataFrame to Pandas for Seaborn plotting
df_genre_ratings = df_avg.select("categories", "avg_rating").toPandas()

# Group genres by frequency and get top 10
top_genres = df_genre_ratings['categories'].value_counts().nlargest(10).index

# Filter the dataframe to include only the top genres
df_top_genres = df_genre_ratings[df_genre_ratings['categories'].isin(top_genres)]

# Plot horizontal box plot with top genres
plt.figure(figsize=(12, 8))
sns.boxplot(x="avg_rating", y="categories", data=df_top_genres, palette="Set2")
plt.title("Rating Distribution by Top 10 Genres")
plt.tight_layout()
plt.show()

#  Rating Distribution (Histogram)

In [None]:
# Convert ratings to Pandas for plotting
ratings = df_avg.select("avg_rating").toPandas()

# Plot histogram
plt.figure(figsize=(10, 6))
plt.hist(ratings['avg_rating'], bins=20, color='lightblue', edgecolor='black')
plt.xlabel("Average Rating")
plt.ylabel("Frequency")
plt.title("Distribution of Average Ratings")
plt.tight_layout()
plt.show()

# Average Rating by Year (Line Plot)

In [None]:
# Converte 'publishedDate' field and to Pandas
df_books_with_year = df_avg_with_date.select("Title", "avg_rating", "publishedDate").toPandas()

# Extract year from the 'publishedDate'
df_books_with_year['Year'] = pd.to_datetime(df_books_with_year['publishedDate'], errors='coerce').dt.year

# Group by year and calculate average rating
avg_ratings_by_year = df_books_with_year.groupby('Year')['avg_rating'].mean().reset_index()

# Plot line plot
plt.figure(figsize=(12, 6))
sns.lineplot(x='Year', y='avg_rating', data=avg_ratings_by_year, marker='o', color='green')
plt.title("Average Rating by Year")
plt.xlabel("Year")
plt.ylabel("Average Rating")
plt.tight_layout()
plt.show()


# Rating Distribution by Top 10 Author (Box Plot)

In [None]:
# Calculate average rating per author
df_author_avg_rating = df_avg.groupBy("authors").agg({"avg_rating": "avg"}).withColumnRenamed("avg(avg_rating)", "avg_author_rating")

# Convert the Spark DataFrame to Pandas for Seaborn plotting
df_author_ratings = df_author_avg_rating.toPandas()

# Sort authors by average rating
df_author_ratings = df_author_ratings.sort_values(by="avg_author_rating", ascending=False)

# Limit to top 10 authors based on average rating
top_authors = df_author_ratings.head(10)

# Plot box plot for ratings by author
plt.figure(figsize=(12, 6))
sns.boxplot(x="authors", y="avg_author_rating", data=top_authors)
plt.xticks(rotation=45, ha='right')  # Rotate labels and align them
plt.title("Rating Distribution by Top 10 Authors", fontsize=14)
plt.xlabel("Authors", fontsize=12)
plt.ylabel("Average Rating", fontsize=12)
plt.tight_layout()
plt.show()


# 📊 Performance Evaluation

### 1. Spark SQL Execution Time

In [None]:
import time

start = time.time()

# Example: Compute average rating per genre
df_avg = df_joined.groupBy("categories").agg(avg("rating_score").alias("avg_rating"))
df_avg.show()

end = time.time()
print(f"⏱ Spark query time: {end - start:.4f} seconds")


### 2. MongoDB Aggregation Execution Time

In [None]:
start = time.time()

pipeline = [
    { "$group": {
        "_id": "$categories",
        "avg_rating": { "$avg": "$review/score" }
    }},
    { "$sort": { "avg_rating": -1 } }
]

results = list(db["ratings"].aggregate(pipeline))

end = time.time()
print(f"⏱ MongoDB aggregation time: {end - start:.4f} seconds")
