<a href="https://colab.research.google.com/github/JuanPicUNT/JuanPic_DTSC3020_Fall2025/blob/main/Assignment5_Ch_10%2611%3Cjap0706%3E.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment5: CRM Cleanup @ **DalaShop**
*Files (Ch.10), Exceptions (Ch.10), Unit Tests (Ch.11), and Regular Expressions*  
.....

**Total: 3 points**  (Two questions, 1.5 pts each)  

> This assignment is scenario-based and aligned with Python Crash Course Ch.10 (files & exceptions), Ch.11 (unit testing with `unittest`), and Regular Expressions.

## Scenario
You are a data intern at an online retailer called **DalaShop**.  
Sales exported a **raw contacts** file from the CRM. It contains customer names, emails, and phone numbers, but the formatting is messy and some emails are invalid.  
Your tasks:

1. **Clean** the contacts (Files + Exceptions + Regex).  
2. **Write unit tests** to make sure your helper functions work correctly and keep working in the future.

## Data file (given by the company): `contacts_raw.txt`
Use this exact sample data (you may extend it for your own testing, but do **not** change it when submitting).  
Run the next cell once to create the file beside your notebook.

In [None]:
# Create the provided company dataset file
with open("contacts_raw.txt", "w", encoding="utf-8") as f:
    f.write('Alice Johnson <alice@example.com> , +1 (469) 555-1234\nBob Roberts <bob[at]example.com> , 972-555-777\nSara M. , sara@mail.co , 214 555 8888\n"Mehdi A." <mehdi.ay@example.org> , (469)555-9999\nDelaram <delaram@example.io>, +1-972-777-2121\nNima <NIMA@example.io> , 972.777.2121\nduplicate <Alice@Example.com> , 469 555 1234')
print("Wrote contacts_raw.txt with sample DalaShop data.")

## Q1 (1.5 pts) — CRM cleanup with Files, Exceptions, and Regex
Implement `q1_crm_cleanup.py` to:

1. **Read** `contacts_raw.txt` using `pathlib` and `with`. If the file is missing, **handle** it gracefully with `try/except FileNotFoundError` (print a friendly message; do not crash).
2. **Validate emails** with a simple regex (`r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"`).  
   - Trim whitespace with `strip()` before checking.  
   - Use **full** matching (not partial).
3. **Normalize phone numbers:** remove all non-digits (e.g., with `re.sub(r"\D", "", raw)`).  
   - If the result has **≥ 10 digits**, keep the **last 10 digits**.  
   - Otherwise, return an **empty string** (`""`).
4. **Filter rows:** keep **only** rows with a valid email.
5. **Deduplicate:** remove duplicates by **email** using **case-insensitive** comparison (e.g., `email.casefold()`). **Keep the first occurrence** and drop later duplicates.
6. **Output CSV:** write to `contacts_clean.csv` with **columns exactly** `name,email,phone` (UTF-8).  
7. **Preserve input order:** the order of rows in `contacts_clean.csv` must match the **first appearance** order from the input file. **Do not sort** the rows.

**Grading rubric (1.5 pts):**
- (0.4) File read/write via `pathlib` + graceful `FileNotFoundError` handling  
- (0.5) Correct email regex validation + filtering  
- (0.4) Phone normalization + case-insensitive de-dup (keep first)  
- (0.2) Clean code, clear names, minimal docstrings/comments

In [5]:
#!/usr/bin/env python3
"""
q1_crm_cleanup.py

DalaShop CRM cleanup — Chapters 10 & 11, Q1

SPEC:
- Read contacts_raw.txt using pathlib + with; handle FileNotFoundError gracefully.
- Validate emails via FULL match using the provided regex (trim first).
- Normalize phones: remove all non-digits; keep last 10 if >=10; else "".
- Keep only rows with a valid email.
- De-duplicate by email (case-insensitive), keeping the first occurrence.
- Preserve input order; do not sort.
- Output UTF-8 CSV with columns exactly: name,email,phone
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
import csv
import logging
import re
from typing import Iterable, List, Optional

# ------------------------------------------------------------
# Paths: support both "run as script" and "run inside notebook"
# ------------------------------------------------------------
try:
    # When executed as a .py script
    BASE_DIR = Path(__file__).parent
except NameError:
    # When executed inside a notebook cell
    BASE_DIR = Path.cwd()

INPUT_PATH = BASE_DIR / "contacts_raw.txt"
OUTPUT_PATH = BASE_DIR / "contacts_clean.csv"

# ------------------------------------------------------------
# Config / Regex
# ------------------------------------------------------------
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")


# ------------------------------------------------------------
# Data model
# ------------------------------------------------------------
@dataclass(frozen=True)
class Contact:
    name: str
    email: str
    phone: str


# ------------------------------------------------------------
# Validation & normalization
# ------------------------------------------------------------
def is_valid_email(raw_email: str) -> bool:
    """Return True iff the entire trimmed string FULL-matches EMAIL_RE."""
    if not raw_email:
        return False
    email = raw_email.strip()
    return EMAIL_RE.fullmatch(email) is not None


def normalize_phone(raw_phone: str) -> str:
    """
    Remove all non-digits. If >=10 digits, keep LAST 10; otherwise return "".
    """
    if raw_phone is None:
        return ""
    digits = re.sub(r"\D", "", raw_phone)
    return digits[-10:] if len(digits) >= 10 else ""


# ------------------------------------------------------------
# Parsing
# ------------------------------------------------------------
def parse_line(line: str) -> Optional[Contact]:
    """
    Parse a raw CRM line into (name, email, phone).

    Supported shapes (per provided sample):
      - 'Alice Johnson <alice@example.com> , +1 (469) 555-1234'
      - 'Sara M. , sara@mail.co , 214 555 8888'
      - '"Mehdi A." <mehdi.ay@example.org> , (469)555-9999'
      - 'Delaram <delaram@example.io>, +1-972-777-2121'
      - 'Nima <NIMA@example.io> , 972.777.2121'
      - 'duplicate <Alice@Example.com> , 469 555 1234'
    """
    line = line.strip()
    if not line:
        return None

    # Email possibly in <...>
    m = re.search(r"<([^>]+)>", line)
    angle_email = m.group(1).strip() if m else None

    parts = [p.strip() for p in line.split(",")]

    name = ""
    email = ""
    phone = ""

    if len(parts) == 1:
        # Likely "Name <email> rest"
        if not angle_email:
            return None
        name = line.split("<", 1)[0].strip().rstrip(",")
        email = angle_email
        phone = line.split(">", 1)[1].strip() if ">" in line else ""
    else:
        # name , email-ish , phone?
        name = parts[0]
        candidate = parts[1]
        if angle_email and not is_valid_email(candidate):
            email = angle_email
        else:
            email = candidate.replace("<", "").replace(">", "").strip()
        phone = parts[2] if len(parts) >= 3 else ""

    return Contact(name=name.strip(), email=email.strip(), phone=phone.strip())


# ------------------------------------------------------------
# Pipeline helpers
# ------------------------------------------------------------
def load_lines(path: Path) -> List[str]:
    """Read all lines from path, handling FileNotFoundError as required."""
    try:
        with path.open("r", encoding="utf-8") as f:
            return f.readlines()
    except FileNotFoundError:
        print(f"{path.name} not found. Please ensure the file exists beside the notebook/script.")
        return []


def clean_contacts(lines: Iterable[str]) -> List[Contact]:
    """
    Parse → validate email → normalize phone → collect (order preserved).
    """
    acc: List[Contact] = []
    for line in lines:
        parsed = parse_line(line)
        if not parsed:
            continue
        if not is_valid_email(parsed.email):
            continue
        normalized_phone = normalize_phone(parsed.phone)
        acc.append(Contact(name=parsed.name, email=parsed.email, phone=normalized_phone))
    return acc


def dedupe_by_email_casefold(contacts: Iterable[Contact]) -> List[Contact]:
    """
    Remove duplicates by email using case-insensitive comparison, keeping first.
    Order preserved.
    """
    seen: set[str] = set()
    out: List[Contact] = []
    for c in contacts:
        key = c.email.casefold()
        if key in seen:
            continue
        seen.add(key)
        out.append(c)
    return out


def write_csv(contacts: Iterable[Contact], path: Path) -> None:
    """Write UTF-8 CSV with columns exactly: name,email,phone."""
    with path.open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["name", "email", "phone"])
        writer.writeheader()
        for c in contacts:
            writer.writerow({"name": c.name, "email": c.email, "phone": c.phone})


# ------------------------------------------------------------
# Entrypoint
# ------------------------------------------------------------
def main() -> None:
    lines = load_lines(INPUT_PATH)
    if not lines:
        return  # graceful no-op if file missing or empty

    cleaned = clean_contacts(lines)
    deduped = dedupe_by_email_casefold(cleaned)
    write_csv(deduped, OUTPUT_PATH)
    logging.info("Wrote %d cleaned contacts to %s", len(deduped), OUTPUT_PATH.name)


if __name__ == "__main__":
    main()



contacts_raw.txt not found. Please ensure the file exists beside the notebook.


## Q2 (1.5 pts) — Unit testing with `unittest`
Create tests in `test_crm_cleanup.py` that cover at least:

1. **Email validation**: valid/invalid variations.  
2. **Phone normalization**: parentheses, dashes, spaces, country code; too-short cases.  
3. **Parsing**: from a small multi-line string (not from a file), assert the exact structured rows (name/email/phone).  
4. **De-duplication**: demonstrate that a case-variant duplicate email is dropped (first occurrence kept).




In [16]:
import unittest

from q1_crm_cleanup import (
    is_valid_email,
    normalize_phone,
    parse_line,
    clean_contacts,
    dedupe_by_email_casefold,
    Contact,
)

class TestEmailValidation(unittest.TestCase):
    def test_valid_emails(self):
        valid = [
            "alice@example.com",
            "ALICE@EXAMPLE.COM",             # case OK
            "mehdi.ay@example.org",
            "delaram@example.io",
            "name.surname+tag@sub.domain.co",
            "  alice@example.com  ",         # surrounding spaces trimmed
        ]
        for e in valid:
            with self.subTest(email=e):
                self.assertTrue(is_valid_email(e))

    def test_invalid_emails(self):
        invalid = [
            "bob[at]example.com",            # bracketed at
            "no-at-symbol.example.com",
            "trailing-dot@domain.com.",
            "@no-local-part.com",
            "name@domain",                   # no TLD
            "name@domain.c",                 # TLD too short
            "",
            "   ",
        ]
        for e in invalid:
            with self.subTest(email=e):
                self.assertFalse(is_valid_email(e))


class TestPhoneNormalization(unittest.TestCase):
    def test_various_formats(self):
        cases = {
            "(469) 555-1234": "4695551234",
            "+1 (469) 555-1234": "4695551234",
            "214 555 8888": "2145558888",
            "972-777-2121": "9727772121",
            "972.777.2121": "9727772121",
            "1-972-777-2121": "9727772121",
            "  +1-469-555-0000  ": "4695550000",
        }
        for raw, expected in cases.items():
            with self.subTest(phone=raw):
                self.assertEqual(normalize_phone(raw), expected)

    def test_too_short_returns_empty(self):
        self.assertEqual(normalize_phone("972-555-777"), "")
        self.assertEqual(normalize_phone("123456789"), "")
        self.assertEqual(normalize_phone("abc-def"), "")
        self.assertEqual(normalize_phone(""), "")
        self.assertEqual(normalize_phone(None), "")


class TestParsing(unittest.TestCase):
    def test_parse_from_multiline_string(self):
        # No files: parse_line only
        raw = (
            "Alice Johnson <alice@example.com> , +1 (469) 555-1234\n"
            "Sara M. , sara@mail.co , 214 555 8888\n"
            "\"Mehdi A.\" <mehdi.ay@example.org> , (469)555-9999\n"
            "Delaram <delaram@example.io>, +1-972-777-2121\n"
            "Nima <NIMA@example.io> , 972.777.2121\n"
            "duplicate <Alice@Example.com> , 469 555 1234\n"
        )
        lines = raw.splitlines()

        parsed = [parse_line(line) for line in lines]
        parsed = [p for p in parsed if p is not None]

        expected = [
            Contact(name="Alice Johnson", email="alice@example.com",       phone="+1 (469) 555-1234"),
            Contact(name="Sara M.",       email="sara@mail.co",            phone="214 555 8888"),
            Contact(name='"Mehdi A."',    email="mehdi.ay@example.org",    phone="(469)555-9999"),
            Contact(name="Delaram",       email="delaram@example.io",      phone="+1-972-777-2121"),
            Contact(name="Nima",          email="NIMA@example.io",         phone="972.777.2121"),
            Contact(name="duplicate",     email="Alice@Example.com",       phone="469 555 1234"),
        ]

        self.assertEqual(parsed, expected)


class TestDeduplication(unittest.TestCase):
    def test_case_insensitive_dedup_keeps_first(self):
        contacts = [
            Contact(name="Alice Johnson", email="alice@example.com", phone="4695551234"),
            Contact(name="Duplicate",     email="ALICE@EXAMPLE.COM", phone="4695551234"),
            Contact(name="Sara",          email="sara@mail.co",      phone="2145558888"),
        ]
        deduped = dedupe_by_email_casefold(contacts)
        self.assertEqual(
            deduped,
            [
                Contact(name="Alice Johnson", email="alice@example.com", phone="4695551234"),
                Contact(name="Sara",          email="sara@mail.co",      phone="2145558888"),
            ],
        )

    def test_end_to_end_clean_and_dedupe(self):
        """
        lines -> clean_contacts (valid email + normalized phone) -> dedupe
        """
        raw = (
            "Alice Johnson <alice@example.com> , +1 (469) 555-1234\n"
            "Bob Roberts <bob[at]example.com> , 972-555-777\n"  # invalid email -> dropped
            "Sara M. , sara@mail.co , 214 555 8888\n"
            "\"Mehdi A.\" <mehdi.ay@example.org> , (469)555-9999\n"
            "Delaram <delaram@example.io>, +1-972-777-2121\n"
            "Nima <NIMA@example.io> , 972.777.2121\n"
            "duplicate <Alice@Example.com> , 469 555 1234\n"   # duplicate of alice -> dropped
        )
        lines = raw.splitlines()

        cleaned = clean_contacts(lines)                # validates + normalizes phones
        deduped = dedupe_by_email_casefold(cleaned)    # drops duplicate alice

        expected = [
            Contact(name="Alice Johnson", email="alice@example.com",    phone="4695551234"),
            Contact(name="Sara M.",       email="sara@mail.co",         phone="2145558888"),
            Contact(name='"Mehdi A."',    email="mehdi.ay@example.org", phone="4695559999"),
            Contact(name="Delaram",       email="delaram@example.io",   phone="9727772121"),
            Contact(name="Nima",          email="NIMA@example.io",      phone="9727772121"),
        ]
        self.assertEqual(deduped, expected)


if __name__ == "__main__":
    unittest.main()


ModuleNotFoundError: No module named 'q1_crm_cleanup'

## Grading rubric (total 3 pts)
- **Q1 (1.5 pts)**  
  - (0.4) File I/O with `pathlib` + graceful `FileNotFoundError` handling  
  - (0.5) Email validation (regex + strip + full match) and filtering  
  - (0.4) Phone normalization and **case-insensitive** de-duplication (keep first)  
  - (0.2) Code clarity (names, minimal docstrings/comments)
- **Q2 (1.5 pts)**  
  - (0.6) Meaningful coverage for email/phone functions (valid & invalid)  
  - (0.6) Parsing & de-dup tests that assert exact expected rows  
  - (0.3) Standard `unittest` structure and readable test names
