<a href="https://colab.research.google.com/github/Nhatty1788/NatnaelMolago_DTSC3020_Fall2025/blob/main/assignment5_nsm0128.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment5: CRM Cleanup @ **DalaShop**
*Files (Ch.10), Exceptions (Ch.10), Unit Tests (Ch.11), and Regular Expressions*  
.....

**Total: 3 points**  (Two questions, 1.5 pts each)  

> This assignment is scenario-based and aligned with Python Crash Course Ch.10 (files & exceptions), Ch.11 (unit testing with `unittest`), and Regular Expressions.

## Scenario
You are a data intern at an online retailer called **DalaShop**.  
Sales exported a **raw contacts** file from the CRM. It contains customer names, emails, and phone numbers, but the formatting is messy and some emails are invalid.  
Your tasks:

1. **Clean** the contacts (Files + Exceptions + Regex).  
2. **Write unit tests** to make sure your helper functions work correctly and keep working in the future.

## Data file (given by the company): `contacts_raw.txt`
Use this exact sample data (you may extend it for your own testing, but do **not** change it when submitting).  
Run the next cell once to create the file beside your notebook.

In [None]:
# Create the provided company dataset file
with open("contacts_raw.txt", "w", encoding="utf-8") as f:
    f.write('Alice Johnson <alice@example.com> , +1 (469) 555-1234\nBob Roberts <bob[at]example.com> , 972-555-777\nSara M. , sara@mail.co , 214 555 8888\n"Mehdi A." <mehdi.ay@example.org> , (469)555-9999\nDelaram <delaram@example.io>, +1-972-777-2121\nNima <NIMA@example.io> , 972.777.2121\nduplicate <Alice@Example.com> , 469 555 1234')
print("Wrote contacts_raw.txt with sample DalaShop data.")

Wrote contacts_raw.txt with sample DalaShop data.


## Q1 (1.5 pts) — CRM cleanup with Files, Exceptions, and Regex
Implement `q1_crm_cleanup.py` to:

1. **Read** `contacts_raw.txt` using `pathlib` and `with`. If the file is missing, **handle** it gracefully with `try/except FileNotFoundError` (print a friendly message; do not crash).
2. **Validate emails** with a simple regex (`r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"`).  
   - Trim whitespace with `strip()` before checking.  
   - Use **full** matching (not partial).
3. **Normalize phone numbers:** remove all non-digits (e.g., with `re.sub(r"\D", "", raw)`).  
   - If the result has **≥ 10 digits**, keep the **last 10 digits**.  
   - Otherwise, return an **empty string** (`""`).
4. **Filter rows:** keep **only** rows with a valid email.
5. **Deduplicate:** remove duplicates by **email** using **case-insensitive** comparison (e.g., `email.casefold()`). **Keep the first occurrence** and drop later duplicates.
6. **Output CSV:** write to `contacts_clean.csv` with **columns exactly** `name,email,phone` (UTF-8).  
7. **Preserve input order:** the order of rows in `contacts_clean.csv` must match the **first appearance** order from the input file. **Do not sort** the rows.

**Grading rubric (1.5 pts):**
- (0.4) File read/write via `pathlib` + graceful `FileNotFoundError` handling  
- (0.5) Correct email regex validation + filtering  
- (0.4) Phone normalization + case-insensitive de-dup (keep first)  
- (0.2) Clean code, clear names, minimal docstrings/comments

In [None]:
# Write your answer here
import re
import csv
from pathlib import Path

EMAIL_REGEX = r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"

def validate_email(raw_email, email_regex=EMAIL_REGEX):
    cleaned_email = raw_email.strip().casefold()
    if re.fullmatch(email_regex, cleaned_email):
        return cleaned_email
    return None

def normalize_phone(raw_phone):
    digits = re.sub(r"\D", "", raw_phone)
    if len(digits) >= 10:
        return digits[-10:]
    return ""

def parse_line(raw_line, email_regex=EMAIL_REGEX):
    search_pattern = r"(.*?)(?:<|,\s*|\s*)(" + email_regex + r")\s*(?:>|,\s*)?(.*)"
    match = re.search(search_pattern, raw_line, re.IGNORECASE)

    if match:
        name_prefix = match.group(1).strip().strip('<>"').strip()
        raw_email = match.group(2).strip()
        phone_suffix = match.group(3).strip()

        if ',' in phone_suffix:
            raw_phone = phone_suffix.split(',', 1)[1].strip()
        else:
            raw_phone = phone_suffix.strip()

        name = name_prefix.strip()
        if not name:
             name = raw_email.split('@')[0].strip().replace('.', ' ').title()

        return name, raw_email, raw_phone

    return None, None, None

def crm_cleanup(input_filepath, output_filepath):
    input_path = Path(input_filepath)
    output_path = Path(output_filepath)

    try:
        raw_lines = input_path.read_text(encoding="utf-8").splitlines()
    except FileNotFoundError:
        print(f"Error: Input file not found at '{input_path.name}'. Please create it first.")
        return

    cleaned_contacts = []
    seen_emails = set()

    for raw_line in raw_lines:

        name, raw_email, raw_phone = parse_line(raw_line)

        if name is None:
            continue

        validated_email = validate_email(raw_email)

        if validated_email:

            if validated_email not in seen_emails:

                clean_phone = normalize_phone(raw_phone)

                cleaned_contacts.append({
                    'name': name,
                    'email': validated_email,
                    'phone': clean_phone
                })

                seen_emails.add(validated_email)

    if cleaned_contacts:
        with open(output_path, 'w', newline='', encoding='utf-8') as outfile:
            fieldnames = ['name', 'email', 'phone']
            writer = csv.DictWriter(outfile, fieldnames=fieldnames)

            writer.writeheader()
            writer.writerows(cleaned_contacts)

        print(f"Successfully wrote {len(cleaned_contacts)} clean contacts to '{output_path.name}'.")

def print_clean_contacts(filepath="contacts_clean.csv"):
    output_path = Path(filepath)
    try:
        with open(output_path, 'r', newline='', encoding='utf-8') as infile:
            reader = csv.reader(infile)
            for row in reader:
                print(', '.join(row))
    except FileNotFoundError:
        print(f"Error: Output file not found at '{output_path.name}'. Run crm_cleanup first.")
if __name__ == '__main__':
    crm_cleanup("contacts_raw.txt", "contacts_clean.csv")
    print_clean_contacts()

Successfully wrote 5 clean contacts to 'contacts_clean.csv'.
name, email, phone
Alice Johnson, alice@example.com, 4695551234
Sara M., sara@mail.co, 2145558888
Mehdi A., mehdi.ay@example.org, 4695559999
Delaram, delaram@example.io, 9727772121
Nima, nima@example.io, 9727772121


## Q2 (1.5 pts) — Unit testing with `unittest`
Create tests in `test_crm_cleanup.py` that cover at least:

1. **Email validation**: valid/invalid variations.  
2. **Phone normalization**: parentheses, dashes, spaces, country code; too-short cases.  
3. **Parsing**: from a small multi-line string (not from a file), assert the exact structured rows (name/email/phone).  
4. **De-duplication**: demonstrate that a case-variant duplicate email is dropped (first occurrence kept).




In [None]:
# Write your answer here
import unittest
import os
import csv
from pathlib import Path

try:
    from q1_crm_cleanup import (
        validate_email,
        normalize_phone,
        parse_line,
        crm_cleanup
    )
except ImportError:
    exit()

class CRMCleanupTests(unittest.TestCase):

    def test_email_validation(self):
        self.assertEqual(validate_email(" Valid.Email@Example.Com  "), "valid.email@example.com")
        self.assertEqual(validate_email("sara@mail.co"), "sara@mail.co")
        self.assertEqual(validate_email("m1-2_d3@sub.domain.org"), "m1-2_d3@sub.domain.org")
        self.assertIsNone(validate_email("bob[at]example.com"))
        self.assertIsNone(validate_email("noat.com"))
        self.assertIsNone(validate_email("test@.c"))
        self.assertIsNone(validate_email("test@space .com"))

    def test_phone_normalization(self):
        self.assertEqual(normalize_phone("+1 (469) 555-1234"), "4695551234")
        self.assertEqual(normalize_phone("972.777.2121"), "9727772121")
        self.assertEqual(normalize_phone("1234567890"), "1234567890")
        self.assertEqual(normalize_phone("(12)345-6789-0123"), "4567890123")
        self.assertEqual(normalize_phone("123-456"), "")
        self.assertEqual(normalize_phone(""), "")

    def test_parsing_and_filtering(self):

        raw_data = (
            'Alice Johnson <alice@example.com> , +1 (469) 555-1234\n'
            'Bob Roberts <bob[at]example.com> , 972-555-777\n'
            'Sara M. , sara@mail.co , 214 555 8888\n'
            'duplicate <Alice@Example.com> , 469 555 1234\n'
            '"Mehdi A." <mehdi.ay@example.org> , (469)555-9999'
        )

        temp_input_file = Path("temp_raw.txt")
        temp_output_file = Path("temp_clean.csv")
        temp_input_file.write_text(raw_data, encoding='utf-8')

        try:
            crm_cleanup(temp_input_file.name, temp_output_file.name)

            with open(temp_output_file, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                cleaned_rows = list(reader)

        finally:
            if temp_input_file.exists(): os.remove(temp_input_file)
            if temp_output_file.exists(): os.remove(temp_output_file)

        expected_rows = [
            {'name': 'Alice Johnson', 'email': 'alice@example.com', 'phone': '4695551234'},
            {'name': 'Sara M.', 'email': 'sara@mail.co', 'phone': '2145558888'},
            {'name': 'Mehdi A.', 'email': 'mehdi.ay@example.org', 'phone': '4695559999'},
        ]

        self.assertEqual(cleaned_rows, expected_rows, "The final list of cleaned and deduplicated rows does not match the expected output.")

    def test_deduplication_case_insensitivity(self):

        raw_data = (
            'First Contact <Unique@test.io>, 111-111-1111\n'
            'Primary Contact <test@email.com> , 999-999-9999\n'
            'Duplicate Case <TeSt@EmAiL.cOm> , 888-888-8888\n'
            'Last Contact <last@test.io> , 222-222-2222\n'
        )

        temp_input_file = Path("temp_dedup_raw.txt")
        temp_output_file = Path("temp_dedup_clean.csv")
        temp_input_file.write_text(raw_data, encoding='utf-8')

        try:
            crm_cleanup(temp_input_file.name, temp_output_file.name)
            with open(temp_output_file, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                cleaned_rows = list(reader)
        finally:
            if temp_input_file.exists(): os.remove(temp_input_file)
            if temp_output_file.exists(): os.remove(temp_output_file)

        expected_emails = [
            'unique@test.io',
            'test@email.com',
            'last@test.io'
        ]
        actual_emails = [row['email'] for row in cleaned_rows]

        self.assertEqual(actual_emails, expected_emails, "Deduplication failed to be case-insensitive or did not keep the first occurrence.")

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

....
----------------------------------------------------------------------
Ran 4 tests in 0.007s

OK


Successfully wrote 3 clean contacts to 'temp_dedup_clean.csv'.
Successfully wrote 3 clean contacts to 'temp_clean.csv'.


## Grading rubric (total 3 pts)
- **Q1 (1.5 pts)**  
  - (0.4) File I/O with `pathlib` + graceful `FileNotFoundError` handling  
  - (0.5) Email validation (regex + strip + full match) and filtering  
  - (0.4) Phone normalization and **case-insensitive** de-duplication (keep first)  
  - (0.2) Code clarity (names, minimal docstrings/comments)
- **Q2 (1.5 pts)**  
  - (0.6) Meaningful coverage for email/phone functions (valid & invalid)  
  - (0.6) Parsing & de-dup tests that assert exact expected rows  
  - (0.3) Standard `unittest` structure and readable test names
