In [2]:

import os
import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema

# File paths
base_dir = r"C:\Users\chang\Desktop\MS2 Platform Technology"
raw_path = os.path.join(base_dir, "trend_report.csv")
clean_path = os.path.join(base_dir, "CLEANED_trend_report.csv")

# 1 Load raw data & normalize column names
df = pd.read_csv(raw_path)
print("Columns before normalization:", df.columns.tolist())
df.columns = (
    df.columns
      .str.strip()
      .str.lower()
      .str.replace(r"[ \-]+", "_", regex=True)
)
print("Columns after normalization:", df.columns.tolist())

# 2 Detect date column
def find_column(candidates, cols):
    for c in candidates:
        if c in cols:
            return c
    return None

date_col = find_column(["trend_date", "date", "report_date", "week"], df.columns)
if date_col is None:
    raise KeyError(f"Date column missing. Available columns: {df.columns.tolist()}")
print(f"Using date column: '{date_col}'")

# 3 Initial shape and missing-value overview
print(f"▶️ Raw data shape: {df.shape}")
print("Missing % per column:")
print((df.isnull().mean() * 100).sort_values(ascending=False))

# 4 Drop high-null & irrelevant columns
high_null = df.columns[df.isnull().mean() > 0.5]
pattern_cols = [c for c in df.columns if c.startswith("col_")]
# Keep date column
drop_cols = [c for c in list(high_null) + pattern_cols if c != date_col]
df = df.drop(columns=drop_cols, errors="ignore")
print(f"After dropping cols, shape: {df.shape}")

# 5 Remove duplicates
df = df.drop_duplicates()
print(f"After duplicate removal, shape: {df.shape}")

# 6 Numeric & categorical fixes
numeric_cols = df.select_dtypes(include=["number"]).columns.tolist()
cat_cols = [c for c in df.select_dtypes(include=["object"]).columns if c != date_col]
print(f"Numeric columns: {numeric_cols}")
print(f"Categorical columns: {cat_cols}")
# Numeric: cast to float (allow negatives)
for col in numeric_cols:
    df[col] = pd.to_numeric(df[col], errors="coerce").astype(float)
# Categorical: strip, lower, cast to category
for col in cat_cols:
    df[col] = df[col].astype(str).str.strip().str.lower().astype("category")
print(f"After numeric & category clean, shape: {df.shape}")

# 7 (Optional) Outlier filtering
print("⚠️ Skipping outlier filtering for trend report.")

# 8 Define & apply Pandera schema
schema_cols = { date_col: Column(str, nullable=False) }
for col in numeric_cols:
    schema_cols[col] = Column(float, nullable=True)
for col in cat_cols:
    schema_cols[col] = Column(pa.Category, nullable=True)
schema = DataFrameSchema(schema_cols)
df = schema.validate(df, lazy=True)
print("✔️ Pandera validation passed")

# 9 Save cleaned data
df.to_csv(clean_path, index=False)
print(f"✅ Cleaned data saved to: {clean_path}")


Columns before normalization: ['week', 'avg_users', 'sales_growth_rate', 'col_4', 'col_5', 'col_6', 'col_7', 'col_8', 'col_9', 'col_10', 'col_11', 'col_12', 'col_13', 'col_14', 'col_15', 'col_16', 'col_17', 'col_18', 'col_19', 'col_20', 'col_21', 'col_22', 'col_23', 'col_24', 'col_25', 'col_26', 'col_27', 'col_28', 'col_29', 'col_30', 'col_31', 'col_32', 'col_33', 'col_34', 'col_35', 'col_36', 'col_37', 'col_38', 'col_39', 'col_40', 'col_41', 'col_42', 'col_43', 'col_44', 'col_45', 'col_46', 'col_47', 'col_48', 'col_49', 'col_50']
Columns after normalization: ['week', 'avg_users', 'sales_growth_rate', 'col_4', 'col_5', 'col_6', 'col_7', 'col_8', 'col_9', 'col_10', 'col_11', 'col_12', 'col_13', 'col_14', 'col_15', 'col_16', 'col_17', 'col_18', 'col_19', 'col_20', 'col_21', 'col_22', 'col_23', 'col_24', 'col_25', 'col_26', 'col_27', 'col_28', 'col_29', 'col_30', 'col_31', 'col_32', 'col_33', 'col_34', 'col_35', 'col_36', 'col_37', 'col_38', 'col_39', 'col_40', 'col_41', 'col_42', 'col_43

In [5]:


import logging
import pandera as pa
from pandera import Column, Check, DataFrameSchema


logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

# 1 Track any upstream errors
# If you wrapped your pd.read_csv in try/except, set this accordingly; otherwise:
error_handling = "Passed"

# 2 Detect missing or broken columns
# We require the date column plus all numeric metrics
required = [date_col] + numeric_cols
missing = [col for col in required if col not in df.columns]
if missing:
    report_missing = missing
    logger.warning(f"Required columns missing: {missing}")
else:
    report_missing = []
    logger.info("All required columns present")

# 3 Define Pandera schema for double‐check
schema = DataFrameSchema({
    date_col: Column(str, nullable=False),
    **{col: Column(float, nullable=True) for col in numeric_cols}
})

# 4 Run schema validation, capture outcome
if df.empty:
    report_schema = "Skipped (empty DataFrame)"
else:
    try:
        schema.validate(df, lazy=True)
        report_schema = "Passed"
        logger.info("Schema validation passed")
    except pa.errors.SchemaErrors as err:
        report_schema = f"Failed: {len(err.failure_cases)} errors"
        logger.error("Schema validation failed:\n%s", err.failure_cases)

# 5 Print consolidated health report
print("\n=== trend_report Pipeline Health Report ===")
print(f"Missing/Broken columns detected: {report_missing}")
print(f"Schema double check:             {report_schema}")
print(f"Error Handling check:            {error_handling}")




INFO: All required columns present
INFO: Schema validation passed



=== trend_report Pipeline Health Report ===
Missing/Broken columns detected: []
Schema double check:             Passed
Error Handling check:            Passed
