In [13]:
import socket
import asyncio
from bleak import BleakScanner
from datetime import datetime
import asyncio
import cv2 
import numpy as np
import json
import time
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from bluetooth import discover_devices, BluetoothSocket, RFCOMM
from roboflow import Roboflow
import mediapipe as mp
from ultralytics import YOLO
import keyboard
import select

In [14]:
host = 'DESKTOP-28SQ46K'    
port = 8000
message_to_client = None
message_from_clinet = None

In [15]:
async def detect_objects_on_frame(model, frame, conf_threshold=0.5):

    results = model.predict(source=frame, conf=conf_threshold, verbose=False)

    detections = []  # To store detected objects
    # Extract detection results and draw on the frame
    for box in results[0].boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())  # Bounding box coordinates
        confidence = box.conf[0].item()  # Confidence score
        class_id = int(box.cls[0].item())  # Class ID
        class_name = model.names[class_id]  # Class name

        # Append the detected object details to the list
        detections.append({
            "class_name": class_name,
            "confidence": confidence,
            "bbox": [x1, y1, x2, y2]
        })

        # Draw bounding box and label on the frame
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)  # Green box
        label = f"{class_name} {confidence:.2f}"
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    return detections

In [16]:
# Function to send "pinching" message to the server
async def pinching_status_callback(pinching_detected,pinch_coordinates=None):
    if pinching_detected:
        if pinch_coordinates:
            return(f"Pinching:true,{pinch_coordinates[0]},{pinch_coordinates[1]}")
    # else:
        # print("Not pinching")

In [17]:
async def getPointsRealTime(pinching_detected_callback,frame):

    mp_holistic = mp.solutions.holistic
    mp_hands = mp.solutions.hands
    drawing_utils = mp.solutions.drawing_utils

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        
            # Flip the frame horizontally to mirror the camera feed
            frame = cv2.flip(frame, 1)
            
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            results = holistic.process(image)
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # Draw hand landmarks
            drawing_utils.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
            drawing_utils.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

            # Check for pinch gesture
            pinching_detected = False
            pinch_coordinates = None
            if results.right_hand_landmarks or results.left_hand_landmarks:
                try:
                    hand_landmarks = results.right_hand_landmarks or results.left_hand_landmarks
                    index_tip = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP]
                    thumb_tip = hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP]
                    
                    # Calculate distance between thumb and index fingertip to detect pinch
                    distance = ((thumb_tip.x - index_tip.x)**2 + (thumb_tip.y - index_tip.y)**2) ** 0.5

                    if distance < 0.08:  # Adjust threshold as needed
                        pinching_detected = True
                        # Calculate pinch coordinates
                        pinch_x = int(index_tip.x * image.shape[1])
                        pinch_y = int(index_tip.y * image.shape[0])
                        pinch_coordinates = (pinch_x, pinch_y)
                        # Draw a green dot at the pinch location
                        cv2.circle(image, (pinch_x, pinch_y), 10, (0, 255, 0), -1)  # Draw green dot

                except Exception as e:
                    print(f"Error processing hand landmarks: {e}")

            return await pinching_detected_callback(pinching_detected,pinch_coordinates)
            

In [18]:
async def receive_data(client_socket):
    while True:
        
        data = await asyncio.to_thread(client_socket.recv, 1024)
        global message_from_clinet
        message_from_clinet = data.decode()

        if data:
            print(f"Received from client: {data.decode()}")
        else:
            print("Client disconnected")
            break

async def send_data_to_client(client_socket, message_queue):
    while True:
        # Wait for a message from the queue
        message_to_client = await message_queue.get()
        if message_to_client:
            #print(f"Sending message to client: {message_to_client}")  # Add logging for sent messages
            client_socket.sendall(message_to_client.encode())
        message_queue.task_done()  # Mark the message as processed

async def start_server(host, port, message_queue):
    # Create a TCP/IP socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # Bind the socket to the address and port
    server_socket.bind((host, port))
    
    # Enable the server to listen for incoming connections
    server_socket.listen(5)
    print(f"Server listening on {host}:{port}...")
    
    # Wait for a client to connect
    client_socket, client_address = await asyncio.to_thread(server_socket.accept)
    print(f"Client connected: {client_address}")
    
    try:
        # Start both receiving and sending concurrently
        await asyncio.gather(
            receive_data(client_socket),
            send_data_to_client(client_socket, message_queue)
        )
    finally:
        # Clean up the connection
        client_socket.close()
        server_socket.close()


In [19]:
client_sockets = {}
# Initialize MediaPipe Hands module
mp_hands = mp.solutions.hands
hands = mp_hands.Hands()

In [20]:
async def track_hand_and_get_index_finger(frame):
    
    # Convert the image color to RGB
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Process the frame and detect hands
    results = hands.process(rgb_frame)

    index_finger_coords = None
    name = "No hand detected"

    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
            # Get the x and y coordinates of the index finger tip (landmark 8)
            index_finger_tip = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP]

            # Convert the normalized coordinates to pixel values
            h, w, c = frame.shape
            index_x = int(index_finger_tip.x * w)
            index_y = int(index_finger_tip.y * h)

            # Draw a circle at the index finger tip
            cv2.circle(frame, (index_x, index_y), 10, (255, 0, 0), -1)
            
            # Determine if it's a left or right hand based on landmarks
            if hand_landmarks.landmark[mp_hands.HandLandmark.WRIST].x < hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_MCP].x:
                name = "Left hand"
            else:
                name = "Right hand"
                
            index_finger_coords = (index_x, index_y)

    return name, index_finger_coords

In [21]:
async def initialize_emotion_model():
    model = Sequential()
    model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(48, 48, 1)))
    model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Flatten())
    model.add(Dense(1024, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(7, activation='softmax'))
    
    # Load the trained weights
    await asyncio.to_thread(model.load_weights, 'model.h5')
    
    return model

In [22]:
async def predict_emotion(frame, model, emotion_dict):
    # Convert the frame to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Initialize the face cascade for face detection
    facecasc = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

    # Detect faces in the image
    faces = facecasc.detectMultiScale(gray, scaleFactor=1.3, minNeighbors=5)

    # Process the faces
    for (x, y, w, h) in faces:
        
        # Extract the region of interest (ROI) for emotion prediction
        roi_gray = gray[y:y + h, x:x + w]
        cropped_img = np.expand_dims(np.expand_dims(cv2.resize(roi_gray, (48, 48)), -1), 0)

        # Suppress TensorFlow/Keras verbosity
        tf.get_logger().setLevel('ERROR')  # Suppress verbose logging

         # If model is async, we need to await it
        if asyncio.iscoroutinefunction(model.predict):
            prediction = await model.predict(cropped_img, verbose=0)
        else:
            # If it's not async, use asyncio.to_thread to run it in a separate thread
            prediction = await asyncio.to_thread(model.predict, cropped_img, verbose=0)
        
        maxindex = int(np.argmax(prediction))

        # Return the predicted emotion label
        return emotion_dict[maxindex]
    
    return "No Face"  # If no face is detected

In [23]:
async def face_recognition(frame, recognizer, faceCascade, names):
    
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = faceCascade.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5, minSize=(30, 30))

    if len(faces) == 0:
        return "No Face", None  # Return default values if no faces are detected
    
    for (x, y, w, h) in faces:
        
        # Recognize the face using the trained model
        id, confidence = recognizer.predict(gray[y:y + h, x:x + w])

        if confidence > 10:
            name = names[id]
        else:
            name = "Unknown"

        # Return the name and the coordinates of the face
        return name, (x, y, w, h)
    
    return "No Face", None  # In case the loop ends without detecting a face

In [24]:
async def main():
    message_queue = asyncio.Queue()  # Create a queue to send messages to the server
    
    socket_task = asyncio.create_task(start_server(host, port,message_queue))
    # Create LBPH Face Recognizer
    recognizer = cv2.face.LBPHFaceRecognizer_create()
    # Load the trained model
    recognizer.read('trainer.yml')
    modelObject = YOLO('C:/Users/medoa/Desktop/HCI-Project-master/Python-Code-HCI/object detection/runs/detect/custom_yolo_model/weights/best.pt')  # Update with your model's path

    # Path to the Haar cascade file for face detection
    face_cascade_Path = "haarcascade_frontalface_default.xml"
    # Create a face cascade classifier
    faceCascade = cv2.CascadeClassifier(face_cascade_Path)

    # Initialize user IDs and associated names
    names = ['None']
    is_pinching=None
    with open('names.json', 'r') as fs:
        names = json.load(fs)
        names = list(names.values())
    
    # Initialize emotion model
    emotion_model = await initialize_emotion_model()
    
    # Define emotion dictionary
    emotion_dict = {0: "Angry", 1: "Disgusted", 2: "Fearful", 3: "Happy", 4: "Neutral", 5: "Sad", 6: "Surprised"}

    # Video Capture from the default camera (camera index 0)
    cam = cv2.VideoCapture(0)
    cam.set(3, 640)  # Set width
    cam.set(4, 480)  # Set height

    # Create the window
    cv2.namedWindow('camera', cv2.WINDOW_NORMAL)
    # Resize the window
    cv2.resizeWindow('camera', 1700, 980)  # Change the values as needed (e.g., 320, 240 for a smaller window)
    last_sent_time = time.time()  # Initialize the timer
    prev_time = time.time()

    emotion = "unkown"
    index_finger_coords = "unkown"
    Object_Detected = "unkown"
    face_coords = "unkown"
    is_logged_in = False
    lgoin_face = False
    login_bluetooth = False
    use_hand_index = False
    Object_detection = False
    is_playing = False
    name = "unkown"
    is_pinching = "unkown"
    
    while True:

        global message_from_clinet
        
        if(message_from_clinet == "face_recogention"):
            lgoin_face = True

        if(message_from_clinet == "not_face_recogention"):
            lgoin_face = False

        # Read a frame from the camera
        ret, frame = cam.read()
        

        current_time = time.time()
        fps = 1 / (current_time - prev_time)
        fps = int(fps)  # Make it a whole number
        prev_time = current_time

        if not ret:
            print("[ERROR] Failed to read from the camera.")
            break

        frame = cv2.flip(frame,1)
        resized_frame = cv2.resize(frame, (224, 224))
        # Call face recognition function
        
        if(lgoin_face):
            name, face_coords = await asyncio.create_task(face_recognition(resized_frame, recognizer, faceCascade, names))
            
        
        index_finger_coords = await asyncio.create_task(track_hand_and_get_index_finger(frame))

        
        #is_pinching = await asyncio.create_task(getPointsRealTime(pinching_status_callback,resized_frame))

        if(Object_detection):
            Object_Detected = await asyncio.create_task(detect_objects_on_frame(modelObject,resized_frame))


        # Display the frame with landmarks and index finger tip
        if index_finger_coords and isinstance(index_finger_coords, tuple) and len(index_finger_coords) == 2:
            x, y = index_finger_coords
            # Ensure that x and y are integers
            if isinstance(x, int) and isinstance(y, int):
                cv2.circle(frame, (x, y), 10, (128, 0, 128), 3)  # Draw the circle
       
        # Call emotion prediction function
        if(is_playing):
            emotion = await predict_emotion(resized_frame, emotion_model, emotion_dict)

        cv2.putText(frame, f"FPS : {fps}", (5, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (128, 0, 128), 2)
        cv2.putText(frame, f"Name : {name}", (5, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (128, 0, 128), 2)
        cv2.putText(frame, f"Emotion : {emotion}", (5, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (128, 0, 128), 2)
        cv2.putText(frame, f"IndexFinger : {index_finger_coords}", (5, 170), cv2.FONT_HERSHEY_SIMPLEX, 1, (128, 0, 128), 2)
        cv2.putText(frame,f"Pinching : {is_pinching}", (5, 230), cv2.FONT_HERSHEY_SIMPLEX, 1, (128, 0, 128), 2)
        cv2.putText(frame, f"Object : {Object_Detected}", (5, 290), cv2.FONT_HERSHEY_SIMPLEX, 1, (128, 0, 128), 2)
        # Check if 5 seconds have passed since the last recognition
        
        current_time = time.time()
        
        if current_time - last_sent_time >= 0.01:
            #print(f"Recognized: {name}, Emotion: {emotion},Object_Detected:{Object_Detected},Pinching:{is_pinching},Index Finger coordinates: {index_finger_coords}")
            last_sent_time = current_time  # Update the timer
         # Send the message to the server via the queue
            message_to_client = f" {name or 'None'},{emotion or 'None'},{Object_Detected or 'None'},{is_pinching or 'None'},{index_finger_coords or 'None'}"
            await asyncio.create_task (message_queue.put(message_to_client) ) # Put the message in the queue

        # Display the image with rectangles around faces and objects

        cv2.imshow('camera', frame)
        
        # Press Escape or 'q' to exit the webcam/program
        k = cv2.waitKey(5) & 0xff
        if k == 27 or k == ord('q'):  # 27 is the Escape key; ord('q') checks for 'q'
            break
        

    print("\n [INFO] Exiting Program.")
    # Release the camera
    cam.release()
    # Close all OpenCV windows
    cv2.destroyAllWindows()

if __name__ == "__main__":
    await main()

Server listening on DESKTOP-28SQ46K:8000...
Client connected: ('192.168.1.6', 62926)
Received from client: Hello, Server!
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Received from client: face_recogention
Client disconnected


Task exception was never retrieved
future: <Task finished name='Task-2172' coro=<start_server() done, defined at C:\Users\medoa\AppData\Local\Temp\ipykernel_18996\4168792974.py:23> exception=ConnectionAbortedError(10053, 'An established connection was aborted by the software in your host machine', None, 10053, None)>
Traceback (most recent call last):
  File "C:\Users\medoa\AppData\Local\Temp\ipykernel_18996\4168792974.py", line 40, in start_server
    await asyncio.gather(
  File "C:\Users\medoa\AppData\Local\Temp\ipykernel_18996\4168792974.py", line 20, in send_data_to_client
    client_socket.sendall(message_to_client.encode())
ConnectionAbortedError: [WinError 10053] An established connection was aborted by the software in your host machine



 [INFO] Exiting Program.
