### MQTT FUNCTIONS

In [6]:
# !pip install paho-mqtt
# !pip install python-dotenv

Collecting python-dotenv
  Using cached python_dotenv-1.0.0-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.0



[notice] A new release of pip is available: 23.1.2 -> 23.2.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import paho.mqtt.client as mqtt

import os
from dotenv import load_dotenv

load_dotenv()
client_name = os.environ.get('client')
token = os.environ.get('token')
server_name = os.environ.get('server')
port = os.environ.get('port')

In [2]:
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT broker")
    else:
        print(f"Connection failed with code {rc}")
        exit()


In [3]:
def mqtt_connects(data ):
        
    # Create an MQTT client instance
    client = mqtt.Client()
    # Set username and password for authentication
    username = client_name
    password = token
    client.username_pw_set(username, password)
    # Set the callback function
    client.on_connect = on_connect

    # Connect to the MQTT broker (replace with your broker's IP/hostname)
    broker_address = server_name
    client.connect(broker_address, int(port), 60)



    # Start the MQTT client loop in a non-blocking thread
    client.loop_start()

    try:
        previous_message = ""
        message = data


        # Publish a message to a specific topic

        if message != previous_message:
            # Publish a message to a specific topic
            topic = "/hand_d/get/data"
            client.publish(topic, message)
            
            print(f"Message published: {message}")
            previous_message = message
            # return True
        # topic = "test/topic"
        # client.publish(topic, message)
        else:
            print("Message unchanged. Not publishing.")

            # return False
        print("Message published. Disconnecting...")

    except Exception as e:
        print("An error occurred:", str(e))
        return False

    finally:
        # Disconnect the MQTT client
        client.loop_stop()
        client.disconnect()
        print("Disconnected from MQTT broker")



## Hand Detection Send Data

In [4]:
import cv2
import mediapipe as mp
from google.protobuf.json_format import MessageToDict

# MediaPipe hand detection module
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
# hand = mp_hands.Hands()



## Function Count And Send to MQTT

In [9]:
def process_frame(frame  ,old_count):
    # Flip the frame horizontally

    frame = cv2.flip(frame, 1)

    # Process the frame with MediaPipe
    results = hands.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

    # Count the number of fingers
    finger_count = 0
    

    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
            # Calculate the finger count
            if   hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_MCP].y:
                finger_count += 1
            if hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_PIP].y:
                finger_count += 1
            if hand_landmarks.landmark[mp_hands.HandLandmark.MIDDLE_FINGER_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.MIDDLE_FINGER_PIP].y:
                finger_count += 1
            if hand_landmarks.landmark[mp_hands.HandLandmark.RING_FINGER_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.RING_FINGER_PIP].y:
                finger_count += 1
            if hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_PIP].y:
                finger_count += 1

            # Draw hand landmarks on the frame
            mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

    # Draw finger count on the frame
    cv2.putText(frame, f"Finger Count: {finger_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
     

    
    if (finger_count == old_count[0] ):
        already = True

    elif  (finger_count != old_count[0] ):
        old_count[0] = finger_count
        mqtt_connects(old_count[0])

    else:
        print( " - else - ")

    
    #Display the resulting frame
    cv2.imshow('Finger Count', frame)

## Functions ON Lamp


In [5]:
def process_frame_on_lamp(frame ,old_state  ):
    # Flip the frame horizontally
    # label_hand = ""
    frame = cv2.flip(frame, 1)

    # Process the frame with MediaPipe
    results = hands.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

    # Count the number of fingers
    
    

    if results.multi_hand_landmarks:


        if len(results.multi_handedness) == 2:
                # Display 'Both Hands' on the image
            # send_mqtt_one_time(old_state,'Both Hands') 
            cv2.putText(frame, 'Both Hands', (250, 50),
            cv2.FONT_HERSHEY_COMPLEX,0.9, (0, 255, 0), 2)
        else:
            for i in results.multi_handedness:
                
                # Return whether it is Right or Left Hand
                label = MessageToDict(i)['classification'][0]['label']
  
                if label == 'Left':
                    
                    # Display 'Left Hand' on
                    # left side of window
                    cv2.putText(frame, label+' Hand',
                                (20, 50),
                                cv2.FONT_HERSHEY_COMPLEX, 
                                0.9, (0, 255, 0), 2)
                    
                    send_mqtt_one_time(old_state,label)
  
                if label == 'Right':
                      
                    # Display 'Left Hand'
                    # on left side of window
                    cv2.putText(frame, label+' Hand', (460, 50),
                                cv2.FONT_HERSHEY_COMPLEX,
                                0.9, (0, 255, 0), 2)
 
                    
                    send_mqtt_one_time(old_state,label)

        for handLms in results.multi_hand_landmarks:
            mp_drawing.draw_landmarks(frame, handLms,mp_hands.HAND_CONNECTIONS )
    

    cv2.imshow('Hand ',frame)



In [6]:
def check_status_change(previous_status, current_status):
    if previous_status != current_status:
        return True
    return False
def send_mqtt_one_time(old_state , now_state ):

    if check_status_change(old_state[0], now_state):
        if now_state == '':
            return 
        else:
            # print(old_state[0])

            old_state[0] = now_state 
            # print("Status has changed!")
            mqtt_connects(now_state)
            return 


## Run Functions

In [8]:
# OpenCV video capture
cap = cv2.VideoCapture(1)
old_count = [0]
old_state = ["hand"]

# Create a hands object
with mp_hands.Hands(
    static_image_mode=False,
    max_num_hands=2,
    min_detection_confidence=0.90,
    min_tracking_confidence=0.90) as hands:


    # Main program loop
    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break

        # process_frame(frame  ,old_count ) # Finger Count
        process_frame_on_lamp(frame,old_state ) # Check Hand

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

Message published: Left
Message published. Disconnecting...
Connected to MQTT broker
Disconnected from MQTT broker
Message published: Right
Message published. Disconnecting...
Connected to MQTT broker
Disconnected from MQTT broker
Message published: Left
Message published. Disconnecting...
Connected to MQTT broker
Disconnected from MQTT broker
Message published: Right
Message published. Disconnecting...
Connected to MQTT broker
Disconnected from MQTT broker
Message published: Left
Message published. Disconnecting...
Connected to MQTT broker
Disconnected from MQTT broker
Message published: Right
Message published. Disconnecting...
Connected to MQTT broker
Disconnected from MQTT broker
Message published: Left
Message published. Disconnecting...
Disconnected from MQTT broker
Message published: Right
Message published. Disconnecting...
Disconnected from MQTT broker
Message published: Left
Message published. Disconnecting...
Connected to MQTT broker
Disconnected from MQTT broker
Message pub

## TEST SCRIPS

In [None]:
mqtt_connects("Test 22")

Message published. Disconnecting...
Connected to MQTT broker
Disconnected from MQTT broker


In [None]:
def process_frame(frame ,already ,old_count):
    # Flip the frame horizontally

    frame = cv2.flip(frame, 1)

    # Process the frame with MediaPipe
    results = hands.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

    # Count the number of fingers
    finger_count = 0
    

    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
            # Calculate the finger count
            if   hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_MCP].y:
                finger_count += 1
            if hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_PIP].y:
                finger_count += 1
            if hand_landmarks.landmark[mp_hands.HandLandmark.MIDDLE_FINGER_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.MIDDLE_FINGER_PIP].y:
                finger_count += 1
            if hand_landmarks.landmark[mp_hands.HandLandmark.RING_FINGER_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.RING_FINGER_PIP].y:
                finger_count += 1
            if hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP].y < hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_PIP].y:
                finger_count += 1

            # Draw hand landmarks on the frame
            mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

    # Draw finger count on the frame
    cv2.putText(frame, f"Finger Count: {finger_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
     
    # print( " - O - ")
    # print(old_count[0])
    # print( " - F - ")
    # print(old_count[1])

    
    if (finger_count == old_count[0] ):
        already = True
        # old_count[0] = finger_count
        # print( " - Old - ")
        # print(old_count[0])
    elif  (finger_count != old_count[0] ):
        old_count[0] = finger_count
    # if already:
        # print( " - xx - ")
        # print(old_count[0])

        mqtt_connects(old_count[0])
        # already = False
    else:
        print( " - else - ")

    
    #Display the resulting frame
    cv2.imshow('Finger Count', frame)
# OpenCV video capture
cap = cv2.VideoCapture(0)
data_already = False
old_count = [0]
# Create a hands object
with mp_hands.Hands(
    static_image_mode=False,
    max_num_hands=2,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as hands:


    # Main program loop
    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break

        process_frame(frame ,data_already ,old_count )

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

Printing data: 1


In [93]:
class StatusStateMachine:
    def __init__(self):
        self.state = "On"

    def update_status(self, new_status):
        if new_status == "On" and self.state != "On":
            print("Status has changed to On!")
        elif new_status == "Off" and self.state != "Off":
            print("Status has changed to Off!")

In [92]:
# Example usage
status_machine = StatusStateMachine()
status_machine.update_status("On")
status_machine.update_status("Off")

Status has changed to Off!


In [97]:
def status_changed_handler(new_status):
    print(f"Status has changed to {new_status}!")

# Example usage
new_status = "Off"
status_changed_handler(new_status)

Status has changed to Off!
