# Multimodal Review Summarizer

#### Part I: Data Collection and Preprocessing

In [3]:
# Importing the required libraries
import json
import logging
import os
from datetime import datetime
from urllib.request import urlretrieve

import pandas as pd
from bson.json_util import dumps
from pymongo import MongoClient

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("data_processor.log"),
        logging.StreamHandler(),
    ],
)

In [4]:
# Define a class for Data Processoring
class DataProcessor:
    # Initialize the DataProcessor class
    def __init__(self, db_uri, db_name, data_dir):
        self.client = MongoClient(db_uri)
        self.db = self.client[db_name]
        self.data_dir = data_dir
        logging.getLogger("pymongo").setLevel(logging.WARNING)
        logging.info("Initialized DataProcessor with database: %s", db_name)

    # Define method to get categories with minimum ratings
    def get_categories_with_min_ratings(self, collection_name, min_ratings):
        logging.info("Fetching categories with minimum ratings > %d", min_ratings)
        pipeline = [
            {"$match": {"rating_number": {"$gt": min_ratings}}},
            {"$unwind": "$categories"},
            {"$group": {"_id": "$categories", "count": {"$sum": 1}}},
            {"$sort": {"count": -1}},
        ]

        collection = self.db[collection_name]
        categories = list(collection.aggregate(pipeline))
        logging.info("Found %d categories", len(categories))

        return categories

    # Define method to save data to JSON file
    def save_to_json_file(self, json_data, filename):

        logging.info("Saving data to JSON file: %s", filename)
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        with open(filename, "w") as outfile:
            outfile.write(json_data)

        logging.info("Data saved to %s", filename)

    # Define method to filter meta products
    def filter_meta_products(self, collection_name, min_ratings, category):
        logging.info(
            "Filtering meta products for category '%s' with ratings > %d",
            category,
            min_ratings,
        )

        collection = self.db[collection_name]
        cursor = collection.find(
            {"rating_number": {"$gt": min_ratings}, "categories": {"$in": [category]}}
        )

        filtered_meta = list(cursor)
        logging.info("Filtered %d meta products", len(filtered_meta))

        return filtered_meta

    # Define method to filter reviews
    def filter_reviews(self, collection_name, parent_asins, start_date, end_date):
        logging.info("Filtering reviews between %s and %s", start_date, end_date)
        collection = self.db[collection_name]
        start_timestamp = start_date.timestamp() * 1000
        end_timestamp = end_date.timestamp() * 1000

        cursor = collection.find(
            {
                "parent_asin": {"$in": parent_asins},
                "timestamp": {"$gte": start_timestamp, "$lte": end_timestamp},
            }
        )

        filtered_reviews = list(cursor)
        logging.info("Filtered %d reviews", len(filtered_reviews))

        return filtered_reviews

    # Define method to clean reviews

    def clean_reviews(self, reviews):
        logging.info("Cleaning reviews data")
        reviews_df = pd.DataFrame(reviews)

        # Select relevant columns
        selected_reviews_columns = [
            "parent_asin",
            "rating",
            "title",
            "text",
            "images",
            "user_id",
            "timestamp",
            "verified_purchase",
        ]

        reviews_df = reviews_df[selected_reviews_columns]

        # Remove duplicates
        initial_count = len(reviews_df)
        reviews_df = reviews_df.drop_duplicates(
            subset=["parent_asin", "user_id", "timestamp"]
        )

        logging.info("Removed %d duplicate reviews", initial_count - len(reviews_df))

        # Remove multiple reviews by the same user for the same product
        initial_count = len(reviews_df)
        reviews_df = reviews_df.drop_duplicates(subset=["parent_asin", "user_id"])
        logging.info(
            "Removed %d reviews from the same user for the same product",
            initial_count - len(reviews_df),
        )

        # Count verified and unverified reviews
        verified_counts = reviews_df["verified_purchase"].value_counts()
        verified_reviews = verified_counts.get(True, 0)
        unverified_reviews = verified_counts.get(False, 0)
        logging.info("Number of verified reviews: %d", verified_reviews)
        logging.info("Number of unverified reviews: %d", unverified_reviews)

        # Drop unverified reviews
        initial_count = len(reviews_df)
        reviews_df = reviews_df[reviews_df["verified_purchase"] == True]
        logging.info("Removed %d unverified reviews", initial_count - len(reviews_df))

        # Transform timestamp to datetime
        reviews_df["timestamp"] = pd.to_datetime(reviews_df["timestamp"], unit="ms")

        return reviews_df

    # Define method to save cleaned reviews

    def save_cleaned_reviews(self, reviews_df, filename):
        try:
            logging.info("Saving cleaned reviews to file: %s", filename)

            # Ensure the directory exists
            os.makedirs(os.path.dirname(filename), exist_ok=True)

            # Convert any datetime64[ns] columns to string (e.g., ISO 8601 format)
            reviews_df = reviews_df.copy()
            for col in reviews_df.select_dtypes(include=["datetime64[ns]"]).columns:
                reviews_df[col] = reviews_df[col].dt.strftime("%Y-%m-%d %H:%M:%S")

            # Convert DataFrame to list of dictionaries
            reviews_list = reviews_df.to_dict(orient="records")

            # Save as JSON file
            with open(filename, "w", encoding="utf-8") as f:
                json.dump(reviews_list, f, indent=4, ensure_ascii=False)

            logging.info("Cleaned reviews saved to %s", filename)
        except Exception as e:
            logging.error("Failed to save cleaned reviews: %s", e)
            raise

    # Define method to download images

    def download_images(self, items, img_url_field, output_folder):
        logging.info("Downloading images to folder: %s", output_folder)
        os.makedirs(output_folder, exist_ok=True)
        for item in items:
            for image in item.get("images", []):
                image_url = image[img_url_field]
                filename = image_url.split("/")[-1]
                save_path = os.path.join(output_folder, filename)

                try:
                    urlretrieve(image_url, save_path)
                    logging.info("Downloaded image: %s", filename)
                except Exception as e:
                    logging.error(
                        "Error downloading %s for %s: %s", filename, item["_id"], str(e)
                    )

    # Define method to close MongoDB connection

    def close_connection(self):
        logging.info("Closing database connection")
        self.client.close()
        logging.info("Database connection closed")

In [5]:
# Initialize the processor
processor = DataProcessor(
    db_uri="mongodb://localhost:27017/", db_name="amazon_23", data_dir="data"
)

# Configuration
MIN_RATINGS = 20000
CATEGORY = "Humidifiers"
START_DATE = datetime(2022, 10, 1)
END_DATE = datetime(2023, 10, 31)
META_IMG = "large"
REVIEW_IMG = "medium"

DATA_DIR = f"../data"
META_IMG_DIR = f"{DATA_DIR}/meta_images/{META_IMG}"
REVIEW_IMG_DIR = f"{DATA_DIR}/review_images/{REVIEW_IMG}"

2025-09-17 00:35:55,378 - INFO - Initialized DataProcessor with database: amazon_23


In [6]:
# Step 1: Get categories with minimum ratings
categories = processor.get_categories_with_min_ratings("meta", MIN_RATINGS)
for category in categories:
    logging.info("Category: %s, Count: %d", category["_id"], category["count"])

2025-09-17 00:36:03,627 - INFO - Fetching categories with minimum ratings > 20000
2025-09-17 00:37:15,716 - INFO - Found 578 categories
2025-09-17 00:37:15,732 - INFO - Category: Home & Kitchen, Count: 1244
2025-09-17 00:37:15,734 - INFO - Category: Kitchen & Dining, Count: 498
2025-09-17 00:37:15,735 - INFO - Category: Bedding, Count: 221
2025-09-17 00:37:15,736 - INFO - Category: Storage & Organization, Count: 196
2025-09-17 00:37:15,736 - INFO - Category: Home Décor Products, Count: 139
2025-09-17 00:37:15,738 - INFO - Category: Kitchen Utensils & Gadgets, Count: 129
2025-09-17 00:37:15,739 - INFO - Category: Small Appliances, Count: 94
2025-09-17 00:37:15,739 - INFO - Category: Bath, Count: 87
2025-09-17 00:37:15,740 - INFO - Category: Furniture, Count: 81
2025-09-17 00:37:15,747 - INFO - Category: Sheets & Pillowcases, Count: 71
2025-09-17 00:37:15,749 - INFO - Category: Heating, Cooling & Air Quality, Count: 63
2025-09-17 00:37:15,749 - INFO - Category: Bathroom Accessories, Coun

In [7]:
# Step 2: Filter meta products with minimum ratings and selected category
filtered_meta = processor.filter_meta_products("meta", MIN_RATINGS, CATEGORY)
filtered_meta_asins = [product["parent_asin"] for product in filtered_meta]
logging.info("Number of selected products: %d", len(filtered_meta))

# Save filtered meta products to a file
processor.save_to_json_file(
    dumps(filtered_meta, indent=4), f"{DATA_DIR}/filtered_meta.json"
)

2025-09-17 00:40:31,357 - INFO - Filtering meta products for category 'Humidifiers' with ratings > 20000
2025-09-17 00:42:11,259 - INFO - Filtered 11 meta products
2025-09-17 00:42:11,271 - INFO - Number of selected products: 11
2025-09-17 00:42:11,538 - INFO - Saving data to JSON file: ../data/filtered_meta.json
2025-09-17 00:42:11,725 - INFO - Data saved to ../data/filtered_meta.json


In [8]:
# Step 3: Filter reviews
filtered_reviews = processor.filter_reviews(
    "reviews", filtered_meta_asins, START_DATE, END_DATE
)
logging.info("Number of filtered reviews: %d", len(filtered_reviews))

# Save filtered reviews to a file
processor.save_to_json_file(
    dumps(filtered_reviews, indent=4), f"{DATA_DIR}/filtered_reviews.json"
)

# Clean and save reviews
cleaned_reviews = processor.clean_reviews(filtered_reviews)
processor.save_cleaned_reviews(
    cleaned_reviews, f"{DATA_DIR}/filtered_reviews_cleaned.json"
)

2025-09-17 00:42:41,707 - INFO - Filtering reviews between 2022-10-01 00:00:00 and 2023-10-31 00:00:00
2025-09-17 00:51:18,517 - INFO - Filtered 11552 reviews
2025-09-17 00:51:18,538 - INFO - Number of filtered reviews: 11552
2025-09-17 00:51:24,237 - INFO - Saving data to JSON file: ../data/filtered_reviews.json
2025-09-17 00:51:24,304 - INFO - Data saved to ../data/filtered_reviews.json
2025-09-17 00:51:24,305 - INFO - Cleaning reviews data
2025-09-17 00:51:25,869 - INFO - Removed 112 duplicate reviews
2025-09-17 00:51:25,873 - INFO - Removed 3 reviews from the same user for the same product
2025-09-17 00:51:25,992 - INFO - Number of verified reviews: 10799
2025-09-17 00:51:25,993 - INFO - Number of unverified reviews: 638
2025-09-17 00:51:25,996 - INFO - Removed 638 unverified reviews
2025-09-17 00:51:26,315 - INFO - Saving cleaned reviews to file: ../data/filtered_reviews_cleaned.json
2025-09-17 00:51:27,013 - INFO - Cleaned reviews saved to ../data/filtered_reviews_cleaned.json


In [9]:
# Step 4: Download meta images
processor.download_images(filtered_meta, META_IMG, META_IMG_DIR)

2025-09-17 00:53:07,969 - INFO - Downloading images to folder: ../data/meta_images/large
2025-09-17 00:53:08,358 - INFO - Downloaded image: 418n7FGqEyL._AC_.jpg
2025-09-17 00:53:08,644 - INFO - Downloaded image: 51iTmm7ZTnL._AC_.jpg
2025-09-17 00:53:08,888 - INFO - Downloaded image: 41Rf9AYQvgS._AC_.jpg
2025-09-17 00:53:09,043 - INFO - Downloaded image: 41s3sP4BZVS._AC_.jpg
2025-09-17 00:53:09,271 - INFO - Downloaded image: 51qEKXCFE3L._AC_.jpg
2025-09-17 00:53:09,510 - INFO - Downloaded image: 51KlnnL4b5L._AC_.jpg
2025-09-17 00:53:09,682 - INFO - Downloaded image: 51v1ADwYj-L._AC_.jpg
2025-09-17 00:53:10,115 - INFO - Downloaded image: 51NTOa2p0GL._AC_.jpg
2025-09-17 00:53:10,209 - INFO - Downloaded image: 413A3O1oMML._AC_.jpg
2025-09-17 00:53:10,364 - INFO - Downloaded image: 41HoxISSkOS._AC_.jpg
2025-09-17 00:53:10,526 - INFO - Downloaded image: 31c9DzzJBWS._AC_.jpg
2025-09-17 00:53:11,739 - INFO - Downloaded image: 51359p3gSxL._AC_.jpg
2025-09-17 00:53:11,963 - INFO - Downloaded ima

In [10]:
# Step 5: Download review images
processor.download_images(filtered_reviews, f"{REVIEW_IMG}_image_url", REVIEW_IMG_DIR)

2025-09-17 00:53:38,237 - INFO - Downloading images to folder: ../data/review_images/medium
2025-09-17 00:53:38,516 - INFO - Downloaded image: 41AiZulKABL._SL800_.jpg
2025-09-17 00:53:38,673 - INFO - Downloaded image: 410tQ5qADNL._SL800_.jpg
2025-09-17 00:53:38,827 - INFO - Downloaded image: 51MTMKxaCpL._SL800_.jpg
2025-09-17 00:53:38,925 - INFO - Downloaded image: 71unFm5OeeL._SL800_.jpg
2025-09-17 00:53:39,012 - INFO - Downloaded image: 61EXioQ735L._SL800_.jpg
2025-09-17 00:53:39,619 - INFO - Downloaded image: 61hBOHXHdWL._SL800_.jpg
2025-09-17 00:53:39,789 - INFO - Downloaded image: 71+lr8MBcFL._SL800_.jpg
2025-09-17 00:53:40,074 - INFO - Downloaded image: 613TuOgPMYL._SL800_.jpg
2025-09-17 00:53:40,351 - INFO - Downloaded image: 61VEbC6Cv6L._SL800_.jpg
2025-09-17 00:53:40,633 - INFO - Downloaded image: 71tRu9GKXLL._SL800_.jpg
2025-09-17 00:53:40,721 - INFO - Downloaded image: 51drE+o-5eL._SL800_.jpg
2025-09-17 00:53:40,837 - INFO - Downloaded image: 61dsyEA2h7L._SL800_.jpg
2025-09-

In [11]:
# Close the database connection
processor.close_connection()

2025-09-17 01:05:29,691 - INFO - Closing database connection
2025-09-17 01:05:29,906 - INFO - Database connection closed
