# Assignment5: CRM Cleanup @ **DalaShop**

_Files (Ch.10), Exceptions (Ch.10), Unit Tests (Ch.11), and Regular Expressions_  
.....

**Total: 3 points** (Two questions, 1.5 pts each)

> This assignment is scenario-based and aligned with Python Crash Course Ch.10 (files & exceptions), Ch.11 (unit testing with `unittest`), and Regular Expressions.


## Scenario

You are a data intern at an online retailer called **DalaShop**.  
Sales exported a **raw contacts** file from the CRM. It contains customer names, emails, and phone numbers, but the formatting is messy and some emails are invalid.  
Your tasks:

1. **Clean** the contacts (Files + Exceptions + Regex).
2. **Write unit tests** to make sure your helper functions work correctly and keep working in the future.


## Data file (given by the company): `contacts_raw.txt`

Use this exact sample data (you may extend it for your own testing, but do **not** change it when submitting).  
Run the next cell once to create the file beside your notebook.


In [12]:
# Create the provided company dataset file
with open("contacts_raw.txt", "w", encoding="utf-8") as f:
    f.write('Alice Johnson <alice@example.com> , +1 (469) 555-1234\nBob Roberts <bob[at]example.com> , 972-555-777\nSara M. , sara@mail.co , 214 555 8888\n"Mehdi A." <mehdi.ay@example.org> , (469)555-9999\nDelaram <delaram@example.io>, +1-972-777-2121\nNima <NIMA@example.io> , 972.777.2121\nduplicate <Alice@Example.com> , 469 555 1234')
print("Wrote contacts_raw.txt with sample DalaShop data.")

Wrote contacts_raw.txt with sample DalaShop data.


## Q1 (1.5 pts) — CRM cleanup with Files, Exceptions, and Regex

Implement `q1_crm_cleanup.py` to:

1. **Read** `contacts_raw.txt` using `pathlib` and `with`. If the file is missing, **handle** it gracefully with `try/except FileNotFoundError` (print a friendly message; do not crash).
2. **Validate emails** with a simple regex (`r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"`).
   - Trim whitespace with `strip()` before checking.
   - Use **full** matching (not partial).
3. **Normalize phone numbers:** remove all non-digits (e.g., with `re.sub(r"\D", "", raw)`).
   - If the result has **≥ 10 digits**, keep the **last 10 digits**.
   - Otherwise, return an **empty string** (`""`).
4. **Filter rows:** keep **only** rows with a valid email.
5. **Deduplicate:** remove duplicates by **email** using **case-insensitive** comparison (e.g., `email.casefold()`). **Keep the first occurrence** and drop later duplicates.
6. **Output CSV:** write to `contacts_clean.csv` with **columns exactly** `name,email,phone` (UTF-8).
7. **Preserve input order:** the order of rows in `contacts_clean.csv` must match the **first appearance** order from the input file. **Do not sort** the rows.

**Grading rubric (1.5 pts):**

- (0.4) File read/write via `pathlib` + graceful `FileNotFoundError` handling
- (0.5) Correct email regex validation + filtering
- (0.4) Phone normalization + case-insensitive de-dup (keep first)
- (0.2) Clean code, clear names, minimal docstrings/comments


In [13]:
import re
import csv
from pathlib import Path

# 1) email regex from instructions
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")

def validate_email(raw: str):
    """Return cleaned email if valid, else None."""
    if not raw:
        return None
    email = raw.strip()
    return email if EMAIL_RE.fullmatch(email) else None

def normalize_phone(raw: str):
    """Remove non-digits; if >=10 keep last 10, else ''. """
    if not raw:
        return ""
    digits = re.sub(r"\D", "", raw)
    if len(digits) < 10:
        return ""
    return digits[-10:]

def deduplicate_contacts(rows):
    """Case-insensitive dedup by email, keep first, preserve order."""
    seen = set()
    result = []
    for row in rows:
        key = row["email"].casefold()
        if key in seen:
            continue
        seen.add(key)
        result.append(row)
    return result

def parse_contacts_from_string(text: str):
    """
    Handles BOTH:
      - 'Name, email, phone'
      - 'Name <email> , phone'
    Drops rows with invalid email.
    """
    lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
    rows = []

    for line in lines:
        # split into at most 3 parts
        parts = [p.strip() for p in line.split(",", maxsplit=2)]
        if not parts:
            continue

        name_part = parts[0]
        email_part = parts[1] if len(parts) > 1 else ""
        phone_part = parts[2] if len(parts) > 2 else ""

        # case: name contains <email>
        extracted_email = None
        extracted_name = name_part
        if "<" in name_part and ">" in name_part:
            inside = name_part[name_part.find("<")+1 : name_part.find(">")]
            maybe_email = validate_email(inside)
            if maybe_email:
                extracted_email = maybe_email
                extracted_name = name_part[: name_part.find("<")].strip().strip('"')

        if extracted_email:
            email = extracted_email
            # in given sample, after the name-with-email, the next part is the phone
            phone_raw = phone_part if phone_part else email_part
        else:
            # normal: name, email, phone
            email = validate_email(email_part)
            phone_raw = phone_part

        if not email:
            # skip rows with bad email
            continue

        phone = normalize_phone(phone_raw)
        rows.append({
            "name": extracted_name,
            "email": email,
            "phone": phone,
        })

    return deduplicate_contacts(rows)

def parse_contacts_from_file(path_str: str):
    """Read contacts_raw.txt with pathlib + with; handle missing file."""
    path = Path(path_str)
    try:
        with path.open("r", encoding="utf-8") as f:
            text = f.read()
    except FileNotFoundError:
        print(f"File not found: {path}")
        return []
    return parse_contacts_from_string(text)

def write_contacts_csv(rows, path_str: str):
    """Write to contacts_clean.csv with columns exactly name,email,phone."""
    path = Path(path_str)
    with path.open("w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["name", "email", "phone"])
        for row in rows:
            writer.writerow([row["name"], row["email"], row["phone"]])

# run Q1 pipeline
cleaned = parse_contacts_from_file("contacts_raw.txt")
write_contacts_csv(cleaned, "contacts_clean.csv")
cleaned



[{'name': 'Alice Johnson',
  'email': 'alice@example.com',
  'phone': '4695551234'},
 {'name': 'Sara M.', 'email': 'sara@mail.co', 'phone': '2145558888'},
 {'name': 'Mehdi A.', 'email': 'mehdi.ay@example.org', 'phone': '4695559999'},
 {'name': 'Delaram', 'email': 'delaram@example.io', 'phone': '9727772121'},
 {'name': 'Nima', 'email': 'NIMA@example.io', 'phone': '9727772121'}]

## Q2 (1.5 pts) — Unit testing with `unittest`

Create tests in `test_crm_cleanup.py` that cover at least:

1. **Email validation**: valid/invalid variations.
2. **Phone normalization**: parentheses, dashes, spaces, country code; too-short cases.
3. **Parsing**: from a small multi-line string (not from a file), assert the exact structured rows (name/email/phone).
4. **De-duplication**: demonstrate that a case-variant duplicate email is dropped (first occurrence kept).


In [14]:
import unittest

class TestCrmCleanup(unittest.TestCase):
    def test_email_validation(self):
        self.assertEqual(validate_email("test@example.com"), "test@example.com")
        self.assertEqual(validate_email(" user+tag@domain.co.uk "), "user+tag@domain.co.uk")
        self.assertIsNone(validate_email("bad-email"))
        self.assertIsNone(validate_email("user@domain"))
        self.assertIsNone(validate_email("user@domain.c"))
        self.assertIsNone(validate_email("user@domain.com extra"))

    def test_phone_normalization(self):
        self.assertEqual(normalize_phone("(214) 555-1212"), "2145551212")
        self.assertEqual(normalize_phone("214-555-1212"), "2145551212")
        self.assertEqual(normalize_phone("214 555 1212"), "2145551212")
        self.assertEqual(normalize_phone("+1 (214) 555-1212"), "2145551212")
        self.assertEqual(normalize_phone("001-214-555-1212"), "2145551212")
        self.assertEqual(normalize_phone("123456789"), "")
        self.assertEqual(normalize_phone(""), "")

    def test_parsing_from_string(self):
        text = (
            "Alice, alice@example.com, (214) 555-1212\n"
            "Bob, bob@example.com, 972-777-8888\n"
            "Bad, bad-email, 111-222-3333\n"
        )
        rows = parse_contacts_from_string(text)
        expected = [
            {"name": "Alice", "email": "alice@example.com", "phone": "2145551212"},
            {"name": "Bob", "email": "bob@example.com", "phone": "9727778888"},
        ]
        self.assertEqual(rows, expected)

    def test_deduplication_case_insensitive(self):
        rows = [
            {"name": "Sam", "email": "sam@example.com", "phone": "2145551212"},
            {"name": "SAM DUP", "email": "SAM@example.com", "phone": "9727778888"},
            {"name": "Other", "email": "other@example.com", "phone": "4690001111"},
        ]
        deduped = deduplicate_contacts(rows)
        self.assertEqual(len(deduped), 2)
        emails = [r["email"] for r in deduped]
        self.assertIn("sam@example.com", emails)
        self.assertIn("other@example.com", emails)
        self.assertNotIn("SAM@example.com", emails)

suite = unittest.TestLoader().loadTestsFromTestCase(TestCrmCleanup)
unittest.TextTestRunner().run(suite)



....
----------------------------------------------------------------------
Ran 4 tests in 0.002s

OK


<unittest.runner.TextTestResult run=4 errors=0 failures=0>

## Grading rubric (total 3 pts)

- **Q1 (1.5 pts)**
  - (0.4) File I/O with `pathlib` + graceful `FileNotFoundError` handling
  - (0.5) Email validation (regex + strip + full match) and filtering
  - (0.4) Phone normalization and **case-insensitive** de-duplication (keep first)
  - (0.2) Code clarity (names, minimal docstrings/comments)
- **Q2 (1.5 pts)**
  - (0.6) Meaningful coverage for email/phone functions (valid & invalid)
  - (0.6) Parsing & de-dup tests that assert exact expected rows
  - (0.3) Standard `unittest` structure and readable test names
