Model nhận biết hành động đánh hay không đánh

In [1]:
import os
import kagglehub

# Download latest version
dataset_path = kagglehub.dataset_download("naveenk903/movies-fight-detection-dataset") + "/Peliculas"

print("Path to dataset files:", dataset_path)


Path to dataset files: /Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-dataset/versions/1/Peliculas


In [2]:
labels = ['noFights', 'fights']
labels2id = {'noFights': 0, 'fights': 1}

print(labels)

number_of_labels = 2

['noFights', 'fights']


In [3]:
import torch
from torch import nn
import torch.nn.functional as f
from PIL import Image
from torchvision import models, transforms, datasets
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

INPUTSIZE = 224


In [4]:
import cv2
import numpy as np
import matplotlib.pyplot as plt


def load_video_to_tensors(video_path):
    # Open the video file
    video = cv2.VideoCapture(video_path)

    frames = []

    while True:
        ret, frame = video.read()
        if not ret:
            break

        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = cv2.resize(frame, (INPUTSIZE, INPUTSIZE), interpolation=cv2.INTER_LINEAR)
        # plt.figure()
        # plt.imshow(frame)

        frame = torch.from_numpy(frame).float().permute(2,0,1)

        # print(frame.size())
        frames.append(frame)


    video.release()

    return frames

dataset = []

for label in labels:
    label_path = f"{dataset_path}/{label}"

    for video in os.listdir(label_path):
        video_path = f"{label_path}/{video}"
        print(video_path)
        x = load_video_to_tensors(video_path)
        y = torch.tensor(labels2id[label])
        # print(x, y)
        # break

        dataset.append((x,y))


/Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-dataset/versions/1/Peliculas/noFights/17.mpg
/Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-dataset/versions/1/Peliculas/noFights/16.mpg
/Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-dataset/versions/1/Peliculas/noFights/28.mpg
/Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-dataset/versions/1/Peliculas/noFights/14.mpg
/Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-dataset/versions/1/Peliculas/noFights/100.mpg
/Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-dataset/versions/1/Peliculas/noFights/15.mpg
/Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-dataset/versions/1/Peliculas/noFights/29.mpg
/Users/phamdinhtrunghieu/.cache/kagglehub/datasets/naveenk903/movies-fight-detection-data

In [5]:
import random

random.shuffle(dataset)

In [6]:
class GetFeature(nn.Module):
    def __init__(self):
        super().__init__()
        input_size  = INPUTSIZE

        self.conv1 = nn.Conv2d(3, 64, kernel_size = 3, stride = 1, padding = 1)
        self.maxpool1 = nn.MaxPool2d(2,2)
        input_size //= 2
        self.conv2 = nn.Conv2d(64, 128, kernel_size = 3, stride = 1, padding = 1)
        self.maxpool2 = nn.MaxPool2d(2,2)
        input_size //= 2
        self.conv3 = nn.Conv2d(128, 256, kernel_size = 3, stride = 1, padding = 1)
        self.maxpool3 = nn.MaxPool2d(2,2)
        input_size //= 2
        self.conv4 = nn.Conv2d(256, 512, kernel_size = 3, stride = 1, padding = 1)
        self.maxpool4 = nn.MaxPool2d(2,2)
        input_size //= 2

        self.fc1 = nn.Linear(input_size * input_size * 512, INPUTSIZE)

    def forward(self, x):

        x = f.relu(self.conv1(x))
        x = self.maxpool1(x)
        x = f.relu(self.conv2(x))
        x = self.maxpool2(x)
        x = f.relu(self.conv3(x))
        x = self.maxpool3(x)
        x = f.relu(self.conv4(x))
        x = self.maxpool4(x)

        x = torch.flatten(x)

        x = f.tanh(self.fc1(x))

        return x

# RNN

In [13]:
class RNNCell(nn.Module):
    def __init__(self):
        super().__init__()
        self.Wx = nn.Linear(INPUTSIZE, 128)
        self.Ws = nn.Linear(128, 128)

    def forward(self, x, s = None):
        z = self.Wx(x)
        if not s == None:
            z = z + self.Ws(s)

        return f.tanh(z)

class RNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.cell = RNNCell()
        self.get_feature = GetFeature()

        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 2)


    def forward(self, frames):
        last_s = None
        for frame in frames:
            feature_vector = self.get_feature(frame)
            last_s = self.cell(feature_vector, last_s)


        last_s = f.relu(self.fc1(last_s))
        last_s = self.fc2(last_s)

        return last_s

model = RNN()

import torch.optim as optim

optimizer = optim.SGD(model.parameters(), lr = 0.01, momentum = 0.9)

loss_calculator = nn.CrossEntropyLoss()

for epoch in range(100):
    for id, (x, y) in enumerate(dataset):
        y_hat = model(x)
        loss = loss_calculator(y_hat, y)
        print(f"Epoch: {epoch} || example: {id}")
        print(y.numpy())
        print(torch.argmax(y_hat).numpy())
        print(loss.item())

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()


Epoch: 0 || example: 0
0
1
0.7621915936470032
Epoch: 0 || example: 1
0
0
0.43971604108810425
Epoch: 0 || example: 2
0
0
0.316957950592041
Epoch: 0 || example: 3
0
0
0.2311413288116455
Epoch: 0 || example: 4
1
0
1.7848124504089355
Epoch: 0 || example: 5
1
0
1.5745452642440796
Epoch: 0 || example: 6
0
0
0.35038867592811584
Epoch: 0 || example: 7
0
0
0.4374980628490448
Epoch: 0 || example: 8
1
0
0.9729369878768921
Epoch: 0 || example: 9
1
0
0.8053944706916809
Epoch: 0 || example: 10
0
1
0.7786017656326294
Epoch: 0 || example: 11
1
1
0.5532603859901428
Epoch: 0 || example: 12
1
1
0.47397682070732117
Epoch: 0 || example: 13
0
1
1.1245734691619873
Epoch: 0 || example: 14
0
1
1.116947889328003
Epoch: 0 || example: 15
1
1
0.4651242196559906
Epoch: 0 || example: 16
0
1
0.9396517276763916
Epoch: 0 || example: 17
1
1
0.582838773727417
Epoch: 0 || example: 18
1
1
0.6208463907241821
Epoch: 0 || example: 19
1
1
0.612571120262146
Epoch: 0 || example: 20
1
1
0.564384400844574
Epoch: 0 || example: 21
0

KeyboardInterrupt: 

# LSTM

In [24]:
class LSTMCell(nn.Module):
    def __init__(self):
        super().__init__()
        self.sigmoid1 = nn.Linear(INPUTSIZE + 128, 128)
        
        self.sigmoid2 = nn.Linear(INPUTSIZE + 128, 128)
        self.tanh1 = nn.Linear(INPUTSIZE + 128, 128)
        
        self.sigmoid3 = nn.Linear(INPUTSIZE + 128, 128)
        self.tanh2 = nn.Linear(128, 128)
        
        

    def forward(self, x, s = None, c = None):
        if s == None:
            s = torch.zeros(128)
        if c == None:
            c = torch.zeros(128)
        
        #sigmoid 1
        # để bỏ bớt đi mấy cái feature không quan trọng trong c
        # dùng sigmoid để bỏ đi mấy cái feature không quan trọng, vì sigmoid chạy từ [0,1] (bỏ hoàn toàn, giữ hoàn toàn)
        xs = torch.cat((x,s))
        sig1_xs = f.sigmoid(self.sigmoid1(xs))
        c = c * sig1_xs
        
        #sigmoid 2
        # thêm các feature quan trọng vào c, hàm tanh được dùng để tạo sự tích cực hay tiêu cực cho feature
        # dùng sigmoid và tanh để tạo sự phức tạp cho mạng
        sig2_xs = f.sigmoid(self.sigmoid2(xs))
        tanh1_xs = f.tanh(self.tanh1(xs))
        sig2tanh1_xs = sig2_xs * tanh1_xs
        c = c + sig2tanh1_xs
        
        #sigmoid 3
        # kết hợp c vào việc tính s ở output
        sig3_xs = f.sigmoid(self.sigmoid3(xs))
        tanh2_xs = f.tanh(self.tanh2(c))
        
        s = sig3_xs * tanh2_xs
        

        return s, c

class LTSM(nn.Module):
    def __init__(self):
        super().__init__()
        self.cell = LSTMCell()
        self.get_feature = GetFeature()

        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 2)


    def forward(self, frames):
        last_s = None
        last_c = None
        for frame in frames:
            feature_vector = self.get_feature(frame)
            last_s, last_c = self.cell(feature_vector, last_s, last_c)

        last_s = f.relu(self.fc1(last_s))
        last_s = self.fc2(last_s)

        return last_s

model = LTSM()

import torch.optim as optim

optimizer = optim.SGD(model.parameters(), lr = 0.01, momentum = 0.9)

loss_calculator = nn.CrossEntropyLoss()

for epoch in range(100):
    for id, (x, y) in enumerate(dataset):
        y_hat = model(x)
        loss = loss_calculator(y_hat, y)
        print(f"Epoch: {epoch} || example: {id}")
        print(y.numpy())
        print(torch.argmax(y_hat).numpy())
        print(loss.item())

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

Epoch: 0 || example: 0
1
0
0.718099057674408
Epoch: 0 || example: 1
0
1
0.7839987874031067
Epoch: 0 || example: 2
0
1
0.7300007939338684
Epoch: 0 || example: 3
1
1
0.6871210336685181
Epoch: 0 || example: 4
1
1
0.6790741682052612
Epoch: 0 || example: 5
1
1
0.6491162180900574
Epoch: 0 || example: 6
0
1
0.7632565498352051
Epoch: 0 || example: 7
0
1
0.7593437433242798
Epoch: 0 || example: 8
0
1
0.7502493262290955
Epoch: 0 || example: 9
0
1
0.7332251667976379
Epoch: 0 || example: 10
1
1
0.6738114356994629
Epoch: 0 || example: 11
0
1
0.697658360004425
Epoch: 0 || example: 12
0
0
0.6778993606567383
Epoch: 0 || example: 13
1
0
0.730548620223999
Epoch: 0 || example: 14
1
0
0.7411055564880371
Epoch: 0 || example: 15
1
0
0.7481496930122375
Epoch: 0 || example: 16
1
0
0.7317287921905518
Epoch: 0 || example: 17
1
0
0.7166278958320618
Epoch: 0 || example: 18
0
1
0.6992001533508301
Epoch: 0 || example: 19
1
1
0.6727598309516907
Epoch: 0 || example: 20
0
1
0.7352619767189026
Epoch: 0 || example: 21
1


KeyboardInterrupt: 

# Peephole Architecture
Nối thêm vector c vào xs trước các hàm sigmoid để tăng tính ảnh hưởng của c lên kết quả

In [25]:
class LSTMCell(nn.Module):
    def __init__(self):
        super().__init__()
        self.sigmoid1 = nn.Linear(INPUTSIZE + 128 + 128, 128)
        
        self.sigmoid2 = nn.Linear(INPUTSIZE + 128 + 128, 128)
        self.tanh1 = nn.Linear(INPUTSIZE + 128, 128)
        
        self.sigmoid3 = nn.Linear(INPUTSIZE + 128 + 128, 128)
        self.tanh2 = nn.Linear(128, 128)
        
        

    def forward(self, x, s = None, c = None):
        if s == None:
            s = torch.zeros(128)
        if c == None:
            c = torch.zeros(128)
        
        xs = torch.cat((x,s))
        xsc = torch.cat((xs, c))
        sig1_xs = f.sigmoid(self.sigmoid1(xsc))
        c = c * sig1_xs
        
        sig2_xs = f.sigmoid(self.sigmoid2(xsc))
        tanh1_xs = f.tanh(self.tanh1(xs))
        sig2tanh1_xs = sig2_xs * tanh1_xs
        c = c + sig2tanh1_xs
        
        sig3_xs = f.sigmoid(self.sigmoid3(torch.cat((xs, c))))
        tanh2_xs = f.tanh(self.tanh2(c))
        
        s = sig3_xs * tanh2_xs
        

        return s, c

class LTSM(nn.Module):
    def __init__(self):
        super().__init__()
        self.cell = LSTMCell()
        self.get_feature = GetFeature()

        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 2)


    def forward(self, frames):
        last_s = None
        last_c = None
        for frame in frames:
            feature_vector = self.get_feature(frame)
            last_s, last_c = self.cell(feature_vector, last_s, last_c)

        last_s = f.relu(self.fc1(last_s))
        last_s = self.fc2(last_s)

        return last_s

model = LTSM()

import torch.optim as optim

optimizer = optim.SGD(model.parameters(), lr = 0.01, momentum = 0.9)

loss_calculator = nn.CrossEntropyLoss()

for epoch in range(100):
    for id, (x, y) in enumerate(dataset):
        y_hat = model(x)
        loss = loss_calculator(y_hat, y)
        print(f"Epoch: {epoch} || example: {id}")
        print(y.numpy())
        print(torch.argmax(y_hat).numpy())
        print(loss.item())

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

Epoch: 0 || example: 0
1
0
0.7019950747489929
Epoch: 0 || example: 1
0
1
0.7731455564498901
Epoch: 0 || example: 2
0
1
0.724738359451294
Epoch: 0 || example: 3
1
0
0.6981870532035828
Epoch: 0 || example: 4
1
1
0.6906003952026367
Epoch: 0 || example: 5
1
1
0.6657988429069519
Epoch: 0 || example: 6
0
1
0.7439027428627014
Epoch: 0 || example: 7
0
1
0.7489844560623169
Epoch: 0 || example: 8
0
1
0.736472487449646
Epoch: 0 || example: 9
0
1
0.7191302180290222
Epoch: 0 || example: 10
1
1
0.6911607980728149
Epoch: 0 || example: 11
0
0
0.6851422786712646
Epoch: 0 || example: 12
0
0
0.6675310134887695
Epoch: 0 || example: 13
1
0
0.7452117800712585
Epoch: 0 || example: 14
1
0
0.7533529996871948
Epoch: 0 || example: 15
1
0
0.7543352246284485
Epoch: 0 || example: 16
1
0
0.7466088533401489
Epoch: 0 || example: 17
1
0
0.7313017845153809
Epoch: 0 || example: 18
0
0
0.6766842603683472
Epoch: 0 || example: 19
1
0
0.6991128921508789
Epoch: 0 || example: 20
0
1
0.7035485506057739
Epoch: 0 || example: 21
1

KeyboardInterrupt: 

# Gated Recurrent Unit
Bỏ đi C, thay vào đó là cho cell có khả năng quyết định quên đi và thêm vào feature cần tiếp trực tiếp vào s
Giảm độ phức tạp tính toán nhưng không chính xác bằng (chỉ có 3 hàm phi tuyến tính và fully connected)

In [7]:
class RNNCell(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.sigmoid_forget = nn.Linear(INPUTSIZE + 128, 128)
        
        self.sigmoid_1  = nn.Linear(INPUTSIZE + 128, 128)
        self.tanh_1 = nn.Linear(INPUTSIZE + 128, 128)
        

    def forward(self, x, s = None):
        if s == None:
            s = torch.zeros(128)
        
        xs = torch.cat((x,s))
        
        xs_1 = torch.cat((f.sigmoid(self.sigmoid_1(xs)) * s, x))
        xs_1_tanh = f.tanh(self.tanh_1(xs_1))
        
        xs_sigmoid_forget = f.sigmoid(self.sigmoid_forget(xs))
        s = s * (1 - xs_sigmoid_forget)
        
        xs_2 = xs_1_tanh * xs_sigmoid_forget
        
        s = s * xs_2

        return s

class RNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.cell = RNNCell()
        self.get_feature = GetFeature()

        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 2)


    def forward(self, frames):
        last_s = None
        for frame in frames:
            feature_vector = self.get_feature(frame)
            last_s = self.cell(feature_vector, last_s)


        last_s = f.relu(self.fc1(last_s))
        last_s = self.fc2(last_s)

        return last_s

model = RNN()

import torch.optim as optim

optimizer = optim.SGD(model.parameters(), lr = 0.01, momentum = 0.9)

loss_calculator = nn.CrossEntropyLoss()

for epoch in range(100):
    for id, (x, y) in enumerate(dataset):
        y_hat = model(x)
        loss = loss_calculator(y_hat, y)
        print(f"Epoch: {epoch} || example: {id}")
        print(y.numpy())
        print(torch.argmax(y_hat).numpy())
        print(loss.item())

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()


Epoch: 0 || example: 0
1
1
0.6230216026306152
Epoch: 0 || example: 1
1
1
0.6175631284713745
Epoch: 0 || example: 2
0
1
0.7870140671730042
Epoch: 0 || example: 3
1
1
0.6045029759407043
Epoch: 0 || example: 4
0
1
0.7998218536376953
Epoch: 0 || example: 5
0
1
0.8006166815757751
Epoch: 0 || example: 6
1
1
0.601833701133728
Epoch: 0 || example: 7
0
1
0.7936781048774719
Epoch: 0 || example: 8
0
1
0.78611159324646
Epoch: 0 || example: 9
0
1
0.7718812823295593
Epoch: 0 || example: 10
1
1
0.6375934481620789
Epoch: 0 || example: 11
0
1
0.740597665309906
Epoch: 0 || example: 12
1
1
0.6636122465133667
Epoch: 0 || example: 13
0
1
0.7147839665412903
Epoch: 0 || example: 14
1
1
0.68599534034729
Epoch: 0 || example: 15
1
1
0.6924451589584351
Epoch: 0 || example: 16
0
1
0.6944000720977783
Epoch: 0 || example: 17
1
0
0.6977977156639099
Epoch: 0 || example: 18
1
0
0.6966854333877563
Epoch: 0 || example: 19
1
1
0.6892927885055542
Epoch: 0 || example: 20
1
1
0.6764387488365173
Epoch: 0 || example: 21
0
1
0