# Automated HR Analytics Data Pipeline
**Project:** IBM HR Analytics - Attrition Analysis  
**Process:** Data Engineering & Automated Pipeline  
**Author:** [Your Name]

---
### Purpose
This notebook automates the extraction, cleaning, and transformation of HR datasets. 
It is designed to handle batch processing of multiple raw files and ensure data integrity 
before exporting the results for Power BI visualization.

In [1]:
import pandas as pd
import numpy as np
import os
from datetime import datetime

# Define directory paths
RAW_DATA_DIR = '../data/raw/'
PROCESSED_DATA_PATH = '../data/processed/HR_Attrition_Cleaned.csv'
LOG_FILE_PATH = '../docs/pipeline_log.txt'

print("‚úÖ Environment initialized.")

‚úÖ Environment initialized.


In [5]:
def clean_hr_data(df):
    """
    Standardizes and cleans the input dataframe.
    - Removes redundant features
    - Maps categorical binaries to integers
    - Performs age and distance binning
    """
    # 1. Drop redundant columns with zero variance
    # 'errors=ignore' ensures the script continues if columns were already removed
    redundant_cols = ['Over18', 'EmployeeCount', 'StandardHours', 'EmployeeNumber']
    df = df.drop(columns=redundant_cols, errors='ignore')

    # 2. Map Categorical 'Yes/No' to Binary (1/0)
    # This facilitates statistical calculation for Attrition Rate
    binary_map = {'Yes': 1, 'No': 0}
    
    if 'Attrition' in df.columns:
        df['Attrition'] = df['Attrition'].map(binary_map)
    if 'OverTime' in df.columns:
        df['OverTime'] = df['OverTime'].map(binary_map)

    # 3. Feature Engineering: Age Binning
    age_bins = [18, 25, 35, 45, 55, 100]
    age_labels = ['18-25', '26-35', '36-45', '46-55', '55+']
    df['AgeGroup'] = pd.cut(df['Age'], bins=age_bins, labels=age_labels, right=False)

    # 4. Feature Engineering: Distance Binning
    dist_bins = [0, 5, 15, 100]
    dist_labels = ['Near', 'Far', 'Very Far']
    df['DistanceGroup'] = pd.cut(df['DistanceFromHome'], bins=dist_bins, labels=dist_labels, right=False)

    return df

In [6]:
def run_pipeline():
    """
    Orchestrates the data pipeline: Scans raw folder, validates, cleans, and merges data.
    """
    # Scan for CSV files in the raw directory
    if not os.path.exists(RAW_DATA_DIR):
        print(f"‚ùå Error: Directory '{RAW_DATA_DIR}' not found.")
        return

    csv_files = [f for f in os.listdir(RAW_DATA_DIR) if f.endswith('.csv')]
    
    if not csv_files:
        print("‚ÑπÔ∏è No new raw files found to process.")
        return

    processed_list = []

    for file in csv_files:
        full_path = os.path.join(RAW_DATA_DIR, file)
        raw_df = pd.read_csv(full_path)
        
        # Integrity Check: Validate required columns
        required_columns = ['Age', 'Attrition', 'DistanceFromHome']
        if not all(col in raw_df.columns for col in required_columns):
            print(f"‚ö†Ô∏è Warning: Missing required columns in {file}. Skipping...")
            continue
            
        # Execute cleaning function
        cleaned_df = clean_hr_data(raw_df)
        processed_list.append(cleaned_df)
        print(f"‚úÖ Successfully processed: {file}")

    # Merge and export if data exists
    if processed_list:
        final_dataset = pd.concat(processed_list, ignore_index=True)
        final_dataset.to_csv(PROCESSED_DATA_PATH, index=False)
        
        # Logging
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        log_entry = f"Pipeline Execution: {timestamp} | Total Rows: {len(final_dataset)}"
        print(f"\nüöÄ {log_entry}")
        
        # Save log history
        with open(LOG_FILE_PATH, 'a') as log_file:
            log_file.write(log_entry + '\n')
    else:
        print("‚ùå Pipeline failed: No valid data found.")

# Trigger the pipeline
if __name__ == "__main__":
    run_pipeline()

‚úÖ Successfully processed: WA_Fn-UseC_-HR-Employee-Attrition.csv

üöÄ Pipeline Execution: 2026-01-02 09:49:48 | Total Rows: 1470
