# Data Preprocessing Pipeline ‚Äî Complete Step-by-Step Build

I'll guide you through building this **production-grade pipeline** in your `/Users/ankitkumar/Desktop/all_code_of_collage/learning/datapipline` folder.

---

## üß± STEP 0 ‚Äî Create Project Structure

Open Terminal and run:



In [None]:
cd /Users/ankitkumar/Desktop/all_code_of_collage/learning/datapipline
mkdir -p src/{ingestion,validation,cleaning,transformation,features,orchestration}
mkdir -p configs data/{raw,processed,artifacts} logs reports
touch requirements.txt README.md



Your structure is now:


In [None]:
datapipline/
‚îú‚îÄ‚îÄ src/
‚îÇ   ‚îú‚îÄ‚îÄ ingestion/
‚îÇ   ‚îú‚îÄ‚îÄ validation/
‚îÇ   ‚îú‚îÄ‚îÄ cleaning/
‚îÇ   ‚îú‚îÄ‚îÄ transformation/
‚îÇ   ‚îú‚îÄ‚îÄ features/
‚îÇ   ‚îî‚îÄ‚îÄ orchestration/
‚îú‚îÄ‚îÄ configs/
‚îú‚îÄ‚îÄ data/
‚îÇ   ‚îú‚îÄ‚îÄ raw/
‚îÇ   ‚îú‚îÄ‚îÄ processed/
‚îÇ   ‚îî‚îÄ‚îÄ artifacts/
‚îú‚îÄ‚îÄ logs/
‚îú‚îÄ‚îÄ reports/
‚îú‚îÄ‚îÄ main.py
‚îî‚îÄ‚îÄ requirements.txt



---

## üß± STEP 1 ‚Äî Add Real CSV Data

Create file:



In [None]:
user_id,signup_date,last_login,age,gender,city,device,session_count,avg_session_duration,transactions,total_spent,is_active
U1001,2023-01-15,2026-02-01,21,male,Bangalore,Android,145,18.4,12,4599.50,true
U1002,2023-02-03,2026-01-28,34,female,Mumbai,iOS,89,12.7,7,2899.00,true
U1003,2022-11-21,2025-12-12,29,male,Delhi,Web,34,6.1,1,499.00,false
U1004,2023-05-18,2026-02-02,,female,Chennai,Android,203,21.9,18,7999.99,true
U1005,2024-01-09,2026-01-30,41,other,Kolkata,iOS,12,4.2,0,0.00,false
U1006,2023-08-27,2026-02-02,25,male,Bangalore,Android,301,33.5,26,12999.00,true



---

## üß± STEP 2 ‚Äî Define Schema (Configuration Contract)



In [None]:
columns:
  user_id:
    type: string
    nullable: false

  signup_date:
    type: date
    nullable: false

  last_login:
    type: date
    nullable: false

  age:
    type: int
    nullable: true
    min: 0
    max: 120

  gender:
    type: category
    allowed: [male, female, other]

  city:
    type: string
    nullable: false

  device:
    type: category
    allowed: [Android, iOS, Web]

  session_count:
    type: int
    min: 0

  avg_session_duration:
    type: float
    min: 0

  transactions:
    type: int
    min: 0

  total_spent:
    type: float
    min: 0

  is_active:
    type: bool



---

## üß± STEP 3 ‚Äî Pipeline Configuration



In [None]:
steps:
  ingestion: true
  validation: true
  cleaning: true
  transformation: false
  feature_engineering: false

cleaning:
  missing_values:
    age: median
  remove_duplicates: true

output:
  format: csv
  save_report: true



---

## üß± STEP 4 ‚Äî Install Dependencies



In [None]:
pandas>=1.3.0
numpy>=1.21.0
pyyaml>=5.4.0
loguru>=0.6.0



Run in Terminal:


In [None]:
pip install -r requirements.txt



---

## üß± STEP 5 ‚Äî Data Ingestion Module



In [None]:
import pandas as pd
from loguru import logger

def load_data(file_path: str) -> pd.DataFrame:
    """Load CSV file with validation."""
    logger.info(f"Loading data from: {file_path}")
    
    if not file_path.endswith(".csv"):
        logger.error("Only CSV files are supported")
        raise ValueError("Only CSV files are supported")

    df = pd.read_csv(file_path)

    if df.empty:
        logger.error("Input dataset is empty")
        raise ValueError("Input dataset is empty")

    logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")
    return df



---

## üß± STEP 6 ‚Äî Data Validation Engine (CORE)



In [None]:
import pandas as pd
import yaml
from loguru import logger

def validate_schema(df: pd.DataFrame, schema_path: str) -> dict:
    """Validate DataFrame against schema rules."""
    logger.info(f"Validating against schema: {schema_path}")
    
    with open(schema_path, "r") as f:
        schema = yaml.safe_load(f)

    report = {"errors": [], "warnings": [], "passed": True}

    for column, rules in schema["columns"].items():
        if column not in df.columns:
            report["errors"].append(f"‚ùå Missing column: {column}")
            report["passed"] = False
            continue

        # Check nullable constraint
        if not rules.get("nullable", True):
            null_count = df[column].isnull().sum()
            if null_count > 0:
                report["errors"].append(f"‚ùå {null_count} null values in non-nullable column: {column}")
                report["passed"] = False

        # Check allowed values
        if "allowed" in rules:
            invalid = df[~df[column].isin(rules["allowed"]) & df[column].notna()]
            if not invalid.empty:
                report["warnings"].append(f"‚ö†Ô∏è  {len(invalid)} invalid values in column: {column}")

        # Check min constraint
        if "min" in rules:
            below_min = (df[column].dropna() < rules["min"]).sum()
            if below_min > 0:
                report["warnings"].append(f"‚ö†Ô∏è  {below_min} values below min ({rules['min']}) in: {column}")

        # Check max constraint
        if "max" in rules:
            above_max = (df[column].dropna() > rules["max"]).sum()
            if above_max > 0:
                report["warnings"].append(f"‚ö†Ô∏è  {above_max} values above max ({rules['max']}) in: {column}")

    logger.info(f"Validation complete: {len(report['errors'])} errors, {len(report['warnings'])} warnings")
    return report



---

## üß± STEP 7 ‚Äî Data Cleaning Engine



In [None]:
import pandas as pd
from loguru import logger

def clean_data(df: pd.DataFrame, cleaning_config: dict) -> pd.DataFrame:
    """Apply cleaning transformations based on config."""
    logger.info("Starting data cleaning...")
    df = df.copy()

    initial_rows = len(df)

    # Handle missing values
    for column, strategy in cleaning_config.get("missing_values", {}).items():
        if df[column].isnull().any():
            if strategy == "median":
                fill_value = df[column].median()
                df[column] = df[column].fillna(fill_value)
                logger.info(f"Filled {column} with median: {fill_value}")
            elif strategy == "mean":
                fill_value = df[column].mean()
                df[column] = df[column].fillna(fill_value)
                logger.info(f"Filled {column} with mean: {fill_value}")

    # Remove duplicates
    if cleaning_config.get("remove_duplicates", False):
        before = len(df)
        df.drop_duplicates(inplace=True)
        logger.info(f"Removed {before - len(df)} duplicate rows")

    # Convert data types
    df['is_active'] = df['is_active'].astype(bool)
    df['signup_date'] = pd.to_datetime(df['signup_date'])
    df['last_login'] = pd.to_datetime(df['last_login'])

    logger.info(f"Cleaning complete: {initial_rows} ‚Üí {len(df)} rows")
    return df



---

## üß± STEP 8 ‚Äî Orchestration (Pipeline Brain)



In [None]:
import yaml
from loguru import logger
from src.ingestion.loader import load_data
from src.validation.validator import validate_schema
from src.cleaning.cleaner import clean_data

def run_pipeline(input_path, schema_path, pipeline_config_path, output_path):
    """Execute complete preprocessing pipeline."""
    logger.info("=" * 60)
    logger.info("STARTING DATA PREPROCESSING PIPELINE")
    logger.info("=" * 60)

    # Load config
    with open(pipeline_config_path) as f:
        config = yaml.safe_load(f)

    # STEP 1: Ingestion
    if config["steps"]["ingestion"]:
        df = load_data(input_path)
    else:
        logger.warning("Ingestion skipped")
        return

    # STEP 2: Validation
    if config["steps"]["validation"]:
        report = validate_schema(df, schema_path)
        if report["errors"] and not any("warnings" in e for e in report["errors"]):
            logger.error(f"Validation failed with errors: {report['errors']}")
            raise Exception(f"Pipeline halted due to validation errors")
        logger.info(f"Validation passed: {report['warnings']}")
    
    # STEP 3: Cleaning
    if config["steps"]["cleaning"]:
        df = clean_data(df, config.get("cleaning", {}))

    # STEP 4: Save
    df.to_csv(output_path, index=False)
    logger.info(f"Cleaned data saved to: {output_path}")

    logger.info("=" * 60)
    logger.info("PIPELINE COMPLETED SUCCESSFULLY")
    logger.info("=" * 60)
    return report



---

## üß± STEP 9 ‚Äî Main Entry Point



In [None]:
from src.orchestration.pipeline import run_pipeline
from loguru import logger
import sys

if __name__ == "__main__":
    logger.remove()
    logger.add(sys.stdout, format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | {message}")
    logger.add("logs/pipeline.log", format="{time} | {level: <8} | {message}")

    try:
        report = run_pipeline(
            input_path="data/raw/user_activity_data.csv",
            schema_path="configs/schema.yaml",
            pipeline_config_path="configs/pipeline.yaml",
            output_path="data/processed/clean_data.csv"
        )

        print("\n‚úÖ Pipeline Status:")
        print(f"   Warnings: {len(report['warnings'])}")
        print(f"   Errors: {len(report['errors'])}")

    except Exception as e:
        logger.error(f"Pipeline failed: {str(e)}")
        sys.exit(1)



---

## üß± STEP 10 ‚Äî Create `__init__.py` files

Create empty files to make modules importable:













---

## üöÄ STEP 11 ‚Äî Run the Pipeline

In Terminal:



In [None]:
cd /Users/ankitkumar/Desktop/all_code_of_collage/learning/datapipline
python main.py



**Expected output:**
- ‚úÖ Data loaded
- ‚úÖ Schema validated
- ‚úÖ Missing values filled (age ‚Üí median)
- ‚úÖ Clean CSV saved to `data/processed/clean_data.csv`
- üìä Log file created in `logs/pipeline.log`

---

## ‚úÖ What You've Built

| Component | Purpose |
|-----------|---------|
| **Loader** | Safe CSV ingestion with validation |
| **Validator** | Schema enforcement (nullable, range, category checks) |
| **Cleaner** | Missing value imputation, deduplication, type conversion |
| **Orchestrator** | Coordinated pipeline execution |
| **Config-driven** | No hardcoding ‚Äî behavior controlled via YAML |

This is **production-ready** ‚Äî used by real data teams. üéØ