# Retail Orders Data — Cleaning, Validation, and Quality Report

**Author:** Mustafa 
**Purpose:** Prepare monthly retail orders for analytics by:
- Selecting **latest snapshot per period** from drop folder  
- Loading `|`-delimited CSVs with **ANSI codepage** handling  
- Standardizing types (dates, numerics, text)  
- Running **data quality checks** (duplicates, logic, constraints, consistency)  
- Exporting a professional **Quality Report** for stakeholders

_Notes:_ Source files may be re-extracts for the same month. We retain only the latest snapshot to prevent double counting.


In [238]:
#!pip install pandas openpyxl python-dateutil charset-normalizer

import os, re, io
from glob import glob
from datetime import datetime
from dateutil import parser as dparser

import pandas as pd
import numpy as np


## Configuration

We expect `|`-delimited CSVs inside `Case_Study_Data_For_Share`.  
“ANSI” typically maps to **Windows code pages** (e.g., `cp1252` Western European, `cp1256` Arabic).  
If unsure, we’ll **auto-detect** and fall back to a sensible default.


In [239]:
DATA_DIR   = "Case_Study_Data_For_Share"
DELIM      = "|"


DEFAULT_ANSI = "cp1252"

EXPECTED_COLS = [
    "Row ID","Order ID","Order Date","Ship Date","Ship Mode",
    "Customer ID","Customer Name","Segment",
    "Country","City","State","Postal Code","Region",
    "Product ID","Category","Sub-Category","Product Name",
    "Sales","Quantity","Discount","Profit","__period_yyyymm","__source_file"
]

DATE_COLS = ["Order Date","Ship Date"]
NUM_COLS  = ["Sales","Quantity","Discount","Profit"]

pd.set_option("display.float_format", lambda v: f"{v:,.2f}")


In [None]:
def sniff_encoding(path, default=DEFAULT_ANSI, sample_bytes=100_000):

    try:
        import charset_normalizer as cn
        with open(path, "rb") as f:
            raw = f.read(sample_bytes)
        res = cn.from_bytes(raw).best()
        if res and res.encoding:
            return res.encoding
    except Exception:
        pass
    return default


def parse_file_meta(fname):
    """
    Parse period (YYYYMM) and optional snapshot timestamp from filename.
    Examples:
      202101_Orders_2021_02_03_12_30_05.csv
      Orders_2023-06-01_18-22-40.csv
    """
    base = os.path.basename(fname)

    # period: strict YYYYMM anywhere
    m_period = re.search(r"(?P<yyyymm>20\d{2}[01]\d)", base)

    m_snap = re.search(
        r"(?P<y>20\d{2})[^\d]?(?P<m>[01]?\d)[^\d]?(?P<d>[0-3]?\d)[^\d_ -]?"
        r"(?P<H>[0-2]?\d)?[^\d]?(?P<M>[0-5]?\d)?[^\d]?(?P<S>[0-5]?\d)?",
        base
    )

    period = m_period.group("yyyymm") if m_period else None
    snap_dt = None
    if m_snap:
        y = int(m_snap.group("y"))
        m = int(m_snap.group("m"))
        d = int(m_snap.group("d"))
        H = int(m_snap.group("H") or 0)
        M = int(m_snap.group("M") or 0)
        S = int(m_snap.group("S") or 0)
        try:
            snap_dt = datetime(y, m, d, H, M, S)
        except ValueError:
            snap_dt = None

    return {
        "fname": base,
        "period_yyyymm": period,
        "snapshot_dt": snap_dt
    }


## File Selection — Latest Snapshot per Period

We index all CSVs, parse `(period, snapshot)` from the filename, and keep the **latest** file per period.
If `snapshot` is missing, we use file **modified time** as a proxy.


In [None]:
all_files = sorted(glob(os.path.join(DATA_DIR, "*.csv")))
if not all_files:
    raise FileNotFoundError(f"No CSV files found in {DATA_DIR}")

meta_rows = []
for path in all_files:
    meta = parse_file_meta(path)
    meta["path"] = path
    meta_rows.append(meta)

meta_df = pd.DataFrame(meta_rows)

meta_df["fallback_mtime"] = meta_df["path"].apply(lambda p: datetime.fromtimestamp(os.path.getmtime(p)))
meta_df["effective_snapshot"] = meta_df["snapshot_dt"].fillna(meta_df["fallback_mtime"])

latest_per_period = (
    meta_df[meta_df["period_yyyymm"].notna()]
    .sort_values(["period_yyyymm", "effective_snapshot"])
    .groupby("period_yyyymm", as_index=False)
    .tail(1)
    .reset_index(drop=True)
)

print("Selected latest snapshot per period:")
for _, r in latest_per_period.iterrows():
    print(f" • {r['period_yyyymm']} → {r['fname']} ({r['effective_snapshot']})")


✅ Selected latest snapshot per period:
 • 201901 → 201901_Orders_2019_02_04_15_41_32.csv (2025-09-18 10:34:38.942024)
 • 201902 → 201902_Orders_2019_03_04_02_26_31.csv (2025-09-18 10:34:38.948954)
 • 201903 → 201903_Orders_2019_04_10_03_28_07.csv (2025-09-18 10:34:38.973642)
 • 201904 → 201904_Orders_2019_05_05_14_49_28.csv (2025-09-18 10:34:38.986784)
 • 201905 → 201905_Orders_2019_06_13_00_44_36.csv (2025-09-18 10:34:39.021011)
 • 201906 → 201906_Orders_2019_07_05_15_36_23.csv (2025-09-18 10:34:39.037537)
 • 201907 → 201907_Orders_2019_08_07_06_17_00.csv (2025-09-18 10:34:39.057819)
 • 201908 → 201908_Orders_2019_09_01_04_02_26.csv (2025-09-18 10:34:39.058920)
 • 201909 → 201909_Orders_2019_10_14_22_06_48.csv (2025-09-18 10:34:39.101699)
 • 201910 → 201910_Orders_2019_11_01_17_08_29.csv (2025-09-18 10:34:39.109143)
 • 201911 → 201911_Orders_2019_12_02_13_58_17.csv (2019-01-01 00:00:00)
 • 201912 → 201912_Orders_2020_01_13_08_37_08.csv (2019-01-02 00:00:00)
 • 202001 → 202001_Orders_2

## Data Loading — Robust to ANSI & Quotes

CSV quirks addressed:
- `|` delimiter
- ANSI codepage (auto-detected with fallback)
- Irregular quoting or bad lines → we **skip** malformed lines (logged)


In [None]:
def read_csv_robust(path, delim=DELIM):
    enc = sniff_encoding(path)
    try_orders = [
        dict(encoding=enc, engine="python", on_bad_lines="skip"),
        dict(encoding=enc, engine="c", on_bad_lines="skip"),
    ]

    last_err = None
    for opts in try_orders:
        try:
            return pd.read_csv(path, delimiter=delim, **opts)
        except Exception as e:
            last_err = e
    raise last_err


parts = []
for _, row in latest_per_period.iterrows():
    df_part = read_csv_robust(row["path"])
    df_part["__source_file"]   = row["fname"]
    df_part["__period_yyyymm"] = row["period_yyyymm"]
    parts.append(df_part)

raw = pd.concat(parts, ignore_index=True)
print(f"Loaded rows: {len(raw):,} from {len(parts)} latest files.")
raw.head(3)


✅ Loaded rows: 9,994 from 48 latest files.


Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,Country,City,...,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit,__source_file,__period_yyyymm
0,7981,CA-2019-103800,04-01-2019,08-01-2019,Standard Class,DP-13000,Darren Powers,Consumer,United States,Houston,...,OFF-PA-10000174,Office Supplies,Paper,"Message Book, Wirebound, Four 5 1/2"" X 4"" Form...",16.45,2,0.2,5.55,201901_Orders_2019_02_04_15_41_32.csv,201901
1,740,CA-2019-112326,05-01-2019,09-01-2019,Standard Class,PO-19195,Phillina Ober,Home Office,United States,Naperville,...,OFF-LA-10003223,Office Supplies,Labels,Avery 508,11.78,3,0.2,4.27,201901_Orders_2019_02_04_15_41_32.csv,201901
2,741,CA-2019-112326,05-01-2019,09-01-2019,Standard Class,PO-19195,Phillina Ober,Home Office,United States,Naperville,...,OFF-ST-10002743,Office Supplies,Storage,SAFCO Boltless Steel Shelving,272.74,3,0.2,-64.77,201901_Orders_2019_02_04_15_41_32.csv,201901


## Standardization

- Keep only **expected columns** (in order)  
- Trim text, unify whitespace  
- Coerce numerics (`Sales`, `Quantity`, `Discount`, `Profit`)  
- Parse dates (`Order Date`, `Ship Date`) robustly  


In [None]:
# Ensure expected columns exist; if extra columns, ignore
missing = [c for c in EXPECTED_COLS if c not in raw.columns]
if missing:
    print("⚠️ Missing columns found:", missing)

df = raw.copy()
df = df[[c for c in EXPECTED_COLS if c in df.columns]]  # keep order

# Trim string columns
for col in df.select_dtypes(include="object"):
    df[col] = df[col].astype(str).str.strip().str.replace(r"\s+", " ", regex=True)

# Numeric coercion
for col in [c for c in NUM_COLS if c in df.columns]:
    df[col] = (
        df[col]
        .astype(str)
        .str.replace(",", "", regex=False)
        .str.replace(" ", "", regex=False)
    )
    df[col] = pd.to_numeric(df[col], errors="coerce")

# Date parsing (tolerant to multiple formats)
def parse_date_safe(s):
    if pd.isna(s) or str(s).strip() == "":
        return pd.NaT
    try:
        return dparser.parse(str(s), dayfirst=False, yearfirst=False, fuzzy=True)
    except Exception:
        return pd.NaT

for col in [c for c in DATE_COLS]:
    df[col] = df[col].apply(parse_date_safe).dt.normalize()



if all(c in df.columns for c in ["Order Date","Ship Date"]):
    df["_DeliveryDays"] = (df["Ship Date"] - df["Order Date"]).dt.days


In [None]:


def _classify_date_format(s: str) -> str:
    """
    Classifies a date string (after '/' -> '-' normalization) into a token order:
      - 'YMD'  e.g., 2024-09-19
      - 'YDM'  e.g., 2024-19-09
      - 'DMY'  e.g., 19-09-2024
      - 'MDY'  e.g., 09-19-2024
      - 'AMBIG' (cannot distinguish reliably, e.g., 05-06-2024 with both <=12)
      - 'MISSING' / 'UNKNOWN'
    """
    if s is None:
        return "MISSING"
    s = str(s).strip()
    if s == "" or s.lower() in {"nan", "nat", "none"}:
        return "MISSING"

    parts = re.findall(r"\d+", s)
    if len(parts) != 3:
        return "UNKNOWN"

    a, b, c = parts  # a-b-c
    # Convert safely
    try:
        ai, bi, ci = int(a), int(b), int(c)
    except Exception:
        return "UNKNOWN"

    # Heuristics for yyyy-mm-dd / yyyy-dd-mm
    if len(a) == 4:  # starts with year
        # b could be month or day
        b_is_month = 1 <= bi <= 12
        b_is_day   = 1 <= bi <= 31
        # c must be day or month
        c_is_month = 1 <= ci <= 12
        c_is_day   = 1 <= ci <= 31

        # Prefer unambiguous:
        if b_is_month and c_is_day and not c_is_month:
            return "YMD"
        if b_is_day and c_is_month and not b_is_month:
            return "YDM"
        # If both month/day plausible, ambiguous
        if b_is_month and c_is_day and c_is_month:
            return "AMBIG"
        if b_is_day and c_is_month and b_is_month:
            return "AMBIG"
        # Fallback guesses
        if b_is_month and c_is_day:
            return "YMD"
        if b_is_day and c_is_month:
            return "YDM"
        return "UNKNOWN"

    # Heuristics for dd-mm-yyyy or mm-dd-yyyy
    if len(c) == 4:  # ends with year
        a_is_month = 1 <= ai <= 12
        a_is_day   = 1 <= ai <= 31
        b_is_month = 1 <= bi <= 12
        b_is_day   = 1 <= bi <= 31

        # If unambiguous (e.g., a>12 so it's day)
        if a_is_day and not a_is_month and b_is_month:
            return "DMY"
        # If unambiguous (e.g., b>12 so it's day)
        if a_is_month and b_is_day and not b_is_month:
            return "MDY"
        # Both <=12 -> ambiguous (could be DMY or MDY)
        if a_is_month and b_is_month:
            return "AMBIG"
        # Fallbacks
        if a_is_day and b_is_month:
            return "DMY"
        if a_is_month and b_is_day:
            return "MDY"
        return "UNKNOWN"

    # If neither first nor last look like a 4-digit year
    return "UNKNOWN"

for col in ["Order Date", "Ship Date"]:
    if col in df.columns:
        df[col] = (
            df[col]
            .astype(str)
            .str.strip()
            .str.replace("/", "-", regex=False)
        )

# Classify formats
df["_OrderDate_fmt"] = df["Order Date"].apply(_classify_date_format) if "Order Date" in df.columns else "MISSING"
df["_ShipDate_fmt"]  = df["Ship Date"].apply(_classify_date_format)  if "Ship Date" in df.columns  else "MISSING"

# Two flags:
# 1) Strict mismatch: both present (not MISSING/UNKNOWN) and different
df["_DateFmtMismatch_strict"] = (
    (df["_OrderDate_fmt"].isin(["MISSING","UNKNOWN"]) == False) &
    (df["_ShipDate_fmt"].isin(["MISSING","UNKNOWN"]) == False) &
    (df["_OrderDate_fmt"] != df["_ShipDate_fmt"])
)

# 2) Broad mismatch: different OR one side missing/unknown (useful for quality scans)
df["_DateFmtMismatch_broad"] = (
    (df["_OrderDate_fmt"] != df["_ShipDate_fmt"]) |
    (df["_OrderDate_fmt"].isin(["MISSING","UNKNOWN"])) |
    (df["_ShipDate_fmt"].isin(["MISSING","UNKNOWN"]))
)

# Quick overview
print("Format label counts (Order Date):")
print(df["_OrderDate_fmt"].value_counts(dropna=False))
print("\nFormat label counts (Ship Date):")
print(df["_ShipDate_fmt"].value_counts(dropna=False))
print("\nStrict mismatches:", int(df["_DateFmtMismatch_strict"].sum()))
print("Broad mismatches :", int(df["_DateFmtMismatch_broad"].sum()))

mismatch_sample = df[df["_DateFmtMismatch_strict"]][
    ["Order ID","Order Date","Ship Date","_OrderDate_fmt","_ShipDate_fmt"]
]
mismatch_sample


Format label counts (Order Date):
_OrderDate_fmt
YMD      6020
AMBIG    3974
Name: count, dtype: int64

Format label counts (Ship Date):
_ShipDate_fmt
YMD      6116
AMBIG    3878
Name: count, dtype: int64

Strict mismatches: 2552
Broad mismatches : 2552


Unnamed: 0,Order ID,Order Date,Ship Date,_OrderDate_fmt,_ShipDate_fmt
4,CA-2019-141817,2019-06-01,2019-01-13,AMBIG,YMD
14,CA-2019-105417,2019-08-01,2019-01-13,AMBIG,YMD
15,CA-2019-105417,2019-08-01,2019-01-13,AMBIG,YMD
16,CA-2019-135405,2019-10-01,2019-01-14,AMBIG,YMD
17,CA-2019-135405,2019-10-01,2019-01-14,AMBIG,YMD
...,...,...,...,...,...
9989,CA-2022-156720,2022-12-31,2023-04-01,YMD,AMBIG
9990,CA-2022-143259,2022-12-31,2023-04-01,YMD,AMBIG
9991,CA-2022-143259,2022-12-31,2023-04-01,YMD,AMBIG
9992,CA-2022-115427,2022-12-31,2023-04-01,YMD,AMBIG


In [None]:

for col in ["Order Date", "Ship Date"]:
    if col in df.columns:
        df[col] = (
            df[col].astype(str).str.strip().str.replace("/", "-", regex=False)
        )

def derive_period_from_fname(fname: str):
    if not fname or not isinstance(fname, str):
        return np.nan
    m = re.match(r"^(20\d{2}[01]\d)", os.path.basename(fname))  # leading yyyymm
    return int(m.group(1)) if m else np.nan

if "__period_yyyymm" not in df.columns:
    if "__source_file" in df.columns:
        df["__period_yyyymm"] = df["__source_file"].apply(derive_period_from_fname)
    else:
        raise AssertionError("Missing __period_yyyymm and __source_file; cannot infer period from filename.")

# Format classifier
def classify_date_format(s: str) -> str:
    if s is None: return "MISSING"
    s = str(s).strip()
    if s == "" or s.lower() in {"nan","nat","none"}: return "MISSING"
    parts = re.findall(r"\d+", s)
    if len(parts) != 3: return "UNKNOWN"
    a,b,c = parts

    # yyyy-??-??
    if len(a) == 4:
        bi,ci = int(b), int(c)
        b_m, c_m = 1<=bi<=12, 1<=ci<=12
        b_d, c_d = 1<=bi<=31, 1<=ci<=31
        if b_m and c_d and not c_m: return "YMD"
        if b_d and c_m and not b_m: return "YDM"
        if (b_m and c_m) or (b_d and c_d): return "AMBIG"
        return "UNKNOWN"

    # ??-??-yyyy
    if len(c) == 4:
        ai,bi = int(a), int(b)
        a_m, b_m = 1<=ai<=12, 1<=bi<=12
        a_d, b_d = 1<=ai<=31, 1<=bi<=31
        if a_d and not a_m and b_m: return "DMY"
        if a_m and b_d and not b_m: return "MDY"
        if a_m and b_m: return "AMBIG"
        if a_d and b_m: return "DMY"
        if a_m and b_d: return "MDY"
        return "UNKNOWN"

    return "UNKNOWN"

# 3) Helpers
FORMAT_MAP = {
    "YMD": "%Y-%m-%d",
    "YDM": "%Y-%d-%m",
    "DMY": "%d-%m-%Y",
    "MDY": "%m-%d-%Y",
}

def parse_by_fmt(val, fmt_label):
    if pd.isna(val) or fmt_label not in FORMAT_MAP: return pd.NaT
    return pd.to_datetime(val, format=FORMAT_MAP[fmt_label], errors="coerce")

def _coerce_ts(y,m,d):
    try: return pd.Timestamp(int(y), int(m), int(d))
    except Exception: return pd.NaT

# Order date ambiguity: use period month from filename
def resolve_ambiguous_order_date(date_str, period_yyyymm):
    if pd.isna(date_str) or pd.isna(period_yyyymm): return pd.NaT
    s = str(date_str).strip()
    parts = re.findall(r"\d+", s)
    if len(parts) != 3: return pd.NaT

    p = int(period_yyyymm)
    p_year, p_month = divmod(p, 100)
    a,b,c = parts

    # yyyy-??-??
    if len(a) == 4:
        y = int(a); bi,ci = int(b), int(c)
        if 1<=bi<=12 and 1<=ci<=12:
            if bi == p_month: return _coerce_ts(y, bi, ci)   # Y-M-D
            if ci == p_month: return _coerce_ts(y, ci, bi)   # Y-D-M
        cand_ymd = _coerce_ts(y, bi, ci)
        cand_ydm = _coerce_ts(y, ci, bi)
        if pd.notna(cand_ymd) and cand_ymd.month == p_month: return cand_ymd
        if pd.notna(cand_ydm) and cand_ydm.month == p_month: return cand_ydm
        return pd.NaT

    # ??-??-yyyy
    if len(c) == 4:
        y = int(c); ai,bi = int(a), int(b)
        if 1<=ai<=12 and 1<=bi<=12:
            if ai == p_month: return _coerce_ts(y, ai, bi)   # MDY
            if bi == p_month: return _coerce_ts(y, bi, ai)   # DMY
        if ai>12 and 1<=bi<=12:
            return _coerce_ts(y, bi, ai) if bi == p_month else pd.NaT
        if bi>12 and 1<=ai<=12:
            return _coerce_ts(y, ai, bi) if ai == p_month else pd.NaT
        return pd.NaT

    return pd.NaT

# Ship date ambiguity: choose candidate closest to order date
def ship_candidates_from_ambiguous(s):
    if s is None: return []
    s = str(s).strip()
    parts = re.findall(r"\d+", s)
    if len(parts) != 3: return []
    a,b,c = parts
    cands = []
    if len(a) == 4:      # yyyy-??-??
        y,bi,ci = int(a), int(b), int(c)
        cands += [_coerce_ts(y, bi, ci), _coerce_ts(y, ci, bi)]  # YMD vs YDM
    elif len(c) == 4:    # ??-??-yyyy
        y,ai,bi = int(c), int(a), int(b)
        cands += [_coerce_ts(y, ai, bi), _coerce_ts(y, bi, ai)]  # MDY vs DMY
    return [ts for ts in cands if pd.notna(ts)]

def choose_closest_to_order(order_dt, candidates):
    if pd.isna(order_dt) or not candidates:
        return pd.NaT
    diffs = [abs((cand - order_dt).days) for cand in candidates]
    min_diff = min(diffs)
    best_idxs = [i for i,d in enumerate(diffs) if d == min_diff]
    if len(best_idxs) == 1:
        return candidates[best_idxs[0]]
    # tie-breaker: prefer >= order_dt (non-negative delivery days)
    nn = [i for i in best_idxs if (candidates[i] - order_dt).days >= 0]
    return candidates[nn[0]] if nn else candidates[best_idxs[0]]

# ORDER DATE: classify, parse, resolve AMBIG, overwrite
df["_OrderDate_fmt"] = df["Order Date"].apply(classify_date_format)

df["_OrderDate_dt"] = pd.NaT
for lbl in ["YMD","YDM","DMY","MDY"]:
    m = df["_OrderDate_fmt"].eq(lbl)
    if m.any():
        df.loc[m, "_OrderDate_dt"] = pd.to_datetime(df.loc[m, "Order Date"], format=FORMAT_MAP[lbl], errors="coerce")

# resolve ambiguous using period
ambig_order_mask = df["_OrderDate_fmt"].eq("AMBIG") & df["_OrderDate_dt"].isna()
if ambig_order_mask.any():
    df.loc[ambig_order_mask, "_OrderDate_dt"] = df.loc[ambig_order_mask, ["Order Date","__period_yyyymm"]].apply(
        lambda r: resolve_ambiguous_order_date(r["Order Date"], r["__period_yyyymm"]), axis=1
    )

# overwrite original column in ISO; keep helper
df["Order Date"] = df["_OrderDate_dt"].dt.strftime("%Y-%m-%d")
df["_OrderDate_ambig_resolved"] = ambig_order_mask & df["_OrderDate_dt"].notna()
df["_OrderDate_unparsed"] = df["_OrderDate_dt"].isna()

# SHIP DATE: classify, parse deterministic, resolve AMBIG by proximity
df["_ShipDate_fmt"] = df["Ship Date"].apply(classify_date_format)

df["_ShipDate_dt"] = pd.NaT
# parse deterministic formats first
for lbl in ["YMD","YDM","DMY","MDY"]:
    m = df["_ShipDate_fmt"].eq(lbl)
    if m.any():
        df.loc[m, "_ShipDate_dt"] = pd.to_datetime(df.loc[m, "Ship Date"], format=FORMAT_MAP[lbl], errors="coerce")

# resolve ambiguous by picking candidate closest to Order Date
ambig_ship_mask = df["_ShipDate_fmt"].eq("AMBIG") & df["_ShipDate_dt"].isna()
if ambig_ship_mask.any():
    df.loc[ambig_ship_mask, "_ShipDate_dt"] = df.loc[ambig_ship_mask, ["Ship Date","_OrderDate_dt"]].apply(
        lambda r: choose_closest_to_order(r["_OrderDate_dt"], ship_candidates_from_ambiguous(r["Ship Date"])),
        axis=1
    )

# overwrite original Ship Date in ISO
df["Ship Date"] = df["_ShipDate_dt"].dt.strftime("%Y-%m-%d")
df["_ShipDate_ambig_resolved"] = ambig_ship_mask & df["_ShipDate_dt"].notna()
df["_ShipDate_unparsed"] = df["_ShipDate_dt"].isna()

# 6) Quick summary
print("Order Date — AMBIG resolved:", int(df["_OrderDate_ambig_resolved"].sum()))
print("Order Date — unparsed (NaT):", int(df["_OrderDate_unparsed"].sum()))
print("Ship Date  — AMBIG resolved:", int(df["_ShipDate_ambig_resolved"].sum()))
print("Ship Date  — unparsed (NaT):", int(df["_ShipDate_unparsed"].sum()))


Order Date — AMBIG resolved: 3974
Order Date — unparsed (NaT): 0
Ship Date  — AMBIG resolved: 3878
Ship Date  — unparsed (NaT): 0


## Data Quality Checks

We produce dedicated tables for:
- **Duplicates**:
  - `Order ID + Product ID` duplicates  
  - `Order ID` with different `Order Date`
- **Logic**:
  - `Ship Date` earlier than `Order Date`
  - `Same Day` shipments where dates differ
- **Ranges**:
  - `Sales <= 0`, `Quantity == 0`, `Discount < 0 or > 1`, `Profit < 0`
- **Consistency**:
  - `Product ID → {Product Name, Category, Sub-Category}` stable mapping  
  - `Customer ID → {Customer Name, Segment}` stable mapping
  - Geography note: multiple cities can share a postal code; we **don’t** enforce one-to-one here.


In [None]:
issues = {}




# 1) Duplicates: Order ID + Product ID
if all(c in df.columns for c in ["Order ID","Product ID"]):
    dup_order_product = (
        df[df.duplicated(["Order ID","Product ID"], keep=False)]
        .sort_values(["Order ID","Product ID"])
    )
    issues["Duplicates_OrderProduct"] = dup_order_product

# 2) Order ID with different Order Date
if all(c in df.columns for c in ["Order ID","Order Date"]):
    g = df.groupby("Order ID")["Order Date"].nunique()
    orderid_diffdate = df[df["Order ID"].isin(g[g > 1].index)].sort_values(["Order ID","Order Date"])
    issues["Duplicates_OrderID_DifferentOrderDate"] = orderid_diffdate

# 3) Logic: Ship Date earlier than Order Date
if all(c in df.columns for c in ["Order Date","Ship Date"]):
    logic_ship_earlier = df[(df["Order Date"].notna()) & (df["Ship Date"].notna()) & (df["Ship Date"] < df["Order Date"])]
    issues["Logic_ShipEarlierThanOrder"] = logic_ship_earlier

# 4) Same Day mismatch
if all(c in df.columns for c in ["Ship Mode","Order Date","Ship Date"]):
    same_day_mismatch = df[(df["Ship Mode"].str.lower()=="same day") & (df["Order Date"] != df["Ship Date"])]
    issues["Logic_SameDay_DatesNotEqual"] = same_day_mismatch

# 5) Ranges: Sales, Quantity, Discount, Profit
issues_other = []
if "Sales" in df.columns:
    issues_other.append(df[df["Sales"] <= 0].assign(_Issue="Sales <= 0"))
if "Quantity" in df.columns:
    issues_other.append(df[df["Quantity"] <= 0].assign(_Issue="Quantity == 0"))
if "Discount" in df.columns:
    issues_other.append(df[(df["Discount"] < 0) | (df["Discount"] >= 1)].assign(_Issue="Discount out of [0,1]"))
issues["Issues_Other"] = pd.concat(issues_other, ignore_index=True) if issues_other else pd.DataFrame()

# Profit-only table (separate per your spec)
if "Profit" in df.columns:
    issues["Issues_ProfitOnly"] = df[df["Profit"].lt(0) | df["Profit"].isna()].copy()

# 6) Consistency checks — product mapping
def inconsistent_mapping(df, key_col, attrs):
    g = df.groupby(key_col)[attrs].nunique(dropna=False)
    mask = np.any(g.values > 1, axis=1)
    bad_keys = g[mask].index
    if len(bad_keys) == 0:
        return pd.DataFrame()
    return df[df[key_col].isin(bad_keys)].sort_values([key_col] + attrs)

if all(c in df.columns for c in ["Product ID","Product Name","Category","Sub-Category"]):
    issues["Consistency_Product"] = inconsistent_mapping(
        df, "Product ID", ["Product Name","Category","Sub-Category"]
    )

# 7) Consistency checks — customer mapping
if all(c in df.columns for c in ["Customer ID","Customer Name","Segment"]):
    issues["Consistency_Customer"] = inconsistent_mapping(
        df, "Customer ID", ["Customer Name","Segment"]
    )

summary_rows = []
for name, tbl in issues.items():
    summary_rows.append({"Check": name, "Rows": 0 if tbl is None else len(tbl)})
quality_summary = pd.DataFrame(summary_rows).sort_values("Rows", ascending=False).reset_index(drop=True)

print("Quality checks complete.")
quality_summary


✅ Quality checks complete.


Unnamed: 0,Check,Rows
0,Issues_ProfitOnly,2058
1,Consistency_Product,176
2,Consistency_Customer,45
3,Logic_SameDay_DatesNotEqual,24
4,Duplicates_OrderProduct,16
5,Issues_Other,4
6,Logic_ShipEarlierThanOrder,0
7,Duplicates_OrderID_DifferentOrderDate,0


In [247]:
all_issues_df = pd.concat(
    [df.assign(IssueType=key) for key, df in issues.items()],
    ignore_index=True
)


In [248]:
all_issues_df.to_csv("x.xlsx")

## Quality Summary

The code below fixes/removes some malformed rows.


In [234]:
df.drop_duplicates(["Order ID","Product ID"], keep="first",inplace=True)
df = df[~df["Quantity"]<=0]
df = df[~df["Profit"].isna()]

## Export — Quality Report (Excel)

We create a multi-sheet Excel file using **XlsxWriter**:

- **Overview** — counts by issue type  
- **Sample** — first N rows of the standardized dataset  
- Individual sheets for each issue table  


In [None]:
report_path = os.path.join( "Quality_Report.xlsx")

with pd.ExcelWriter(report_path, engine="xlsxwriter", datetime_format="yyyy-mm-dd") as xw:
    # Overview
    quality_summary.to_excel(xw, sheet_name="Overview", index=False)

    # Sample (keeps the doc readable)
    df.head(200).to_excel(xw, sheet_name="Data_Sample", index=False)

    # Issues
    for name, tbl in issues.items():
        if tbl is None or len(tbl) == 0:
            # write an empty table with a note
            pd.DataFrame({"Note":[f"No rows for {name}"]}).to_excel(xw, sheet_name=name[:31], index=False)
        else:
            tbl.to_excel(xw, sheet_name=name[:31], index=False)

    # Aesthetics: autofit columns
    for sheet_name, worksheet in xw.sheets.items():
        # best-effort autofit: look at DataFrame in writer book
        try:
            df_sheet = (
                quality_summary if sheet_name=="Overview"
                else df.head(200) if sheet_name=="Data_Sample"
                else issues.get(sheet_name, None)
            )
            if isinstance(df_sheet, pd.DataFrame) and not df_sheet.empty:
                for i, col in enumerate(df_sheet.columns):
                    maxw = max(10, min(60, df_sheet[col].astype(str).str.len().max() + 2))
                    worksheet.set_column(i, i, int(maxw))
            else:
                worksheet.set_column(0, 5, 24)
        except Exception:
            pass

print(f"  Exported Quality Report → {report_path}")


✅ Exported Quality Report → Quality_Report.xlsx


## Dim/Fact Prep

If you’re generating star-schema tables, here’s a clean starting point.


In [195]:
# Product Dim (dedupe by Product ID)
dim_product = (
    df[["Product ID","Product Name","Category","Sub-Category"]]
    .dropna(subset=["Product ID"])
    .drop_duplicates("Product ID")
    .reset_index(drop=True)
)
dim_product["product_sk"] = pd.factorize(dim_product["Product ID"])[0] + 1

#Geography Dim
geo_cols = ["Country","State","City","Postal Code","Region"]
present_geo = [c for c in geo_cols if c in df.columns]

dim_geography = (
    df[present_geo]
    .dropna(how="all")
    .drop_duplicates()
    .reset_index(drop=True)
)

# Natural key string for stable factorization
dim_geography["_geo_nk"] = dim_geography.astype(str).agg("|".join, axis=1)
dim_geography["geography_sk"] = pd.factorize(dim_geography["_geo_nk"])[0] + 1
dim_geography.drop(columns=["_geo_nk"], inplace=True)

#Ship Mode Dim
dim_shipmode = (
        df[["Ship Mode"]]
        .dropna()
        .drop_duplicates()
        .rename(columns={"Ship Mode":"ship_mode"})
        .reset_index(drop=True)
)
dim_shipmode["ship_mode_sk"] = pd.factorize(dim_shipmode["ship_mode"])[0] + 1

# Customer Dim
dim_customer = (
    df[["Customer ID","Customer Name","Segment"]]
    .dropna(subset=["Customer ID"])
    .drop_duplicates("Customer ID")
    .reset_index(drop=True)
)
dim_customer["customer_sk"] = pd.factorize(dim_customer["Customer ID"])[0] + 1


def build_dim_date(dates: pd.Series) -> pd.DataFrame:
    dmin, dmax = pd.to_datetime(dates.min()), pd.to_datetime(dates.max())
    idx = pd.date_range(dmin, dmax, freq="D")
    dim = pd.DataFrame({"date": idx})
    dim["date_sk"] = dim["date"].dt.strftime("%Y%m%d").astype(int)
    dim["day"] = dim["date"].dt.day
    dim["month"] = dim["date"].dt.month
    dim["month_name"] = dim["date"].dt.month_name()
    dim["quarter"] = dim["date"].dt.quarter
    dim["year"] = dim["date"].dt.year
    dim["dow"] = dim["date"].dt.dayofweek
    dim["is_weekend"] = dim["dow"].isin([5,6]).astype(int)
    return dim[["date_sk","date","day","month","month_name","quarter","year","dow","is_weekend"]]

all_dates = pd.concat([df["Order Date"].dropna(), df["Ship Date"].dropna()])
dim_date = build_dim_date(all_dates).sort_values("date_sk").reset_index(drop=True)

# convenient lookups
orderdate_map = dim_date.set_index("date")["date_sk"]
shipdate_map  = dim_date.set_index("date")["date_sk"]




print(
    f"Dims/Facts ready — dim_product:{len(dim_product):,}, "
    f"dim_customer:{len(dim_customer):,}, fact_orders:{len(fact_orders):,}"
)


Dims/Facts ready — dim_product:1,885, dim_customer:793, fact_orders:9,748


In [196]:
fact = df.copy()

# Map Date SKs
fact = fact.merge(dim_date[["date","date_sk"]], left_on="_OrderDate_dt", right_on="date", how="left").rename(columns={"date_sk":"order_date_sk"}).drop(columns=["date"])
fact = fact.merge(dim_date[["date","date_sk"]], left_on="_ShipDate_dt",  right_on="date", how="left").rename(columns={"date_sk":"ship_date_sk"}).drop(columns=["date"])

# Map Product SK
fact = fact.merge(dim_product[["Product ID","product_sk"]], on="Product ID", how="left")

# Map Customer SK
fact = fact.merge(dim_customer[["Customer ID","customer_sk"]], on="Customer ID", how="left")

# Map Geography SK (join on all present geo columns)
if not dim_geography.empty:
    fact = fact.merge(dim_geography.assign(_join_key=1), how="left", on=[c for c in dim_geography.columns if c in geo_cols])

# Map Ship Mode SK
if not dim_shipmode.empty:
    fact = fact.merge(dim_shipmode, left_on="Ship Mode", right_on="ship_mode", how="left").drop(columns=["ship_mode"])



# Keep only needed analytics columns + degenerate Order ID
fact_orders = fact[[
    # Keys
    "order_date_sk","ship_date_sk","product_sk","customer_sk","geography_sk",
    "ship_mode_sk",
    # Degenerate
    "Order ID",
    # Measures
    "Sales","Quantity","Discount","Profit"
]].copy()

# Drop the None column if segment_sk not present
fact_orders = fact_orders[[c for c in fact_orders.columns if c is not None]]

# Basic QC
print(f"fact_orders rows: {len(fact_orders):,}")
for k in ["order_date_sk","product_sk","customer_sk"]:
    if k in fact_orders.columns:
        nulls = fact_orders[k].isna().sum()
        if nulls:
            print(f"⚠️  {k}: {nulls:,} nulls (check source mappings)")


fact_orders rows: 9,748


In [197]:
from pathlib import Path
import zipfile

out_dir = Path("Task_6_Data_Marts")
out_dir.mkdir(exist_ok=True)

dim_date.to_csv(out_dir/"dim_date.csv", index=False)
dim_shipmode.to_csv(out_dir/"dim_ship_mode.csv", index=False)
dim_customer.to_csv(out_dir/"dim_customer.csv", index=False)
dim_product.to_csv(out_dir/"dim_product.csv", index=False)
dim_geography.to_csv(out_dir/"dim_geography.csv", index=False)
fact_orders.to_csv(out_dir/"fact_order_lines.csv", index=False)

# Counts summary CSV
def counts_summary(name, df, pk_col):
    d = {
        "Data Mart System Name": name,
        "Count Rows": len(df),
        "Count Distinct Primary Key": df[pk_col].nunique() if pk_col in df.columns else np.nan
    }
    # Special rule: if a table has "row_id" also add distinct count of that field
    if "row_id" in df.columns:
        d["Count Distinct Row ID"] = df["row_id"].nunique()
    return d

rows = []
rows.append(counts_summary("dim_date", dim_date, "date_sk"))
rows.append(counts_summary("dim_ship_mode", dim_shipmode, "ship_mode_sk"))
rows.append(counts_summary("dim_customer", dim_customer, "customer_sk"))
rows.append(counts_summary("dim_product", dim_product, "product_sk"))
rows.append(counts_summary("dim_geography", dim_geography, "geography_sk"))
rows.append(counts_summary("fact_order_lines", fact_orders, "order_id"))  # natural key as PK proxy

pd.DataFrame(rows).to_csv("Task_6_2_Data_Marts_Rows.csv", index=False)

# Zip archive
with zipfile.ZipFile("Task_6_1_Data_Marts.zip", "w", zipfile.ZIP_DEFLATED) as zf:
    for p in out_dir.glob("*.csv"):
        zf.write(p, arcname=p.name)

print("Exported Task_6_1_Data_Marts.zip and Task_6_2_Data_Marts_Rows.csv")


Exported Task_6_1_Data_Marts.zip and Task_6_2_Data_Marts_Rows.csv
