# 01 — Data Validation & Merge Pipeline## HumanForYou — Employee Attrition Prediction---### ObjectiveEstablish a **reproducible and traceable workflow** to:1. Validate the integrity of all 4 source datasets (general_data, employee_survey, manager_survey, badge data)2. Check schema consistency, missing values, and data quality3. Merge into a single analysis-ready DataFrame4. Export the clean dataset for downstream notebooks> **Adapted from** the pipeline architecture of a previous YOLO detection project — same rigor, different domain.

## Section 1: Configuration and Environment Setup

In [None]:
# ==============================================================================
# CONFIGURATION — Centralized paths and parameters
# ==============================================================================

import os
import sys
import warnings
import pandas as pd
import numpy as np
from datetime import datetime

warnings.filterwarnings("ignore")

# --- Path Configuration ---
# Adapt DATA_DIR to your environment
DATA_DIR = "../data"          # where CSVs live
OUTPUT_DIR = "../outputs"     # cleaned / merged outputs
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Source files declaration
FILES = {
    "general":         os.path.join(DATA_DIR, "general_data.csv"),
    "employee_survey": os.path.join(DATA_DIR, "employee_survey_data.csv"),
    "manager_survey":  os.path.join(DATA_DIR, "manager_survey_data.csv"),
    "in_time":         os.path.join(DATA_DIR, "in_time.csv"),
    "out_time":        os.path.join(DATA_DIR, "out_time.csv"),
}

EMPLOYEE_ID_COL = "EmployeeID"

print(f"Configuration loaded — {datetime.now():%Y-%m-%d %H:%M}")
print(f"Data directory : {os.path.abspath(DATA_DIR)}")
print(f"Output directory: {os.path.abspath(OUTPUT_DIR)}")

## Section 2: File Existence & Schema Validation**Purpose**: Verify every source file exists, is non-empty, and has the expected columns.

In [None]:
# ==============================================================================
# FILE EXISTENCE CHECK
# ==============================================================================

def validate_file(name, path):
    """Check file exists and is non-empty. Return row/col counts."""
    if not os.path.isfile(path):
        print(f"  [FAIL] {name}: file not found at {path}")
        return None
    size_kb = os.path.getsize(path) / 1024
    df = pd.read_csv(path)
    print(f"  [OK]   {name:20s} — {df.shape[0]:>5} rows × {df.shape[1]:>3} cols  ({size_kb:,.0f} KB)")
    return df

print("=" * 65)
print("FILE VALIDATION")
print("=" * 65)

raw = {}
for key, path in FILES.items():
    result = validate_file(key, path)
    if result is not None:
        raw[key] = result

print(f"\nLoaded {len(raw)}/{len(FILES)} files successfully.")

In [None]:
# ==============================================================================
# SCHEMA VALIDATION — Expected columns
# ==============================================================================

EXPECTED_SCHEMAS = {
    "general": [
        "Age", "Attrition", "BusinessTravel", "Department", "DistanceFromHome",
        "Education", "EducationField", "EmployeeCount", "EmployeeID", "Gender",
        "JobLevel", "JobRole", "MaritalStatus", "MonthlyIncome",
        "NumCompaniesWorked", "Over18", "PercentSalaryHike", "StandardHours",
        "StockOptionLevel", "TotalWorkingYears", "TrainingTimesLastYear",
        "YearsAtCompany", "YearsSinceLastPromotion", "YearsWithCurrManager"
    ],
    "employee_survey": ["EmployeeID", "EnvironmentSatisfaction", "JobSatisfaction", "WorkLifeBalance"],
    "manager_survey":  ["EmployeeID", "JobInvolvement", "PerformanceRating"],
}

print("SCHEMA VALIDATION")
print("=" * 65)
for key, expected_cols in EXPECTED_SCHEMAS.items():
    actual = list(raw[key].columns)
    missing = set(expected_cols) - set(actual)
    extra   = set(actual) - set(expected_cols)
    status = "OK" if not missing else "FAIL"
    print(f"  [{status}] {key}")
    if missing:
        print(f"         Missing columns: {missing}")
    if extra:
        print(f"         Extra columns  : {extra}")

## Section 3: Data Quality Audit**Purpose**: For each dataset, check missing values, duplicates, constant columns, and basic statistics.

In [None]:
# ==============================================================================
# MISSING VALUES REPORT
# ==============================================================================

def missing_report(df, name):
    """Print missing value summary for a DataFrame."""
    total = df.isnull().sum()
    pct   = (total / len(df) * 100).round(2)
    report = pd.DataFrame({"missing": total, "pct": pct})
    report = report[report["missing"] > 0].sort_values("pct", ascending=False)
    if report.empty:
        print(f"  {name}: No missing values ✓")
    else:
        print(f"  {name}: {len(report)} column(s) with missing values")
        for col, row in report.iterrows():
            print(f"    → {col}: {int(row['missing'])} ({row['pct']:.1f}%)")
    return report

print("MISSING VALUES AUDIT")
print("=" * 65)
missing_reports = {}
for key, df in raw.items():
    if key not in ("in_time", "out_time"):  # badge data handled separately
        missing_reports[key] = missing_report(df, key)

In [None]:
# ==============================================================================
# DUPLICATE & CONSTANT COLUMN CHECK
# ==============================================================================

print("DUPLICATE EMPLOYEES CHECK")
print("=" * 65)
for key in ["general", "employee_survey", "manager_survey"]:
    df = raw[key]
    n_dup = df[EMPLOYEE_ID_COL].duplicated().sum()
    print(f"  {key}: {n_dup} duplicate EmployeeIDs {'✓' if n_dup == 0 else '⚠'}")

print("\nCONSTANT COLUMNS CHECK")
print("=" * 65)
df_gen = raw["general"]
constant_cols = [col for col in df_gen.columns if df_gen[col].nunique() <= 1]
if constant_cols:
    print(f"  Constant columns found (candidates for removal): {constant_cols}")
    for col in constant_cols:
        print(f"    → {col}: unique value = {df_gen[col].unique()}")
else:
    print("  No constant columns found.")

In [None]:
# ==============================================================================
# EMPLOYEE ID CONSISTENCY ACROSS DATASETS
# ==============================================================================

print("EMPLOYEE ID CROSS-FILE CONSISTENCY")
print("=" * 65)

ids_general  = set(raw["general"][EMPLOYEE_ID_COL])
ids_employee = set(raw["employee_survey"][EMPLOYEE_ID_COL])
ids_manager  = set(raw["manager_survey"][EMPLOYEE_ID_COL])

# in_time / out_time: first column is EmployeeID (unnamed)
ids_in  = set(raw["in_time"].iloc[:, 0].astype(int))
ids_out = set(raw["out_time"].iloc[:, 0].astype(int))

all_sets = {
    "general": ids_general, "employee_survey": ids_employee,
    "manager_survey": ids_manager, "in_time": ids_in, "out_time": ids_out
}

ref = ids_general
for name, s in all_sets.items():
    only_ref  = ref - s
    only_this = s - ref
    if not only_ref and not only_this:
        print(f"  {name:20s} ↔ general: Perfect match ({len(s)} IDs) ✓")
    else:
        print(f"  {name:20s} ↔ general: {len(only_ref)} missing, {len(only_this)} extra ⚠")

## Section 4: Badge Data Processing**Purpose**: Transform raw badge timestamps (in_time / out_time) into meaningful features per employee:- Average arrival & departure hours- Average daily working hours- Absence rate (% of working days with no badge)- Punctuality indicators

In [None]:
# ==============================================================================
# BADGE DATA — Parse and compute features
# ==============================================================================

def process_badge_data(df_in, df_out):
    """
    Transform raw badge in/out timestamps into employee-level features.
    
    Returns a DataFrame indexed by EmployeeID with:
      - avg_arrival_hour, avg_departure_hour, avg_working_hours
      - absence_rate (fraction of working days with NA badge)
      - late_arrival_rate (fraction of days arriving after 10:00)
    """
    # Extract EmployeeID from first column
    emp_ids = df_in.iloc[:, 0].astype(int)
    
    # Date columns (skip first = EmployeeID)
    date_cols = df_in.columns[1:]
    
    records = []
    for idx in range(len(emp_ids)):
        emp_id = emp_ids.iloc[idx]
        
        arrivals = []
        departures = []
        n_absent = 0
        n_late = 0
        n_days = len(date_cols)
        
        for col in date_cols:
            in_val  = df_in.iloc[idx][col]
            out_val = df_out.iloc[idx][col]
            
            # Check for absence (NA or NaN)
            in_missing  = pd.isna(in_val) or (isinstance(in_val, str) and in_val.strip().upper() == "NA")
            out_missing = pd.isna(out_val) or (isinstance(out_val, str) and out_val.strip().upper() == "NA")
            
            if in_missing or out_missing:
                n_absent += 1
                continue
            
            try:
                t_in  = pd.to_datetime(in_val)
                t_out = pd.to_datetime(out_val)
                arrivals.append(t_in.hour + t_in.minute / 60)
                departures.append(t_out.hour + t_out.minute / 60)
                if t_in.hour >= 10:
                    n_late += 1
            except Exception:
                n_absent += 1
        
        n_present = len(arrivals)
        records.append({
            EMPLOYEE_ID_COL: emp_id,
            "avg_arrival_hour":   np.mean(arrivals)    if arrivals else np.nan,
            "avg_departure_hour": np.mean(departures)  if departures else np.nan,
            "avg_working_hours":  np.mean([d - a for a, d in zip(arrivals, departures)]) if arrivals else np.nan,
            "absence_rate":       n_absent / n_days    if n_days > 0 else np.nan,
            "late_arrival_rate":  n_late / n_present   if n_present > 0 else np.nan,
        })
    
    return pd.DataFrame(records)

print("Processing badge data (this may take ~30s)...")
df_badge = process_badge_data(raw["in_time"], raw["out_time"])
print(f"Badge features computed for {len(df_badge)} employees.")
df_badge.describe().round(3)

## Section 5: Dataset Merge**Purpose**: Inner-join all datasets on EmployeeID to create the unified analysis DataFrame.

In [None]:
# ==============================================================================
# MERGE ALL DATASETS
# ==============================================================================

df = raw["general"].copy()

# Merge survey data
df = df.merge(raw["employee_survey"], on=EMPLOYEE_ID_COL, how="left")
df = df.merge(raw["manager_survey"],  on=EMPLOYEE_ID_COL, how="left")
df = df.merge(df_badge,               on=EMPLOYEE_ID_COL, how="left")

# Drop constant / uninformative columns identified earlier
cols_to_drop = ["EmployeeCount", "Over18", "StandardHours"]
df.drop(columns=[c for c in cols_to_drop if c in df.columns], inplace=True)

print(f"Merged dataset: {df.shape[0]} rows × {df.shape[1]} columns")
print(f"Target distribution (Attrition):")
print(df["Attrition"].value_counts())
print(f"\nAttrition rate: {(df['Attrition'] == 'Yes').mean() * 100:.1f}%")

In [None]:
# ==============================================================================
# FINAL QUALITY CHECK ON MERGED DATA
# ==============================================================================

print("POST-MERGE QUALITY CHECK")
print("=" * 65)

# Missing values in merged dataset
total_missing = df.isnull().sum()
cols_with_na = total_missing[total_missing > 0]
if cols_with_na.empty:
    print("  No missing values in merged dataset ✓")
else:
    print(f"  {len(cols_with_na)} columns with missing values:")
    for col, count in cols_with_na.items():
        print(f"    → {col}: {count} ({count/len(df)*100:.1f}%)")

# Data types summary
print(f"\nColumn types:")
print(f"  Numeric : {df.select_dtypes(include='number').shape[1]}")
print(f"  Object  : {df.select_dtypes(include='object').shape[1]}")
print(f"  Total   : {df.shape[1]}")

## Section 6: Export

In [None]:
# ==============================================================================
# EXPORT CLEAN DATASET
# ==============================================================================

output_path = os.path.join(OUTPUT_DIR, "merged_data.csv")
df.to_csv(output_path, index=False)
print(f"Merged dataset exported to: {os.path.abspath(output_path)}")
print(f"Shape: {df.shape}")

# Also save a quick summary
summary_path = os.path.join(OUTPUT_DIR, "data_validation_summary.txt")
with open(summary_path, "w") as f:
    f.write(f"Data Validation Summary — {datetime.now():%Y-%m-%d %H:%M}\n")
    f.write(f"{'='*50}\n")
    f.write(f"Total employees: {len(df)}\n")
    f.write(f"Total features:  {df.shape[1]}\n")
    f.write(f"Attrition rate:  {(df['Attrition']=='Yes').mean()*100:.1f}%\n")
    f.write(f"Columns dropped: {cols_to_drop}\n")
    f.write(f"Badge features added: avg_arrival_hour, avg_departure_hour, avg_working_hours, absence_rate, late_arrival_rate\n")

print(f"Summary saved to: {os.path.abspath(summary_path)}")
print("\n✓ Pipeline complete — proceed to 02_EDA_Explorer.ipynb")