In [1]:
import os
import cv2
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision.transforms import transforms

In [None]:
def preprocess_video(video_path):
    # Open the video file
    video = cv2.VideoCapture(video_path)
    
    # Get the video properties
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = video.get(cv2.CAP_PROP_FPS)
    
    # Check if the video properties meet the requirements
    if width != 1280 or height != 720:
        print("Video resolution is not 720p. Please provide a video with resolution 1280x720.")
        return None
    
    if fps != 29.97:
        print("Video frame rate is not 29.97 fps. Please provide a video with frame rate 29.97.")
        return None
    
    # Preprocess the video frames
    frames = []
    while True:
        ret, frame = video.read()
        if not ret:
            break
        
        # Perform preprocessing on the frame (e.g., resize, normalize, etc.)
        # ...
        
        frames.append(frame)
    
    # Release the video file
    video.release()
    
    return frames


In [None]:
import torch
import torch.nn as nn

class VoYAGER(nn.Module):
    def __init__(self, num_classes):
        super(VoYAGER, self).__init__()
        
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU(inplace=True)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU(inplace=True)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU(inplace=True)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.fc1 = nn.Linear(256 * 4 * 4, 1024)
        self.relu4 = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(p=0.5)
        
        self.fc2 = nn.Linear(1024, num_classes)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.pool3(x)
        
        x = x.view(x.size(0), -1)
        
        x = self.fc1(x)
        x = self.relu4(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        
        return x


In [None]:
# Define the dataset class
class VideoDataset(torch.utils.data.Dataset):
    def __init__(self, video_dir, transform=None):
        self.video_dir = video_dir
        self.video_files = os.listdir(video_dir)
        self.transform = transform
    
    def __len__(self):
        return len(self.video_files)
    
    def __getitem__(self, idx):
        video_path = os.path.join(self.video_dir, self.video_files[idx])
        frames = preprocess_video(video_path)
        
        if self.transform:
            frames = self.transform(frames)
        
        return frames


if __name__ == "__main__":
    # Set the random seed for reproducibility
    random.seed(42)
    torch.manual_seed(42)
    
    # Define the data directory
    data_dir = "/path/to/data"
    
    # Define the transformations for preprocessing
    transform = transforms.Compose([
        transforms.ToTensor(),
        # Add more transformations as needed
    ])
    
    # Create the dataset
    dataset = VideoDataset(data_dir, transform=transform)
    
    # Split the dataset into train, validation, and test sets
    train_size = int(0.7 * len(dataset))
    val_size = int(0.2 * len(dataset))
    test_size = len(dataset) - train_size - val_size
    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
    
    # Create the data loaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32)
    test_loader = DataLoader(test_dataset, batch_size=32)
    
    # Define the model
    num_classes = 10
    model = TwoStreamCNN(num_classes)
    
    # Define the loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        
        for batch in train_loader:
            frames = batch
            
            # Forward pass
            outputs = model(frames)
            
            # Compute the loss
            loss = criterion(outputs, labels)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * frames.size(0)
        
        # Compute the average training loss for the epoch
        train_loss /= len(train_loader.dataset)
        
        # Validation loop
        model.eval()
        val_loss = 0.0
        val_accuracy = 0.0
        
        with torch.no_grad():
            for batch in val_loader:
                frames = batch
                
                # Forward pass
                outputs = model(frames)
                
                # Compute the loss
                loss = criterion(outputs, labels)
                
                val_loss += loss.item() * frames.size(0)
                val_accuracy += (outputs.argmax(dim=1) == labels).sum().item()
        
        # Compute the average validation loss and accuracy for the epoch
        val_loss /= len(val_loader.dataset)
        val_accuracy /= len(val_loader.dataset)
        
        # Print the training and validation metrics for the epoch
        print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")
    
    # Evaluation loop
    model.eval()
    test_loss = 0.0
    test_accuracy = 0.0
    
    with torch.no_grad():
        for batch in test_loader:
            frames = batch
            
            # Forward pass
            outputs = model(frames)
            
            # Compute the loss
            loss = criterion(outputs, labels)
            
            test_loss += loss.item() * frames.size(0)
            test_accuracy += (outputs.argmax(dim=1) == labels).sum().item()
    
    # Compute the average test loss and accuracy
    test_loss /= len(test_loader.dataset)
    test_accuracy /= len(test_loader.dataset)
    
    # Print the test metrics
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

