## Импорт необходимых модулей

In [1]:
import os
import cv2
import torch
import numpy as np
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

## Решение

In [2]:
def load_dataset_from_folder(data_folder):
    video_paths = []
    labels = []
    
    for filename in os.listdir(data_folder):
        if filename.endswith(".mpeg"): 
            file_path = os.path.join(data_folder, filename)
            video_paths.append(file_path)
            
            if "fight" in filename.lower():
                labels.append(1)  
            else:
                labels.append(0)  
    
    return video_paths, labels

In [3]:
data_folder = "/kaggle/input/cctv-fights-dataset/dataset/CCTV_DATA/training"
video_paths, labels = load_dataset_from_folder(data_folder)

In [4]:
def video_to_frames(video_path, frame_count=16):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) < frame_count:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (112, 112))  
        frames.append(frame)
    cap.release()
    frames = np.array(frames)
    return frames if len(frames) == frame_count else None 

In [5]:
class FightDataset(Dataset):
    def __init__(self, video_paths, labels, frame_count=8):
        self.video_paths = video_paths
        self.labels = labels
        self.frame_count = frame_count

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]
        
        frames = video_to_frames(video_path, self.frame_count)
        
        if frames is None: 
            raise ValueError(f"Недостаточно кадров в {video_path}")
        
        # Преобразуем кадры в формат CxDxHxW
        frames = np.array(frames)  # форма (D, H, W, C)
        frames = frames.transpose(3, 0, 1, 2)  # теперь форма будет (C, D, H, W)
        frames = torch.tensor(frames, dtype=torch.float32) / 255.0
        
        return frames, torch.tensor(label, dtype=torch.long)

In [6]:
dataset = FightDataset(video_paths, labels)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

In [7]:
class FightDetection3DCNN(nn.Module):
    def __init__(self):
        super(FightDetection3DCNN, self).__init__()
        self.conv1 = nn.Conv3d(3, 64, kernel_size=(3, 3, 3), stride=(1, 2, 2), padding=(1, 1, 1))
        self.conv2 = nn.Conv3d(64, 128, kernel_size=(3, 3, 3), stride=(1, 2, 2), padding=(1, 1, 1))
        self.conv3 = nn.Conv3d(128, 256, kernel_size=(3, 3, 3), stride=(1, 2, 2), padding=(1, 1, 1))
        self.fc1 = nn.Linear(8192, 512)
        self.fc2 = nn.Linear(512, 2)  

        self.pool = nn.MaxPool3d((1, 2, 2))
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.relu(self.pool(self.conv1(x)))
        x = self.relu(self.pool(self.conv2(x)))
        x = self.relu(self.pool(self.conv3(x)))
        x = x.view(x.size(0), -1)  
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [8]:
model = FightDetection3DCNN()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [9]:
def train_model(model, dataloader, optimizer, criterion, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        for frames, labels in dataloader:
            optimizer.zero_grad()
            outputs = model(frames)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        #print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}")

In [10]:
train_model(model, dataloader, optimizer, criterion, num_epochs=10)

In [11]:
def evaluate_model(model, dataloader, criterion):
    model.eval()  
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad(): 
        for frames, labels in dataloader:
            outputs = model(frames)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    #print(f"Средняя потеря: {total_loss / len(dataloader)}")
    #print(f"Точность на тестовом наборе: {accuracy}%")

In [12]:
evaluate_model(model, dataloader, criterion)

In [13]:
torch.save(model.state_dict(), "fight_detection_model.pth")


In [14]:
def detect_fights_in_video(video_path, model):
    cap = cv2.VideoCapture(video_path)
    frames_buffer = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (112, 112))
        frames_buffer.append(frame)
        
        if len(frames_buffer) == 8:  
            input_frames = np.array(frames_buffer).transpose(3, 0, 1, 2)
            input_frames = torch.tensor(input_frames, dtype=torch.float32).unsqueeze(0)

            with torch.no_grad():
                output = model(input_frames)
                _, pred = torch.max(output, 1)
                #if pred.item() == 1:  
                    #print("Драка обнаружена!")
                    
            frames_buffer.pop(0)  

    cap.release()

In [15]:
def run_inference_on_folder(folder_path, model):
    for filename in os.listdir(folder_path):
        if filename.endswith(".mpeg") or filename.endswith(".mp4"): 
            video_path = os.path.join(folder_path, filename)
            detect_fights_in_video(video_path, model)

In [None]:
test_folder = "/kaggle/input/cctv-fights-dataset/dataset/CCTV_DATA/testing"
run_inference_on_folder(test_folder, model)

In [18]:
def detect_and_save_first_fight_video(folder_path, output_path, model, frame_count=8):
    for filename in os.listdir(folder_path):
        if filename.endswith(".mpeg") or filename.endswith(".mp4"):
            video_path = os.path.join(folder_path, filename)
            cap = cv2.VideoCapture(video_path)
            frames_buffer = []
            fight_detected = False

            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = int(cap.get(cv2.CAP_PROP_FPS))
            
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                resized_frame = cv2.resize(frame, (112, 112))
                frames_buffer.append(resized_frame)
                
                if len(frames_buffer) == frame_count:
                    input_frames = np.array(frames_buffer).transpose(3, 0, 1, 2)
                    input_frames = torch.tensor(input_frames, dtype=torch.float32).unsqueeze(0)

                    with torch.no_grad():
                        output = model(input_frames)
                        _, pred = torch.max(output, 1)
                        
                        if pred.item() == 1:
                            fight_detected = True
                            # Рисуем красную рамку на кадре
                            cv2.rectangle(frame, (10, 10), (width - 10, height - 10), (0, 0, 255), 3)
                    
                    frames_buffer.pop(0)  

                out.write(frame)

            cap.release()
            out.release()
            
            if fight_detected:
                print(f"Драка обнаружена в видео: {filename}. Сохранено как {output_path}.")
                break

In [19]:
test_folder = "/kaggle/input/cctv-fights-dataset/dataset/CCTV_DATA/testing"
output_video_path = "/kaggle/working/output_fight_video.mp4"

detect_and_save_first_fight_video(test_folder, output_video_path, model)

Драка обнаружена в видео: fight_0117.mpeg. Сохранено как /kaggle/working/output_fight_video.mp4.


### Выгрузка видео для презентации

In [21]:
def detect_and_save_fight_videos(folder_path, output_folder, model, frame_count=8, num_videos_to_save=9):
    os.makedirs(output_folder, exist_ok=True)  
    saved_videos = 0  

    for filename in os.listdir(folder_path):
        if filename.endswith(".mpeg") or filename.endswith(".mp4"):
            video_path = os.path.join(folder_path, filename)
            cap = cv2.VideoCapture(video_path)
            frames_buffer = []
            fight_detected = False

            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = int(cap.get(cv2.CAP_PROP_FPS))
            
            output_video_path = os.path.join(output_folder, f"fight_video_{saved_videos}.mp4")
            out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                resized_frame = cv2.resize(frame, (112, 112))
                frames_buffer.append(resized_frame)
                
                if len(frames_buffer) == frame_count:
                    input_frames = np.array(frames_buffer).transpose(3, 0, 1, 2)
                    input_frames = torch.tensor(input_frames, dtype=torch.float32).unsqueeze(0)

                    with torch.no_grad():
                        output = model(input_frames)
                        _, pred = torch.max(output, 1)
                        
                        if pred.item() == 1:
                            fight_detected = True
                            cv2.rectangle(frame, (10, 10), (width - 10, height - 10), (0, 0, 255), 3)
                    
                    frames_buffer.pop(0)  

                out.write(frame)

            cap.release()
            out.release()
            
            if fight_detected:
                saved_videos += 1
                print(f"Драка обнаружена в видео: {filename}. Сохранено как {output_video_path}.")
            
            if saved_videos >= num_videos_to_save:
                print(f"Сохранено {saved_videos} видео с обнаруженной дракой.")
                break

test_folder = "/kaggle/input/cctv-fights-dataset/dataset/CCTV_DATA/testing"
output_folder = "/kaggle/working/fight_videos"
detect_and_save_fight_videos(test_folder, output_folder, model)

Драка обнаружена в видео: fight_0117.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_0.mp4.
Драка обнаружена в видео: fight_0079.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_1.mp4.
Драка обнаружена в видео: fight_0943.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_2.mp4.
Драка обнаружена в видео: fight_0114.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_3.mp4.
Драка обнаружена в видео: fight_0183.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_4.mp4.
Драка обнаружена в видео: fight_0941.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_5.mp4.
Драка обнаружена в видео: fight_0005.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_6.mp4.
Драка обнаружена в видео: fight_0206.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_7.mp4.
Драка обнаружена в видео: fight_0029.mpeg. Сохранено как /kaggle/working/fight_videos/fight_video_8.mp4.
Сохранено 9 видео с обнаруженной дракой.
