In [1]:
import cv2
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import time
from datetime import datetime
import random
from tqdm import tqdm

# ==================== DATASET COLLECTOR ====================
class DatasetCollector:
    def __init__(self, target_images=500):
        self.expressions = {
            '1': 'angry',
            '2': 'disgust', 
            '3': 'fear',
            '4': 'happy',
            '5': 'neutral',
            '6': 'sad',
            '7': 'surprise'
        }
        self.dataset_dir = 'dataset'
        self.target_images = target_images
        self.face_cascade = cv2.CascadeClassifier(
            cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        )
        self.current_expression = 'neutral'
        self.counters = {expr: 0 for expr in self.expressions.values()}
        self.cap = None
        self.running = True
        
        self.setup_directories()
    
    def setup_directories(self):
        """Create dataset directory structure"""
        print("üìÅ Creating dataset directories...")
        os.makedirs(self.dataset_dir, exist_ok=True)
        for expression in self.expressions.values():
            expr_dir = os.path.join(self.dataset_dir, expression)
            os.makedirs(expr_dir, exist_ok=True)
        print(f"‚úÖ Created directories for {len(self.expressions)} expressions")
    
    def capture_dataset(self):
        """Create dataset instantly"""
        print("üöÄ Creating facial expression dataset...")
        print(f"üéØ Target: {self.target_images} images per expression")
        
        for expression in self.expressions.values():
            print(f"üé≠ Generating {expression} images...")
            expr_dir = os.path.join(self.dataset_dir, expression)
            
            for i in tqdm(range(self.target_images), desc=f"  {expression}"):
                # Create synthetic face image
                img = self.create_dataset(expression)
                
                # Save image
                filename = f"{expression}_{i:04d}.jpg"
                filepath = os.path.join(expr_dir, filename)
                cv2.imwrite(filepath, img)
                
                self.counters[expression] += 1
            
            print(f"‚úÖ {expression}: {self.target_images} images created")
        
        total_images = sum(self.counters.values())
        print(f"\nüéâ Dataset created successfully!")
        print(f"üìä Total images: {total_images}")
        print(f"üíæ Location: {self.dataset_dir}/")
    
    def create_dataset(self, expression):
        """Create synthetic face image with expression"""
        img_size = 48
        image = np.zeros((img_size, img_size), dtype=np.uint8)
        center_x, center_y = img_size // 2, img_size // 2
        
        # Draw face oval
        face_radius = random.randint(18, 22)
        cv2.circle(image, (center_x, center_y), face_radius, 255, -1)
        
        # Add expression features
        if expression == 'angry':
            # Angry eyes and mouth
            cv2.circle(image, (center_x-8, center_y-4), 2, 0, -1)
            cv2.circle(image, (center_x+8, center_y-4), 2, 0, -1)
            cv2.line(image, (center_x-6, center_y+8), (center_x+6, center_y+8), 0, 2)
        elif expression == 'happy':
            # Happy eyes and smile
            cv2.circle(image, (center_x-8, center_y-4), 2, 0, -1)
            cv2.circle(image, (center_x+8, center_y-4), 2, 0, -1)
            cv2.ellipse(image, (center_x, center_y+8), (6, 3), 0, 0, 180, 0, 2)
        elif expression == 'sad':
            # Sad eyes and mouth
            cv2.circle(image, (center_x-8, center_y-4), 2, 0, -1)
            cv2.circle(image, (center_x+8, center_y-4), 2, 0, -1)
            cv2.ellipse(image, (center_x, center_y+10), (5, 2), 0, 180, 360, 0, 2)
        elif expression == 'surprise':
            # Surprised eyes and mouth
            cv2.circle(image, (center_x-8, center_y-4), 3, 0, -1)
            cv2.circle(image, (center_x+8, center_y-4), 3, 0, -1)
            cv2.circle(image, (center_x, center_y+8), 3, 0, 2)
        else:  # neutral, disgust, fear
            cv2.circle(image, (center_x-8, center_y-4), 2, 0, -1)
            cv2.circle(image, (center_x+8, center_y-4), 2, 0, -1)
            cv2.line(image, (center_x-5, center_y+8), (center_x+5, center_y+8), 0, 2)
        
        # Add random variations
        image = self.add_variations(image)
        
        return image
    
    def add_variations(self, image):
        """Add random variations to images"""
        # Brightness variation
        brightness = random.uniform(0.8, 1.2)
        image = cv2.convertScaleAbs(image, alpha=brightness)
        
        # Noise
        noise = np.random.randint(-5, 5, image.shape, dtype=np.int16)
        image = np.clip(image.astype(np.int16) + noise, 0, 255).astype(np.uint8)
        
        return image

# ==================== MODEL TRAINER ====================
class ExpressionTrainer:
    def __init__(self):
        self.model = None
        self.img_size = 48
        self.num_classes = 7
        self.class_names = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
    
    def create_model(self):
        """Create CNN model for facial expression recognition"""
        model = keras.Sequential([
            layers.Conv2D(32, (3, 3), activation='relu', input_shape=(self.img_size, self.img_size, 1)),
            layers.MaxPooling2D(2, 2),
            layers.Conv2D(64, (3, 3), activation='relu'),
            layers.MaxPooling2D(2, 2),
            layers.Conv2D(128, (3, 3), activation='relu'),
            layers.MaxPooling2D(2, 2),
            layers.Flatten(),
            layers.Dense(256, activation='relu'),
            layers.Dropout(0.5),
            layers.Dense(128, activation='relu'),
            layers.Dropout(0.3),
            layers.Dense(self.num_classes, activation='softmax')
        ])
        
        model.compile(
            optimizer='adam',
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def load_dataset(self, data_path):
        """Load and prepare dataset"""
        print("üìä Loading dataset...")
        
        X = []
        y = []
        
        for class_idx, expression in enumerate(self.class_names):
            expr_path = os.path.join(data_path, expression)
            if not os.path.exists(expr_path):
                print(f"‚ùå Directory not found: {expr_path}")
                continue
            
            image_files = [f for f in os.listdir(expr_path) if f.endswith('.jpg')]
            print(f"üìÅ {expression}: {len(image_files)} images")
            
            for img_file in image_files:
                img_path = os.path.join(expr_path, img_file)
                img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                
                if img is not None:
                    # Resize and normalize
                    img = cv2.resize(img, (self.img_size, self.img_size))
                    img = img.astype('float32') / 255.0
                    
                    X.append(img)
                    y.append(class_idx)
        
        if len(X) == 0:
            print("‚ùå No images found in dataset!")
            return None, None
        
        X = np.array(X)
        y = np.array(y)
        
        # Reshape for CNN
        X = X.reshape(-1, self.img_size, self.img_size, 1)
        
        # Convert labels to categorical
        y = keras.utils.to_categorical(y, self.num_classes)
        
        print(f"‚úÖ Dataset loaded: {X.shape[0]} images")
        return X, y
    
    def train(self, data_path, epochs=20):
        """Train the facial expression model"""
        print("üß† Starting model training...")
        
        # Load dataset
        X, y = self.load_dataset(data_path)
        if X is None:
            return False
        
        # Split dataset
        from sklearn.model_selection import train_test_split
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        print(f"üìä Training samples: {X_train.shape[0]}")
        print(f"üìä Validation samples: {X_test.shape[0]}")
        
        # Create model
        self.model = self.create_model()
        
        print("üìã Model architecture:")
        self.model.summary()
        
        # Callbacks
        callbacks = [
            keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
            keras.callbacks.ReduceLROnPlateau(patience=3, factor=0.5)
        ]
        
        # Train model
        print("üöÄ Training model...")
        history = self.model.fit(
            X_train, y_train,
            epochs=epochs,
            validation_data=(X_test, y_test),
            batch_size=32,
            callbacks=callbacks,
            verbose=1
        )
        
        # Save model
        os.makedirs('models', exist_ok=True)
        self.model.save('models/expression_model.h5')
        print("‚úÖ Model saved as: models/expression_model.h5")
        
        # Evaluate model
        test_loss, test_accuracy = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"üìä Final Validation Accuracy: {test_accuracy:.4f}")
        print(f"üìä Final Validation Loss: {test_loss:.4f}")
        
        return True

# ==================== REAL-TIME DETECTOR ====================
class RealTimeDetector:
    def __init__(self):
        self.face_cascade = cv2.CascadeClassifier(
            cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        )
        self.expression_model = None
        self.img_size = 48
        self.class_names = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
        
        # Colors for expressions
        self.expression_colors = {
            'angry': (0, 0, 255),        # Red
            'disgust': (0, 128, 0),      # Dark Green
            'fear': (128, 0, 128),       # Purple
            'happy': (0, 255, 255),      # Yellow
            'neutral': (255, 255, 255),  # White
            'sad': (255, 0, 0),          # Blue
            'surprise': (0, 165, 255)    # Orange
        }
        
        self.load_model()
    
    def load_model(self):
        """Load the trained expression model"""
        model_path = 'models/expression_model.h5'
        if os.path.exists(model_path):
            try:
                self.expression_model = keras.models.load_model(model_path)
                print("‚úÖ Facial expression model loaded successfully!")
                return True
            except Exception as e:
                print(f"‚ùå Error loading model: {e}")
        
        print("‚ùå No trained model found. Please train the model first.")
        return False
    
    def detect_expressions(self):
        """Real-time facial expression detection"""
        if self.expression_model is None:
            print("‚ùå Cannot start detection without a trained model.")
            return
        
        # Initialize camera
        cap = cv2.VideoCapture(0)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        
        if not cap.isOpened():
            print("‚ùå Error: Could not open camera")
            return
        
        print("üöÄ Starting real-time facial expression detection...")
        print("üéÆ Press 'Q' to quit, 'S' to save image")
        
        prev_time = time.time()
        fps = 0
        
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                
                # Calculate FPS
                current_time = time.time()
                fps = 1.0 / (current_time - prev_time)
                prev_time = current_time
                
                # Flip frame for mirror effect
                frame = cv2.flip(frame, 1)
                
                # Convert to grayscale
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                
                # Detect faces
                faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
                
                # Process each face
                for (x, y, w, h) in faces:
                    # Extract face ROI
                    face_roi = gray[y:y+h, x:x+w]
                    
                    # Preprocess for model
                    processed_face = cv2.resize(face_roi, (self.img_size, self.img_size))
                    processed_face = processed_face.astype('float32') / 255.0
                    processed_face = processed_face.reshape(1, self.img_size, self.img_size, 1)
                    
                    # Predict expression
                    predictions = self.expression_model.predict(processed_face, verbose=0)
                    predicted_class = np.argmax(predictions[0])
                    confidence = predictions[0][predicted_class]
                    expression = self.class_names[predicted_class]
                    
                    # Get color for expression
                    color = self.expression_colors.get(expression, (255, 255, 255))
                    
                    # Draw bounding box
                    cv2.rectangle(frame, (x, y), (x+w, y+h), color, 3)
                    
                    # Draw label
                    label = f"{expression} ({confidence:.2f})"
                    cv2.putText(frame, label, (x, y-10), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
                
                # Display FPS
                cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                
                # Display instructions
                cv2.putText(frame, "Press 'Q' to quit", (10, 60), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
                
                # Show frame
                cv2.imshow('Real-time Facial Expression Detection', frame)
                
                # Handle key press
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    break
                elif key == ord('s'):
                    # Save current frame
                    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                    filename = f"detection_{timestamp}.jpg"
                    cv2.imwrite(filename, frame)
                    print(f"üíæ Saved: {filename}")
        
        except KeyboardInterrupt:
            print("\n‚èπÔ∏è Detection stopped by user")
        
        finally:
            cap.release()
            cv2.destroyAllWindows()
            print("‚úÖ Real-time detection stopped.")

# ==================== MAIN PIPELINE ====================
def main():
    print("üé≠ COMPLETE FACIAL EXPRESSION RECOGNITION SYSTEM")
    print("=" * 60)
    
    # Create necessary directories
    os.makedirs('dataset', exist_ok=True)
    os.makedirs('models', exist_ok=True)
    
    while True:
        print("\nüìã MAIN MENU:")
        print("1. üé® Create Dataset (Synthetic - Instant)")
        print("2. üß† Train Model")
        print("3. üîç Real-time Detection")
        print("4. üöÄ Run Complete Pipeline")
        print("0. ‚ùå Exit")
        
        choice = input("\nEnter your choice (0-4): ").strip()
        
        if choice == '1':
            print("\nüöÄ Creating synthetic dataset...")
            try:
                target = int(input("Enter images per expression (default 500): ") or "500")
                collector = DatasetCollector(target_images=target)
                collector.capture_dataset()
            except Exception as e:
                print(f"‚ùå Error: {e}")
        
        elif choice == '2':
            print("\nüöÄ Training model...")
            try:
                trainer = ExpressionTrainer()
                trainer.train('dataset', epochs=2)
            except Exception as e:
                print(f"‚ùå Error: {e}")
        
        elif choice == '3':
            print("\nüöÄ Starting real-time detection...")
            try:
                detector = RealTimeDetector()
                detector.detect_expressions()
            except Exception as e:
                print(f"‚ùå Error: {e}")
        
        elif choice == '4':
            print("\nüöÄ RUNNING COMPLETE PIPELINE...")
            try:
                # Step 1: Create dataset
                print("\nüìù Step 1: Creating Dataset")
                collector = DatasetCollector(target_images=500)
                collector.capture_dataset()
                
                # Step 2: Train model
                print("\nüìù Step 2: Training Model")
                trainer = ExpressionTrainer()
                success = trainer.train('dataset', epochs=10)
                
                if success:
                    # Step 3: Real-time detection
                    print("\nüìù Step 3: Real-time Detection")
                    detector = RealTimeDetector()
                    detector.detect_expressions()
                else:
                    print("‚ùå Model training failed. Cannot start detection.")
                
            except Exception as e:
                print(f"‚ùå Pipeline error: {e}")
        
        elif choice == '0':
            print("üëã Goodbye!")
            break
        
        else:
            print("‚ùå Invalid choice. Please try again.")

if __name__ == "__main__":
    main()

üé≠ COMPLETE FACIAL EXPRESSION RECOGNITION SYSTEM

üìã MAIN MENU:
1. üé® Create Dataset (Synthetic - Instant)
2. üß† Train Model
3. üîç Real-time Detection
4. üöÄ Run Complete Pipeline
0. ‚ùå Exit

Enter your choice (0-4): 3

üöÄ Starting real-time detection...




‚úÖ Facial expression model loaded successfully!
üöÄ Starting real-time facial expression detection...
üéÆ Press 'Q' to quit, 'S' to save image
‚úÖ Real-time detection stopped.

üìã MAIN MENU:
1. üé® Create Dataset (Synthetic - Instant)
2. üß† Train Model
3. üîç Real-time Detection
4. üöÄ Run Complete Pipeline
0. ‚ùå Exit

Enter your choice (0-4): 0
üëã Goodbye!
