In [2]:
import socket
import threading
import json
import time
import cv2
import numpy as np

In [3]:
import face_recognition
from deepface import DeepFace

In [4]:
connections = {}
server_sockets = []

In [None]:
def handle_client(conn, addr, client_name):
    """
    Handles communication with a connected client.
    """
    print(f"{client_name} connected from {addr}")
    connections[client_name] = conn
    try:
        while True:
            data = conn.recv(1024)  # Receive data
            if not data:
                break

            message = data[2:].decode('utf-8')

            message_dict = json.loads(message)

            if client_name == "Java":
                modelIndex = message_dict.get("ModelIndex", None)

                if modelIndex:
                    send_structured_message("Unity", "ChangeModel", modelIndex)

            print(f"Received from {client_name}: {message_dict}")
    except ConnectionResetError:
        print(f"{client_name} disconnected.")
    finally:
        conn.close()
        if client_name in connections:
            del connections[client_name]

In [6]:
def start_server(port, client_name):
    """
    Starts a server socket listening on a specified port for a specific client.
    """
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('127.0.0.1', port))
    server_socket.listen(5)
    server_sockets.append(server_socket)  # Track the server socket
    print(f"Server listening for {client_name} on port {port}...")
    try:
        while True:
            conn, addr = server_socket.accept()
            threading.Thread(target=handle_client, args=(conn, addr, client_name)).start()
    except Exception as e:
        print(f"Server for {client_name} on port {port} encountered an error: {e}")
    finally:
        server_socket.close()

In [7]:
def send_message_to(client_name, message):
    """
    Sends a message to a connected client.
    """
    if client_name in connections:
        try:
            encodedMessage = message.encode()
            connections[client_name].send(encodedMessage)
            print(f"Sending message: {encodedMessage}")
        except BrokenPipeError:
            print(f"Failed to send message to {client_name}. Connection might be closed.")
    else:
        print(f"{client_name} is not connected.")

In [33]:
def send_structured_message(client_name, action, value):
    obj = {"Action": action, "Value": value}
    jsonData = json.dumps(obj)
    send_message_to(client_name, jsonData + "\n")

In [8]:
def close_all_connections():
    """
    Closes all client connections and server sockets.
    """
    print("Closing all connections...")
    for client_name, conn in list(connections.items()):
        try:
            conn.close()
            print(f"Closed connection for {client_name}")
        except Exception as e:
            print(f"Error closing connection for {client_name}: {e}")
        finally:
            del connections[client_name]

    for server_socket in server_sockets:
        try:
            server_socket.close()
            print("Server socket closed.")
        except Exception as e:
            print(f"Error closing server socket: {e}")

In [9]:
lower_red = np.array([0, 0, 0])
upper_red = np.array([0, 95, 10])

def detect_led(frame, lower_red, upper_red):
    """
    Detects an LED in the given frame and returns its bounding rectangle
    along with the modified frame with a rectangle drawn around the detected LED, if any.
    """
    # Convert the frame to the HSV color space
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Create a mask for the LED light color using the threshold values
    mask = cv2.inRange(hsv, lower_red, upper_red)

    # Apply morphological operations to remove noise
    mask = cv2.erode(mask, None, iterations=2)
    mask = cv2.dilate(mask, None, iterations=2)

    # Find contours in the mask
    contours, _ = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # Check if any contours were found
    if len(contours) > 0:
        # Find the largest contour
        largest_contour = max(contours, key=cv2.contourArea)

        # Get the bounding rectangle for the contour
        x, y, w, h = cv2.boundingRect(largest_contour)

        # Draw a rectangle around the LED light
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

        # Return the position of the LED as the center of the bounding box
        return frame, (x + w // 2, y + h // 2)

    return frame, None

In [20]:
ali_image = face_recognition.load_image_file("people/ali.jpg")
ali_face_encoding = face_recognition.face_encodings(ali_image)[0]

known_face_encodings = [
    ali_face_encoding,
]
known_face_names = [
    "Ali ElRogbany",
]

face_locations = []
face_encodings = []
face_names = []

# Facial Recognition Function
def detect_faces(frame, known_face_encodings, known_face_names):
    rgb_small_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) 

    face_locations = []
    face_names = []

    # Detect faces and their encodings
    face_locations = face_recognition.face_locations(rgb_small_frame)
    face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)

    for face_encoding in face_encodings:
        matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
        name = "Unknown"

        # Find the best match for the detected face
        face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
        best_match_index = np.argmin(face_distances)
        if matches[best_match_index]:
            name = known_face_names[best_match_index]

        face_names.append(name)

    return face_locations, face_names, len(face_encodings) > 0

In [None]:
def run_servers():
    """
    Starts server threads and keeps them running in a while loop.
    """
    threading.Thread(target=start_server, args=(3000, "Java")).start()
    threading.Thread(target=start_server, args=(4200, "Unity")).start()

    cap = cv2.VideoCapture(0)
    framecnt = 0

    last_position = None

    face_found = False
    frame_has_faces = False

    current_user_age = 18
    current_user_expression = "None"

    try:
        while True:
            # Read the frame from the camera
            ret, frame = cap.read()

            if not ret:
                print("Failed to read frame from camera.")
                break

            frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
            framecnt += 1

            if framecnt % 23 == 0:
                # LED Tracking
                frame, current_position = detect_led(frame, lower_red, upper_red)

                if current_position is not None:
                    if last_position is not None:
                        # Calculate the position difference
                        dx = current_position[0] - last_position[0]
                        dy = current_position[1] - last_position[1]
                        send_structured_message("Unity", "RotateModel", dx)

                    # Update the last position
                    last_position = current_position

                # Facial Recognition
                face_locations, face_names, frame_has_faces = detect_faces(frame, known_face_encodings, known_face_names)
                new_face_found = len(face_names) > 0
                if new_face_found != face_found:
                    face_found = new_face_found
                    send_structured_message("Java", "FaceIdentification", face_found)

            if framecnt % 69 == 0 and frame_has_faces:
                framecnt = 0

                required_outputs =  ['age', 'emotion']
                result = DeepFace.analyze(frame, actions = required_outputs, enforce_detection = False)
                if len(result) > 0:
                    current_user_age = result[0]["age"]
                    current_user_expression = result[0]["dominant_emotion"]

            cv2.putText(frame, f"Expression: {current_user_expression}", (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)

            # Display the frame
            cv2.imshow('Python Server', frame)

            # Break the loop if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    except KeyboardInterrupt:
        print("Shutting down servers...")
    finally:
        close_all_connections()
        cap.release()
        cv2.destroyAllWindows()

In [32]:
run_servers()

Server listening for Java on port 3000...
Server listening for Unity on port 4200...
Java is not connected.


Action: emotion: 100%|██████████| 2/2 [00:00<00:00,  5.75it/s]
Action: emotion: 100%|██████████| 2/2 [00:00<00:00,  5.92it/s]


Closing all connections...
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server socket closed.
Server for Unity on port 4200 encountered an error: [WinError 10038] An operation was attempted on something that is not a socket
Server for Java on port 3000 encountered an error: [WinError 10038] An operation was attempted on something that is not a socket
