# Production-Ready Preprocessing & Feature Engineering Module

This module implements a robust, scikit-learn compatible preprocessing pipeline for agricultural datasets.
It is designed to be modular, testable, and production-ready.

**Key Components:**
1.  **Global Configuration:** Centralized definition of feature constraints and binning logic.
2.  **Custom Transformers:** Scikit-learn compatible classes for cleaning and feature engineering.
3.  **Pipeline Construction:** Automated assembly of preprocessing steps.
4.  **Verification:** Built-in unit tests to validate logic.



In [ ]:
import logging
import os
import json
import numpy as np
import pandas as pd
import joblib
from typing import Dict, List, Optional, Union, Tuple
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PowerTransformer
from sklearn.utils.validation import check_is_fitted

# Configure Logging
# Navigate to project root log directory
log_dir = "logs"
if not os.path.exists(log_dir):
    # If running from notebooks/ directory, check parent
    if os.path.exists("../logs"):
        log_dir = "../logs"
    else:
        # Default to local logs if not found
        os.makedirs(log_dir, exist_ok=True)

logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(os.path.join(log_dir, 'preprocessing.log')),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)



## 1. Global Configuration Block

Constraints are now loaded from `config/constraints.json` to allow for flexibility across soil types.
- **FEATURE_CONSTRAINTS**: Loaded dynamically.
- **BIN_EDGES**: Categories for binning continuous variables.



In [ ]:
def load_constraints(config_path: str = '../config/constraints.json', profile: str = 'default') -> Dict[str, Tuple[float, float]]:
    """Load feature constraints from JSON config."""
    # Handle path relative to script location
    if not os.path.isabs(config_path):
        base_dir = os.path.dirname(__file__)
        config_path = os.path.join(base_dir, config_path)
        
    try:
        with open(config_path, 'r') as f:
            config = json.load(f)
        return config.get(profile, {})
    except FileNotFoundError:
        logger.warning(f"Config file {config_path} not found. Using fallback defaults.")
        # Fallback defaults
        return {
            'N': (0, 140), 'P': (5, 145), 'K': (5, 205),
            'temperature': (8, 45), 'humidity': (10, 100),
            'ph': (3.5, 10.0), 'rainfall': (20, 300)
        }

# Load defaults initially
FEATURE_CONSTRAINTS = load_constraints()

# Binning logic for categorical feature generation
BIN_EDGES = {
    'moisture': { # Using humidity as proxy if soil_moisture missing, or for specific columns
        'bins': [0, 30, 60, 100],
        'labels': ['Low', 'Medium', 'High']
    },
    'temperature': {
        'bins': [-np.inf, 20, 30, np.inf],
        'labels': ['Cool', 'Moderate', 'Warm']
    }
}



## 2. Step 1: Custom Transformers (The Logic)

### DataCleaner
Handles data integrity issues:
- Removes duplicate rows (Critical for preventing leakage).
- Clips values to physically possible ranges (Sensor error handling).



In [ ]:
class DataCleaner(BaseEstimator, TransformerMixin):
    def __init__(self, constraints: Dict[str, Tuple[float, float]] = FEATURE_CONSTRAINTS):
        self.constraints = constraints
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        """
        Clean data by handling duplicates and clipping values.
        
        Note: Dropping duplicates in a transformer within a pipeline that requires 
        aligned X and y (like cross_validate) can be problematic. 
        This is best used in the inference pipeline or initial data cleaning.
        """
        X = X.copy()
        
        # 1. Handle Duplicates
        initial_shape = X.shape
        X = X.drop_duplicates()
        if X.shape[0] < initial_shape[0]:
            logger.info(f"DataCleaner: Dropped {initial_shape[0] - X.shape[0]} duplicate rows.")
            
        # 2. Range Validation (Clipping)
        for col, (min_val, max_val) in self.constraints.items():
            if col in X.columns:
                # Log if clipping happens
                outliers = ((X[col] < min_val) | (X[col] > max_val)).sum()
                if outliers > 0:
                    logger.debug(f"DataCleaner: Clipping {outliers} values in {col} to [{min_val}, {max_val}]")
                X[col] = X[col].clip(lower=min_val, upper=max_val)
                
        logger.info(f"DataCleaner: Output shape {X.shape}")
        return X

    def set_output(self, *, transform=None):
        """
        Enable pandas output configuration.
        Since this transformer always returns a DataFrame (if input is DF),
        we just return self to allow the pipeline validation to pass.
        """
        return self



### FeatureEngineer
Implements domain-specific feature extraction:
- **NPK Ratios**: Critical for understanding nutrient balance.
- **Climate Interaction**: Captures the combined effect of heat and moisture on plant stress.
- **Nutrient Index**: A heuristic score for overall soil fertility.



In [ ]:
class FeatureEngineer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        X = X.copy()
        
        # Validate columns exist
        required_cols = ['N', 'P', 'K', 'temperature', 'humidity']
        if not all(col in X.columns for col in required_cols):
            missing = [c for c in required_cols if c not in X.columns]
            logger.warning(f"FeatureEngineer: Missing columns {missing}. Some features won't be generated.")
            return X # Return as is or handle gracefully

        # 1. NPK Ratios (with zero-division protection)
        # Agricultural Logic: The ratio of nutrients defines vegetative vs reproductive growth.
        epsilon = 1e-6
        X['ratio_N_P'] = X['N'] / (X['P'] + epsilon)
        X['ratio_N_K'] = X['N'] / (X['K'] + epsilon)
        X['ratio_P_K'] = X['P'] / (X['K'] + epsilon)
        
        # 2. Total NPK
        # Agricultural Logic: Total ionic concentration indicator.
        X['total_nutrients'] = X['N'] + X['P'] + X['K']
        
        # 3. Climate Interaction
        # Agricultural Logic: High Temp + High Humidity = High Disease Pressure / Heat Stress.
        X['climate_stress_index'] = (X['temperature'] * X['humidity']) / 100.0
        
        # 4. Binning Logic
        # Categorize continuous variables for decision tree stability / interpretability.
        if 'humidity' in X.columns:
             X['moisture_cat'] = pd.cut(
                X['humidity'], 
                bins=BIN_EDGES['moisture']['bins'], 
                labels=BIN_EDGES['moisture']['labels']
            ).astype(object) # Keep as object for OneHotEncoder
            
        if 'temperature' in X.columns:
            X['temp_cat'] = pd.cut(
                X['temperature'],
                bins=BIN_EDGES['temperature']['bins'],
                labels=BIN_EDGES['temperature']['labels']
            ).astype(object)

        # 5. Nutrient Balance Score (Liebig's Law of Minimum)
        # Instead of a weighted sum (which implies compensation), we find the limiting factor.
        # We normalize roughly by max expected values: N/140, P/145, K/205
        X['nutrient_limiting_factor'] = np.minimum(
            X['N'] / 140.0, 
            np.minimum(X['P'] / 145.0, X['K'] / 205.0)
        )
        
        # Keeping 'nutrient_balance_score' for backward compatibility or different interpretation if needed, 
        # but the request specifically asked to fix the logic violation. 
        # We'll replace the old score with this new logic or rename it. 
        # Let's replace 'nutrient_balance_score' with the limiting factor value to satisfy "fix these".
        X['nutrient_balance_score'] = X['nutrient_limiting_factor']
        
        logger.info(f"FeatureEngineer: Generated {len(X.columns) - len(required_cols)} new features.")
        return X

    def set_output(self, *, transform=None):
        return self



## 3. Step 2: The Scikit-Learn Pipeline (The Engine)

We coordinate the preprocessing steps:
1. **Cleaner**: Initial sanitization.
2. **Engineer**: Feature creation.
3. **ColumnTransformer**: Specific handling for Numeric vs Categorical data.
   - **Numeric**: Impute constraints -> Standard Scaler.
   - **Categorical**: Impute missing -> One-Hot Encode.



In [ ]:
def create_training_pipeline(numeric_features: List[str], categorical_features: List[str]) -> Pipeline:
    """
    Creates the full end-to-end preprocessing pipeline.
    """
    
    # Numeric Transformer Pipeline
    # Using KNNImputer as soil properties are often correlated (e.g. pH and Rainfall).
    # Using PowerTransformer (Yeo-Johnson) to handle skewed distributions (e.g., Rainfall, trace elements).
    numeric_transformer = Pipeline(steps=[
        ('imputer', KNNImputer(n_neighbors=5)),
        ('transformer', PowerTransformer(method='yeo-johnson', standardize=True))
    ])
    
    # Categorical Transformer Pipeline
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    # Column Transformer
    # Note: FeatureEngineer adds columns, so we need to be careful with column selection.
    # Approach: We apply FeatureEngineer FIRST, then we treat the *Resulting* columns.
    # However, ColumnTransformer needs to know columns upfront.
    # To make this robust: 
    # Option A: FeatureEngineer runs inside the pipeline, and we use a selector that can handle dynamic columns 
    # (available in newer sklearn via make_column_selector, but strict defining is safer for prod).
    # Option B: We define the 'expected' output columns of FeatureEngineer.
    
    # For this implementation, we will assume the initial FeatureEngineer runs on the whole DataFrame,
    # and then the ColumnTransformer selects specific columns to scale/encode.
    
    # Let's derive the expected columns after feature engineering
    derived_numeric = ['ratio_N_P', 'ratio_N_K', 'ratio_P_K', 'total_nutrients', 'climate_stress_index', 'nutrient_balance_score']
    derived_categorical = ['moisture_cat', 'temp_cat']
    
    full_numeric_features = numeric_features + derived_numeric
    full_categorical_features = categorical_features + derived_categorical

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, full_numeric_features),
            ('cat', categorical_transformer, full_categorical_features)
        ],
        verbose_feature_names_out=False
    )
    
    # Main Pipeline
    pipeline = Pipeline(steps=[
        ('cleaner', DataCleaner()),
        ('engineer', FeatureEngineer()),
        ('preprocessor', preprocessor)
    ])
    
    # Output as Pandas DataFrame for readability and easier debugging
    pipeline.set_output(transform="pandas")
    
    return pipeline



## 4. Step 3: Logging & Persistence

Utilities to save the artifacts and track execution.



In [ ]:
class LoggingTransformer(BaseEstimator, TransformerMixin):
    """Wrapper to log shape and status between steps."""
    def __init__(self, name=""):
        self.name = name
        
    def fit(self, X, y=None):
        return self
        
    def transform(self, X, y=None):
        logger.info(f"[{self.name}] Shape: {X.shape}")
        return X

def save_pipeline(pipeline: Pipeline, filepath: str):
    """Persist the pipeline to disk."""
    try:
        joblib.dump(pipeline, filepath)
        logger.info(f"Pipeline successfully saved to {filepath}")
    except Exception as e:
        logger.error(f"Failed to save pipeline: {e}")



## 5. Step 4: Unit Tests (The Verification)

This section uses `pytest` style assertions to verify logic correctness.
It can be run interactively or via a test runner.



In [ ]:
def test_pipeline_logic():
    print("Running Unit Tests...")
    
    # Mock Data
    data = pd.DataFrame({
        'N': [60, 50, 400, 60],   # Note: 400 is outlier (limit 300)
        'P': [20, 40, 40, 20],
        'K': [55, 60, 60, 55],    # Keep K above min 50
        'temperature': [25, 35, 10, 25],
        'humidity': [50, 80, 45, 50],
        'ph': [6.5, 7.0, 6.0, 6.5],
        'rainfall': [500, 600, 450, 500] # Min 400
    })
    
    # 1. Test DataCleaner (Duplicate Removal & Clipping)
    cleaner = DataCleaner()
    cleaned = cleaner.transform(data)
    
    # Check duplicate dropped (Row 0 and 3 are identical)
    assert len(cleaned) == 3, f"Expected 3 rows after duplicate drop, got {len(cleaned)}"
    
    # Check clipping (N=200 should be clipped to 300 if > 300, or stay 200)
    # The default config now has N max = 300. 
    # Example data has N=200, which is valid now.
    # To test clipping, we need a value > 300.
    # But for this existing test with 200, it should be <= 300.
    assert cleaned['N'].max() <= 300, "DataCleaner constraint failed"
    
    # 2. Test FeatureEngineer
    engineer = FeatureEngineer()
    engineered = engineer.transform(cleaned)
    
    # Check Ratio Logic
    # Row 0: N=60, P=20 -> N/P = 3.0
    expected_ratio = 60 / (20 + 1e-6)
    assert np.isclose(engineered.iloc[0]['ratio_N_P'], expected_ratio), f"N/P Ratio calculation incorrect. Expected {expected_ratio}, got {engineered.iloc[0]['ratio_N_P']}"
    
    # Check Binning
    # Temp=25 -> 'Moderate' (20-30)
    assert engineered.iloc[0]['temp_cat'] == 'Moderate', f"Expected 'Moderate' temp, got {engineered.iloc[0]['temp_cat']}"
    
    # 3. Test Full Pipeline
    # Define features
    num_vars = ['N', 'P', 'K', 'temperature', 'humidity', 'ph', 'rainfall']
    cat_vars = [] # Start with none, relying on engineered ones
    
    pipeline = create_training_pipeline(num_vars, cat_vars)
    transformed_df = pipeline.fit_transform(data)
    
    assert isinstance(transformed_df, pd.DataFrame), "Pipeline output is not a DataFrame"
    
    # Check Columns exist (Standard Scaled + OneHot)
    # Scaled N, P, K... OneHot temp_cat, moisture_cat
    expected_cols_subset = ['ratio_N_P', 'temp_cat_Moderate']
    for col in expected_cols_subset:
        # Note: Scaler might keep name 'ratio_N_P', OneHot makes 'temp_cat_Moderate'
        assert any(c in transformed_df.columns for c in [col, f"num__{col}", f"cat__{col}"]), f"Missing column {col} in output"

    print("[PASS] All Tests Passed!")


def process_dataset(input_path: str, output_path: str, model_save_path: Optional[str] = None):
    """
    Load data, process it using the pipeline, and save the result.
    """
    logger.info(f"Loading data from {input_path}...")
    try:
        df = pd.read_csv(input_path)
    except FileNotFoundError:
        logger.error(f"Input file not found: {input_path}")
        return

    # Define Feature Groups based on Dataset
    # Raw data columns: Temparature, Humidity, Moisture, Soil Type, Crop Type, Nitrogen, Potassium, Phosphorous, Fertilizer Name
    # We need to map these to the expected N, P, K, temperature, humidity
    
    rename_map = {
        'Nitrogen': 'N',
        'Phosphorous': 'P',
        'Potassium': 'K',
        'Temparature': 'temperature', # Note the spelling error in source 'Temparature'
        'Humidity': 'humidity',
        'Moisture': 'moisture',
        'Soil Type': 'soil_type',
        'Crop Type': 'crop_type',
        'Fertilizer Name': 'label' # Assuming this is the target
    }
    
    df = df.rename(columns=rename_map)
    
    # Separate Target if present
    target_col = 'label'
    if target_col in df.columns:
        y = df[target_col]
        X = df.drop(columns=[target_col])
    else:
        y = None
        X = df
        
    numeric_features = ['N', 'P', 'K', 'temperature', 'humidity']
    # If we want to use moisture, we should add it.
    if 'moisture' in X.columns:
        numeric_features.append('moisture')
        
    # We have extra categorical columns now: soil_type, crop_type
    categorical_features = []
    if 'soil_type' in X.columns:
        categorical_features.append('soil_type')
    if 'crop_type' in X.columns:
        categorical_features.append('crop_type')
    
    # Create Pipeline
    pipeline = create_training_pipeline(numeric_features, categorical_features)
    
    # Fit & Transform
    logger.info("Running preprocessing pipeline...")
    X_processed = pipeline.fit_transform(X)
    
    # If we have a target, re-attach it for the 'processed data' file, 
    # OR save X and y separately. Usually for ML pipeline readiness, 
    # saving a single dataframe with engineered features + target is convenient.
    if y is not None:
        # We might want to encode the target too, but usually that's done by LabelEncoder 
        # at the model training step, or we can leave it string.
        X_processed[target_col] = y
        
    # Save Data
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    X_processed.to_csv(output_path, index=False)
    logger.info(f"Processed data saved to {output_path}")
    
    # Save Pipeline
    if model_save_path:
        # We refit on X to ensure the saved pipeline is fitted (redundant if using fit_transform but safe)
        save_pipeline(pipeline, model_save_path)

# Run tests if executed as script
if __name__ == "__main__":
    # 1. Run Tests
    test_pipeline_logic()
    
    # 2. Process Actual Data
    # Adjust paths relative to project root
    raw_data_path = os.path.join(os.path.dirname(__file__), '../data/raw/crop_data.csv')
    processed_data_path = os.path.join(os.path.dirname(__file__), '../data/processed/processed_crop_data.csv')
    pipeline_save_path = os.path.join(os.path.dirname(__file__), '../models/preprocessing_pipeline.joblib')
    
    if os.path.exists(raw_data_path):
        process_dataset(raw_data_path, processed_data_path, pipeline_save_path)
    else:
        logger.warning(f"Raw data not found at {raw_data_path}. Skipping data processing.")
