<a href="https://colab.research.google.com/github/hmin27/Anticipate-Accident/blob/main/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting the Environment

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
from torch import nn, Tensor
import torchvision
import torchvision.transforms as transforms
from torchvision.transforms.functional import resize
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
import os

import cv2
from google.colab.patches import cv2_imshow  # for colab env.

import numpy as np
from PIL import Image
from typing import Dict, Iterable, Callable
from tqdm.notebook import tqdm

EPOCH = 3
BATCH_SIZE = 16

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


# Load Dataset


## (1) OpenCV
- use OpenCV
- make the several captures for a video
- make the video with cv2.VideoWriter

In [None]:
# training-positive-000001
PATH = '000001.mp4'

cap = cv2.VideoCapture(PATH)
frame_cnt = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print('Frame 갯수:', frame_cnt)

if cap.isOpened() == False:
  print("Can't open vieo...")

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(frame_width, frame_height)

save_name = f"{PATH.split('.')[0]}_result"
out = cv2.VideoWriter(f"{save_name}.mp4",
                      cv2.VideoWriter_fourcc(*'DIVX'), 20,
                      (frame_width, frame_height))


Frame 갯수: 100
1280 720


##(2) Torchvision
- read_video

In [None]:
!pip install av

Collecting av
  Downloading av-12.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.8/33.8 MB[0m [31m45.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: av
Successfully installed av-12.0.0


In [None]:
class DashcamDataset(Dataset):
  def __init__(self, data_path, train=True, transform=None):
    self.data_path = data_path
    self.transform = transform

    if train == True:
      self.base_path = os.path.join(data_path, "training")
    else:
      self.base_path = os.path.join(data_path, "testing")

    self.positive_path = os.path.join(self.base_path, "positive")
    self.negative_path = os.path.join(self.base_path, "negative")

    self.positive_videos = [os.path.join(self.positive_path, v) for v in sorted(os.listdir(self.positive_path))]
    self.negative_videos = [os.path.join(self.negative_path, v) for v in sorted(os.listdir(self.negative_path))]

    self.video_paths = self.positive_videos + self.negative_videos

  def __len__(self):
    return len(self.video_paths)

  def __getitem__(self, idx):
    video_path = self.video_paths[idx]
    video = torchvision.io.read_video(video_path, output_format = 'TCHW')[0]
    video = torch.stack([resize(frame, (180, 320)) for frame in video])  # Resize
    label = 1 if video_path in self.positive_videos else 0

    return video, label


In [None]:
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

# Random Sampling
# path = "/content/drive/MyDrive/Study/2024-S/Car_Accident_Anticipation/프로젝트/Dashcam_dataset/videos"
path = "/content/drive/MyDrive/Study/Car_Accident_Anticipation/프로젝트/Dashcam_dataset/videos"

train_sampled_dataset = DashcamDataset(path, train=True)

indices = list(range(642))

sampler = SubsetRandomSampler(indices)
train_sampled_loader = DataLoader(train_sampled_dataset, batch_size=BATCH_SIZE, sampler=sampler)

In [None]:
# path = "/content/drive/MyDrive/Study/2024-S/Car_Accident_Anticipation/프로젝트/Dashcam_dataset/videos"
path = "/content/drive/MyDrive/Study/Car_Accident_Anticipation/프로젝트/Dashcam_dataset/videos"

train_dataset = DashcamDataset(path, train=True)
test_dataset = DashcamDataset(path, train=False)
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size = BATCH_SIZE, shuffle=False)

In [None]:
print(len(train_dataset))  # 1284
print(len(test_dataset))  # 466
print(len(train_loader))  # 321
print(len(test_loader))  # 117
print(len(train_sampled_loader))

1284
466
321
117
41


In [None]:
video, label = next(iter(train_sampled_loader))

In [None]:
print(len(video))
print(video.shape)
print(label)

16
torch.Size([16, 100, 3, 180, 320])
tensor([0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1])


In [None]:
import matplotlib.pyplot as plt

frame = video[0][0]

plt.imshow(frame.permute(1, 2, 0))
plt.show()

# Object Detection
- faster R CNN pretrained model with ResNet50


In [None]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights

weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
fasterrcnn = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights,
                                                                  rpn_post_nms_top_n_train=20,
                                                                  rpn_post_nms_top_n_test=20,
                                                                  box_detections_per_img=20)
fasterrcnn = fasterrcnn.eval().to(device)

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:01<00:00, 157MB/s]


## inference with Faster R CNN


In [None]:
class_labels = weights.meta["categories"]
print(class_labels)
print("The number of labels: ", len(class_labels))  # 우리가 사용한 라벨들만 좀 추리기

['__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush']
The number of labels:  91


In [None]:
transform = transforms.Compose([transforms.ToTensor(),])

def predict(image, model, detection_threshold):
  image = transform(image).to(device) # to tensor
  image = image.unsqueeze(0)
  outputs = fasterrcnn(image)
  # Input: batch_size(1) * c * h * w, (1, 3, 720, 1280)
  # Output: [{'scores': [score1, score2, ...], 'boxes': [box1, box2, ...], 'labels': [label1, label2, ...]}]

  pred_classes = [class_labels[i] for i in outputs[0]['labels'].cpu().numpy()]  # class_labels는 따로 정의하기(car, motorbike 등)
  pred_scores = outputs[0]['scores'].detach().cpu().numpy()
  pred_bboxes = outputs[0]['boxes'].detach().cpu().numpy()
  pred_labels = outputs[0]['labels'].cpu().numpy()

  boxes = pred_bboxes[pred_scores >= detection_threshold]

  print("# of candidate objects: ", len(boxes))

  return boxes, pred_classes, outputs[0]['labels']


In [None]:
COLORS = np.random.uniform(0, 255, size=(len(class_labels), 3))

def draw_boxes(boxes, classes, labels, image):
  image = cv2.cvtColor(np.asarray(image), cv2.COLOR_BGR2RGB)  #OepnCV에서 BRG로 저장됨(변환 필요)
  for i, box in enumerate(boxes):
    color = COLORS[labels[i]]
    cv2.rectangle(
        image,
        (int(box[0]), int(box[1])),  # x1, x2
        (int(box[2]), int(box[3])),  # y1, y2
        color,
        2  # thickness
    )

    cv2.putText(image, classes[i], (int(box[0]), int(box[1]-5)),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2,
                lineType=cv2.LINE_AA)

  return image


In [None]:
total_frame = 0
frame_list = []

while cap.isOpened():
  ret, img_frame = cap.read()

  if not ret:
    print('남은 프레임이 없습니다.')
    break

  total_frame += 1
  print("Frame: ", total_frame)

  frame_list.append(img_frame)

  with torch.no_grad():
    boxes, classes, labels = predict(img_frame, fasterrcnn, 0.8)

  image = draw_boxes(boxes, classes, labels, img_frame)

  # cv2_imshow(image)
  # out.write(image)

cap.release()
cv2.destroyAllWindows()

print(total_frame)
print(frame_list[0])


In [None]:
input = frames.float().to(device)  # (100, 3, 720, 1280)

for i in range(input.size(0)):
  frame = input[i, :, :, :]
  print(frame.shape)

  with torch.no_grad():
    boxes, classes, labels = predict(frame, fasterrcnn, 0.7)

# cv2_imshow(image)

# Feature Extraction


In [None]:
class FeatureExtractor(nn.Module):
    def __init__(self, model: nn.Module, layers: Iterable[str]):
        super().__init__()
        self.model = model
        self.layers = layers
        self._features = {layer: torch.empty(0) for layer in layers}

        for layer_id in layers:
            layer = dict([*self.model.named_modules()])[layer_id]
            layer.register_forward_hook(self.save_outputs_hook(layer_id))

    def save_outputs_hook(self, layer_id: str) -> Callable:
        def fn(_, __, output):
            self._features[layer_id] = output
        return fn

    def forward(self, x):
        result = self.model(x)
        return self._features, result

### Try

In [None]:
## 예시 테스트

features_extractor = FeatureExtractor(fasterrcnn, ["roi_heads.box_head.fc7"]).eval()
frames = frames.float().to(device)

roi_features = []

with torch.no_grad():
  for i in range(frames.size(0)):
    frame = frames[i, :, :, :].unsqueeze(0)
    # print(frame)

    feature, result = features_extractor(frame)
    roi_features.append(feature["roi_heads.box_head.fc7"])

roi_features = torch.stack(roi_features, dim=0)

# Full-frame feature
- pre-trained VGG model

In [None]:
vgg = torchvision.models.vgg16(pretrained=True).to(device)
print(vgg)

vgg.classifier = vgg.classifier[:-1]  # 마지막 레이어 삭제

In [None]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.CenterCrop((224, 224)),  # 원하는 크기로 조절
    transforms.ToTensor(),
])

sample_img = frame_list[0]
sample_img = transform(sample_img).unsqueeze(0).to(device)
print(sample_img.shape)

with torch.no_grad():
  feature = vgg(sample_img)

print(feature)
print(feature.shape)

torch.Size([1, 3, 224, 224])
tensor([[0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.4001]],
       device='cuda:0')
torch.Size([1, 4096])


# Model

In [None]:
class Attention(nn.Module):
  def __init__(self, encoder_dim):
    super(Attention, self).__init__()
    self.U = nn.Linear(1024, 512)
    self.W = nn.Linear(encoder_dim, 512)
    self.w = nn.Linear(512, 1)
    self.tanh = nn.Tanh()
    self.softmax = nn.Softmax(1)

  def forward(self, img_features, hidden_state):
    W_e = self.W(img_features)  # (batch, 100, 20, 512)
    U_e = self.U(hidden_state).unsqueeze(0).permute(2, 0, 1, 3)  # (batch, 1, 1, 512)
    att = self.tanh(W_e + U_e)  # (batch, 100, 20, 512)
    e = self.w(att).squeeze(3)  # (batch, 100, 20)
    alpha = self.softmax(e)  # (batch, 100, 20)
    phi = (img_features * alpha.unsqueeze(3)).sum(2)  # phi(x, alpha), (batch, 100, 1024)
    return phi

### Try

In [None]:
## 예시 테스트

features = torch.randn(4, 100, 20, 1024).to(device)
hidden = torch.randn(1, 4, 1024).to(device)

attention = Attention(encoder_dim = 1024).to(device)
phi= attention(features, hidden)

print(phi.shape)
# print(phi)

torch.Size([4, 100, 20, 512])
torch.Size([4, 1, 1, 512])
torch.Size([4, 100, 20, 512])
torch.Size([4, 100, 20])
torch.Size([4, 100, 1024])


## Encoder


In [None]:
# for batch size videos

class Encoder(nn.Module):
  def __init__(self):
    super(Encoder, self).__init__()
    self.model = fasterrcnn
    self.layer = "roi_heads.box_head.fc7"
    self.features_extractor = FeatureExtractor(self.model, [self.layer]).eval()

  def forward(self, videos):
    batch_size = frames.size(0)
    frames_size = frames.size(1)
    video_features = []

    for i in range(batch_size):
      video = videos[i]
      # print(f"{i+1}th video: {video.shape}")

      roi_features = []

      for j in range(frames_size):
        with torch.no_grad():
          frame = video[j, :, :, :].unsqueeze(0)  # (1, 3, 640, 1280)
          feature, _ = self.features_extractor(frame)

          roi_features.append(feature[self.layer])

      roi_features = torch.stack(roi_features, dim=0)  # 모든 frames 하나의 tensor로, [100, 20, 1024]
      video_features.append(roi_features)

    video_features = torch.stack(video_features, dim=0)  # 모든 video 하나의 tensor로, [batch, 100, 20, 1024]
    # print(f"Video Features: {video_features.shape}")

    return video_features

### Try

In [None]:
# for one video

class Encoder_one(nn.Module):
  def __init__(self):
    super(Encoder_one, self).__init__()
    self.model = fasterrcnn
    self.layer = "roi_heads.box_head.fc7"
    self.features_extractor = FeatureExtractor(self.model, [self.layer]).eval()

  def forward(self, frames):
    roi_features = []

    for i in range(frames.size(0)):
      with torch.no_grad():
        frame = frames[i, :, :, :].unsqueeze(0)  # (1, 3, 640, 1280)
        feature, _ = self.features_extractor(frame)

        roi_features.append(feature[self.layer])

    roi_features = torch.stack(roi_features, dim=0)

    return roi_features

In [None]:
frames = torch.randn(4, 100, 3, 360, 640).to(device)

encoder = Encoder()
features = encoder(frames)

print(features.shape)  # (batch_size, 100, 20, 1024)

1th video: torch.Size([100, 3, 360, 640])
2th video: torch.Size([100, 3, 360, 640])
3th video: torch.Size([100, 3, 360, 640])
4th video: torch.Size([100, 3, 360, 640])
total video: torch.Size([4, 100, 20, 1024])
torch.Size([4, 100, 20, 1024])


# Decoder
- LSTM

In [None]:
class DSA_LSTM(nn.Module):
  def __init__(self):
    super(DSA_LSTM, self).__init__()

    self.dsa = Attention(encoder_dim = 1024).to(device)
    self.lstm = nn.LSTM(input_size = 1024,
                        hidden_size = 1024,
                        batch_first = True).to(device)
                        # batch_first = [batch, time_step, input]

    self.prediction = nn.Linear(1024, 1).to(device)
    self.sigmoid = nn.Sigmoid()

  def forward(self, features):
    h0 = torch.randn(1, features.size(0), 1024).to(device)
    c0 = torch.randn(1, features.size(0), 1024).to(device)

    input = self.dsa(features, h0)
    out, _ = self.lstm(input, (h0, c0))
    out = self.prediction(out)
    out = self.sigmoid(out)

    return out

### Try


In [None]:
features = torch.randn(4, 100, 20, 1024).to(device)

lstm = DSA_LSTM()
result = lstm(features)

print(result)
print(result.shape)


# Loss

In [None]:
# a_0 : Probability of Accident
# a_1 : Probability of Non-Accident
# p(prob, frame)
# t(prob, frame)
import torch.nn.functional as F

def prediction(pred):  # [100]
  pred = torch.stack([torch.tensor([pred, frame]) for frame, pred in enumerate(pred)])
  return pred


def CrossEntropyLoss(pred):  # for negative
  label = torch.stack([p[0] for p in pred])
  loss = -torch.sum(torch.log(1-label))
  loss.requires_grad = True

  return loss


def AnticipationLoss(pred):  # for positive
  label = torch.stack([p[0] for p in pred])  # pred prob
  timestep = torch.stack([p[1] for p in pred]).to(int)
  loss = -torch.sum(torch.exp(-torch.maximum(torch.tensor(0), (90 - timestep)))*torch.log(label))
  loss.requires_grad = True

  return loss

In [None]:
encoder = Encoder().eval()
lstm = DSA_LSTM().to(device).train()

optimizer = torch.optim.Adam(lstm.parameters(), lr=0.005)

# lr = optimizer.param_groups[0]['lr']
# print("Learning rate:", lr)   #default = 0.001

# Train

In [None]:
train_loss_save = []


for epoch in range(EPOCH):
  print(f"Epoch: {epoch+1}")

  train_loss = 0
  step = 0

  pbar = tqdm(train_sampled_loader, total=len(train_sampled_loader))
  for videos, labels in pbar:
    step += 1
    optimizer.zero_grad()

    # prediction
    frames = videos.float().to(device)
    labels = labels.to(device)

    features = encoder(frames)
    preds = lstm(features).squeeze(-1)  # [batch_size, frames=100]

    # Loss
    loss = 0

    for i in range(preds.size(0)):
      pred = prediction(preds[i])

      if labels[i] == 1:
        pos_loss = AnticipationLoss(pred)
        loss += pos_loss

      else:
        neg_loss = CrossEntropyLoss(pred)
        loss += neg_loss

    # Train loss
    total_loss = loss / pred.size(0)
    train_loss += total_loss.item()  # Tensor to Scalar

    # back propagation
    total_loss.backward()
    optimizer.step()

    pbar.set_description(f"Epoch {epoch+1}/{EPOCH}, Train Loss: {total_loss.item():.4f}", refresh=True)

    if step % 5 == 0:
      print(f"Step {step}/{len(train_sampled_loader)}, Train Loss: {total_loss.item():.4f}")

  avg_train_loss = train_loss / step

  train_loss_save.append(avg_train_loss)
  print(f"Average Train Loss: {avg_train_loss:.4f}")

  torch.save(lstm.state_dict(), f'DSA_LSTM_train_epoch{epoch+1}.pth')


Epoch: 1


  0%|          | 0/41 [00:00<?, ?it/s]



Step 5/41, Train Loss: 4.2594
Step 10/41, Train Loss: 4.2716
Step 15/41, Train Loss: 4.4426
Step 20/41, Train Loss: 2.4063
Step 25/41, Train Loss: 3.6741
Step 30/41, Train Loss: 3.2847
Step 35/41, Train Loss: 3.7180
Step 40/41, Train Loss: 5.5769
Average Train Loss: 4.0824
Epoch: 2


  0%|          | 0/41 [00:00<?, ?it/s]

Step 5/41, Train Loss: 4.3769
Step 10/41, Train Loss: 3.1621
Step 15/41, Train Loss: 5.6410
Step 20/41, Train Loss: 3.9277
Step 25/41, Train Loss: 6.3997
Step 30/41, Train Loss: 4.4853
Step 35/41, Train Loss: 3.8139


In [None]:
torch.save(lstm.state_dict(), 'DSA_LSTM_1.pth')

model_checkpoint_path = "/content/drive/MyDrive/Study/2024-S/Car_Accident_Anticipation/DSA_LSTM.pth"
torch.save(lstm.state_dict(), model_checkpoint_path)

In [None]:
import matplotlib.pyplot as plt

# Plot the loss values
plt.plot(train_loss_save)

# Label the axes
plt.xlabel('Epoch')
plt.ylabel('Loss')

# Show the plot
plt.show()

NameError: name 'train_loss_save' is not defined