In [None]:
!pip install -r ../requirements.txt -q


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.3[0m[39;49m -> [0m[32;49m26.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [7]:
import pandas as pd
import numpy as np
import os

In [14]:
# --- EXTRACT: Load CSV with error handling ---
CSV_PATH = "../data/reviews.csv"

try:
    if not os.path.exists(CSV_PATH):
        raise FileNotFoundError(f"CSV file not found at: {CSV_PATH}")
    dataset = pd.read_csv(CSV_PATH)
    print(f"Successfully loaded {len(dataset)} rows from {CSV_PATH}")
except FileNotFoundError as e:
    print(f"ERROR: {e}")
    raise
except pd.errors.ParserError as e:
    print(f"ERROR: Failed to parse CSV — {e}")
    raise
except Exception as e:
    print(f"ERROR: Unexpected error reading CSV — {e}")
    raise

Successfully loaded 850 rows from ../data/reviews.csv


In [9]:
print("=" * 60)
print("1. NULL VALUES")
print("=" * 60)
print(f"\nTotal rows: {len(dataset)}")
print(f"\nNull counts per column:")
null_counts = dataset.isnull().sum()
print(null_counts[null_counts > 0] if null_counts.sum() > 0 else "No null values found.")
print(f"\nTotal nulls: {dataset.isnull().sum().sum()}")

print("\n" + "=" * 60)
print("2. DUPLICATE ENTRIES")
print("=" * 60)
full_dupes = dataset.duplicated().sum()
print(f"\nFully duplicate rows: {full_dupes}")
id_dupes = dataset.duplicated(subset=["review_id"]).sum()
print(f"Duplicate review_ids: {id_dupes}")
if id_dupes > 0:
    dup_ids = dataset[dataset.duplicated(subset=["review_id"], keep=False)].sort_values("review_id")
    print(dup_ids[["review_id", "product_id", "customer_id", "rating", "review_date"]])

print("\n" + "=" * 60)
print("3. TYPE MISMATCHES")
print("=" * 60)
print(f"\nColumn dtypes:")
print(dataset.dtypes)

# Check specific columns for unexpected types
print("\n--- Checking for type issues ---")
# Price should be numeric
non_numeric_price = pd.to_numeric(dataset["price"], errors="coerce").isna() & dataset["price"].notna()
print(f"Non-numeric price values: {non_numeric_price.sum()}")
if non_numeric_price.sum() > 0:
    print(dataset.loc[non_numeric_price, ["review_id", "price"]])

# Rating should be numeric (1-5)
non_numeric_rating = pd.to_numeric(dataset["rating"], errors="coerce").isna() & dataset["rating"].notna()
print(f"Non-numeric rating values: {non_numeric_rating.sum()}")
if non_numeric_rating.sum() > 0:
    print(dataset.loc[non_numeric_rating, ["review_id", "rating"]])

# customer_age should be numeric
non_numeric_age = pd.to_numeric(dataset["customer_age"], errors="coerce").isna() & dataset["customer_age"].notna()
print(f"Non-numeric customer_age values: {non_numeric_age.sum()}")
if non_numeric_age.sum() > 0:
    print(dataset.loc[non_numeric_age, ["review_id", "customer_age"]])

# review_date should be parseable as date
invalid_dates = pd.to_datetime(dataset["review_date"], errors="coerce").isna() & dataset["review_date"].notna()
print(f"Invalid date values: {invalid_dates.sum()}")
if invalid_dates.sum() > 0:
    print(dataset.loc[invalid_dates, ["review_id", "review_date"]])

# verified_purchase should be 0/1
print(f"Unique verified_purchase values: {dataset['verified_purchase'].unique()}")

# helpful_votes should be numeric
non_numeric_votes = pd.to_numeric(dataset["helpful_votes"], errors="coerce").isna() & dataset["helpful_votes"].notna()
print(f"Non-numeric helpful_votes values: {non_numeric_votes.sum()}")
if non_numeric_votes.sum() > 0:
    print(dataset.loc[non_numeric_votes, ["review_id", "helpful_votes"]])

1. NULL VALUES

Total rows: 850

Null counts per column:
No null values found.

Total nulls: 0

2. DUPLICATE ENTRIES

Fully duplicate rows: 0
Duplicate review_ids: 0

3. TYPE MISMATCHES

Column dtypes:
review_id              int64
product_id             int64
product_name             str
brand                    str
category                 str
price                float64
customer_id            int64
customer_name            str
customer_email           str
customer_age           int64
customer_country         str
customer_city            str
rating                 int64
review_title             str
review_text              str
review_date              str
verified_purchase      int64
helpful_votes          int64
dtype: object

--- Checking for type issues ---
Non-numeric price values: 0
Non-numeric rating values: 0
Non-numeric customer_age values: 0
Invalid date values: 0
Unique verified_purchase values: [0 1]
Non-numeric helpful_votes values: 0


In [15]:
print(f"Shape before cleaning: {dataset.shape}")

# --- 1. Handle Missing Values ---
# Fill missing text fields with empty string, numeric with 0
text_cols = dataset.select_dtypes(include=["object", "string"]).columns
num_cols = dataset.select_dtypes(include=["number"]).columns
dataset[text_cols] = dataset[text_cols].fillna("")
dataset[num_cols] = dataset[num_cols].fillna(0)
print(f"[1] Missing values handled. Remaining nulls: {dataset.isnull().sum().sum()}")

# --- 2. Normalize Text Fields ---
# Strip whitespace, normalize unicode dashes/special chars, lowercase where appropriate
import unicodedata

def normalize_text(s):
    """Normalize unicode characters (e.g. en-dash to hyphen) and strip whitespace."""
    if isinstance(s, str):
        s = unicodedata.normalize("NFKC", s)  # normalize unicode chars
        s = s.strip()
    return s

# Apply normalization to all text columns
for col in text_cols:
    dataset[col] = dataset[col].apply(normalize_text)

# Lowercase specific categorical/identifier text fields (not free-text like review_text/title)
normalize_lower_cols = ["customer_email"]
for col in normalize_lower_cols:
    if col in dataset.columns:
        dataset[col] = dataset[col].str.lower()

# Title-case name fields
name_cols = ["product_name", "brand", "customer_name", "customer_country", "customer_city", "category"]
for col in name_cols:
    if col in dataset.columns:
        dataset[col] = dataset[col].str.strip().str.title()

print(f"[2] Text fields normalized (unicode, whitespace, casing).")

# --- 3. Parse and Validate Date Formats ---
dataset["review_date"] = pd.to_datetime(dataset["review_date"], errors="coerce")
invalid_date_count = dataset["review_date"].isna().sum()
if invalid_date_count > 0:
    print(f"[3] WARNING: {invalid_date_count} rows with unparseable dates (set to NaT).")
else:
    print(f"[3] All review_date values parsed successfully. Dtype: {dataset['review_date'].dtype}")

# --- 4. Remove Duplicates ---
before = len(dataset)
dataset = dataset.drop_duplicates()
after_full = len(dataset)
dataset = dataset.drop_duplicates(subset=["review_id"], keep="first")
after_id = len(dataset)
print(f"[4] Duplicates removed: {before - after_full} full duplicates, {after_full - after_id} duplicate review_ids.")

print(f"\nShape after cleaning: {dataset.shape}")
print(f"\nCleaned dtypes:\n{dataset.dtypes}")

Shape before cleaning: (850, 18)
[1] Missing values handled. Remaining nulls: 0
[2] Text fields normalized (unicode, whitespace, casing).
[3] All review_date values parsed successfully. Dtype: datetime64[us]
[4] Duplicates removed: 0 full duplicates, 0 duplicate review_ids.

Shape after cleaning: (850, 18)

Cleaned dtypes:
review_id                     int64
product_id                    int64
product_name                    str
brand                           str
category                        str
price                       float64
customer_id                   int64
customer_name                   str
customer_email                  str
customer_age                  int64
customer_country                str
customer_city                   str
rating                        int64
review_title                    str
review_text                     str
review_date          datetime64[us]
verified_purchase             int64
helpful_votes                 int64
dtype: object


In [16]:
# === TRANSFORMATION: Calculate Sentiment Scores ===
from textblob import TextBlob

def get_sentiment(text):
    """Return polarity score in [-1.0, 1.0] using TextBlob."""
    if not text or not isinstance(text, str):
        return 0.0
    return TextBlob(text).sentiment.polarity

# Apply sentiment analysis on review_text
dataset["sentiment_score"] = dataset["review_text"].apply(get_sentiment)

print("Sentiment score statistics:")
print(dataset["sentiment_score"].describe())
print(f"\nSample rows:")
print(dataset[["review_id", "product_name", "rating", "sentiment_score", "review_text"]].head(10).to_string())

Sentiment score statistics:
count    850.000000
mean       0.336828
std        0.221072
min       -0.375000
25%        0.350000
50%        0.412500
75%        0.458333
max        0.550000
Name: sentiment_score, dtype: float64

Sample rows:
   review_id      product_name  rating  sentiment_score                                                                                                                                review_text
0     900001     Beard Trimmer       3         0.458333                                                                     Met my needs. The irritation is fine, but the results could be better.
1     900002   Design Handbook       3         0.354167                                                        It’s okay overall. The organization is fine, but the writing style could be better.
2     900003       Moisturizer       5         0.460000        Exceeded my expectations. Packaging is excellent and the texture is better than expected. Shipping was fast and 

In [17]:
# === TRANSFORMATION: Rolling Average Sentiment per Product ===

# Sort by product and date for proper rolling calculation
dataset = dataset.sort_values(["product_id", "review_date"]).reset_index(drop=True)

# Calculate rolling average sentiment per product (window=3, min_periods=1)
WINDOW_SIZE = 3

dataset["rolling_avg_sentiment"] = (
    dataset.groupby("product_id")["sentiment_score"]
    .transform(lambda x: x.rolling(window=WINDOW_SIZE, min_periods=1).mean())
)

print(f"Rolling average sentiment (window={WINDOW_SIZE}) calculated per product.\n")

# Show a sample for one product
sample_product = dataset["product_name"].value_counts().index[0]
sample = dataset[dataset["product_name"] == sample_product][
    ["review_id", "product_name", "review_date", "rating", "sentiment_score", "rolling_avg_sentiment"]
]
print(f"Sample: '{sample_product}' ({len(sample)} reviews)")
print(sample.to_string(index=False))

print(f"\n--- Summary statistics for rolling_avg_sentiment ---")
print(dataset["rolling_avg_sentiment"].describe())

Rolling average sentiment (window=3) calculated per product.

Sample: 'Camping Lantern' (27 reviews)
 review_id    product_name review_date  rating  sentiment_score  rolling_avg_sentiment
    900101 Camping Lantern  2024-02-27       5         0.350000               0.350000
    900381 Camping Lantern  2024-05-03       5         0.460000               0.405000
    900768 Camping Lantern  2024-05-08       3         0.458333               0.422778
    900039 Camping Lantern  2024-06-02       3         0.422619               0.446984
    900106 Camping Lantern  2024-07-14       5         0.438095               0.439683
    900832 Camping Lantern  2024-08-12       5         0.550000               0.470238
    900259 Camping Lantern  2024-09-11       5         0.460000               0.482698
    900223 Camping Lantern  2024-11-06       5         0.412500               0.474167
    900779 Camping Lantern  2024-11-09       4         0.412500               0.428333
    900837 Camping Lantern  2

In [18]:
# === LOAD: Insert cleaned data into SQLite database ===
import sqlite3

DB_PATH = "../data/reviews_db.sqlite"

try:
    # --- 1. Write the full cleaned reviews table ---
    conn = sqlite3.connect(DB_PATH)

    # Convert review_date to string for SQLite storage
    load_df = dataset.copy()
    load_df["review_date"] = load_df["review_date"].dt.strftime("%Y-%m-%d")

    load_df.to_sql("reviews", conn, if_exists="replace", index=False)
    print(f"[LOAD] 'reviews' table: {len(load_df)} rows written.")

    # --- 2. Create product_rolling_sentiment table (schema matches app.py API) ---
    # API expects: product_id, product_name, rating, rolling_average_sentiment, date
    rolling_df = load_df[["product_id", "product_name", "rating",
                          "rolling_avg_sentiment", "review_date"]].copy()
    rolling_df = rolling_df.rename(columns={
        "rolling_avg_sentiment": "rolling_average_sentiment",
        "review_date": "date"
    })

    rolling_df.to_sql("product_rolling_sentiment", conn, if_exists="replace", index=False)
    print(f"[LOAD] 'product_rolling_sentiment' table: {len(rolling_df)} rows written.")

    # --- 3. Verify tables ---
    tables = pd.read_sql("SELECT name FROM sqlite_master WHERE type='table'", conn)
    print(f"\nTables in database: {tables['name'].tolist()}")

    # Verify schema of product_rolling_sentiment
    schema = pd.read_sql("PRAGMA table_info(product_rolling_sentiment)", conn)
    print(f"\nproduct_rolling_sentiment schema:")
    print(schema[["name", "type"]].to_string(index=False))

    # Quick row count check
    for tbl in tables["name"]:
        count = pd.read_sql(f"SELECT COUNT(*) as cnt FROM {tbl}", conn).iloc[0, 0]
        print(f"  {tbl}: {count} rows")

    conn.close()
    print(f"\nDatabase saved to: {os.path.abspath(DB_PATH)}")

except sqlite3.Error as e:
    print(f"ERROR: SQLite error — {e}")
    raise
except Exception as e:
    print(f"ERROR: Unexpected error during LOAD — {e}")
    raise

[LOAD] 'reviews' table: 850 rows written.
[LOAD] 'product_rolling_sentiment' table: 850 rows written.

Tables in database: ['reviews', 'product_rolling_sentiment']

product_rolling_sentiment schema:
                     name    type
               product_id INTEGER
             product_name    TEXT
                   rating INTEGER
rolling_average_sentiment    REAL
                     date    TEXT
  reviews: 850 rows
  product_rolling_sentiment: 850 rows

Database saved to: /workspaces/Sprints-capstone-project/data/reviews_db.sqlite
