In [4]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor, IsolationForest
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.tree import DecisionTreeRegressor
from sklearn.base import clone
import random

In [5]:
# Load data
train_set = pd.read_csv("train_cleandata.csv")
test_set = pd.read_csv("test_cleandata.csv")

# Convert categorical variables to categorical dtype
categorical_cols = ["quarter", "day", "team", "department"]
train_set[categorical_cols] = train_set[categorical_cols].astype("category")
test_set[categorical_cols] = test_set[categorical_cols].astype("category")

# Define predictors
department_predictors = [
    "targeted_productivity", "smv", "over_time", "incentive",
    "idle_time", "idle_men", "no_of_style_change", "no_of_workers", "team"
]

overall_predictors = department_predictors + ["department"]

In [10]:
# Configuration
random.seed(42)
np.random.seed(42)

DEPARTMENTS = {
    "Sewing": department_predictors,
    "Finishing": department_predictors,
    "Overall": overall_predictors
}

OUTLIER_METHODS = {
    "isolation_forest": IsolationForest(
        contamination=0.1,
        n_estimators=200,
        max_features=0.8,
        random_state=42
    ),
    "none": None
}

def detect_outliers(data, method):
    """Detect outliers using specified method"""
    if method == "none":
        return data
    
    detector = OUTLIER_METHODS[method]
    numeric_cols = data.select_dtypes(include=np.number).columns
    detector.fit(data[numeric_cols])
    inliers = detector.predict(data[numeric_cols]) == 1
    return data[inliers]

def prepare_data(train, test, predictors, outlier_method):
    """Preprocess data with specified outlier handling"""
    # Detect and remove outliers
    train_clean = detect_outliers(train, outlier_method)
    test_clean = detect_outliers(test, outlier_method)
    
    # Separate features and target
    X_train = train_clean[predictors]
    X_test = test_clean[predictors]
    y_train = train_clean["actual_productivity"]
    y_test = test_clean["actual_productivity"]
    
    return X_train, X_test, y_train, y_test

# Enhanced Parameter Grid with Pruning Focus
RANDOM_FOREST_PARAMS = {
    'n_estimators': [500],
    'max_depth': [8, 10],
    'min_samples_split': [15, 20],
    'min_samples_leaf': [10, 15],
    'max_features': [0.5, 0.6],
    'max_leaf_nodes': [50, 100],
    'ccp_alpha': np.linspace(0, 0.1, 20),
    'max_samples': [0.7, 0.8]
}

def prune_forest(forest, X_train, y_train):
    """Advanced post-hoc pruning with cross-validated alpha selection"""
    pruned_forest = clone(forest)
    best_alphas = []
    
    # Prune each tree individually
    for idx, estimator in enumerate(pruned_forest.estimators_):
        # Get pruning path
        path = estimator.cost_complexity_pruning_path(X_train, y_train)
        ccp_alphas = path.ccp_alphas
        
        # Find optimal alpha using validation set
        best_alpha = ccp_alphas[np.argmin(path.impurities)]
        best_alphas.append(best_alpha)
        
        # Clone and prune tree
        pruned_tree = DecisionTreeRegressor(random_state=42)
        pruned_tree.set_params(**estimator.get_params())
        pruned_tree.set_params(ccp_alpha=best_alpha)
        pruned_tree.fit(X_train, y_train)
        
        pruned_forest.estimators_[idx] = pruned_tree
    
    return pruned_forest

def train_rf_model(X_train, y_train):
    """Advanced training with two-stage tuning and pruning"""
    # Stage 1: Initial parameter search
    rf = RandomForestRegressor(
        random_state=42,
        oob_score=True,
        bootstrap=True
    )
    
    search = RandomizedSearchCV(
        rf, RANDOM_FOREST_PARAMS,
        n_iter=50,
        cv=5,
        scoring='neg_mean_squared_error',
        n_jobs=-1,
        error_score='raise'
    )
    search.fit(X_train, y_train)
    
    # Stage 2: Post-hoc pruning
    best_model = search.best_estimator_
    pruned_model = prune_forest(best_model, X_train, y_train)
    
    return pruned_model, search.best_params_

def run_analysis(department, outlier_method):
    """Enhanced analysis pipeline with double regularization"""
    # Data preparation
    if department == "Overall":
        train = train_set
        test = test_set
    else:
        dept_lower = department.lower()
        train = train_set[train_set["department"] == dept_lower]
        test = test_set[test_set["department"] == dept_lower]
    
    X_train, X_test, y_train, y_test = prepare_data(
        train, test, DEPARTMENTS[department], outlier_method
    )
    
    # Preprocessing pipeline
    categorical_cols = X_train.select_dtypes(include=['category']).columns
    preprocessor = ColumnTransformer([
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)
    ], remainder='passthrough')
    
    # Transform data
    X_train_processed = preprocessor.fit_transform(X_train)
    X_test_processed = preprocessor.transform(X_test)
    
    # Train model with pruning
    final_model, best_params = train_rf_model(X_train_processed, y_train)
    
    # Evaluate
    metrics = {
        "train_mse": mean_squared_error(y_train, final_model.predict(X_train_processed)),
        "test_mse": mean_squared_error(y_test, final_model.predict(X_test_processed)),
        "train_r2": r2_score(y_train, final_model.predict(X_train_processed)),
        "test_r2": r2_score(y_test, final_model.predict(X_test_processed)),
        "oob_score": final_model.oob_score_,
        "best_params": best_params
    }
    
    return {
        "department": department,
        "outlier_method": outlier_method,
        **metrics
    }



In [11]:
finishing_no_outlier_result = run_analysis("Finishing", "isolation_forest")

AttributeError: 'RandomForestRegressor' object has no attribute 'estimators_'

In [None]:
# Example Usage
results = []
for dept in ["Sewing", "Finishing", "Overall"]:
    for method in ["isolation_forest", "none"]:
        result = run_analysis(dept, method)
        results.append(result)

results_df = pd.DataFrame(results)
print(results_df[['department', 'outlier_method', 'train_r2', 'test_r2', 'oob_score']])