In [1]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from ultralytics import YOLO
import gradio as gr
from PIL import Image, ImageDraw, ImageFont
import cv2
import numpy as np


class CNNLSTMClassifier(nn.Module):
    def __init__(self, num_classes, hidden_dim=128, lstm_layers=2, lstm_dropout=0.5):
        super(CNNLSTMClassifier, self).__init__()
        self.cnn = models.shufflenet_v2_x1_0(pretrained=True)
        num_ftrs = self.cnn.fc.in_features
        self.cnn.fc = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(num_ftrs, hidden_dim)
        )
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, lstm_layers, batch_first=True, dropout=lstm_dropout)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        if len(x.shape) == 5:
            batch_size, seq_length, c, h, w = x.size()
            cnn_out = []
            for t in range(seq_length):
                cnn_out.append(self.cnn(x[:, t, :, :, :]))

            cnn_out = torch.stack(cnn_out, dim=1)
            lstm_out, _ = self.lstm(cnn_out)
            lstm_out = lstm_out[:, -1, :]
            out = self.fc(lstm_out)
        elif len(x.shape) == 4:
            cnn_out = self.cnn(x)
            lstm_out, _ = self.lstm(cnn_out.unsqueeze(1))
            lstm_out = lstm_out[:, -1, :]
            out = self.fc(lstm_out)
        else:
            raise ValueError("Unsupported input shape")

        return out

# CNN+LSTM 모델 초기화
num_classes_cnnlstm = 4  # Adjust based on your dataset
cnn_lstm_model = CNNLSTMClassifier(num_classes=num_classes_cnnlstm)
cnn_lstm_model.load_state_dict(torch.load(r"C:\Users\ime203\Desktop\Graduation\cnn_lstm_model.pth"))
cnn_lstm_model.eval()




CNNLSTMClassifier(
  (cnn): ShuffleNetV2(
    (conv1): Sequential(
      (0): Conv2d(3, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    )
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (stage2): Sequential(
      (0): InvertedResidual(
        (branch1): Sequential(
          (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=24, bias=False)
          (1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): Conv2d(24, 58, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (3): BatchNorm2d(58, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (4): ReLU(inplace=True)
        )
        (branch2): Sequential(
          (0): Conv2d(24, 58, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): Batc

In [4]:
import os
import base64
import requests
import torch
import torch.nn as nn
from torchvision import models, transforms
from ultralytics import YOLO
import gradio as gr
from PIL import Image, ImageDraw, ImageFont
import cv2
import numpy as np
from dotenv import load_dotenv

# OpenAI API 키 로드
load_dotenv('.env')
openai_api_key = os.getenv('OPENAI_API_KEY')

# YOLO 모델 로드
yolo_model = YOLO(r"C:\Users\ime203\Desktop\Graduation\runs\detect\Epochs80test\weights\best.pt").to('cuda' if torch.cuda.is_available() else 'cpu')

# CNN+LSTM 모델 정의 및 로드
class CNNLSTMClassifier(nn.Module):
    def __init__(self, num_classes, hidden_dim=128, lstm_layers=2, lstm_dropout=0.5):
        super(CNNLSTMClassifier, self).__init__()
        self.cnn = models.shufflenet_v2_x1_0(weights=models.ShuffleNet_V2_X1_0_Weights.DEFAULT)
        num_ftrs = self.cnn.fc.in_features
        self.cnn.fc = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(num_ftrs, hidden_dim)
        )
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, lstm_layers, batch_first=True, dropout=lstm_dropout)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        if len(x.shape) == 5:
            batch_size, seq_length, c, h, w = x.size()
            cnn_out = []
            for t in range(seq_length):
                cnn_out.append(self.cnn(x[:, t, :, :, :]))

            cnn_out = torch.stack(cnn_out, dim=1)
            lstm_out, _ = self.lstm(cnn_out)
            lstm_out = lstm_out[:, -1, :]
            out = self.fc(lstm_out)
        elif len(x.shape) == 4:
            cnn_out = self.cnn(x)
            lstm_out, _ = self.lstm(cnn_out.unsqueeze(1))
            lstm_out = lstm_out[:, -1, :]
            out = self.fc(lstm_out)
        else:
            raise ValueError("Unsupported input shape")

        return out

# 모델 인스턴스 및 로드된 가중치 설정
num_classes = 4  # Adjust based on your dataset
cnn_lstm_model = CNNLSTMClassifier(num_classes=num_classes)
cnn_lstm_model.load_state_dict(torch.load(r"C:\Users\ime203\Desktop\Graduation\cnn_lstm_model.pth"))
cnn_lstm_model.eval()

transform = transforms.Compose([
    transforms.Resize((640, 640)),
    transforms.Lambda(lambda img: img.convert('RGB')),  # 이미지를 RGB로 변환
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 이미지 분류 함수
def classify_image(image):
    image = transform(image).unsqueeze(0)  # 배치 차원 추가
    outputs = cnn_lstm_model(image)
    _, preds = torch.max(outputs, 1)
    class_names = ["Normal", 1, 2, 3]  # Adjust based on your dataset
    return class_names[preds.item()]

# 객체 감지 함수
def detect_objects(image):
    results = yolo_model(image)
    result_image = results[0].plot()  # 첫 번째 결과를 시각화
    return Image.fromarray(result_image), results[0].boxes


# OpenAI API를 사용한 이미지 분류 함수
def classify_with_openai(api_key, img_path):
    with open(img_path, "rb") as img_file:
        base64_image = base64.b64encode(img_file.read()).decode('utf-8')
    
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}"
    }

     payload = {
        "model": "gpt-4o",
        "messages": [
            {
                "role": "user",
                "content": f"Classify this image into: (1) Car-to-car accident, (2) Car-to-human accident, (3) Car-to-motorcycle, (4) Car-to-bicycle accident. Answer with the number. Image: data:image/jpeg;base64,{base64_image}"
            }
        ],
        "max_tokens": 300
    }

    response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
    response_json = response.json()
    
    if response.status_code == 200 and 'choices' in response_json:
        message_content = response_json['choices'][0]['message']['content']
        if "1" in message_content:
            return 1
        elif "2" in message_content:
            return 3
        elif "3" in message_content:
            return 2
        elif "4" in message_content:
            return 2
        else:
            return "Normal"  # No accident
    else:
        print(f"Error in OpenAI API response: {response_json}")
        return "Normal"  # No accident

# 이미지 감지 및 분류 함수
def detect_and_classify(image):
    try:
        # 객체 감지
        detected_image, boxes = detect_objects(image)
        # 이미지 분류
        classification_result = classify_image(image)
        
        # OpenAI API를 통한 이미지 분류
        temp_image_path = "temp_frame.jpg"
        image.save(temp_image_path)
        openai_classification_result = classify_with_openai(openai_api_key, temp_image_path)
        
        # 앙상블을 위한 최종 분류 결과
        final_classification_result = openai_classification_result if openai_classification_result != "Normal" else classification_result
        
        # 특정 조건에 따라 텍스트 추가 (예: 분류 결과가 "1" 또는 "2"일 경우)
        if final_classification_result in [1, 2, 3]:
            try:
                draw = ImageDraw.Draw(detected_image)
                font = ImageFont.truetype("arial.ttf", 36)
                
                if final_classification_result == 1:
                    text = "Car-to-car Accident"
                elif final_classification_result == 3:
                    text = "Car-to-human Accident"
                elif final_classification_result == 2:
                    text = "Car-to-motorcycle/Bicycle Accident"
                
                # 텍스트 크기를 계산
                text_bbox = draw.textbbox((0, 0), text, font=font)
                text_width = text_bbox[2] - text_bbox[0]
                text_height = text_bbox[3] - text_bbox[1]
                width, height = detected_image.size
                x = (width - text_width) // 2
                y = height // 10
                draw.text((x, y), text, font=font, fill=(255, 0, 0))
            except Exception as e:
                print(f"Error drawing text: {e}")

        return detected_image, final_classification_result
    except Exception as e:
        print(f"Error in detect_and_classify: {e}")
        return image, "Error"

# 비디오 처리를 위한 함수
def process_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        try:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(frame_rgb)
            detected_image, classification_result = detect_and_classify(pil_image)
            frames.append(cv2.cvtColor(np.array(detected_image), cv2.COLOR_RGB2BGR))
        except Exception as e:
            print(f"Error processing frame: {e}")
    
    cap.release()
    
    # 비디오 쓰기
    if frames:
        height, width, layers = frames[0].shape
        size = (width, height)
        output_path = 'output_video.mp4'
        out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), 30, size)

        for frame in frames:
            out.write(frame)
        out.release()

        return output_path
    else:
        return "Error: No frames were processed."

interface = gr.Interface(
    fn=process_video,
    inputs=gr.Video(),
    outputs="file",  # 파일 다운로드 링크 제공
    title="YOLO Object Detection and Image Classification in Videos",
    description="Upload a video to detect objects and classify images within the video."
)

interface.launch()


Running on local URL:  http://127.0.0.1:7862

To create a public link, set `share=True` in `launch()`.





0: 256x416 4 Cars, 167.1ms
Speed: 0.0ms preprocess, 167.1ms inference, 7.0ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 3 Cars, 27.8ms
Speed: 18.8ms preprocess, 27.8ms inference, 7.8ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 5 Cars, 52.0ms
Speed: 0.0ms preprocess, 52.0ms inference, 9.0ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 4 Cars, 27.1ms
Speed: 19.7ms preprocess, 27.1ms inference, 0.0ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 2 Cars, 33.7ms
Speed: 8.1ms preprocess, 33.7ms inference, 0.0ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 2 Cars, 22.9ms
Speed: 8.0ms preprocess, 22.9ms inference, 8.0ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 3 Cars, 31.1ms
Speed: 3.3ms preprocess, 31.1ms inference, 0.8ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 5 Cars, 28.1ms
Speed: 14.0ms preprocess, 28.1ms inference, 0.0ms postprocess per image at shape (1, 3, 256, 416)

0: