In [None]:
!pip install -q flask pyngrok
from pyngrok import ngrok

# Replace this with your actual token from the ngrok dashboard
ngrok.set_auth_token("your-ngrok-auth-token")


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# prompt: unzip rar file /content/RPSENSE BACKEND.rar

!sudo apt-get install unrar
# !unrar x "/content/RPSENSE BACKEND.rar" "/content/"
!unrar x "/content/bacnew.rar" "/content/"

In [None]:
# ! pip install flask_cors python-dotenv mediapipe flask_socketio
! pip install flask_cors python-dotenv mediapipe

In [None]:
!pip install --upgrade flask-socketio python-socketio python-engineio

In [None]:
!pip list | grep socketio
!pip list | grep engineio

In [None]:
# !python "/content/RPSENSE BACKEND/app.py"
!python "/content/bacnew/app.py"

In [None]:
import os


class Config:
    # Base paths
    MODEL_PATH = '/content/drive/MyDrive/RPSense_Dataset/finetuned_after100layers_mobilenetv2_rpsense.h5'

    # Model configuration
    MODEL_INPUT_SIZE = (224, 224)
    CLASSES = ['invalid', 'paper', 'rock', 'scissors']
    CONFIDENCE_THRESHOLD = 0.75

    # Frame processing configuration
    INFERENCE_WINDOW_DURATION = 2.0  # seconds
    FRAMES_PER_SECOND = 10  # Expected frames per second from frontend
    MAX_FRAMES_IN_WINDOW = int(INFERENCE_WINDOW_DURATION * FRAMES_PER_SECOND)

    # MediaPipe configuration
    HAND_DETECTION_CONFIDENCE = 0.5
    HAND_TRACKING_CONFIDENCE = 0.5
    MAX_HANDS = 2  # We'll check if more than 1 hand is detected

    # Server configuration
    DEBUG = True
    HOST = '0.0.0.0'
    PORT = 5000

    # Image processing
    HAND_BBOX_PADDING = 30  # Pixels to add around detected hand

In [None]:
import cv2
import numpy as np
import base64
from io import BytesIO
from PIL import Image


def decode_frame_from_base64(base64_string):
    """Decode base64 string to OpenCV image"""
    try:
        # Remove data URL prefix if present
        if "data:image" in base64_string:
            base64_string = base64_string.split(",")[1]

        # Decode base64 to bytes
        img_bytes = base64.b64decode(base64_string)

        # Convert to numpy array
        nparr = np.frombuffer(img_bytes, np.uint8)

        # Decode to OpenCV image
        img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

        return img # BGR color image
    except Exception as e:
        print(f"Error decoding frame: {str(e)}")
        return None


def encode_frame_to_base64(img):
    """Encode OpenCV image to base64 string"""
    try:
        # Encode image to JPEG
        _, buffer = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 90])

        # Convert to base64
        img_base64 = base64.b64encode(buffer).decode("utf-8")

        return f"data:image/jpeg;base64,{img_base64}"
    except Exception as e:
        print(f"Error encoding frame: {str(e)}")
        return None


def extract_hand_roi(image, hand_landmarks, padding=20):
    """Extract hand region of interest from image using MediaPipe landmarks"""
    if hand_landmarks is None or len(hand_landmarks.landmark) == 0:
        return None, None

    h, w, _ = image.shape

    x_coords = [landmark.x * w for landmark in hand_landmarks.landmark]
    y_coords = [landmark.y * h for landmark in hand_landmarks.landmark]

    x_min = max(0, int(min(x_coords)) - padding)
    x_max = min(w, int(max(x_coords)) + padding)
    y_min = max(0, int(min(y_coords)) - padding)
    y_max = min(h, int(max(y_coords)) + padding)

    if x_max - x_min <= 0 or y_max - y_min <= 0:
        return None, None

    roi = image[y_min:y_max, x_min:x_max]
    return roi, (x_min, y_min, x_max, y_max)



def draw_prediction_overlay(image, bbox, prediction, confidence):
    """Draw prediction overlay on image"""
    x_min, y_min, x_max, y_max = bbox

    # Define colors for each class
    colors = {
        "rock": (0, 255, 0),  # Green
        "paper": (255, 0, 0),  # Blue
        "scissors": (0, 0, 255),  # Red
        "invalid": (128, 128, 128),  # Gray
    }

    color = colors.get(prediction, (255, 255, 255))

    # Draw bounding box
    cv2.rectangle(image, (x_min, y_min), (x_max, y_max), color, 2)

    # Draw label background
    label = f"{prediction.upper()}: {confidence:.2f}"
    (label_width, label_height), _ = cv2.getTextSize(
        label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2
    )

    cv2.rectangle(
        image,
        (x_min, y_min - label_height - 10),
        (x_min + label_width, y_min),
        color,
        -1,
    )

    # Draw label text
    cv2.putText(
        image,
        label,
        (x_min, y_min - 5),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.6,
        (255, 255, 255),
        2,
    )

    return image


In [None]:
import mediapipe as mp
import cv2

class HandDetector:
    def __init__(self):
        self.mp_hands = mp.solutions.hands
        self.mp_drawing = mp.solutions.drawing_utils

        self.hands = self.mp_hands.Hands(
            static_image_mode=False,
            max_num_hands=Config.MAX_HANDS,
            min_detection_confidence=Config.HAND_DETECTION_CONFIDENCE,
            min_tracking_confidence=Config.HAND_TRACKING_CONFIDENCE
        )

    def detect_hands(self, image):
        """
        Detect hands in image
        Returns: (status, message, results)
        """
        try:
            # Convert BGR to RGB
            rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # Process the image
            results = self.hands.process(rgb_image)

            if not results.multi_hand_landmarks:
                return "no_hands", "No hands detected", None

            # Check for multiple hands
            if len(results.multi_hand_landmarks) > 1:
                return "invalid", "Multiple hands detected", None

            # Single hand detected
            return "success", "Single hand detected", results.multi_hand_landmarks[0]

        except Exception as e:
            return "error", f"Hand detection error: {str(e)}", None

    def __del__(self):
        if hasattr(self, 'hands'):
            self.hands.close()

In [None]:
# Load the image
image_path = "/content/RPSENSE BACKEND/WIN_20250712_22_09_29_Pro.jpg"
image = cv2.imread(image_path)
detector = HandDetector()

if image is None:
    print(f"Error: Could not load image from {image_path}")
else:
    # Detect hands in the loaded image
    status, message, hand_landmarks = detector.detect_hands(image.copy()) # Use a copy to avoid modifying original

    print(f"Detection Status: {status}")
    print(f"Detection Message: {message}")
    print(f"Hand Landmarks: {hand_landmarks}")

    # Optionally draw landmarks on the image and display it
    if status == "success":
        # Draw landmarks
        detector.mp_drawing.draw_landmarks(
            image, hand_landmarks, detector.mp_hands.HAND_CONNECTIONS)

        # Display the image (only works in environments like local Jupyter with imshow)
        # In Colab, you'd typically save the image or display it differently
        from google.colab.patches import cv2_imshow
        cv2_imshow(image)

# Preprocessing

In [None]:
import cv2
import numpy as np
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input


class ImagePreprocessor:
    def __init__(self):
        self.input_size = Config.MODEL_INPUT_SIZE

    def preprocess_for_model(self, roi_image):
        """
        Preprocess hand ROI for MobileNetV2 model
        """
        try:
            # Resize to model input size
            resized = cv2.resize(roi_image, self.input_size)

            # Convert BGR to RGB (MobileNetV2 expects RGB)
            rgb_image = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)

            # Expand dimensions for batch processing
            img_array = np.expand_dims(rgb_image, axis=0)   #✅ Model expects input shape of (1, H, W, 3)

            # Apply MobileNetV2 preprocessing
            preprocessed = preprocess_input(img_array.astype(np.float32))

            return preprocessed

        except Exception as e:
            print(f"Preprocessing error: {str(e)}")
            return None


# Model Inference

In [None]:
import tensorflow as tf
import numpy as np


class ModelInference:
    def __init__(self):
        self.model = None
        self.classes = Config.CLASSES
        self.load_model()

    def load_model(self):
        """Load the trained MobileNetV2 model"""
        try:
            self.model = tf.keras.models.load_model(Config.MODEL_PATH)
            print(f"✅ Model loaded successfully from {Config.MODEL_PATH}")
        except Exception as e:
            print(f"❌ Error loading model: {str(e)}")
            self.model = None

    def predict(self, preprocessed_image):
        """
        Run inference on preprocessed image
        Returns: (class_name, confidence, all_predictions)
        """
        if self.model is None:
            return "invalid", 0.0, None

        try:
            # Run prediction
            predictions = self.model.predict(preprocessed_image, verbose=0)
            print(f"✅ Predictions: {predictions}")

            # Get class with highest probability
            predicted_class_idx = np.argmax(predictions[0])
            print(f"✅ Predicted class index: {predicted_class_idx}")

            confidence = float(predictions[0][predicted_class_idx])
            print(f"✅ Confidence: {confidence}")

            # Get class name
            predicted_class = self.classes[predicted_class_idx]
            print(f"✅ Predicted class: {predicted_class}")

            # Create prediction dictionary
            all_predictions = {
                class_name: float(prob)
                for class_name, prob in zip(self.classes, predictions[0])
            }
            print(f"✅ All predictions: {all_predictions}")

            return predicted_class, confidence, all_predictions

        except Exception as e:
            print(f"Inference error: {str(e)}")
            return "invalid", 0.0, None


In [None]:
from google.colab.patches import cv2_imshow
import numpy as np
import cv2

# scissor
image_path = "/content/RPSENSE BACKEND/WIN_20250712_22_09_29_Pro.jpg" # 90% paper WRONG
#image_path = "/content/RPSENSE BACKEND/scissor_back.jpg" # 82% scissors
#image_path = "/content/RPSENSE BACKEND/scissor_front.jpg"  #93% sc


#paper
#image_path = "/content/RPSENSE BACKEND/paper_front.jpg"  # 79 paper 16 scissor
#image_path = "/content/RPSENSE BACKEND/paper_back.jpg"  # 100%

#rock
#image_path = "/content/RPSENSE BACKEND/rock.jpg" #99.50 %


img = cv2.imread(image_path)

preprocess = ImagePreprocessor()

try:
    # Preprocess for model
    image = preprocess.preprocess_for_model(img)  # Shape: (1, 224, 224, 3)

    # Remove batch dimension for display
    image_to_display = np.squeeze(image, axis=0)  # Shape: (224, 224, 3)

    # Convert preprocessed image back to uint8 for display (from [-1, 1] to [0, 255])
    image_displayable = ((image_to_display + 1.0) * 127.5).astype(np.uint8)
    cv2_imshow(image_displayable)

    # Run inference
    model_infer = ModelInference()
    label, confidence, all_preds = model_infer.predict(image)

    print("✅ Prediction Results:")
    print(f"Predicted class: {label}")
    print(f"Confidence: {confidence:.2f}")
    print("All class probabilities:")
    print("\n📊 Raw Predictions:")
    for cls, prob in all_preds.items():
      bar = "█" * int(prob * 40)  # Visual bar
      print(f"{cls:10}: {prob:.4f} {bar}")

except Exception as e:
    print(f"❌ Error during prediction test: {str(e)}")


# Postprocess

In [None]:
from collections import Counter, defaultdict
import numpy as np


class PredictionPostprocessor:
    def __init__(self):
        self.confidence_threshold = Config.CONFIDENCE_THRESHOLD
        self.frame_buffer = []
        self.max_frames = Config.MAX_FRAMES_IN_WINDOW

    def add_prediction(self, prediction, confidence, frame_data):
        """Add a prediction to the buffer"""
        if confidence >= self.confidence_threshold:
            self.frame_buffer.append(
                {
                    "prediction": prediction,
                    "confidence": confidence,
                    "frame_data": frame_data,
                    "timestamp": frame_data.get("timestamp"),
                }
            )


    def get_aggregated_result(self):
        """
        Aggregate predictions over the time window
        Returns the most common prediction and the frame with highest confidence
        """
        if not self.frame_buffer:
            return None, None

        # Count predictions
        prediction_counts = Counter(
            [frame["prediction"] for frame in self.frame_buffer]
        )

        # Get most common prediction
        most_common_prediction = prediction_counts.most_common(1)[0][0]

        # Find frame with highest confidence for the most common prediction
        best_frame = max(
            [
                frame
                for frame in self.frame_buffer
                if frame["prediction"] == most_common_prediction
            ],
            key=lambda x: x["confidence"],
        )

        # Calculate aggregation stats
        total_frames = len(self.frame_buffer)
        prediction_percentage = (
            prediction_counts[most_common_prediction] / total_frames
        ) * 100

        result = {
            "final_prediction": most_common_prediction,
            "confidence": best_frame["confidence"],
            "frame_count": total_frames,
            "prediction_percentage": prediction_percentage,
            "all_predictions": dict(prediction_counts),
            "best_frame": best_frame,
        }

        return result, best_frame

    def clear_buffer(self):
        """Clear the prediction buffer"""
        self.frame_buffer.clear()

    def should_send_final_result(self):
        """Check if we have enough frames to send a final result"""
        return len(self.frame_buffer) >= self.max_frames * 0.8  # 80% of expected frames


In [None]:
import time
from google.colab.patches import cv2_imshow
import cv2
class FrameProcessor:
    def __init__(self):
        self.hand_detector = HandDetector()
        self.preprocessor = ImagePreprocessor()
        self.model_inference = ModelInference()
        self.postprocessor = PredictionPostprocessor()

    def process_frame(self, image, frame_metadata=None):
        """
        Process a single frame through the entire pipeline
        Returns: (status, real_time_result, should_send_final, final_result)
        """
        timestamp = time.time()

        # 1. Hand Detection
        hand_status, hand_message, hand_landmarks = self.hand_detector.detect_hands(
            image
        )

        if hand_status != "success":
            return (
                hand_status,
                {
                    "status": hand_status,
                    "message": hand_message,
                    "timestamp": timestamp,
                },
                False,
                None,
            )

        try:
            # 2. Extract hand ROI
            roi_image, bbox = extract_hand_roi(image, hand_landmarks)
            cv2_imshow(roi_image)
            print("bbox", bbox)

            # 3. Preprocess for model
            preprocessed_roi = self.preprocessor.preprocess_for_model(roi_image)
            if preprocessed_roi is None:
                return (
                    "error",
                    {
                        "status": "error",
                        "message": "Preprocessing failed",
                        "timestamp": timestamp,
                    },
                    False,
                    None,
                )
            # Remove batch dimension for display
            image_to_display = np.squeeze(preprocessed_roi, axis=0)  # Shape: (224, 224, 3)

            # Convert preprocessed image back to uint8 for display (from [-1, 1] to [0, 255])
            image_displayable = ((image_to_display + 1.0) * 127.5).astype(np.uint8)
            cv2_imshow(image_displayable)

            # 4. Run inference
            prediction, confidence, all_predictions = self.model_inference.predict(
                preprocessed_roi
            )
            print(prediction)
            print(confidence)
            print(all_predictions)


            # 5. Create frame data
            frame_data = {
                "timestamp": timestamp,
                "bbox": bbox,
                "original_image": image.copy(),
                "roi": roi_image,
                "metadata": frame_metadata or {},
            }

            # 6. Add to postprocessor buffer
            self.postprocessor.add_prediction(prediction, confidence, frame_data)

            # 7. Create overlay image for real-time feedback
            overlay_image = image.copy()
            overlay_image = draw_prediction_overlay(
                overlay_image, bbox, prediction, confidence
            )
            cv2_imshow(overlay_image)
            overlay_base64 = encode_frame_to_base64(overlay_image)

            # 8. Real-time result
            real_time_result = {
                "status": "success",
                "prediction": prediction,
                "confidence": confidence,
                "all_predictions": all_predictions,
                "overlay_image": overlay_base64,
                "timestamp": timestamp,
                "buffer_size": len(self.postprocessor.frame_buffer),
            }

            # 9. Check if we should send final result
            should_send_final = True
            final_result = None

            if should_send_final:
                aggregated_result, best_frame = (
                    self.postprocessor.get_aggregated_result()
                )
                if aggregated_result and best_frame:
                    # Create final overlay with best frame
                    final_overlay = best_frame["frame_data"]["original_image"].copy()
                    final_overlay = draw_prediction_overlay(
                        final_overlay,
                        best_frame["frame_data"]["bbox"],
                        aggregated_result["final_prediction"],
                        aggregated_result["confidence"],
                    )
                    final_overlay_base64 = encode_frame_to_base64(final_overlay)

                    final_result = {
                        "status": "final_result",
                        "final_prediction": aggregated_result["final_prediction"],
                        "confidence": aggregated_result["confidence"],
                        "frame_count": aggregated_result["frame_count"],
                        "prediction_percentage": aggregated_result[
                            "prediction_percentage"
                        ],
                        "all_predictions": aggregated_result["all_predictions"],
                        "final_overlay_image": final_overlay_base64,
                        "timestamp": timestamp,
                    }

                    # Clear buffer after sending final result
                    self.postprocessor.clear_buffer()

            return "success", real_time_result, should_send_final, final_result

        except Exception as e:
            return (
                "error",
                {
                    "status": "error",
                    "message": f"Processing error: {str(e)}",
                    "timestamp": timestamp,
                },
                False,
                None,
            )


In [None]:
import cv2

# Image path to test
image_path = "/content/RPSENSE BACKEND/scissor_back.jpg"


# Load image using OpenCV
image = cv2.imread(image_path)
if image is None:
    print(f"❌ Failed to load image from: {image_path}")
else:
    # Initialize FrameProcessor
    processor = FrameProcessor()

    # Process the frame
    status, real_time_result, should_send_final, final_result = processor.process_frame(image)

    # Print the results
    print(f"\n✅ Processing Status: {status}")
    print("🕒 Timestamp:", real_time_result.get("timestamp"))

    if status == "success":
        print("🔮 Prediction:", real_time_result.get("prediction"))
        print("📈 Confidence:", f"{real_time_result.get('confidence'):.2f}")
        print("📊 All Probabilities:", real_time_result.get("all_predictions"))

        if should_send_final:
            print("\n✅ Final Aggregated Result:")
            print("🔁 Final Prediction:", final_result["final_prediction"])
            print("📈 Confidence:", f"{final_result['confidence']:.2f}")
            print("🧠 Prediction Percentage:", final_result["prediction_percentage"])
            print("📦 Frame Count Used:", final_result["frame_count"])

    else:
        print("⚠️ Failed to process frame:", real_time_result.get("message"))
