In [1]:
import cv2 #OpenCV 라이브러리 사용
import mediapipe as mp
import paho.mqtt.client as mqtt
import threading # 동시에 여러 작업을 수행하기 위한 기술
import base64 # 바이너리 데이터를 텍스트로 인코딩

# ImageMqttPublisher : MQTT를 통해 이미지 전송
# init으로 클래스 초기화
class ImageMqttPublisher: # MQTT Broker와 통신하여 이미지 전송
    def __init__(self, brokerIp=None, brokerPort=1883, pubTopic=None):
        # 인스턴스 변수 초기화
        self.brokerIp = brokerIp # MQTT Broker IP주소
        self.brokerPort = brokerPort # 포트 번호
        self.pubTopic = pubTopic
        self.client = None # MQTT 클라이언트 생성하고 연결할 때 사용하는 변수

    # ImageMqttPublisher 클래스의 인스턴스를 MQTT 브로커에 연결
    def connect(self):
        thread = threading.Thread(target=self.__run, daemon=True) # daemon : 메인 프로그램이 종료될 때 자동으로 종료
        thread.start()
        
    # MQTT 클라이언트를 설정하고 브로커에 연결
    def __run(self):
        self.client = mqtt.Client() # MQTT 클라이언트 객체 생성
        self.client.on_connect = self.__on_connect # 연결
        self.client.on_disconnect = self.__on_disconnect # 연결 해제
        self.client.connect(self.brokerIp, self.brokerPort) # 브로커에 연결
        self.client.loop_start()
        
    # 클라이언트가 브로커에 연결되었을 때
    def __on_connect(self, client, userdata, flags, rc):
        print("ImageMqttClient mqtt broker connected")

    # 클라이언트가 브로커와 연결이 끊어졌을 때
    def __on_disconnect(self, client, userdata, rc):
        print("ImageMqttClient mqtt broker disconnected")

    def disconnect(self):
        self.client.loop_stop() # MQTT 클라이언트의 네트워크 루프 중지
        self.client.disconnect() # 클라이언트와 브로커 간의 연결 해제

    # ImageMqttPublisher 클래스에서 MQTT 브로커에 이미지 전송
    def sendBase64(self, frame):
        # 클라이언트 체크
        if self.client is None: # MQTT 클라이언트가 초기화되지 않았다면 함수 종료
            return
        # 연결 상태 체크
        if not self.client.is_connected():
            return
        # JPEG 포맷으로 이미지 인코딩
        retval, bytes = cv2.imencode(".jpg", frame)
        # 인코딩 실패 체크
        if not retval:
            print("image encoding fail")
            return
        # Base64 문자열로 인코딩
        b64_bytes = base64.b64encode(bytes)
        # Base64로 인코딩된 이미지 데이터를 MQTT Broker에 보내기
        self.client.publish(self.pubTopic, b64_bytes, retain=True)

# Mediapipe 및 OpenCV 설정
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_hands = mp.solutions.hands

cap = cv2.VideoCapture(0)

# localhost : MQTT Broker 서버 주소
imageMqttPublisher = ImageMqttPublisher("localhost", 1883, "/camerapub")
imageMqttPublisher.connect()

# MediaPipe Hands 설정 및 비디오 캡처 루프
with mp_hands.Hands(
    model_complexity=0, # 모델의 복잡도 설정
    min_detection_confidence=0.5, # 손 검출에 대한 최소 신뢰도를 설정
    min_tracking_confidence=0.5) as hands: # 손 추적에 대한 최소 신뢰도를 설정
    # 비디오 캡처 장치가 열려 있는 동안 루프 실행
    while cap.isOpened():
        retval, frame = cap.read() # 웹캠에서 프레임 읽기
        if not retval:
            print("video capture fail")
            break

        frame.flags.writeable = False # 성능 향상을 위해 프레임 쓰기 불가능하게 설정
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # BGR -> RGB 변환
        results = hands.process(frame)

        # 프레임 다시 쓰기 가능 설정 및 색 공간 변환
        frame.flags.writeable = True # 프레임 다시 쓰기 가능
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # RGB -> BGR
        # 손 랜드마크 그리기
        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                # 프레임에 손 랜드마크 그리기
                mp_drawing.draw_landmarks(
                    frame,
                    hand_landmarks, # 그릴 랜드마크 좌표
                    mp_hands.HAND_CONNECTIONS, # 손가락 관절 간의 연결선을 그리기 위해 사용
                    mp_drawing_styles.get_default_hand_landmarks_style(), # 랜드마크의 스타일
                    mp_drawing_styles.get_default_hand_connections_style()) # 랜드마크 연결선의 스타일

        # 보기 편하게 이미지 좌우 반전
        cv2.imshow('MediaPipe Hands', cv2.flip(frame, 1))
        
        # 프레임 크기 조정
        frame = cv2.resize(frame, (640, 480))
        # 프레임을 base64 문자열로 인코딩해서 보냄
        imageMqttPublisher.sendBase64(frame)
        
        if cv2.waitKey(1) & 0xFF == 27:  # ESC 키를 누르면 종료
            break

imageMqttPublisher.disconnect()
cap.release()
cv2.destroyAllWindows()

  self.client = mqtt.Client() # MQTT 클라이언트 객체 생성


ImageMqttClient mqtt broker connected




ImageMqttClient mqtt broker disconnected
