In [1]:
import numpy as np
import torch 
import cv2 as cv
import os 
import matplotlib.pyplot as plt
from torchinfo import summary


In [None]:
from typing import Literal

def capture_video_frame(video:str, videoName:str, number_of_frames:int , class_label:Literal[0,1]): 
    vid_path = os.path.join(video , videoName)
    cap = capture_video_frame(vid_path)
    total_frames = cv.get(cv.CAP_PROP_FRAME_COUNT)
    frame_indices=  np.random.uniform(1, total_frames, number_of_frames)
    img_samples = []
    frame_count = 0 
    while cap : 
        is_playing , frame = cap.read()
        if not is_playing : 
            break 
        frame_count +=1 

        if frame_count in frame_indices: 
            img_samples.append(frame)

        if cv.waitKey(10) & 0xff == ord('q'): 
            break
    cap.relase()
    cv.destroyAllWindows()

    resized_image = [cv.resize(img , 224, 224) for img in img_samples]
    for idx , img in enumerate(resized_image): 
        if class_label: 
            path = os.path.join('model_data/fire', videoName)
            os.makedirs(path, exist_ok=True)
            file_name = f'{img}.jpg'
            cv.imwrite(os.path.join(path, file_name), img)
        else: 
            




_IncompleteInputError: incomplete input (371091138.py, line 15)

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class Conv2Plus1D(nn.Module):

    def __init__(self, in_channels, out_channels, kernel_size, padding):
        super().__init__()
        # kernel_size is a tuple like (T, H, W)
        T, H, W = kernel_size
        
        self.spatial_conv = nn.Conv3d(
            in_channels, out_channels, kernel_size=(1, H, W), padding=(0, padding, padding)
        )
        self.temporal_conv = nn.Conv3d(
            out_channels, out_channels, kernel_size=(T, 1, 1), padding=(padding, 0, 0)
        )
        
    def forward(self, x):
        x = self.spatial_conv(x)
        x = F.relu(x)
        x = self.temporal_conv(x)
        return x


class ResidualMain(nn.Module):

    def __init__(self, channels, kernel_size, padding=1):
        super().__init__()
        self.conv1 = Conv2Plus1D(channels, channels, kernel_size, padding)
        self.norm1 = nn.LayerNorm([channels, 1, 1, 1])  # LayerNorm over C, D, H, W
        self.conv2 = Conv2Plus1D(channels, channels, kernel_size, padding)
        self.norm2 = nn.LayerNorm([channels, 1, 1, 1])
        self.relu = nn.ReLU()
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        # LayerNorm expects (N, C, D, H, W), but works on last dims. Need permute:
        # We'll permute to (N, D, H, W, C), apply LayerNorm over last dim, then permute back
        out = out.permute(0, 2, 3, 4, 1)  # N, D, H, W, C
        out = self.norm1(out)
        out = out.permute(0, 4, 1, 2, 3)  # N, C, D, H, W
        
        out = self.relu(out)
        out = self.conv2(out)
        
        out = out.permute(0, 2, 3, 4, 1)
        out = self.norm2(out)
        out = out.permute(0, 4, 1, 2, 3)
        
        out += residual
        out = self.relu(out)
        return out


# Example usage similar to your FireDetector conv_block

class FireDetectorWithResidual(nn.Module):
    def __init__(self, in_channels=3, num_classes=1):
        super().__init__()
        self.initial_conv = Conv2Plus1D(in_channels, 16, kernel_size=(3,7,7), padding=1)
        self.bn = nn.BatchNorm3d(16)
        self.relu = nn.ReLU()
        self.pool1 = nn.MaxPool3d((1, 2, 2))
        
        self.res_block1 = ResidualMain(16, kernel_size=(3,3,3))
        self.pool2 = nn.MaxPool3d((2, 2, 2))
        
        self.res_block2 = ResidualMain(16, kernel_size=(3,3,3))
        self.adaptive_pool = nn.AdaptiveAvgPool3d((1, 1, 1))
        
        self.classifier = nn.Linear(16, num_classes)
        
    def forward(self, x):
        x = self.initial_conv(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.pool1(x)
        
        x = self.res_block1(x)
        x = self.pool2(x)
        
        x = self.res_block2(x)
        x = self.adaptive_pool(x)
        
        x = x.flatten(1)  # flatten all except batch dim
        x = self.classifier(x)
        return x


In [None]:
import numpy as np
import torch 
import cv2 as cv
import os 
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader , random_split
from PIL import Image
from torchvision import transforms 

In [None]:
from typing import Dict
import json
import urllib
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
    CenterCropVideo,
    NormalizeVideo,
)
from pytorchvideo.data.encoded_video import EncodedVideo
# from pytorchvideo.transforms import (
#     ApplyTransformToKey,
#     ShortSideScale,
#     UniformTemporalSubsample,
#     UniformCropVideo
# ) 



In [3]:
from typing import Dict
import json
import urllib

from torchvision.transforms import Compose, Lambda, CenterCrop, Normalize
from pytorchvideo.data.encoded_video import EncodedVideo
# Optional: if needed
# from pytorchvideo.transforms import (
#     ApplyTransformToKey,
#     ShortSideScale,
#     UniformTemporalSubsample,
#     UniformCropVideo
# )


In [14]:
video = EncodedVideo.from_path('data/videos/fire/fire_2.mp4')
clip = video.get_clip(start_sec=0, end_sec=1.0)  # Extract a 2-second segment
clip['video'].shape

torch.Size([3, 30, 720, 1280])

In [24]:
video = EncodedVideo.from_path('data/videos/fire/fire_3.mp4')
clip = video.get_clip(start_sec=0, end_sec=1.0)  # Extract a 2-second segment
clip['video'].shape

torch.Size([3, 30, 720, 1280])

In [None]:
# 1. Load video & extract clip
video = EncodedVideo.from_path('data/videos/fire/fire_2.mp4')
clip = video.get_clip(start_sec=0, end_sec=1.0)

# 2. Transform pipeline
transform = ApplyTransformToKey(
    key="video",
    transform=Compose([
        UniformTemporalSubsample(30),
        Lambda(lambda x: x / 255.0),
        NormalizeVideo(mean=[0.45] * 3, std=[0.225] * 3),
        ShortSideScale(256),
        CenterCropVideo(224),
    ])
)

# 3. Apply transform
video_tensor = {'video': clip['video']}
ts_image = transform(video_tensor)
final_ts_image = ts_image['video'].unsqueeze(0)  # [1, 3, 30, 224, 224]

# 4. Pack into SlowFast input
def pack_pathway_output(frames, alpha=4):
    fast_pathway = frames
    slow_pathway = frames[:, :, ::alpha, :, :]
    return [slow_pathway, fast_pathway]

inputs = pack_pathway_output(final_ts_image)

# 5. Load and run model
import torch
slow_fast_model = torch.hub.load('facebookresearch/pytorchvideo', 'slowfast_r50', pretrained=True)
slow_fast_model.eval()

with torch.no_grad():
    preds = slow_fast_model(inputs)


In [1]:
# side_size = 256
# mean = [0.45, 0.45, 0.45]
# std = [0.225, 0.225, 0.225]
# crop_size = 256
# num_frames = 32
# sampling_rate = 2
# frames_per_second = 30
# slowfast_alpha = 4
# num_clips = 10
# num_crops = 3

# class PackPathway(torch.nn.Module):
#     """
#     Transform for converting video frames as a list of tensors. 
#     """
#     def __init__(self):
#         super().__init__()
        
#     def forward(self, frames: torch.Tensor):
#         fast_pathway = frames
#         # Perform temporal sampling from the fast pathway.
#         slow_pathway = torch.index_select(
#             frames,
#             1,
#             torch.linspace(
#                 0, frames.shape[1] - 1, frames.shape[1] // slowfast_alpha
#             ).long(),
#         )
#         frame_list = [slow_pathway, fast_pathway]
#         return frame_list

# transform =  ApplyTransformToKey(
#     key="video",
#     transform=Compose(
#         [
#             UniformTemporalSubsample(num_frames),
#             Lambda(lambda x: x/255.0),
#             NormalizeVideo(mean, std),
#             ShortSideScale(
#                 size=side_size
#             ),
#             CenterCropVideo(crop_size),
#             PackPathway()
#         ]
#     ),
# )

# # The duration of the input clip is also specific to the model.
# clip_duration = (num_frames * sampling_rate)/frames_per_second

In [None]:
# from typing import Tuple
# from tqdm import tqdm 

# def train_the_model(model , number_of_epochs:int ,
#                      data_loader:DataLoader, loss_fn:torch.nn.Module , 
#                      ) -> Tuple[float, float] :
    
#     optimizer = torch.optim.Adam(params=model.parameters() , lr=1e-1, weight_decay=1e-3)
    
#     device = 'cuda' if torch.cuda.is_available() else 'cpu'
#     model.to(device)
    
#     for epoch in range(number_of_epochs): 
#         model.train()
#         batch_loss = 0
#         batch_acc = 0 
#         batch_recall = 0

#         for x , y in tqdm(data_loader) : 
#             x = x.to(device)
#             y = y.to(device)
#             ypred = model(x)
#             # ypred = ypred.squeeze()
#             loss = loss_fn(ypred, y)
            
#             batch_loss += loss.item() 
#             acc , recall = measure_acc(ypred, y) 
#             batch_acc += acc.item()
#             batch_recall += recall.item()
#             loss.backward()
#             optimizer.step()
#         batch_loss /= len(data_loader)
#         batch_acc /= len(data_loader)
#         batch_recall /= len(data_loader)

        
#     return batch_loss , batch_acc, batch_recall


# def eval_the_model(model:torch.nn.Module ,data_loader:DataLoader, loss_fn:torch.nn.Module ):
#     test_loss = 0 
#     test_acc = 0 
#     batch_recall = 0

#     model.eval()
#     with torch.inference_mode(): 
#         for x , y in data_loader: 
#             x = x.to(device)
#             y = y.to(device)
#             ypred = model(x)
#             ypred = np.squeeze(ypred)
#             loss = loss_fn(ypred , y)
#             test_loss+=loss.item()
#             acc , recall = measure_acc(ypred , y)
#             test_acc += acc.item()
#             batch_recall += recall.item()
#         test_loss /= len(data_loader)
#         test_acc /= len(data_loader)
#         batch_recall /= len(data_loader)
    
#     return test_loss, test_acc , batch_recall


In [None]:
from torch.utils.data import Subset, DataLoader
import random
from collections import defaultdict

def get_balanced_subset(dataset, samples_per_class):
    class_indices = defaultdict(list)

    # Group indices by class label
    for idx, (_, label) in enumerate(dataset):
        class_indices[label].append(idx)

    # Sample 'samples_per_class' items from each class
    balanced_indices = []
    for label, indices in class_indices.items():
        # Only sample up to available samples
        sampled = random.sample(indices, min(samples_per_class, len(indices)))
        balanced_indices.extend(sampled)

    # Create a subset
    return Subset(dataset, balanced_indices)

# Example usage:
balanced_subset = get_balanced_subset(dataset, samples_per_class=10)

check_data_loader = DataLoader(balanced_subset, batch_size=2, shuffle=True, num_workers=8, drop_last=True)


In [5]:
from collections import defaultdict

d = defaultdict(list)
d['cat'].append(1)
d['cat'].append(2)
d['dog'].append(3)

print(d)  # {'cat': [1, 2], 'dog': [3]}

defaultdict(<class 'list'>, {'cat': [1, 2], 'dog': [3]})


In [15]:
import torch
import numpy as np

In [7]:
test = torch.rand((2, 3, 30, 224, 224))
test.shape

torch.Size([2, 3, 30, 224, 224])

In [25]:
padding = test[:, :, -1:, :, :].repeat(1, 1, 2, 1, 1)
padding.shape

torch.Size([2, 3, 2, 224, 224])

In [29]:
new_test = torch.cat([padding , test] , dim=2)
new_test.shape


torch.Size([2, 3, 32, 224, 224])

In [19]:
np.repeat(test, 2, axis=2).shape

torch.Size([2, 3, 60, 224, 224])

In [36]:
test[:,:,-1:,:,:].repeat(1, 1, 2, 1, 1).shape


torch.Size([2, 3, 2, 224, 224])

In [24]:
array = np.array([1, 2, 3,4])
array[-1:]

array([4])

In [53]:
def padd_to_32(data:torch.tensor) : 
        T = data.size(2)
        if T == 32 : 
            return data 
        elif T < 32: 
            padd = 32 - T 
            st1 =  data[:,:,-1:,:,:].repeat(1, 1, padd , 1, 1)
            stacked = torch.cat([st1, data] , dim=2)
            return stacked
        else: 
            return data[:, :, :32, : , :]


In [54]:
test = torch.rand((2, 3, 45, 224, 224))
test.shape

torch.Size([2, 3, 45, 224, 224])

In [55]:
padd_to_32(test).shape

torch.Size([2, 3, 32, 224, 224])

In [25]:
# # train.py
# import argparse

# def get_default_config():
#     return {
#         "learning_rate": 0.001,
#         "epochs": 10,
#         "batch_size": 32,
#         "dataset_path": "data/fire_videos",
#         "save_dir": "checkpoints/"
#     }

# def parse_args(): 
#     default_cfg = get_default_config()

#     parser = argparse.ArgumentParser(description="Fire detection trainer")

#     # Add arguments with default values from config
#     parser.add_argument('--learning_rate', type=float, default=default_cfg['learning_rate'])
#     parser.add_argument('--epochs', type=int, default=default_cfg['epochs'])
#     parser.add_argument('--batch_size', type=int, default=default_cfg['batch_size'])
#     parser.add_argument('--dataset_path', type=str, default=default_cfg['dataset_path'])
#     parser.add_argument('--save_dir', type=str, default=default_cfg['save_dir'])

#     return parser.parse_args()


# result = parse_args()
# print(result(args=[]))

In [16]:
import argparse 
parser = argparse.ArgumentParser()

parser.add_argument('--epochs' , type=int , default=10)

args =  parser.parse_args(args=[])
print(args) 


Namespace(epochs=10)
