In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import joblib
import pandas as pd

In [2]:
cnn_model_path = "G:\\Intern_Project_2025\\models\\Unified_classifier.keras"
xgb_model_path = "G:\\Intern_Project_2025\\models\\Unified_regressor.pkl"

In [3]:
try:
    cnn_model = tf.keras.models.load_model(cnn_model_path)
    xgb_model = joblib.load(xgb_model_path)
except ValueError:
    print("Models not could not be loaded.")

In [4]:
img_folder = "G:\\Intern_Project_2025\\combined_images"
csv_path = "G:\\Intern_Project_2025\\Csv_files\\combined_data.csv"

In [5]:
df = pd.read_csv(csv_path)
df = df.drop('Tearing', axis=1)

In [6]:
import os

df['ImagePath'] = df['ImagePath'].apply(os.path.basename)

In [7]:
 def mask_background(img: tf.Tensor) -> tf.Tensor:
        """Mask background to prevent model from focusing on sharp gradient
        near edges.

        Args:
            img: Image tensor of shape (H, W, C)

        Returns:
            tf.Tensor: Image with circular mask applied
        """
        h = tf.shape(img)[0]
        w = tf.shape(img)[1]
        y_range = tf.range(h)
        x_range = tf.range(w)
        yy, xx = tf.meshgrid(y_range, x_range, indexing="ij")
        center_x = tf.cast(w, tf.float32) / 2.0
        center_y = tf.cast(h, tf.float32) / 2.0
        radius = tf.minimum(center_x, center_y)
        dist_from_center = tf.sqrt(
            (tf.cast(xx, tf.float32) - center_x) ** 2
            + (tf.cast(yy, tf.float32) - center_y) ** 2
        )

        mask = tf.cast(dist_from_center <= radius, tf.float32)
        mask = tf.expand_dims(mask, axis=-1)
        return img * mask

In [8]:
def load_process_images(filename: str, set_mask: bool) -> "tf.Tensor":
    """Load and preprocess image from file path.

    Args:
        filename: Image filename or path

    Returns:
        tf.Tensor: Preprocessed image tensor
    """

    if tf is None:
        raise ImportError("TensorFlow is required for image processing")

    def load_image(file):
        """Load an image and process using same preprocessing as backbone.

        Args:
            file: path to image
            preprocess_input: processing from backbone model

        Returns:
            loaded and resized image
        """
        full_path = os.path.join(img_folder, file)

        try:
            img_raw = tf.io.read_file(full_path)
        except FileNotFoundError:
            print(f"Image file not found: {full_path}")
            return None
        except Exception as e:
            print(f"Error loading image {full_path}: {e}")
            return None

        try:
            img = tf.image.decode_png(img_raw, channels=1)
            img = tf.image.resize(img, [224, 224])
            img = tf.image.grayscale_to_rgb(img)
            if set_mask == True:
                img = mask_background(img)
            return img
        except Exception as e:
            print(f"Error processing image {full_path}: {e}")
            return None

    img = load_image(filename)
    img.set_shape([224, 224, 3])
    return img

In [9]:
def test_prediction(
        cnn_model,
        image_path: str,
        feature_vector: "np.ndarray | None" = None,
    ) -> "np.ndarray":
        """Generate prediction for a single image (and features if not
        image_only).

        Args:
            image_path (str): Path to image to predict.
            feature_vector (np.ndarray | None): Numerical features (ignored if image_only).

        Returns:
            np.ndarray: Model prediction.
        """
        image = load_process_images(image_path, set_mask=False)
        image = np.expand_dims(image, axis=0)
        feature_vector = np.expand_dims(feature_vector, axis=0)
        feature_vector = np.zeros_like(feature_vector)
        prediction = cnn_model.predict([image, feature_vector])
        return prediction

In [10]:
def gather_predictions(df, cnn_model
    ) -> "tuple[np.ndarray, list, list] | tuple[None, None, None]":
        """Gather multiple predictions from test data.

        Returns:
            tuple: (true_labels, pred_labels, predictions) or (None, None, None) if no data.
        """
        if df is None:
            return None, None, None
        pred_image_paths = df["ImagePath"].values
        pred_features = df[
                [
                    "CleaveAngle",
                    "CleaveTension",
                    "ScribeDiameter",
                    "Misting",
                    "Hackle",
                ]
            ].values
        predictions = {}
        if pred_features is not None:
            for img_path, feature_vector in zip(
                pred_image_paths, pred_features
            ):
                prediction = test_prediction(cnn_model, img_path, feature_vector)
                predictions[img_path] = (prediction[0, 0] > 0.63).astype(int)
        else:
            print("No features available for prediction.")
            return None, None, None

        return predictions

In [11]:
predictions = gather_predictions(df, cnn_model)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 63ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 56ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 52ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 59ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 59ms

In [12]:
df['CNN_Predicition'] = df['ImagePath'].map(predictions)

In [13]:
from typing import Optional
from tensorflow.keras.models import Model

class XGBoostPredictor:
    """This class implements basic logic for predicting and testing the change
    in tensions."""

    def __init__(
        self,
        csv_path: str,
        cnn_model_path: str,
        angle_threshold: float,
        diameter_threshold: float,
        xgb_path: Optional[str] = None,
        scaler_path: Optional[str] = None,
    ):
        self.csv_path = csv_path
        self.xgb_path = xgb_path
        self.scaler_path = scaler_path
        self.model = None
        self.scaler = None
        self.angle_threshold = angle_threshold
        self.diameter_threshold = diameter_threshold

        # Load CNN
        try:
            self.cnn_model = tf.keras.models.load_model(cnn_model_path)
            self.feature_extractor = Model(
                inputs=self.cnn_model.input[0],
                outputs=self.cnn_model.get_layer("global_avg").output,
            )
        except (OSError, ValueError) as e:
            print(f"[CNN Load Error]: {e}")
            self.feature_extractor = None

    def _extract_cnn_features(self, image_path: str) -> np.ndarray:
        """Extract CNN features from a grayscale image.

        Args:
            image_path: Path to image to extract features

        Returns:
            Extracted features from image.
        """
        img_raw = tf.io.read_file(image_path)
        img = tf.image.decode_png(img_raw, channels=1)
        img = tf.image.resize(img, [224, 224])
        img = tf.image.grayscale_to_rgb(img)
        img = tf.cast(img, tf.float32)
        img = tf.expand_dims(img, axis=0)
        # remove single dimensional entries
        return self.feature_extractor(img).numpy().squeeze()

    def _extract_data(self, angle_threshold: float, diameter_threshold: float):
        """Load and filter dataset for prediction (only bad cleaves).

        Args:
            angle_threshold: Maximum angle to be considered good cleave
            diameter_threshold: Maximum diameter of scribe mark to be considered good cleave

        Returns:
            Df of bad cleaves and the mean of the good cleaves

        """
        try:
            df = pd.read_csv(self.csv_path)
        except Exception as e:
            raise RuntimeError(f"Failed to load CSV: {e}") from e

        df["CleaveCategory"] = df.apply(
            lambda row: (
                1
                if row["CleaveAngle"] <= angle_threshold
                and row["ScribeDiameter"]
                < diameter_threshold * row["Diameter"]
                and not row["Hackle"]
                and not row["Misting"]
                else 0
            ),
            axis=1,
        )
        # Compute mean tension from good cleaves
        # good_mean = df[df["CleaveCategory"] == 1]["CleaveTension"].mean()
        good_cleaves_df = df[df["CleaveCategory"] == 1]
        mean_tension_per_type = (
            good_cleaves_df.groupby("FiberType")["CleaveTension"]
            .mean()
            .to_dict()
        )
        # Keep only bad cleaves
        bad_df = df[df["CleaveCategory"] == 0].copy()
        bad_df["FiberTypeMeanTension"] = bad_df["FiberType"].map(
            mean_tension_per_type
        )

        # Compute true delta (label) = good_mean - current
        bad_df["TrueDelta"] = (
            bad_df["FiberTypeMeanTension"] - bad_df["CleaveTension"]
        )

        return bad_df, mean_tension_per_type

    def load(self):
        """Load trained model and scaler."""
        if not self.xgb_path or not self.scaler_path:
            raise ValueError("Paths for model and scaler must be provided.")

        try:
            self.model = joblib.load(self.xgb_path)
            self.scaler = joblib.load(self.scaler_path)
        except Exception as e:
            raise RuntimeError(f"Failed to load model or scaler: {e}") from e

    def predict(self):
        """Run tension predictions on filtered cleave data."""
        if not self.model or not self.scaler:
            raise RuntimeError(
                "Model and scaler must be loaded before prediction."
            )

        df, mean_tensions = self._extract_data(
            angle_threshold=self.angle_threshold,
            diameter_threshold=self.diameter_threshold,
        )
        image_paths = df["ImagePath"]
        tensions = df["CleaveTension"]
        true_delta = df["TrueDelta"]
        fiber_type = df["FiberType"]

        predictions = []
        predicted_deltas = []
        pred_ts = []
        predictions_results = {}

        for img_path in image_paths:
            features = self._extract_cnn_features(img_path)
            pred_scaled = self.model.predict(features.reshape(1, -1))[0]
            delta = self.scaler.inverse_transform([[pred_scaled]])[0][0]
            predicted_deltas.append(delta)
            predictions_results[img_path] = delta
            predictions.append(delta + tensions.iloc[len(predictions)])

        for true_t, delta_pred, current_t, fiber in zip(
            true_delta, predicted_deltas, tensions, fiber_type
        ):
            pred_t = current_t + delta_pred
            pred_ts.append(pred_t)

            current_mean = mean_tensions[fiber]
            #print(
             #   f"Current: {current_t:.2f} | True delta: {true_t:.2f} | Pred delta: {delta_pred:.2f} | Pred T: {pred_t:.2f} | Target T: {current_mean:.2f}"
            #)

        df = pd.DataFrame(
            {
                "Current Tension": np.array(tensions).round(2),
                "True Delta": np.array(true_delta).round(2),
                "Predicted Tension": np.array(predictions).round(2),
                "Predicted Delta": np.array(predicted_deltas).round(2),
            }
        )
        basepath = self.xgb_path.strip(".pkl")
        csv_path = f"{basepath}_performance.csv"
        # df.to_csv(csv_path, index=False)

        return predictions_results

In [14]:
xgb_predicter = XGBoostPredictor(
            xgb_path=xgb_model_path,
            csv_path=csv_path,
            angle_threshold=0.49,
            diameter_threshold=0.25,
            scaler_path="G:\\Intern_Project_2025\\models\\Unified_regressor_scaler.pkl",
            cnn_model_path=cnn_model_path
        )

xgb_predicter.load()
xgb_predictions = (
    xgb_predicter.predict()
)

In [15]:
xgb_preds = {}
for key, value in xgb_predictions.items():
    key = os.path.basename(key)
    xgb_preds[key] = value

In [16]:
df['XGB_Prediction'] = df['ImagePath'].map(xgb_preds)

In [17]:
df.fillna(0, inplace=True)

In [18]:
df.loc[df['XGB_Prediction'] > 0, 'XGB_Prediction'] = 1
df.loc[df['XGB_Prediction'] < 0, 'XGB_Prediction'] = -1

In [19]:
df['XGB_Prediction'].value_counts()

XGB_Prediction
 0.0    105
-1.0     77
 1.0     24
Name: count, dtype: int64

In [20]:
df

Unnamed: 0,ImagePath,FiberType,DateCreated,Diameter,CleaveAngle,CleaveTension,TensionVelocity,FHBOffset,ScribeDiameter,Misting,Hackle,CNN_Predicition,XGB_Prediction
0,Fiber-179Plus.png,PM15U25d,2025-06-09 15:37,123.5,0.22,193,60,2552.0,17.28,0,0.0,1,0.0
1,Fiber-183Plus.png,PM15U25d,2025-06-09 15:47,123.2,0.17,190,60,2552.0,16.70,0,0.0,1,0.0
2,Fiber-184Plus.png,PM15U25d,2025-06-09 15:50,123.4,0.33,191,60,2552.0,18.26,0,0.0,1,0.0
3,Fiber-185Plus.png,PM15U25d,2025-06-09 15:53,123.3,0.26,192,60,2552.0,20.61,0,0.0,1,0.0
4,Fiber-186Plus.png,PM15U25d,2025-06-09 15:56,123.9,0.38,193,60,2552.0,18.63,1,0.0,0,-1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
201,Fiber-476Plus.png,400LA,2025-07-16 15:37,251.3,0.22,429,60,2520.0,29.65,0,0.0,1,0.0
202,Fiber-477Plus.png,400LA,2025-07-16 15:40,251.4,0.31,587,60,2487.0,31.08,0,0.0,1,0.0
203,Fiber-478Plus.png,400LA,2025-07-16 15:42,251.3,0.23,583,60,2487.0,24.87,0,0.0,0,0.0
204,Fiber-479Plus.png,400LA,2025-07-16 15:44,251.0,0.20,583,60,2487.0,30.00,0,0.0,1,0.0


In [21]:
csv_path = "C:\\Users\\clombardi\\RL\\data.csv"
df.to_csv(csv_path)

In [145]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

In [241]:
X_cnn = df[['CleaveTension']]
y_cnn = df['CNN_Predicition']

In [242]:
X_test_cnn, X_train_cnn, y_test_cnn, y_train_cnn = train_test_split(X_cnn, y_cnn, test_size = 0.2, random_state=42)

In [243]:
cnn_surrogate_model = RandomForestClassifier(n_estimators=1000, max_depth=10, random_state=42)

In [244]:
cnn_surrogate_model.fit(X_train_cnn, y_train_cnn)

0,1,2
,n_estimators,1000
,criterion,'gini'
,max_depth,10
,min_samples_split,2
,min_samples_leaf,1
,min_weight_fraction_leaf,0.0
,max_features,'sqrt'
,max_leaf_nodes,
,min_impurity_decrease,0.0
,bootstrap,True


In [245]:
print(f"CNN Surrogate Model Accuracy: {cnn_surrogate_model.score(X_test_cnn, y_test_cnn):.2f}")

CNN Surrogate Model Accuracy: 0.77


In [246]:
X_xgb = df[['CleaveTension']]
y_xgb = df['XGB_Prediction']

In [247]:
X_train_xgb, X_test_xgb, y_train_xgb, y_test_xgb = train_test_split(X_xgb, y_xgb, test_size=0.2, random_state=42)

In [248]:
xgb_surrogate_model = RandomForestClassifier(n_estimators=1000, random_state=42)

In [249]:
xgb_surrogate_model.fit(X_train_xgb, y_train_xgb)

0,1,2
,n_estimators,1000
,criterion,'gini'
,max_depth,
,min_samples_split,2
,min_samples_leaf,1
,min_weight_fraction_leaf,0.0
,max_features,'sqrt'
,max_leaf_nodes,
,min_impurity_decrease,0.0
,bootstrap,True


In [250]:
print(f"XGB Surrogate Model Accuracy: {xgb_surrogate_model.score(X_test_xgb, y_test_xgb):.2f}")

XGB Surrogate Model Accuracy: 0.76
