In [42]:
import pandas as pd
import numpy as np
import os
import chardet
from sklearn.preprocessing import OneHotEncoder
from xgboost import XGBRegressor

class GAProcessOptimizer:
    def __init__(self, path_raw, path_safety, safety_column="Assurance_Bar"):
        # Load raw plant data with encoding detection
        with open(path_raw, 'rb') as f:
            encoding = chardet.detect(f.read())['encoding']
        self.df_raw = pd.read_csv(path_raw, encoding=encoding)

        # Load safety curve data
        with open(path_safety, 'rb') as f:
            encoding_safety = chardet.detect(f.read())['encoding']
        self.df_safety = pd.read_csv(path_safety, encoding=encoding_safety)

        # Store selected safety margin column
        self.safety_column_used = safety_column

        # Extract plant name from filename
        self.df_raw['plant_name'] = self._extract_plant_name(path_raw)

        # Standardize column names and add temporal derivatives
        self.standardize_columns()
        self.add_temporal_derivatives()

        # Initialize placeholders
        self.df_filtered = None
        self.df_ranges = None
        self.X = None
        self.y = None
        self.models = {}
        self.y_pred = None
        self.outlier_warnings = []
        self.best_config = None
        self.safe_row_count = 0
        self.total_row_count = 0
        self.used_fallback = False

    def _extract_plant_name(self, path):
        fname = os.path.basename(path)
        name = os.path.splitext(fname)[0]
        tokens = [t for t in name.split('_') if t.lower() not in ['filled', 'root', 'data', 'csv']]
        return ' '.join(tokens).strip()

    def standardize_columns(self):
        rename_map = {
            'GA_content_%': 'Yield',
            'epsilon_r_ext': 'epsilon_r_est',
            'pH_neutral_ext': 'pH_neutral_est',
            'Temperature °C': 'temperature_C',
            'Time min': 'time_min',
            'Time s': 'time_s',
            'concen tar dis': 'concentration_est',
            'option 1 est': 'option1_est',
            'rbd marker est': 'rbd_marker_est'
        }
        self.df_raw.rename(columns=rename_map, inplace=True)

    def add_temporal_derivatives(self):
        for col in ['temperature_C', 'pressure_bar_abs', 'pH_neutral_est']:
            self.df_raw[f'delta_{col}'] = self.df_raw[col].diff().fillna(0)

    def safe_curve(self, temp):
        curve = self.df_safety
        return np.interp(temp, curve["T (°C)"], curve[self.safety_column_used])

    def validate_and_filter(self):
        print(f"[INFO] Using safety margin column: '{self.safety_column_used}' for subcritical pressure gating")
        required = ['plant_name', 'Yield', 'time_min', 'temperature_C', 'pressure_bar_abs', 'pH_neutral_est']
        missing = [col for col in required if col not in self.df_raw.columns]
        if missing:
            raise ValueError(f"Missing required columns: {missing}")
        self.df_raw.dropna(subset=required, inplace=True)

        for col in required[1:]:
            mean = self.df_raw[col].mean()
            std = self.df_raw[col].std(ddof=0)
            z = (self.df_raw[col] - mean) / std
            outliers = self.df_raw[np.abs(z) > 3]
            if not outliers.empty:
                flagged = outliers[col].round(2).tolist()
                self.outlier_warnings.append(
                    f"{len(flagged)} rows in '{col}' had extreme values: {flagged}. Mean = {mean:.2f}"
                )

        df = self.df_raw.copy()
        df["safe_pressure_limit"] = df["temperature_C"].apply(self.safe_curve)
        safe_mask = df["pressure_bar_abs"] <= df["safe_pressure_limit"]
        self.safe_row_count = safe_mask.sum()
        self.total_row_count = len(df)
        self.used_fallback = self.safe_row_count == 0

        if self.used_fallback:
            print("[Fallback] No subcritical-safe rows found. Using full dataset with caution. ⚠️")
            self.df_filtered = df.copy()
        else:
            self.df_filtered = df[safe_mask].copy()

    def extract_optimal_ranges(self, threshold_ratio=0.85):
        rows = []
        for plant, group in self.df_filtered.groupby('plant_name'):
            y_max = group['Yield'].max()
            y_thresh = threshold_ratio * y_max
            high_yield = group[group['Yield'] >= y_thresh]
            if high_yield.empty:
                continue
            rows.append({
                'plant_name': plant,
                'Yield_high': y_max,
                'Yield_threshold': y_thresh,
                **{f"{col}_min": high_yield[col].min() for col in ['time_min', 'temperature_C', 'pressure_bar_abs', 'pH_neutral_est']},
                **{f"{col}_max": high_yield[col].max() for col in ['time_min', 'temperature_C', 'pressure_bar_abs', 'pH_neutral_est']}
            })
        self.df_ranges = pd.DataFrame(rows)

    def extract_best_configuration(self):
        df = self.df_filtered if not self.df_filtered.empty else self.df_raw.copy()
        best_row = df.sort_values("Yield", ascending=False).iloc[0]
        self.best_config = {
            "plant_name": best_row["plant_name"],
            "Yield_best": best_row["Yield"],
            "time_min": best_row["time_min"],
            "temperature_C": best_row["temperature_C"],
            "pressure_bar_abs": best_row["pressure_bar_abs"],
            "pH_neutral_est": best_row["pH_neutral_est"],
            "within_safety": best_row["pressure_bar_abs"] <= self.safe_curve(best_row["temperature_C"])
        }
    def prepare_features(self):
        # Select optional features available in the dataset
        optional = ['epsilon_r_est', 'Brix_No', 'ambient_temp', 'humidity', 'equipment_type',
                    'delta_temperature_C', 'delta_pressure_bar_abs', 'delta_pH_neutral_est']
        available = [f for f in optional if f in self.df_filtered.columns]

        # Build feature matrix with one-hot encoding for plant name
        feature_df = self.df_ranges[['plant_name']].copy()
        for f in available:
            feature_df[f] = self.df_filtered.groupby('plant_name')[f].first().reindex(self.df_ranges['plant_name']).values

        encoder = OneHotEncoder(sparse_output=False)
        encoded = encoder.fit_transform(feature_df[['plant_name']])
        self.X = pd.DataFrame(encoded, columns=encoder.get_feature_names_out(['plant_name']))
        for f in available:
            self.X[f] = feature_df[f].values

    def train_model(self):
        # Train XGBoost regressors for each parameter range
        targets = [f"{col}_{ext}" for col in ['time_min', 'temperature_C', 'pressure_bar_abs', 'pH_neutral_est'] for ext in ['min', 'max']]
        self.y = self.df_ranges[targets]
        self.y_pred = np.zeros_like(self.y.values)

        for i, col in enumerate(targets):
            model = XGBRegressor(n_estimators=100, learning_rate=0.1, max_depth=6, objective='reg:squarederror', random_state=42)
            model.fit(self.X, self.y[col])
            self.models[col] = model
            self.y_pred[:, i] = model.predict(self.X)

    def build_output(self):
        # Format model-inferred ranges
        output = self.df_ranges[['plant_name', 'Yield_high', 'Yield_threshold']].copy()
        param_names = ['time_min', 'temperature_C', 'pressure_bar_abs', 'pH_neutral_est']
        for i, name in enumerate(param_names):
            output[f'{name}_range'] = [f"{row[0]:.2f} – {row[1]:.2f}" for row in self.y_pred[:, [2*i, 2*i+1]]]

        # Print safety limits and results per plant
        for plant in output['plant_name'].unique():
            row = output[output['plant_name'] == plant].iloc[0]
            print(f"\n[DEBUG] Safety Limits Used for '{plant}':")
            temp_range = self.df_filtered[self.df_filtered['plant_name'] == plant]['temperature_C']
            if not temp_range.empty:
                t_min, t_max = temp_range.min(), temp_range.max()
                p_min = self.safe_curve(t_min)
                p_max = self.safe_curve(t_max)
                print(f"  Temperature Range: {t_min:.2f} – {t_max:.2f} °C")
                print(f"  Pressure Safety Limits: {p_min:.2f} – {p_max:.2f} bar abs (from '{self.safety_column_used}')")

            print(f'\nProcess: "{plant}"')
            print(f"\nData Summary:")
            print(f"  Total datapoints: {self.total_row_count}")
            print(f"  Subcritical-safe datapoints: {self.safe_row_count}")
            if self.used_fallback:
                print(f"  ⚠️ No safe configurations available — fallback to full dataset used")

            print(f"\nRecommended Ranges (Machine-Inferred via XGBoost):")
            print(f"  Highest Observed Yield (%): {row['Yield_high']:.3f}%")
            print(f"  High-Yield Threshold (%): {row['Yield_threshold']:.3f}%")
            print(f"  Reaction Time Range (min): {row['time_min_range']}")
            print(f"  Reaction Temperature Range (°C): {row['temperature_C_range']}")
            print(f"  Reaction Pressure Range (bar abs): {row['pressure_bar_abs_range']}")
            print(f"  Solution pH Range: {row['pH_neutral_est_range']}")

            if self.best_config and self.best_config["plant_name"] == plant:
                print(f"\nBest-Performing Configuration (Observed from Hard Data):")
                print(f"  Best Yield Achieved (%): {self.best_config['Yield_best']:.3f}%")
                print(f"  Reaction Time (min): {self.best_config['time_min']:.2f}")
                print(f"  Reaction Temperature (°C): {self.best_config['temperature_C']:.2f}")
                print(f"  Reaction Pressure (bar abs): {self.best_config['pressure_bar_abs']:.2f}")
                print(f"  Solution pH: {self.best_config['pH_neutral_est']:.2f}")
                if self.best_config["within_safety"]:
                    print(f"  ✅ Within safety limits: True")
                else:
                    print(f"  ❌ Within safety limits: False")

        return output

    def run(self):
        # Execute full pipeline
        self.validate_and_filter()
        self.extract_optimal_ranges()
        self.extract_best_configuration()
        self.prepare_features()
        self.train_model()
        output = self.build_output()

        # Print grouped warnings per plant
        if self.outlier_warnings:
            plant_name = self.df_raw['plant_name'].iloc[0]
            print(f"\nData Quality Warnings ({plant_name}):")
            for w in self.outlier_warnings:
                print(f"  - {w}")

        return output

# Execution block
plant_files = [
    r"C:\Users\Syeed\Desktop\Innovo\filled_Ellagic_Acid_Peel.csv",
    r"C:\Users\Syeed\Desktop\Innovo\filled_Glycyrrhiza_Glabra_Root.csv"
]
safety_curve_file = r"C:\Users\Syeed\Desktop\Innovo\Clean_Temp_P_Safety.csv"
safety_column = "Assurance_Bar"  # Change to "50% Safety Margin BAR" or "20% Safety Margin BAR" as needed

print(f"\nProcessed files: {', '.join([os.path.basename(p) for p in plant_files])}")
print(f"[INFO] Applying safety gating using shared file: {os.path.basename(safety_curve_file)}")

results = []
for path in plant_files:
    if not os.path.exists(path):
        print(f"[ERROR] File not found: {path}")
        continue

    print(f"\n[INFO] Running pipeline for: {path}")
    pipeline = GAProcessOptimizer(path, safety_curve_file, safety_column=safety_column)
    output = pipeline.run()
    results.append((pipeline.df_ranges, pipeline.best_config, pipeline.outlier_warnings))

# Combine all ranges and identify best config
all_ranges = pd.concat([r for r, _, _ in results if r is not None], ignore_index=True)
best_configs = [cfg for _, cfg, _ in results if cfg]

if best_configs:
    best = max(best_configs, key=lambda x: x["Yield_best"])
    print(f"\nSummary Recommendation (Based Solely on Observed Data):\n")
    print(f"The highest yield was achieved by '{best['plant_name']}' using a real-world configuration.\n")
    print(f"Observed Yield: {best['Yield_best']:.3f}%\n")
    print("Recommended configuration (from actual datapoint):")
    print(f"  Reaction Time (min): {best['time_min']:.2f}")
    print(f"  Reaction Temperature (°C): {best['temperature_C']:.2f}")
    print(f"  Reaction Pressure (bar abs): {best['pressure_bar_abs']:.2f}")
    print(f"  Solution pH: {best['pH_neutral_est']:.2f}")
    if best["within_safety"]:
        print(f"  ✅ Within safety limits: True")
        print("Consider prioritizing this extraction setup for future optimization.")
    else:
        print(f"  ❌ Within safety limits: False")
        print("⚠️ This configuration exceeds safety limits and should not be prioritized without further validation.")

# Print grouped warnings per plant
for _, cfg, warnings in results:
    if warnings:
        print(f"\nData Quality Warnings ({cfg['plant_name']}):")
        for w in warnings:
            print(f"  - {w}")



Processed files: filled_Ellagic_Acid_Peel.csv, filled_Glycyrrhiza_Glabra_Root.csv
[INFO] Applying safety gating using shared file: Clean_Temp_P_Safety.csv

[INFO] Running pipeline for: C:\Users\Syeed\Desktop\Innovo\filled_Ellagic_Acid_Peel.csv
[INFO] Using safety margin column: 'Assurance_Bar' for subcritical pressure gating

[DEBUG] Safety Limits Used for 'Ellagic Acid Peel':
  Temperature Range: 234.79 – 250.00 °C
  Pressure Safety Limits: 32.01 – 41.26 bar abs (from 'Assurance_Bar')

Process: "Ellagic Acid Peel"

Data Summary:
  Total datapoints: 61
  Subcritical-safe datapoints: 6

Recommended Ranges (Machine-Inferred via XGBoost):
  Highest Observed Yield (%): 0.230%
  High-Yield Threshold (%): 0.196%
  Reaction Time Range (min): 55.00 – 60.00
  Reaction Temperature Range (°C): 234.79 – 250.00
  Reaction Pressure Range (bar abs): 31.95 – 35.00
  Solution pH Range: 5.50 – 5.59

Best-Performing Configuration (Observed from Hard Data):
  Best Yield Achieved (%): 0.230%
  Reaction Ti