In [1]:
#importing required models
import os
import shutil
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import cv2
from PIL import Image

### Preparing Data for Deep Learning

In [2]:
path = "curve_images"
for folder in os.listdir(path):
    for img_file in os.listdir(os.path.join(path, folder)):
        img_file = os.path.join(path, folder, img_file)
        
        try:
            img = Image.open(img_file)
            if img.mode != "RGB":
                os.remove(img_file)
        except:
            pass
        

In [3]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5]),
])

In [6]:
# creating an image dataset folder for classification
dataset = datasets.ImageFolder("curve_images", transform=transform)
dataset_len = len(dataset)

In [7]:
dataset_len

4517

In [8]:
# Divide dataset into train and test set
train_len, test_len = dataset_len-1000, 1000
train_set, test_set = torch.utils.data.random_split(dataset, [train_len, test_len])
batch_size = 192
len(train_set)

3517

In [9]:
# Create dataloader from train and test sets
train_set = DataLoader(dataset=train_set, shuffle=True, batch_size=batch_size)
test_set = DataLoader(dataset=test_set, shuffle=True, batch_size=batch_size)
len(train_set)

19

### Build Deep Learning Model

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("using device", device)

using device cpu


In [11]:
learning_rate = 0.001

### Deep Learning Model

In [12]:
class Model(nn.Module): 
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=10, kernel_size=3)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=3)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(58320, 1024)
        self.fc2 = nn.Linear(1024, 2)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(x.shape[0],-1)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return x

In [13]:
model = Model().to(device)
print(model)

Model(
  (conv1): Conv2d(3, 10, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(10, 20, kernel_size=(3, 3), stride=(1, 1))
  (conv2_drop): Dropout2d(p=0.5, inplace=False)
  (fc1): Linear(in_features=58320, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=2, bias=True)
)


In [14]:
# Define loss function
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr = learning_rate)

### Train model

In [15]:
train_losses = []
for epoch in range(10):
    train_loss = 0.0
    total_correct = 0
    model.train()
    for data, target in train_set:
  
        optimizer.zero_grad()
        #forward-pass
        output = model(data)
        
        output_idx = torch.argmax(output, dim=1)
        total_correct += (target == output_idx).sum().item()
        
        loss = criterion(output, target)
        #backward-pass
        loss.backward()
        # Update the parameters
        optimizer.step()
        # Update the Training loss
        train_loss += loss.item() * data.size(0)
    
    print(f"Epoch: {epoch}, loss = {train_loss/train_len}, Accuracy = {(total_correct/train_len)*100}%")
print("Finished Training")    

Epoch: 0, loss = 1.7767515265707743, Accuracy = 57.00881433039522%
Epoch: 1, loss = 0.6096417224471596, Accuracy = 68.83707705430766%
Epoch: 2, loss = 0.519409320877803, Accuracy = 76.25817458060847%
Epoch: 3, loss = 0.4016234049886541, Accuracy = 83.76457207847598%
Epoch: 4, loss = 0.31115366141837464, Accuracy = 87.65993744668752%
Epoch: 5, loss = 0.23643236888854857, Accuracy = 91.47000284333238%
Epoch: 6, loss = 0.19014201459218263, Accuracy = 92.86323571225476%
Epoch: 7, loss = 0.15867235221102186, Accuracy = 94.54080181973272%
Epoch: 8, loss = 0.1279676636325591, Accuracy = 95.67813477395507%
Epoch: 9, loss = 0.09343711479350116, Accuracy = 96.73016775661075%
Finished Training


### Test model

In [16]:
with torch.no_grad():
    model.eval()
    total_loss = 0
    total_correct = 0
    for data, target in test_set:
        data, target = data.to(device), target.to(device)
        outputs = model(data)
        loss = criterion(outputs, target)
        total_loss += loss.item() * data.size(0)
        output_idx = torch.argmax(outputs, dim=1)
        total_correct += (target == output_idx).sum()
    
    print(f"loss = {total_loss/test_len}, Accuracy = {(total_correct/test_len)*100}%")

loss = 0.0994840259552002, Accuracy = 97.19999694824219%


### Save Model

In [17]:
# Save model
model_scripted = torch.jit.script(model) # Export to TorchScript
model_scripted.save("aegis_curve.pt") 

In [18]:
# Load model
model = torch.jit.load("aegis_curve.pt")
model.eval()

RecursiveScriptModule(
  original_name=Model
  (conv1): RecursiveScriptModule(original_name=Conv2d)
  (conv2): RecursiveScriptModule(original_name=Conv2d)
  (conv2_drop): RecursiveScriptModule(original_name=Dropout2d)
  (fc1): RecursiveScriptModule(original_name=Linear)
  (fc2): RecursiveScriptModule(original_name=Linear)
)

In [25]:
#Function to check if a frame is part of the curve frames 
def check_speed(path):
    img = Image.open(path)
    img = transform(img).unsqueeze(dim=0).to(device)
    prediction = model(img)
    v = prediction.detach().numpy()
    y = v.tolist()
    if y[0][0] > y[0][-1] :
        return True
    return False

In [26]:
video = cv2.VideoCapture("test_videos/bern_3.mp4")
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))

In [27]:
#writer object to write selected frames
writer = cv2.VideoWriter("sum_vid/final_curve.mp4", cv2.VideoWriter_fourcc(*"DIVX"), 25, (width, height))

In [28]:
ret, frame1 = video.read()
prev_frame = frame1

In [29]:
currentframe = 0
a = 0
b = 0
c = 0

In [30]:
while ret:
    ret, frame = video.read()
    if ret is True:
        name = 'frame' + str(currentframe) + '.jpg'
        a = cv2.imwrite(name, frame)
        currentframe += 1
        if check_speed(name) == True: #Check if a frame is part of speed frame
            writer.write(frame) #Write the frame into the writer
            prev_frame = frame
            a += int(1)
            
        else:
            prev_frame = frame
            b += 1
            
            
        c += 1
        os.remove(name)
        
    #if cv2.waitKey(1) & 0xFF == ord("q"):
       # break
        
print("Total Frames: ", c)
print("Unique Frames: ", a)
print("Common Frames: ", b)
video.release()
writer.release()
cv2.destroyAllWindows()

Total Frames:  2997
Unique Frames:  True
Common Frames:  2517
