# Data - Engineering Assessment using python

In [0]:
import csv
import os
import logging

# === Configure ===
FILE_PATH = "/Volumes/workspace/customers/data/data.csv"   # Your given path

# Logging config: visible in Databricks cell output
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("customer_ingestion")

logger.info("Notebook started")

# Optional sanity checks — helpful when debugging path issues in Databricks
try:
    exists_local = os.path.exists(FILE_PATH)  # checks driver filesystem
    logger.info(f"Local driver path exists? {exists_local}")
except Exception as e:
    logger.warning(f"Local path check failed: {e}")

# Databricks Filesystem (DBFS) listing for the Volume directory
# (This uses the DBFS API path style; it will work if UC Volumes are enabled.)
try:
    _ = dbutils.fs.ls("dbfs:/Volumes/workspace/customers/data")
    logger.info("DBFS listing successful for dbfs:/Volumes/workspace/customers/data")
except Exception as e:
    logger.warning(f"DBFS path check failed (this can be normal if UC is not enabled): {e}")


In [0]:
REQUIRED_COLUMNS = ["Customer Id", "First Name", "Email"]  # minimal required fields
OPTIONAL_COLUMNS = [
    "Last Name", "Company", "City", "Country", "Phone 1", "Phone 2", "Subscription Date"
]

def _to_bool(value):
    """Robust boolean parser for typical CSV values."""
    if value is None:
        return None
    v = str(value).strip().lower()
    if v in ("true", "t", "yes", "y", "1"):
        return True
    if v in ("false", "f", "no", "n", "0"):
        return False
    return None  # Unknown → leave as None

def validate_and_transform(row: dict) -> dict:
    """
    Business validation + normalization.
    - Ensures required fields exist and are non-empty
    - Validates Customer Id is an integer
    - Minimal email sanity check
    - Coerces Subscription Date to bool when present
    Raises:
        ValueError: if the row violates business rules (ingestion-safe — caught by caller)
    """
    # 1) Required presence
    for col in REQUIRED_COLUMNS:
        if col not in row or str(row[col]).strip() == "":
            raise ValueError(f"Missing required '{col}'")

    # 2) Type + format checks
    try:
        row["Customer Id"] = int(str(row["Customer Id"]).strip())
    except Exception:
        raise ValueError(f"Invalid Customer Id: {row.get('Customer Id')}")

    email = str(row["Email"]).strip()
    if "@" not in email or "." not in email.split("@")[-1]:
        raise ValueError(f"Invalid Email: {email}")
    row["Email"] = email

    # 3) Optional coercions
    if "Subscription Date" in row:
        parsed = _to_bool(row["Subscription Date"])
        # Keep None if unknown, otherwise store True/False
        row["Subscription Date"] = parsed if parsed is not None else row["Subscription Date"]

    # Trim simple text fields
    for k in ("First Name", "Last Name", "Company", "City", "Country", "Phone 1", "Phone 2"):
        if k in row and row[k] is not None:
            row[k] = str(row[k]).strip()

    return row


In [0]:
from datetime import datetime
import re

REQUIRED_COLUMNS = ["Index", "Customer Id", "First Name", "Last Name", "Email"]

def validate_and_transform(row: dict) -> dict:
    """
    Validates and normalizes a customer record row against business rules.
    """
    # 1) Required presence
    for col in REQUIRED_COLUMNS:
        if col not in row or str(row[col]).strip() == "":
            raise ValueError(f"Missing required '{col}'")

    # 2) Index must be integer
    try:
        row["Index"] = int(row["Index"])
    except Exception:
        raise ValueError(f"Invalid Index value: {row.get('Index')}")

    # 3) Customer Id → keep as string (no int conversion)
    row["Customer Id"] = str(row["Customer Id"]).strip()

    # 4) Email validation
    email = str(row["Email"]).strip()
    if "@" not in email or "." not in email.split("@")[-1]:
        raise ValueError(f"Invalid Email: {email}")
    row["Email"] = email

    # 5) Subscription Date → parse as date if not empty
    if row.get("Subscription Date", "").strip():
        try:
            row["Subscription Date"] = datetime.strptime(
                row["Subscription Date"], "%Y-%m-%d"
            ).date()
        except Exception:
            raise ValueError(f"Invalid Subscription Date: {row['Subscription Date']}")

    # 6) Website validation (basic URL check)
    if row.get("Website", "").strip():
        if not re.match(r"^https?://", row["Website"].strip()):
            raise ValueError(f"Invalid Website URL: {row['Website']}")

    # 7) Clean up other string fields
    for k, v in row.items():
        if isinstance(v, str):
            row[k] = v.strip()

    return row


In [0]:
import shutil
import tempfile

new_customer = {
    "Index": 999,
    "Customer Id": "CUST999",
    "First Name": "Akash",
    "Last Name": "Patro",
    "Company": "Self",
    "City": "Bhubaneswar",
    "Country": "India",
    "Phone 1": "",
    "Phone 2": "",
    "Email": "akash@example.com",
    "Subscription Date": "2025-09-04",
    "Website": "https://akashpatro.com"
}

try:
    # Step 1: Read existing data
    with open(FILE_PATH, mode="r", newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        fieldnames = reader.fieldnames
        if not fieldnames:
            raise ValueError("CSV has no header row")
        rows = list(reader)  # load all rows

    # Step 2: Add the new record (ensuring schema alignment)
    row_out = {col: new_customer.get(col, "") for col in fieldnames}
    rows.append(row_out)

    # Step 3: Write everything back (overwrite mode)
    tmp_file = tempfile.NamedTemporaryFile(delete=False, mode="w", newline="", encoding="utf-8")
    try:
        writer = csv.DictWriter(tmp_file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)
    finally:
        tmp_file.close()

    # Step 4: Replace original file
    shutil.move(tmp_file.name, FILE_PATH)
    logger.info(f"✅ Successfully rewrote file with new record: {row_out}")

except FileNotFoundError:
    logger.error(f"File not found: {FILE_PATH}")
except Exception as e:
    logger.exception(f"❌ Failed to update file: {e}")


In [0]:
post_good = 0
post_bad = 0

try:
    with open(FILE_PATH, mode="r", newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        # Header sanity
        if reader.fieldnames is None or any(col not in reader.fieldnames for col in REQUIRED_COLUMNS):
            missing = [c for c in REQUIRED_COLUMNS if not reader.fieldnames or c not in reader.fieldnames]
            raise ValueError(f"CSV header missing required columns: {missing}")

        for i, row in enumerate(reader, start=2):
            try:
                _ = validate_and_transform(row)
                post_good += 1
            except ValueError:
                post_bad += 1

    logger.info(f"Post-append recheck → valid: {post_good} | invalid: {post_bad}")

except FileNotFoundError:
    logger.error(f"File not found on recheck: {FILE_PATH}")
except ValueError as ve:
    logger.error(f"ValueError on recheck: {ve}")
except Exception as e:
    logger.exception(f"Unexpected error on recheck: {e}")
