In [None]:
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from PIL import Image
from torchvision import transforms
import os 

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Create a dummy video for testing
dummy_video_path = "D:\AI Video detector\dummy_video.mp4"
os.makedirs(os.path.dirname(dummy_video_path), exist_ok=True)

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(dummy_video_path, fourcc, 30.0, (256, 256))
for i in range(150): 
    img = np.zeros((256, 256, 3), dtype=np.uint8)
    cv2.putText(img, str(i), (100, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
    out.write(img)
out.release()

print(f"Created dummy video at: {dummy_video_path}")

Using device: cpu
Created dummy video at: D:\AI Video detector\dummy_video.mp4


In [None]:
def get_video_frames(video_path: str, num_frames: int = 24, window_sec: int = 2) -> torch.Tensor:
    """
    Loads a video, samples num_frames evenly over a window_sec duration,
    and returns them as a pre-processed tensor.
    """

    preprocess = transforms.Compose([
        transforms.Resize((224, 224)), 
        transforms.ToTensor(),         
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]  
        ),
    ])

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"Cannot open video file {video_path}")


    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    if fps == 0:
        cap.release()
        raise ValueError(f"Video file {video_path} has 0 FPS.")


    frames_in_window = int(fps * window_sec)
    end_frame_idx = min(frames_in_window, total_frames - 1)
    

    if end_frame_idx <= 0:
        end_frame_idx = total_frames - 1

    indices = np.linspace(0, end_frame_idx, num=num_frames, dtype=int)

    frames = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        
        if not ret:
            if len(frames) > 0:
                frames.append(frames[-1]) 
            else:
                cap.release()
                raise IOError(f"Could not read any frames from {video_path}")
            continue

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_pil = Image.fromarray(frame_rgb)
        frame_tensor = preprocess(frame_pil)
        frames.append(frame_tensor)

    cap.release()
    return torch.stack(frames)

print("Component 1 (get_video_frames) defined.")

Component 1 (get_video_frames) defined.


In [None]:
class ReStraVFeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()
        self.dinov2 = torch.hub.load('facebookresearch/dinov2', 'dinov2_vits14')
        
        self.dinov2.eval()
        for param in self.dinov2.parameters():
            param.requires_grad = False
            
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Extracts features from the final block (block 11) of DINOv2
        and flattens them.
        """
        T = x.shape[0] 
        features_dict = self.dinov2.forward_features(x)
        
        cls_token = features_dict['x_norm_clstoken']       
        patch_tokens = features_dict['x_norm_patchtokens'] 
        

        cls_token = cls_token.unsqueeze(1)                
        trajectory = torch.cat([cls_token, patch_tokens], dim=1) 

        return trajectory.flatten(start_dim=1)

print("Component 2 (ReStraVFeatureExtractor) defined.")

Component 2 (ReStraVFeatureExtractor) defined.


In [None]:
def calculate_geometry(trajectory: torch.Tensor) -> (torch.Tensor, torch.Tensor):
    """
    Calculates the stepwise distance and curvature from the trajectory.
    """
    
    displacement_vectors = trajectory[1:] - trajectory[:-1] 
    
    distances = torch.norm(displacement_vectors, p=2, dim=1) 
    
    delta_z_i = displacement_vectors[:-1] 
    delta_z_ip1 = displacement_vectors[1:]

    cosine_sim = F.cosine_similarity(delta_z_i, delta_z_ip1, dim=1)
    
    cosine_sim = torch.clamp(cosine_sim, -1.0, 1.0)
    
    curvature_rad = torch.acos(cosine_sim)
    curvature_deg = curvature_rad * (180.0 / np.pi)
    
    return distances, curvature_deg

print("Component 3 (calculate_geometry) defined.")

Component 3 (calculate_geometry) defined.


In [None]:
def aggregate_features(distances: torch.Tensor, curvatures: torch.Tensor) -> torch.Tensor:
    """
    Aggregates distance and curvature time-series into a single 
    21-dimensional feature vector.
    """
    
    d_early = distances[:7]
    if len(d_early) < 7:
        d_early = F.pad(d_early, (0, 7 - len(d_early)), 'constant', 0)
        
    c_early = curvatures[:6]
    if len(c_early) < 6:
        c_early = F.pad(c_early, (0, 6 - len(c_early)), 'constant', 0)

    d_stats = torch.stack([
        torch.mean(distances),
        torch.var(distances, unbiased=True),
        torch.min(distances),
        torch.max(distances)
    ]) if distances.numel() > 0 else torch.zeros(4)

    c_stats = torch.stack([
        torch.mean(curvatures),
        torch.var(curvatures, unbiased=True),
        torch.min(curvatures),
        torch.max(curvatures)
    ]) if curvatures.numel() > 0 else torch.zeros(4)
        
    feature_vector = torch.cat([d_early, c_early, d_stats, c_stats])
    
    return torch.nan_to_num(feature_vector, nan=0.0)

print("Component 4 (aggregate_features) defined.")

Component 4 (aggregate_features) defined.


In [None]:
class ReStraVClassifier(nn.Module):
    def __init__(self, input_dim=21, h1=64, h2=32):
        """
        [cite_start]Defines the 2-layer MLP classifier as described in the paper[cite: 228].
        """
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(input_dim, h1),  
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(h1, h2),         
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(h2, 1)           
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.network(x)

print("Component 5 (ReStraVClassifier) defined.")

Component 5 (ReStraVClassifier) defined.


In [None]:
print("--- üöÄ Starting Full Pipeline Test ---")

try:
    feature_extractor = ReStraVFeatureExtractor().to(device)
    classifier = ReStraVClassifier().to(device)
    classifier.eval() 
    print("Models loaded successfully.")

    video_tensor = get_video_frames(dummy_video_path).to(device)
    print(f"C1 Output (Video Tensor):    {video_tensor.shape}")
    
    with torch.no_grad():
        trajectory = feature_extractor(video_tensor)
    print(f"C2 Output (Trajectory 'Z'):  {trajectory.shape}")
    
    distances, curvatures = calculate_geometry(trajectory.cpu())
    print(f"C3 Output (Dist/Curv):     {distances.shape}, {curvatures.shape}")

    final_feature_vector = aggregate_features(distances, curvatures).to(device)
    print(f"C4 Output (21D Vector 'y'):  {final_feature_vector.shape}")
    
    with torch.no_grad():
        logit = classifier(final_feature_vector)
    
    probability = torch.sigmoid(logit)
    prediction = "AI-Generated" if probability.item() > 0.5 else "Natural"
    
    print("\n--- ‚úÖ Test Complete ---")
    print(f"Final Prediction (Untrained): {prediction} ({probability.item():.4f})")
    
except Exception as e:
    print(f"\n--- ‚ùå Test Failed ---")
    print(f"An error occurred: {e}")

--- üöÄ Starting Full Pipeline Test ---


Using cache found in C:\Users\Randinu/.cache\torch\hub\facebookresearch_dinov2_main


Models loaded successfully.
C1 Output (Video Tensor):    torch.Size([24, 3, 224, 224])
C2 Output (Trajectory 'Z'):  torch.Size([24, 98688])
C3 Output (Dist/Curv):     torch.Size([23]), torch.Size([22])
C4 Output (21D Vector 'y'):  torch.Size([21])

--- ‚úÖ Test Complete ---
Final Prediction (Untrained): Natural (0.0000)


In [None]:
import os
import glob
import pandas as pd
from tqdm import tqdm 

base_data_path = "D:\\AI Video detector\\data\\raw"
natural_dir = os.path.join(base_data_path, "natural")
ai_dir = os.path.join(base_data_path, "ai_generated")

output_dir = "D:\\AI Video detector\\data\\processed"
output_csv = os.path.join(output_dir, "features.csv")

os.makedirs(output_dir, exist_ok=True)

natural_files = glob.glob(os.path.join(natural_dir, "*.*"))
ai_files = glob.glob(os.path.join(ai_dir, "*.*"))

file_list = [(f, 0) for f in natural_files] + [(f, 1) for f in ai_files]
print(f"Found {len(natural_files)} natural videos.")
print(f"Found {len(ai_files)} AI-generated videos.")
print(f"Total videos to process: {len(file_list)}")

feature_extractor = ReStraVFeatureExtractor().to(device)

results = []
for file_path, label in tqdm(file_list, desc="Processing Videos"):
    try:
        
        video_tensor = get_video_frames(file_path).to(device)
        
        with torch.no_grad():
            trajectory = feature_extractor(video_tensor)

        distances, curvatures = calculate_geometry(trajectory.cpu())

        final_feature_vector = aggregate_features(distances, curvatures)

        results.append({
            "file": os.path.basename(file_path),
            "label": label,
            "features": final_feature_vector.numpy() 
        })
        
    except Exception as e:
        print(f"Warning: Failed to process {file_path}. Error: {e}")
        print("Skipping this file.")

if results:

    feature_names = [f"f_{i}" for i in range(21)]
    
    df_records = []
    for res in results:
        record = {
            "file": res["file"],
            "label": res["label"]
        }
        for i, val in enumerate(res["features"]):
            record[feature_names[i]] = val
        df_records.append(record)

    df = pd.DataFrame(df_records)
    
    df.to_csv(output_csv, index=False)
    
    print("\n--- üöÄ Processing Complete! ---")
    print(f"Successfully processed {len(df)} videos.")
    print(f"Features saved to: {output_csv}")
    
    print("\n--- Data Head: ---")
    print(df.head())
    
else:
    print("\n--- ‚ùå No results processed. Check your data paths. ---")

In [None]:
import torch

if torch.cuda.is_available():
    print("--- üöÄ CUDA is Available! ---")
    print(f"Device Name: {torch.cuda.get_device_name(0)}")
    print(f"Number of GPUs: {torch.cuda.device_count()}")
    print(f"Current Device Index: {torch.cuda.current_device()}")
    
    # We can also create a tensor and send it to the GPU
    print("\nTesting tensor allocation...")
    try:
        x = torch.tensor([1.0, 2.0]).to("cuda")
        print(f"Successfully allocated tensor on GPU: {x}")
        print("Your environment is ready for GPU training.")
    except Exception as e:
        print(f"\n--- ‚ùå Error allocating tensor to GPU ---")
        print(f"Error: {e}")

else:
    print("\n--- ‚ùå CUDA is NOT Available ---")
    print("PyTorch cannot detect your GPU.")
    print("Training will run on the CPU (which will be much slower).")
    print("Please check your PyTorch installation and CUDA driver versions.")

--- üöÄ CUDA is Available! ---
Device Name: NVIDIA GeForce RTX 3050 Ti Laptop GPU
Number of GPUs: 1
Current Device Index: 0

Testing tensor allocation...
Successfully allocated tensor on GPU: tensor([1., 2.], device='cuda:0')
Your environment is ready for GPU training.
