# Task 3 ‚Äî PostgreSQL Storage Layer (Bank Reviews)

This notebook implements Task 3 end-to-end using imports from `src/`.

## Inputs
### Task 1 (cleaned)
Expected columns:
- `review, rating, date, bank, source`

### Task 2A (theme examples ‚Äî QA only)
File: `data/processed/task2/task2_theme_examples.csv`  
Columns:
- `bank, theme, rating, date, sentiment_label, review`

This file is **not** used to update the DB (it‚Äôs for quick QA/sanity only).

### Task 2B (scored ‚Äî main enrichment source)
File: `data/processed/task2/reviews_task2_scored.csv`  
Columns:
- `review,rating,date,bank,source,review_id,p_pos,p_neg,sentiment_score,sentiment_label,theme_primary,themes`

## Approach
Because Task 1 has no `review_id`, we match Task 1 ‚Üî Task 2 rows using a deterministic `review_hash`
computed from: `bank + review_text + date + rating + source`.

## Outputs / Requirements
- Normalized schema applied from `sql/schema.sql`
- Idempotent load:
  - Task 1 inserts by `review_hash` with `ON CONFLICT DO NOTHING`
  - Task 2 updates enrichment fields by `review_hash`
- Theme normalization via `themes` and `review_themes`
- Verification queries: total reviews (>1000), reviews per bank, average rating per bank, enrichment coverage, data quality checks


In [1]:
import sys
from pathlib import Path

NOTEBOOK_DIR = Path.cwd()

# Detect repo root by searching for "src" in current or parent directory
candidates = [NOTEBOOK_DIR, NOTEBOOK_DIR.parent]
REPO_ROOT = None
for c in candidates:
    if (c / "src").exists():
        REPO_ROOT = c
        break

if REPO_ROOT is None:
    raise RuntimeError(
        "Could not locate repo root (folder containing 'src'). "
        "Run from repo root or from notebooks/."
    )

if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

print("Repo root:", REPO_ROOT)
print("sys.path[0]:", sys.path[0])


Repo root: d:\Python\Week 12\Customer-Experience-Analytics-for-Fintech-Apps-Updated
sys.path[0]: d:\Python\Week 12\Customer-Experience-Analytics-for-Fintech-Apps-Updated


## Load .envand create engine

In [2]:
from src.bank_reviews.db.engine import get_engine
import os
from dotenv import load_dotenv

load_dotenv()

DATABASE_URL = os.getenv("DATABASE_URL", "").strip()
TASK1_CLEAN_CSV = os.getenv(
    "TASK1_CLEAN_CSV", "data/processed/reviews_task1_clean.csv").strip()
TASK2_SCORED_CSV = os.getenv(
    "TASK2_SCORED_CSV", "data/processed/task2/reviews_task2_scored.csv").strip()
TASK2_THEME_EXAMPLES = os.getenv(
    "TASK2_THEME_EXAMPLES", "data/processed/task2/task2_theme_examples.csv").strip()
SCHEMA_SQL_PATH = os.getenv("SCHEMA_SQL_PATH", "sql/schema.sql").strip()

print("DATABASE_URL set:", bool(DATABASE_URL))
print("TASK1_CLEAN_CSV:", TASK1_CLEAN_CSV)
print("TASK2_SCORED_CSV:", TASK2_SCORED_CSV)
print("TASK2_THEME_EXAMPLES:", TASK2_THEME_EXAMPLES)
print("SCHEMA_SQL_PATH:", SCHEMA_SQL_PATH)

assert DATABASE_URL, "DATABASE_URL must be set in .env for src.bank_reviews.db.engine.get_engine()"

engine = get_engine()

DATABASE_URL set: True
TASK1_CLEAN_CSV: data/processed/reviews_task1_clean.csv
TASK2_SCORED_CSV: data/processed/task2/reviews_task2_scored.csv
TASK2_THEME_EXAMPLES: data/processed/task2/task2_theme_examples.csv
SCHEMA_SQL_PATH: sql/schema.sql


## Apply schema ( sql/schema.sql) safely

In [3]:
from sqlalchemy import text

if REPO_ROOT is None:
    raise RuntimeError(
        "REPO_ROOT is not set. Run the repo root detection cell before this cell."
    )

schema_path = REPO_ROOT / SCHEMA_SQL_PATH
assert schema_path.exists(), f"Schema SQL not found: {schema_path.resolve()}"

schema_sql = schema_path.read_text(encoding="utf-8")

with engine.begin() as conn:
    conn.execute(text(schema_sql))

print("Schema applied successfully.")

Schema applied successfully.


## Run Task 3 pipeline using your existing loader functions

In [4]:
from src.bank_reviews.db.load import load_task1_clean_csv, update_from_task2_scored_csv

if REPO_ROOT is None:
	raise RuntimeError(
		"REPO_ROOT is not set. Run the repo root detection cell before this cell."
	)

task1_path = REPO_ROOT / TASK1_CLEAN_CSV
task2_path = REPO_ROOT / TASK2_SCORED_CSV

assert task1_path.exists(), f"Task1 CSV not found: {task1_path.resolve()}"
assert task2_path.exists(), f"Task2 CSV not found: {task2_path.resolve()}"

out1 = load_task1_clean_csv(engine, task1_path, min_rows_required=400)
out2 = update_from_task2_scored_csv(engine, task2_path, load_theme_links=True)

out1, out2

({'task1_rows': 1800,
  'task1_insert_attempted': 1800,
  'task1_insert_rowcount_reported': 0,
  'total_reviews_in_db': 1786},
 {'task2_rows': 1800,
  'update_rowcount_reported': 1800,
  'task2_missing_hashes_in_db': 0,
  'themes_seen_insert_attempt': 6,
  'review_theme_links_inserted_rowcount_reported': 1794,
  'total_reviews_in_db': 1786,
  'reviews_with_any_enrichment': 1786})

Your error means `REPO_ROOT` is `None` when you do `REPO_ROOT / ...`. Run the repo‚Äëroot detection cell first, or add a safe fallback before any path joins.



In [5]:
# ...existing code...
if REPO_ROOT is None:
    NOTEBOOK_DIR = Path.cwd()
    candidates = [NOTEBOOK_DIR, NOTEBOOK_DIR.parent]
    for c in candidates:
        if (c / "src").exists():
            REPO_ROOT = c
            break
    if REPO_ROOT is None:
        raise RuntimeError("REPO_ROOT is not set. Run the repo root detection cell before this cell.")
# ...existing code...



Re-run the detection cell, then re-run the failing cell.

## KPI verification queries

In [6]:
import pandas as pd
from sqlalchemy import text

with engine.begin() as conn:
    total_reviews = conn.execute(
        text("SELECT COUNT(*) FROM reviews;")).scalar_one()

    per_bank = conn.execute(text("""
        SELECT b.bank_name, COUNT(*) AS n_reviews
        FROM reviews r
        JOIN banks b ON b.bank_id = r.bank_id
        GROUP BY b.bank_name
        ORDER BY n_reviews DESC;
    """)).mappings().all()

    avg_rating = conn.execute(text("""
        SELECT b.bank_name, AVG(r.rating)::numeric(10,3) AS avg_rating
        FROM reviews r
        JOIN banks b ON b.bank_id = r.bank_id
        GROUP BY b.bank_name
        ORDER BY avg_rating DESC;
    """)).mappings().all()

    coverage = conn.execute(text("""
        SELECT
          COUNT(*) FILTER (WHERE sentiment_label IS NOT NULL) AS has_sentiment_label,
          COUNT(*) FILTER (WHERE sentiment_score IS NOT NULL) AS has_sentiment_score,
          COUNT(*) FILTER (WHERE theme_primary IS NOT NULL) AS has_theme_primary,
          COUNT(*) AS total
        FROM reviews;
    """)).mappings().one()

print("TOTAL REVIEWS:", total_reviews)
print("KPI > 1000:", total_reviews > 1000)

print("\nReviews per bank:")
display(pd.DataFrame(per_bank))

print("\nAvg rating per bank:")
display(pd.DataFrame(avg_rating))

print("\nEnrichment coverage:")
coverage

TOTAL REVIEWS: 1786
KPI > 1000: True

Reviews per bank:


Unnamed: 0,bank_name,n_reviews
0,BOA,599
1,DASHEN,599
2,CBE,588



Avg rating per bank:


Unnamed: 0,avg_rating,bank_name
0,4.048,DASHEN
1,4.043,CBE
2,3.242,BOA



Enrichment coverage:


{'has_sentiment_label': 1786, 'has_sentiment_score': 1786, 'has_theme_primary': 1786, 'total': 1786}

## Data quality checks (duplicates/orphans/nulls)

In [7]:
with engine.begin() as conn:
    dup_hash_groups = conn.execute(text("""
        SELECT COUNT(*)
        FROM (
          SELECT review_hash, COUNT(*)
          FROM reviews
          GROUP BY review_hash
          HAVING COUNT(*) > 1
        ) t;
    """)).scalar_one()

    orphan_links = conn.execute(text("""
        SELECT COUNT(*)
        FROM review_themes rt
        LEFT JOIN reviews r ON r.review_id = rt.review_id
        LEFT JOIN themes  t ON t.theme_id = rt.theme_id
        WHERE r.review_id IS NULL OR t.theme_id IS NULL;
    """)).scalar_one()

    null_bank = conn.execute(
        text("SELECT COUNT(*) FROM reviews WHERE bank_id IS NULL;")).scalar_one()
    null_text = conn.execute(text("""
        SELECT COUNT(*)
        FROM reviews
        WHERE review_text IS NULL OR length(trim(review_text)) = 0;
    """)).scalar_one()

print("Duplicate review_hash groups:", dup_hash_groups)
print("Orphan review_themes rows:", orphan_links)
print("Null bank_id rows:", null_bank)
print("Null/empty review_text rows:", null_text)

Duplicate review_hash groups: 0
Orphan review_themes rows: 0
Null bank_id rows: 0
Null/empty review_text rows: 0


## Quick sample (sanity view)

In [8]:
with engine.begin() as conn:
    sample = conn.execute(text("""
        SELECT
          r.review_id, b.bank_name, r.rating, r.review_date, r.source,
          r.sentiment_label, r.sentiment_score, r.theme_primary,
          left(r.review_text, 180) AS review_preview
        FROM reviews r
        JOIN banks b ON b.bank_id = r.bank_id
        ORDER BY r.review_date DESC NULLS LAST, r.review_id DESC
        LIMIT 15;
    """)).mappings().all()

display(pd.DataFrame(sample))

Unnamed: 0,bank_name,rating,review_date,review_id,review_preview,sentiment_label,sentiment_score,source,theme_primary
0,BOA,1,2026-02-17,602,it crush every time you opend it it is hard to...,NEGATIVE,-0.998586,Google Play,OTHER
1,BOA,5,2026-02-17,601,Good,POSITIVE,0.999632,Google Play,OTHER
2,CBE,5,2026-02-17,4,goodüëç,NEGATIVE,-0.394113,Google Play,OTHER
3,CBE,5,2026-02-17,3,I like it very much.Thank you,POSITIVE,0.999746,Google Play,OTHER
4,CBE,5,2026-02-17,2,it's nice app for me,POSITIVE,0.999668,Google Play,OTHER
5,CBE,5,2026-02-17,1,"Great App!!! Some times it may become busy,but...",POSITIVE,0.986176,Google Play,OTHER
6,DASHEN,2,2026-02-16,1201,Mostly slow or doesn't open altogether. A clum...,NEGATIVE,-0.999505,Google Play,OTHER
7,BOA,5,2026-02-16,603,it is a best app,POSITIVE,0.999634,Google Play,OTHER
8,CBE,5,2026-02-16,13,it a best app,POSITIVE,0.999679,Google Play,OTHER
9,CBE,5,2026-02-16,12,technitional,NEGATIVE,-0.969318,Google Play,OTHER


## load Task 2 theme examples

In [9]:
if REPO_ROOT is None:
    raise RuntimeError(
        "REPO_ROOT is not set. Run the repo root detection cell before this cell."
    )

examples_path = REPO_ROOT / TASK2_THEME_EXAMPLES

if examples_path.exists():
    ex = pd.read_csv(examples_path)
    print("theme_examples rows:", len(ex))
    display(ex.head(20))

    if "theme" in ex.columns:
        display(ex["theme"].value_counts().head(25))
else:
    print("Not found (skipping):", examples_path.resolve())

theme_examples rows: 51


Unnamed: 0,bank,theme,rating,date,sentiment_label,review
0,CBE,OTHER,5,2026-02-17,POSITIVE,"Great App!!! Some times it may become busy,but..."
1,CBE,OTHER,5,2026-02-17,POSITIVE,it's nice app for me
2,CBE,OTHER,5,2026-02-17,POSITIVE,I like it very much.Thank you
3,CBE,STABILITY_BUGS,1,2026-02-14,NEGATIVE,"since the application updated, i couldn't use ..."
4,CBE,UX_UI,5,2026-02-12,POSITIVE,"CBE Mobile Banking: Easy to Use! Fast, Efficie..."
5,CBE,STABILITY_BUGS,2,2026-02-12,NEGATIVE,update error waiting the next update
6,CBE,STABILITY_BUGS,1,2026-02-10,NEGATIVE,the latest version is worse. not working at all
7,CBE,UX_UI,5,2026-02-06,POSITIVE,its great and easy to use app i appreciate
8,CBE,SUPPORT_SERVICE,1,2026-02-02,NEGATIVE,The previous version of the CBE application wa...
9,CBE,SUPPORT_SERVICE,4,2026-01-30,NEGATIVE,"very easy, helpful and safe for everyday life ..."


theme
OTHER              9
STABILITY_BUGS     9
UX_UI              9
ACCESS_AUTH        9
TXN_RELIABILITY    9
SUPPORT_SERVICE    6
Name: count, dtype: int64

## Troubleshooting: diagnose Task 2 hash mismatches 

In [11]:
import pandas as pd
from src.bank_reviews.db.load import _make_review_hash

t1 = pd.read_csv(task1_path)
t2 = pd.read_csv(task2_path)

t1["review"] = t1["review"].astype(str).str.strip()
t2["review"] = t2["review"].astype(str).str.strip()

t1["date"] = pd.to_datetime(t1["date"], errors="coerce").dt.date
t2["date"] = pd.to_datetime(t2["date"], errors="coerce").dt.date

t1["review_hash"] = t1.apply(lambda r: _make_review_hash(
    r["bank"], r["review"], r["date"], r["rating"], r["source"]), axis=1)
t2["review_hash"] = t2.apply(lambda r: _make_review_hash(
    r["bank"], r["review"], r["date"], r["rating"], r["source"]), axis=1)

missing = t2[~t2["review_hash"].isin(set(t1["review_hash"]))].copy()

print("Unmatched Task2 rows (hash not found in Task1):", len(missing))
display(missing[["bank", "rating", "date", "source", "review"]].head(20))

Unmatched Task2 rows (hash not found in Task1): 0


Unnamed: 0,bank,rating,date,source,review


## Final run summary (for README / submission)

In [12]:
summary = {"task1_load": out1, "task2_update": out2}
summary

{'task1_load': {'task1_rows': 1800,
  'task1_insert_attempted': 1800,
  'task1_insert_rowcount_reported': 0,
  'total_reviews_in_db': 1786},
 'task2_update': {'task2_rows': 1800,
  'update_rowcount_reported': 1800,
  'task2_missing_hashes_in_db': 0,
  'themes_seen_insert_attempt': 6,
  'review_theme_links_inserted_rowcount_reported': 1794,
  'total_reviews_in_db': 1786,
  'reviews_with_any_enrichment': 1786}}