<a href="https://colab.research.google.com/github/dlwltn0430/autonomous-driving-samples/blob/main/car_movement_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install tensorflow opencv-python-headless numpy
!wget https://pjreddie.com/media/files/yolov3.weights
!wget https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg
!wget https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names

--2024-06-25 17:00:48--  https://pjreddie.com/media/files/yolov3.weights
Resolving pjreddie.com (pjreddie.com)... 162.0.215.52
Connecting to pjreddie.com (pjreddie.com)|162.0.215.52|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 248007048 (237M) [application/octet-stream]
Saving to: ‘yolov3.weights’


2024-06-25 17:01:03 (16.3 MB/s) - ‘yolov3.weights’ saved [248007048/248007048]

--2024-06-25 17:01:03--  https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8342 (8.1K) [text/plain]
Saving to: ‘yolov3.cfg’


2024-06-25 17:01:04 (51.2 MB/s) - ‘yolov3.cfg’ saved [8342/8342]

--2024-06-25 17:01:04--  https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.na

In [None]:
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
from google.colab.patches import cv2_imshow
import time

# YOLO 모델과 클래스 파일 로드
def load_yolo():
    net = cv2.dnn.readNet("yolov3.weights", "yolov3.cfg")
    layer_names = net.getLayerNames()
    output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
    return net, output_layers

# 차량 탐지 함수
def detect_cars(frame, net, output_layers):
    height, width, channels = frame.shape
    blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
    net.setInput(blob)
    outs = net.forward(output_layers)

    boxes = []
    confidences = []
    class_ids = []

    for out in outs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            if confidence > 0.5 and class_id == 2:  # class_id == 2 means 'car'
                center_x = int(detection[0] * width)
                center_y = int(detection[1] * height)
                w = int(detection[2] * width)
                h = int(detection[3] * height)
                x = int(center_x - w / 2)
                y = int(center_y - h / 2)
                boxes.append([x, y, w, h])
                confidences.append(float(confidence))
                class_ids.append(class_id)

    indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)
    return boxes, confidences, class_ids, indexes

# 차량 출발 감지 모델 구축
def build_movement_model():
    model = Sequential()
    model.add(LSTM(50, activation='relu', input_shape=(None, 1)))
    model.add(Dense(1))
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

# 비디오 프레임에서 차량 출발 여부를 감지하는 함수
def process_video(video_path, movement_model):
    net, output_layers = load_yolo()
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    positions = []
    prev_center_y = None

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        boxes, confidences, class_ids, indexes = detect_cars(frame, net, output_layers)

        for i in range(len(boxes)):
            if i in indexes:
                x, y, w, h = boxes[i]
                center_y = y + h // 2

                if prev_center_y is not None:
                    positions.append(center_y - prev_center_y)

                prev_center_y = center_y

                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

        if len(positions) > 30:  # 30 프레임 이상 기록되면
            input_data = np.array(positions[-30:]).reshape(1, 30, 1)
            movement_prediction = movement_model.predict(input_data)
            if movement_prediction > 0.5:
                cv2.putText(frame, "Car Started Moving", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            positions.pop(0)

        cv2_imshow(frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# 모델 훈련 (여기서는 가상의 데이터를 사용, 실제 사용시에는 실제 주행 데이터를 사용)
movement_model = build_movement_model()
# 가상 데이터로 모델 훈련 (실제 데이터로 대체 필요)
x_train = np.random.randn(1000, 30, 1)
y_train = np.random.randint(0, 2, 1000)
movement_model.fit(x_train, y_train, epochs=10, batch_size=32)

# 비디오 처리
process_video('car_movement.mp4', movement_model)