In [22]:
import cv2
import gradio as gr
import numpy as np
import base64
from io import BytesIO
from PIL import Image
import requests

################################### YOLO V3 ###################################

# Yolov3 파일 경로
weights_path = 'yolo3/yolov3.weights'
config_paht = 'yolo3/yolov3.cfg'
names_paht = 'yolo3/coco.names'

# Yolov3 모델 로드
net = cv2.dnn.readNet(weights_path, config_paht)

# 라벨 이름 로드
with open(names_paht, 'r') as f:
    labels = f.read().strip().split('\n')
    print('Labels Length: ', len(labels))

# 객체 감지 함수
def detect_objects(image):
    height, width = image.shape[:2]
    blob = cv2.dnn.blobFromImage(image, 1/255.0, (416, 416), swapRB=True, crop=False)
    net.setInput(blob)
    layer_names = net.getLayerNames()

    output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
    # print(output_layers)
    detections = net.forward(output_layers)
    # print(detections)

    box_list = []
    confidence_list = []
    class_id_list = []

    for output in detections:
        for detection in output:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]

            if confidence > 0.5:
                box = detection[0:4] * np.array([width, height, width, height])
                (center_x, center_y, w, h) = box.astype("int")
                x = int(center_x - (w / 2))
                y = int(center_y - (h / 2))

                box_list.append([x, y, int(w), int(h)])
                confidence_list.append(float(confidence))
                class_id_list.append(class_id)
    
    index_list = cv2.dnn.NMSBoxes(box_list, confidence_list, 0.5, 0.4)

    if len(index_list) > 0:
        for i in index_list.flatten():
            x, y, w, h = box_list[i]
            label = str(labels[class_id_list[i]])
            confidence = confidence_list[i]

            # 사각형 그리기
            cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)

            # 레이블 표시
            cv2.putText(image, f"{label} {confidence:.2f}", (x, y - 10), cv2.FONT_HERSHEY_SCRIPT_COMPLEX, 2, (0, 0, 255), 2)

    return image

#########################################################################################################

################################ chatgpt_response ################################

def chatgpt_response(image_array, history):
    endpoint = "https://fimtrus-openai.openai.azure.com"
    api_key = "310a6832c2394daf97a3f446cc86ce20"
    deployment_name = "fitmrus-gpt4o"

    headers = {
        'Content-Type': 'application/json',
        'api-key': api_key
    }

    messages = []

    # System
    messages.append({
        "role": "system",
        "content": [{
            "type":"text",
            "text": "너는 사진 속에서 감지된 물체에 대해서 분석하는 봇이야."
        }]
    })

    image = Image.fromarray(image_array)
    buffered_io = BytesIO()
    image.save(buffered_io, format='png')
    base64_image = base64.b64encode(buffered_io.getvalue()).decode("utf-8")
    
    # User
    messages.append({
        "role": "user",
        "content": [{
            "type":"text",
            "text": "이 사진에서 감지된 물체에 대해 감지 확률과 함께 자세하게 설명해줘."
        },{
            "type": "image_url",
            "image_url": {
                "url": f"data:image/png;base64,{base64_image}"
            }
        }]
    })

    payload = {
        "messages": messages,
        "temperature": 0.7,
        "top_p": 0.95,
        "max_tokens": 800
    }

    response = requests.post(
        f"{endpoint}/openai/deployments/{deployment_name}/chat/completions?api-version=2024-02-15-preview",
        headers=headers,
        json=payload
    )

    if response.status_code == 200:
        result = response.json()
        bot_response = result['choices'][0]['message']['content'].strip()
        history.append(('User', bot_response))
        return history
    else:
        history.append((str(response.status_code), response.text))
        return history

##################################################################################

################################### TTS ###################################
def get_token():
    endpoint = "https://eastus.api.cognitive.microsoft.com/sts/v1.0/issueToken"
    api_key = "8e5931bade634c4e859a8e7544f87ff7"
    
    headers = {
        "Ocp-Apim-Subscription-Key": api_key,
    }

    response = requests.post(endpoint, headers=headers)
    if response.status_code == 200:
        token = response.text
        return token
    else:
        return ''

def request_tts(text):
    endpoint = "https://eastus.tts.speech.microsoft.com/cognitiveservices/v1"
    token = get_token()

    headers = {
        "Content-Type": "application/ssml+xml",
        "User-Agent": "testForEducation",
        "X-Microsoft-OutputFormat": "riff-24khz-16bit-mono-pcm",
        "Authorization": f"Bearer {token}"
    }

    data = f"""
        <speak version='1.0' xml:lang='ko-KR'><voice xml:lang='ko-KR' xml:gender='Female' name='ko-KR-JiMinNeural'>
            {text}
        </voice></speak>
    """

    response = requests.post(endpoint,
                             headers=headers,
                             data=data)
    
    if response.status_code == 200:
        file_name = 'response_audio.wav'
        with open(file_name, "wb") as audio_file:
            audio_file.write(response.content)
        return file_name
    else:
        return None

###########################################################################

def stream_webcam(image):
    return detect_objects(image)

def click_capture(image):
    return image

def click_send_gpt(image_array, history):
    return chatgpt_response(image_array, history)

def change_chatbot(chatbot):
    # TTS
    text = chatbot[-1][1]
    import re
    pattern = r'[^가-힣a-zA-Z0-9\s]'
    cleaned_text = re.sub(pattern, ' ', text)
    file_name = request_tts(cleaned_text)
    # speech file, wave 리턴
    return file_name

with gr.Blocks() as demo:
    gr.Markdown("# Fimtru's AI World!!!")

    with gr.Column():
        # 실시간 화면, 실시간 감지, 캡쳐 화면
        with gr.Row():
            webcam_input = gr.Image(label="실시간 화면", sources="webcam")
            output_image = gr.Image(label="실시간 감지", interactive=False)
            output_capture_image = gr.Image(label="캡쳐 화면", interactive=False)

        # 캡쳐 버튼, GPT로 전송한는 버튼
        with gr.Row():
            capture_button = gr.Button('캡처')
            send_gpt_button = gr.Button('GPT')

    with gr.Column():
        # chatbot, audio
        chatbot = gr.Chatbot(label="분석 결과")
        chatbot_audio = gr.Audio(label='GPT', interactive=False)
    
    webcam_input.stream(fn=stream_webcam, inputs=[webcam_input], outputs=[output_image])
    capture_button.click(fn=click_capture, inputs=[output_image], outputs=[output_capture_image])
    send_gpt_button.click(fn=click_send_gpt, inputs=[output_capture_image, chatbot], outputs=[chatbot])
    chatbot.change(fn=change_chatbot, inputs=[chatbot], outputs=[chatbot_audio])
    # 실시간 화면에 대한 stream event.
    # 각종 이벤트 리스너 필요.

demo.launch(share=True)



Labels Length:  80
Running on local URL:  http://127.0.0.1:7879

Could not create share link. Please check your internet connection or our status page: https://status.gradio.app.


