In [1]:
# %pip install sockets --user
# %pip install torch torchvision torchaudio --user
# %pip install facenet-pytorch --user
# %pip install matplotlib --user
# %pip install scikit-learn --user
# %pip install numpy --user
# %pip install opencv-python --user
# %pip install google-auth gspread --user

In [11]:
import os
import gspread
from google.oauth2.service_account import Credentials
import asyncio
import websockets
import json
import base64
import io as BytesIO
import numpy as np
import cv2 as cv
from datetime import datetime
from sklearn.svm import SVC
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import pickle
from facenet_pytorch import InceptionResnetV1, MTCNN



In [3]:
import torch

In [4]:

# Initialize the MTCNN face detector
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
detector = MTCNN(keep_all=False, device=device)
# Load the pretrained FaceNet model
embedder = InceptionResnetV1(pretrained='vggface2').eval()

In [13]:
def base64_to_opencv_image(base64_str):
    try:
        # Decode the base64 image
        img_data = base64.b64decode(base64_str.split(',')[1])
        # Convert bytes to numpy array
        np_array = np.frombuffer(img_data, np.uint8)
        # Decode numpy array to image
        image = cv.imdecode(np_array, cv.IMREAD_COLOR)
        if image is None:
            raise ValueError("Could not decode image.")
        return image
    except Exception as e:
        print(f"Error in base64_to_opencv_image: {e}")
        return None


def extract_date_time(iso_string):
    # Remove the 'Z' if present (timezone designator)
    iso_string = iso_string.rstrip('Z')

    # Parse the ISO 8601 string
    try:
        date_time = datetime.fromisoformat(iso_string)
    except ValueError as e:
        raise ValueError("Invalid ISO 8601 format") from e

    # Format the date as dd/mm/yy
    formatted_date = date_time.strftime("%d/%m/%y")

    # Format the time as hh:mm:ss
    formatted_time = date_time.strftime("%H:%M:%S")

    return formatted_date, formatted_time


def addName(newInfo, sheet):
    # Find the last row with data in the column (e.g., column A)
    column_values = sheet.col_values(1)  # Fetch all values in column A
    last_row = len(column_values) + 1  # Get the index of the next row

    # Data to be added
    new_data = [[newInfo]]

    # Update the cell in the next available row in column A
    cell_range = f'A{last_row}'  # Adjust the column letter if needed
    try:
        sheet.update(cell_range, new_data)
        print(f"Added data to row {last_row} in column A.")
    except Exception as e:
        print(f"Error updating Google Sheet: {e}")


In [6]:

class FACELOADING:
    def __init__(self, target_size=(160, 160)):
        self.target_size = target_size
        self.detector = MTCNN()
    
    def resize_image(self, img):
        return cv.resize(img, self.target_size)

    def extract_face(self, filename):
        img = cv.imread(filename)
        img = cv.cvtColor(img, cv.COLOR_BGR2RGB)
        
        # Resize the image before face detection
        img = self.resize_image(img)
        
        # Detect faces
        results = self.detector.detect(img, landmarks=False)
        
        # Check if results contain faces
        if results is not None and len(results) > 0:
            boxes = results[0]  # Extract bounding boxes from the results
            faces = []
            for box in boxes:
                x1, y1, x2, y2 = [int(num) for num in box]
                face = img[y1:y2, x1:x2]
                face_arr = cv.resize(face, self.target_size)
                faces.append(face_arr)
            return faces

        return None
    
    def process_image(self, filename):
        faces = self.extract_face(filename)
        if faces is not None:
            # Assuming that you may want to get embeddings or process the faces further
            return faces
        else:
            print("No face detected.")
            return None


In [7]:
# Load the pretrained FaceNet model
embedder = InceptionResnetV1(pretrained='vggface2').eval()
detector = MTCNN(keep_all=False)  # Initialize MTCNN for face detection
model = InceptionResnetV1(pretrained='vggface2').eval()

def get_embedding(face_img):
    detector = MTCNN(keep_all=False)  # Initialize MTCNN for face detection
    model = InceptionResnetV1(pretrained='vggface2').eval()
    face_img = cv.cvtColor(face_img, cv.COLOR_BGR2RGB)  # Convert BGR to RGB
    face_img = torch.tensor(face_img).float()  # Convert to PyTorch tensor
    face_img = face_img.permute(2, 0, 1)  # Change dimensions to (C, H, W)
    face_img = face_img.unsqueeze(0)  # Add batch dimension
    face_img = (face_img - 127.5) / 128.0  # Normalize image

    with torch.no_grad():
        embedding = model(face_img)  # Get the embedding
    return embedding.squeeze().numpy()  # Convert to NumPy array

def update_npz(npz_file, new_embeddings, new_labels):
    # Load the existing .npz file
    data = np.load(npz_file, allow_pickle=True)
    
    # Extract the existing arrays
    existing_embeddings = data['embeddings'] if 'embeddings' in data else np.empty((0, new_embeddings.shape[1]))
    existing_labels = data['labels'] if 'labels' in data else np.empty((0,), dtype=object)
    
    # Append new data
    updated_embeddings = np.concatenate((existing_embeddings, new_embeddings), axis=0)
    updated_labels = np.concatenate((existing_labels, new_labels), axis=0)
    
    # Save the updated arrays back into the .npz file
    np.savez_compressed(npz_file, embeddings=updated_embeddings, labels=updated_labels)

def retrain_model(npz_file, model_file):
    # Load data from .npz file
    data = np.load(npz_file, allow_pickle=True)
    embeddings = data['embeddings']
    labels = data['labels']

    # Check if data is valid
    if embeddings.size == 0 or labels.size == 0:
        raise ValueError("Loaded data is empty. Ensure the .npz file contains embeddings and labels.")

    # Encode labels
    encoder = LabelEncoder()
    encoded_labels = encoder.fit_transform(labels)

    # Train the SVM model
    model = SVC(kernel='linear', probability=True)  # You can adjust the kernel and parameters
    model.fit(embeddings, encoded_labels)

    # Save the trained model
    with open(model_file, 'wb') as f:
        pickle.dump(model, f)

    print(f"Model retrained and saved to {model_file}")

def recognise(pkl_file, np_file, embeddings):
    # Load the embeddings and labels
    faces_embeddings = np.load(np_file)
    X = faces_embeddings['embeddings']  # Assuming embeddings are stored under 'arr_0'
    Y = faces_embeddings['labels']  # Assuming labels are stored under 'arr_1'

    # Load the trained model
    with open(pkl_file, 'rb') as f:
        model = pickle.load(f)

    # Fit the label encoder on existing labels
    encoder = LabelEncoder()
    encoder.fit(Y)

    # Predict the class for each embedding
    predictions = model.predict(embeddings)
    
    # Convert predictions to class names
    final_names = encoder.inverse_transform(predictions)

    print(f"Predicted labels: {predictions}")
    print(f"Final names: {final_names}")

    return final_names

In [8]:
# Authenticating google sheets api

# Define the scopes
scopes = ["https://www.googleapis.com/auth/spreadsheets", 
          "https://www.googleapis.com/auth/drive"]
# Create credentials object
creds = Credentials.from_service_account_file("proj-sheets.json", scopes=scopes)
# Authenticate and create a client
client = gspread.authorize(creds)
sheet_id = "1nzL6dv1ue9dwXdosWSLAeos_dPSttadJOiL9G8DgU80"

# Open the spreadsheet by key
sheets = client.open_by_key(sheet_id)

In [9]:
break

SyntaxError: 'break' outside loop (668683560.py, line 1)

In [14]:
with open('names.txt', 'r') as file:
    names = file.read().splitlines()
with open('courses.txt', 'r') as file:
    courses = file.read().splitlines()


async def handle_connection(websocket, path):
    async for message in websocket:
        data = json.loads(message)
        print(f"Received data: {data}")  # Add this line for debugging

        if data['type'] == 'photo':
            try:
                image_data = data['image']
                image = base64_to_opencv_image(image_data)
                cv.imwrite('captured_image.jpg', image)
                
                date, time = extract_date_time(data['date'])
                name = recognise('captured_image.jpg', 'svm_model.pkl')
                print(f"Recognised name: {name}")  # Add this line for debugging
                
                if name in names:
                    index = names.index(name)
                    sheet = client.open('Your Sheet Name').worksheet(courses[index])
                    row_header = name
                    column_header = date
                    
                    row_cell = sheet.find(row_header)
                    column_cell = sheet.find(column_header)
                    
                    if row_cell and column_cell:
                        row_index = row_cell.row
                        column_index = column_cell.col
                        sheet.update(range_name=f'{column_header}{row_index}', values=['Present'])
                    else:
                        print(f"Row or column header not found: {row_header}, {column_header}")  # Changed from display to print
                else:
                    print(f"Name not found in list: {name}")  # Changed from display to print
            except Exception as e:
                print(f"An error occurred while processing photo_with_info: {e}")  # Changed from display to print

        elif data['type'] == 'photo_with_info':
            try:
                image_data = data['image']
                name = data['name']
                course = data['course']
                
                names.append(name)
                courses.append(course)
                
                sheet = client.open('Your Sheet Name').worksheet(course)
                addName(name, sheet)
                
                image = base64_to_opencv_image(image_data)
                
                existing_folder_path = "dataset"
                new_folder_name = name
                new_folder_path = os.path.join(existing_folder_path, new_folder_name)
                os.makedirs(new_folder_path, exist_ok=True)
                
                image_path = os.path.join(new_folder_path, f'{name}.jpg')
                cv.imwrite(image_path, image)
                
                faceloader = FACELOADING()
                X = faceloader.process_image(f'dataset/{name}.jpg')
                EMBEDDED_X = []
                for img in X:
                    embedding = get_embedding(img)
                    EMBEDDED_X.append(embedding)
                EMBEDDED_X = np.asarray(EMBEDDED_X)
                update_npz('faces_embeddings.npz', EMBEDDED_X, name)
                retrain_model('faces_embeddings.npz', 'model.pkl')
                with open('names.txt', 'w') as file:
                    file.writelines(names)
                with open('courses.txt', 'w') as file:
                    file.writelines(courses)
            except Exception as e:
                print(f"An error occurred while processing photo_with_info: {e}")  # Changed from display to print


async def start_server():
    server = await websockets.serve(handle_connection, 'localhost', 8765)
    print("WebSocket server started on ws://localhost:8765")
    await server.wait_closed()

# Run the server
await start_server()


OSError: [Errno 10048] error while attempting to bind on address ('127.0.0.1', 8765): only one usage of each socket address (protocol/network address/port) is normally permitted