In [None]:
!pip install pandas pyspark sqlalchemy openpyxl



In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, regexp_replace, to_date, month

In [None]:
# Task 1: Data Extraction
# Load dataset from Google Sheets as CSV
sheet_url = "https://docs.google.com/spreadsheets/d/1B2fF-9GKOk3HBb6I40kNnfbaNOaTeDJjhht-iDSz9r0/export?format=csv"
df = pd.read_csv(sheet_url)

In [None]:
# Convert Pandas DataFrame to Spark DataFrame
spark = SparkSession.builder.appName("AmazonReviewETL").getOrCreate()
df_spark = spark.createDataFrame(df)

In [None]:
# Task 2: Data Cleaning
df_spark = df_spark.dropDuplicates()

In [None]:
# Handle missing values
df_spark = df_spark.na.drop(subset=["review_id", "product_id", "review_body", "star_rating"])
df_spark = df_spark.fillna({"helpful_votes": 0, "total_votes": 0})

In [None]:
# Convert data types
df_spark = df_spark.withColumn("star_rating", col("star_rating").cast("int"))
df_spark = df_spark.withColumn("review_date", to_date(col("review_date"), "yyyy-MM-dd"))

In [None]:
# Standardize fields
df_spark = df_spark.withColumn("product_category", lower(trim(col("product_category"))))

In [None]:
# Task 3: Data Transformation
# Normalize text fields
df_spark = df_spark.withColumn("review_body", regexp_replace(lower(col("review_body")), "[^a-zA-Z0-9 ]", ""))
df_spark = df_spark.withColumn("review_headline", regexp_replace(lower(col("review_headline")), "[^a-zA-Z0-9 ]", ""))

In [None]:
df_spark.show(5)

+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+
|review_date|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes| vine|verified_purchase|     review_headline|         review_body|
+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+
| 1995-08-07|         US|   53096398| R61VUUBXYT6EL|042510107X|   3.4724671E7|    Red Storm Rising|           books|          5|            0|          1|false|            false|the single best m...|red storm rising ...|
| 1995-09-29|         US|   53094816| RT94SPZ5CHLK6| 133627659|  3.20981057E8|Criminal Justice ...|           books|

In [None]:
# Extract additional features
df_spark = df_spark.withColumn("review_month", month(col("review_date")))

In [None]:
# Show cleaned and transformed data
df_spark.show(5)

+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+------------+
|review_date|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes| vine|verified_purchase|     review_headline|         review_body|review_month|
+-----------+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+--------------------+--------------------+------------+
| 1995-08-07|         US|   53096398| R61VUUBXYT6EL|042510107X|   3.4724671E7|    Red Storm Rising|           books|          5|            0|          1|false|            false|the single best m...|red storm rising ...|           8|
| 1995-09-29|         US|   53094816| RT94SPZ5CHLK6| 133627659| 

In [None]:
!pip install pymysql



In [None]:
import pandas as pd
from sqlalchemy import create_engine

# Database connection details
DB_HOST = "database-1.cjqu44c44elv.eu-north-1.rds.amazonaws.com"
DB_PORT = "3306"
DB_USER = "DineshAlagesan"
DB_PASS = "Dinesh281602"
DB_NAME = "amazon_reviews"

# Create connection to MySQL or PostgreSQL
engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

# Load cleaned data
df.to_sql(name="reviews", con=engine, if_exists="replace", index=False)

print("✅ Data successfully loaded into AWS RDS!")

✅ Data successfully loaded into AWS RDS!


In [None]:
!pip install mysql-connector-python



In [None]:
import pandas as pd
import mysql.connector

# Google Sheets CSV link
sheet_url = "https://docs.google.com/spreadsheets/d/1B2fF-9GKOk3HBb6I40kNnfbaNOaTeDJjhht-iDSz9r0/export?format=csv"

# Read dataset
df = pd.read_csv(sheet_url)

# MySQL database connection details
DB_HOST = "database-1.cjqu44c44elv.eu-north-1.rds.amazonaws.com"
DB_USER = "DineshAlagesan"
DB_PASS = "Dinesh281602"
DB_NAME = "amazon_reviews"

# Connect to MySQL
conn = mysql.connector.connect(
    host=DB_HOST,
    user=DB_USER,
    password=DB_PASS,
    database=DB_NAME
)

cursor = conn.cursor()

# Create table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS reviews (
    id INT AUTO_INCREMENT PRIMARY KEY,
    product_title TEXT,
    star_rating INT,
    review_date DATE,
    review_body TEXT
);
""")

# Insert data into MySQL
for _, row in df.iterrows():
    cursor.execute("""
        INSERT INTO reviews (product_title, star_rating, review_date, review_body)
        VALUES (%s, %s, %s, %s)
    """, (row["product_title"], row["star_rating"], row["review_date"], row["review_body"]))

conn.commit()
cursor.close()
conn.close()

print("✅ Data loaded successfully into AWS RDS!")


✅ Data loaded successfully into AWS RDS!


In [None]:
!pip install pymysql sqlalchemy pandas



In [None]:
import pandas as pd
from sqlalchemy import create_engine

# ✅ Database connection details (Replace with actual credentials)
DB_USER = "DineshAlagesan"
DB_PASS = "Dinesh281602"
DB_HOST = "database-1.cjqu44c44elv.eu-north-1.rds.amazonaws.com"
DB_PORT = "3306"
DB_NAME = "amazon_reviews"

# ✅ Create MySQL connection
engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

# Define Queries
queries = {
    "Top 10 Products with Most Reviews": """
        SELECT product_title, COUNT(*) AS review_count
        FROM reviews
        GROUP BY product_title
        ORDER BY review_count DESC
        LIMIT 10;
    """,
    "Average Review Ratings Per Month": """
        SELECT product_title, DATE_FORMAT(review_date, '%%Y-%%m') AS review_month,
               AVG(star_rating) AS avg_rating
        FROM reviews
        GROUP BY product_title, review_month
        ORDER BY review_month DESC;
    """,
    "Total Votes Per Product Category": """
        SELECT product_category, SUM(total_votes) AS total_votes
        FROM reviews
        GROUP BY product_category
        ORDER BY total_votes DESC;
    """,
    "Products with 'Awful' in Reviews": """
        SELECT product_title, COUNT(*) AS awful_count
        FROM reviews
        WHERE LOWER(review_body) LIKE '%%awful%%'
           OR LOWER(review_body) LIKE '%%terrible%%'
           OR LOWER(review_body) LIKE '%%bad%%'
           OR LOWER(review_body) LIKE '%%horrible%%'
        GROUP BY product_title
        ORDER BY awful_count DESC
        LIMIT 10;
    """,
    "Products with 'Awesome' in Reviews": """
        SELECT product_title, COUNT(*) AS awesome_count
        FROM reviews
        WHERE LOWER(review_body) LIKE '%%awesome%%'
           OR LOWER(review_body) LIKE '%%fantastic%%'
           OR LOWER(review_body) LIKE '%%excellent%%'
           OR LOWER(review_body) LIKE '%%great%%'
        GROUP BY product_title
        ORDER BY awesome_count DESC
        LIMIT 10;
    """,
    "Most Controversial Reviews": """
        SELECT review_id, product_title, total_votes, helpful_votes,
               IFNULL((helpful_votes / NULLIF(total_votes, 0)) * 100, 0) AS helpful_percentage
        FROM reviews
        WHERE total_votes >= 20  -- Reduced threshold from 50 to 20
        ORDER BY helpful_percentage ASC  -- Sort by least helpful first
        LIMIT 10;
    """,
    "Most Reviewed Product Per Year": """
        SELECT YEAR(review_date) AS review_year, product_title, COUNT(*) AS review_count
        FROM reviews
        GROUP BY review_year, product_title
        ORDER BY review_year DESC, review_count DESC;
    """,
    "Users Who Wrote the Most Reviews": """
        SELECT COALESCE(CAST(customer_id AS CHAR), 'Unknown') AS customer_id, COUNT(*) AS review_count
        FROM reviews
        GROUP BY customer_id
        ORDER BY review_count DESC
        LIMIT 10;
    """
}

# ✅ Run Queries and Display Results
for name, query in queries.items():
    try:
        df = pd.read_sql(query, engine)
        print(f"\n🔹 {name}")
        if df.empty:
            print("⚠ No results found.")
        else:
            print(df)
    except Exception as e:
        print(f"❌ Error in {name}: {e}")


🔹 Top 10 Products with Most Reviews
                                       product_title  review_count
0   Ufo Retrievals: The Recovery of Alien Spacecraft             4
1              Blue Highways: A Journey into America             2
2  Monsters Of The Sea: The History, Natural Hist...             2
3             Essential Client Server Survival Guide             2
4        The Hikers Guide to O'Ahu (A Kolowalu Book)             2
5                               Palimpsest: A Memoir             2
6                                      The Gun Ketch             2
7                            To Live and Die in L.A.             2
8                              The Big Book of Death             2
9               The Celestine Prophecy: An Adventure             2

🔹 Average Review Ratings Per Month
                                         product_title review_month  \
0           The Roaches Have No King (High Risk Books)      1995-11   
1                                      The Doors

In [None]:
from sqlalchemy import create_engine, text
import pymysql

# Database credentials
DB_HOST = "database-1.cjqu44c44elv.eu-north-1.rds.amazonaws.com"
DB_USER = "DineshAlagesan"
DB_PASSWORD = "Dinesh281602"
DB_NAME = "amazon_reviews"

# Create database connection
engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}")
connection = engine.connect()

# Ensure 'review_date' is a DATE column
alter_column_query = """
ALTER TABLE reviews
MODIFY COLUMN review_date DATE;
"""
try:
    connection.execute(text(alter_column_query))
    print("✅ Column 'review_date' converted to DATE type successfully.")
except Exception as e:
    print(f"⚠ Error modifying column: {e}")

# Index creation with existence check
index_queries = [
    ("idx_product_title", "product_title(255)"),
    ("idx_review_date", "review_date")
]

for index_name, column in index_queries:
    # Check if index already exists
    check_index_query = f"""
    SELECT COUNT(1)
    FROM information_schema.statistics
    WHERE table_schema = DATABASE()
      AND table_name = 'reviews'
      AND index_name = '{index_name}';
    """
    index_exists = connection.execute(text(check_index_query)).scalar()

    if index_exists:
        print(f"ℹ Index '{index_name}' already exists. Skipping creation.")
    else:
        create_query = f"CREATE INDEX {index_name} ON reviews ({column});"
        try:
            connection.execute(text(create_query))
            print(f"✅ Index '{index_name}' created successfully.")
        except Exception as e:
            print(f"⚠ Error creating index '{index_name}': {e}")

# Check if table is already partitioned
check_partition_query = """
SELECT COUNT(*)
FROM information_schema.partitions
WHERE table_schema = DATABASE() AND table_name = 'reviews';
"""
partition_count = connection.execute(text(check_partition_query)).scalar()

if partition_count > 1:  # Already partitioned
    print("✅ Table 'reviews' is already partitioned. Skipping partitioning.")
else:
    partition_query = """
    ALTER TABLE reviews
    PARTITION BY RANGE( YEAR(review_date) ) (
        PARTITION p_before_2000 VALUES LESS THAN (2000),
        PARTITION p_2000_2010 VALUES LESS THAN (2010),
        PARTITION p_2010_2020 VALUES LESS THAN (2020),
        PARTITION p_2020_present VALUES LESS THAN MAXVALUE
    );
    """
    try:
        connection.execute(text(partition_query))
        print("✅ Partitioning applied successfully.")
    except Exception as e:
        print(f"⚠ Error applying partitioning: {e}")

# Close connection
connection.close()
print("✅ Index creation and partitioning process completed.")


✅ Column 'review_date' converted to DATE type successfully.
✅ Index 'idx_product_title' created successfully.
✅ Index 'idx_review_date' created successfully.
✅ Partitioning applied successfully.
✅ Index creation and partitioning process completed.
