In [8]:
import os
import cv2
import pickle
from datetime import datetime
import mysql.connector
from deepface import DeepFace
from imgaug import augmenters as iaa
import numpy as np
from collections import Counter
import time

class DatabaseManager:
    def __init__(self, db):
        self.db = db

    def upload_student_db(self, db_folder='db'):
        if not os.path.exists(db_folder):
            os.makedirs(db_folder)
        # Additional code to upload student images to db_folder
        return {"status": "success", "message": "Student database uploaded"}


In [9]:
class Trainer:
    def __init__(self, augment=False, save_augments=False, n_samples=2):
        self.augment = augment
        self.save_augments = save_augments
        self.n_samples = n_samples

    def load_encodings(self):
        if os.path.exists('known_encodings.pkl'):
            with open('known_encodings.pkl', 'rb') as f:
                self.results = pickle.load(f)
                self.known_encodings = [result[1] for result in self.results]
                self.file_paths = [result[0] for result in self.results]
        else:
            self.results = []
            self.known_encodings = []
            self.file_paths = []

    def augment_image(self, img):
        seq = iaa.Sequential([
            iaa.Affine(rotate=(-15, 15)),  # Random rotation
            iaa.Fliplr(0.5),  # Horizontal flip
            iaa.MultiplyBrightness((0.4, 1.5)),  # Random brightness
            iaa.GammaContrast((0.5, 1.5)),  # Gamma contrast
            iaa.Affine(scale={"x": (0.9, 1.1), "y": (0.9, 1.1)}),  # Random scaling
            iaa.Affine(shear=(-7, 7))  # Random shear
        ])
        if self.augment:
            augmented_img = seq.augment_image(img)
            return augmented_img
        else:
            return img

    def process_folder(self, student_folder):
        student_id = os.path.basename(student_folder)
        image_files = [os.path.join(student_folder, file) for file in os.listdir(student_folder) if file.lower().endswith(('.png', '.jpg', '.jpeg'))]
        encodings = []
        for img_path in image_files:
            img = cv2.imread(img_path)
            if self.augment:
                augmented_images = [img]  # Include original image
                for _ in range(self.n_samples):  # Generate additional samples
                    augmented_images.append(self.augment_image(img))
            else:
                augmented_images = [img]

            for idx, augmented_img in enumerate(augmented_images):
                encoding = DeepFace.represent(augmented_img, model_name='Facenet', enforce_detection=False)
                if encoding:
                    encodings.append((img_path, encoding[0]['embedding']))
                    if self.save_augments and self.augment:  # Save augmented images if enabled
                        filename = os.path.splitext(os.path.basename(img_path))[0]
                        augmented_filename = f"augmented_{filename}_{idx}.jpg"
                        cv2.imwrite(os.path.join(student_folder, augmented_filename), augmented_img)

        return encodings

    def train(self, db_folder='db'):
        student_folders = [os.path.join(db_folder, folder) for folder in os.listdir(db_folder) if os.path.isdir(os.path.join(db_folder, folder))]

        self.results = []
        for student_folder in student_folders:
            self.results.extend(self.process_folder(student_folder))

        # Save computed embeddings and corresponding file paths
        with open('known_encodings.pkl', 'wb') as f:
            pickle.dump(self.results, f)

        self.load_encodings()
        return {"status": "success", "message": "Training completed"}


In [10]:
class Recognizer:
    def __init__(self, db, semester_id, course_id, lecture_id, status, confidence_threshold=0.4, buffer_size=5):
        self.db = db
        self.semester_id = semester_id
        self.course_id = course_id
        self.lecture_id = lecture_id
        self.status = status
        self.confidence_threshold = confidence_threshold
        self.buffer_size = buffer_size
        self.predictions_buffer = []
        self.last_marked_time = {}
        self.mark_interval = 300  # 5 minutes interval
        self.load_encodings()

    def load_encodings(self):
        if os.path.exists('known_encodings.pkl'):
            with open('known_encodings.pkl', 'rb') as f:
                self.results = pickle.load(f)
                self.known_encodings = [result[1] for result in self.results]
                self.file_paths = [result[0] for result in self.results]
        else:
            self.results = []
            self.known_encodings = []
            self.file_paths = []

    def mark_attendance(self, student_id):
        try:
            current_time = datetime.now()
            if student_id in self.last_marked_time:
                time_diff = (current_time - self.last_marked_time[student_id]).total_seconds()
                if time_diff < self.mark_interval:
                    return  # Skip marking if within the interval

            cursor = self.db.cursor()
            sql = "SELECT fullname FROM student_details WHERE student_id = %s"
            val = (student_id,)
            cursor.execute(sql, val)
            result = cursor.fetchone()
            if result:
                student_name = result[0]
                sql = "SELECT * FROM attendance WHERE student_id = %s AND course_id = %s AND semester_id = %s AND lecture_id = %s"
                val = (student_id, self.course_id, self.semester_id, self.lecture_id)
                cursor.execute(sql, val)
                result = cursor.fetchone()
                if not result:
                    sql = "INSERT INTO attendance (course_id, semester_id, lecture_id, timestamp, status, student_id) VALUES (%s, %s, %s, %s, %s, %s)"
                    val = (self.course_id, self.semester_id, self.lecture_id, current_time, self.status, student_id)
                    cursor.execute(sql, val)
                    self.db.commit()
                    self.last_marked_time[student_id] = current_time
            cursor.close()
        except mysql.connector.Error as err:
            print(f"Database error: {err}")

    def recognize(self, video_source=0):
        start_time = time.time()
        fps = 0
        frames = 0
        cap = cv2.VideoCapture(video_source)
        face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        consecutive_detections = 0

        while True:
            ret, img = cap.read()
            if not ret:
                break
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray, 1.3, 5)
            for (x, y, w, h) in faces:
                face_img = img[y:y + h, x:x + w]
                face_encoding = DeepFace.represent(face_img, model_name='Facenet', enforce_detection=False, detector_backend='mtcnn')
                if face_encoding:
                    face_encoding = face_encoding[0]['embedding']
                    best_match_id = None
                    confidence = 0.0

                    for j, known_encoding in enumerate(self.known_encodings):
                        if len(known_encoding) == len(face_encoding):
                            result = DeepFace.verify(known_encoding, face_encoding, model_name='Facenet', enforce_detection=False, distance_metric='cosine')
                            if result['verified'] and result['distance'] < self.confidence_threshold:
                                best_match_id = os.path.basename(os.path.dirname(self.file_paths[j]))
                                confidence = 1 - result['distance']
                                break

                    if best_match_id:
                        self.predictions_buffer.append(best_match_id)
                        consecutive_detections += 1

                        if consecutive_detections >= self.buffer_size:
                            most_common_prediction = Counter(self.predictions_buffer).most_common(1)[0][0]
                            cursor = self.db.cursor()
                            sql = "SELECT fullname FROM student_details WHERE student_id = %s"
                            val = (most_common_prediction,)
                            cursor.execute(sql, val)
                            result = cursor.fetchone()
                            if result:
                                student_name = result[0]
                            self.mark_attendance(most_common_prediction)
                            consecutive_detections = 0
                            self.predictions_buffer.clear()
                            cv2.putText(img, f"Recognized: {student_name}", (x, y + h + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                    cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 2)
                    cv2.putText(img, "Face Detected", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
                else:
                    cv2.rectangle(img, (x, y), (x + w, y + h), (0, 0, 255), 2)
                    cv2.putText(img, "Face Not Recognized", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
            frames += 1
            current_time = time.time()
            if current_time - start_time >= 1:
                fps = frames
                frames = 0
                start_time = current_time
            cv2.putText(img, f"FPS: {fps}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)
            cv2.imshow("Faces with Rectangles", img)

            if cv2.waitKey(1) == ord('q'):
                break
        cap.release()
        cv2.destroyAllWindows()

In [11]:
import mysql.connector

# Configure your database connection here
db_config = {
    'user': 'root',
    'password': 'e@123321xzX',
    'host': '127.0.0.1',
    'database': 'attendance'
}
db = mysql.connector.connect(**db_config)

# Initialize classes
db_manager = DatabaseManager(db)
trainer = Trainer(augment=True, save_augments=True, n_samples=5)
recognizer = Recognizer(db, semester_id='1', course_id='1', lecture_id='1', status='present')

In [12]:
# Upload student database
upload_result = db_manager.upload_student_db()
print(upload_result)

# Train the model
train_result = trainer.train()
print(train_result)

# Recognize faces and mark attendance
recognizer.recognize(video_source=0)

{'status': 'success', 'message': 'Student database uploaded'}
{'status': 'success', 'message': 'Training completed'}
24-05-24 01:44:53 - ⚠️ You passed 1st image as pre-calculated embeddings.Please ensure that embeddings have been calculated for the Facenet model.
24-05-24 01:44:53 - ⚠️ You passed 2nd image as pre-calculated embeddings.Please ensure that embeddings have been calculated for the Facenet model.
24-05-24 01:44:53 - ⚠️ You passed 1st image as pre-calculated embeddings.Please ensure that embeddings have been calculated for the Facenet model.
24-05-24 01:44:53 - ⚠️ You passed 2nd image as pre-calculated embeddings.Please ensure that embeddings have been calculated for the Facenet model.
24-05-24 01:44:53 - ⚠️ You passed 1st image as pre-calculated embeddings.Please ensure that embeddings have been calculated for the Facenet model.
24-05-24 01:44:53 - ⚠️ You passed 2nd image as pre-calculated embeddings.Please ensure that embeddings have been calculated for the Facenet model.
2