### Let's create validation dataloaders for unseen objects

In [40]:
import os
import shutil
import uuid
import random

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torchvision.io import read_video
from torchvision.transforms import Compose, Resize, ToTensor
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms

from training import VideoDataset, data_transform

from vit_pytorch.vivit import ViT


In [9]:
root_validation_data_dir = './dataset/unseen_data'
slip_dir                 = os.path.join(root_validation_data_dir, 'slip')
wriggle_dir              = os.path.join(root_validation_data_dir, 'wriggle')

In [10]:

# Create 'slip' and 'wriggle' directories if they don't exist
os.makedirs(slip_dir, exist_ok=True)
os.makedirs(wriggle_dir, exist_ok=True)

In [11]:
def copy_or_move_with_uuid(src, dst, is_copy=True):
    unique_filename = str(uuid.uuid4()) + os.path.splitext(src)[-1]
    destination_path = os.path.join(dst, unique_filename)
    if is_copy:
        shutil.copy(src, destination_path)
    else:
        shutil.move(src, destination_path)


In [12]:
# Recursively search for "left.avi" and "right.avi" files in base_data_dir
for root, _, files in os.walk(root_validation_data_dir):
    if "left.avi" in files or "right.avi" in files:
        if "slip.txt" in files:
            
            # Copy "left.avi" and "right.avi" to 'slip' directory
            copy_or_move_with_uuid(os.path.join(root, "left.avi"), slip_dir, is_copy=True)
            copy_or_move_with_uuid(os.path.join(root, "right.avi"), slip_dir, is_copy=True)
        else:
            # Move "left.avi" and "right.avi" to 'wriggle' directory
            copy_or_move_with_uuid(os.path.join(root, "left.avi"), wriggle_dir, is_copy=True)
            copy_or_move_with_uuid(os.path.join(root, "right.avi"), wriggle_dir, is_copy=True)
            #print("wriggle",files)
print("Organizing complete.")

Organizing complete.


In [18]:
def make_datasets():

    
    root_dir = './dataset/unseen_data'
    source_slip_videos = './dataset/unseen_data/slip'  
    source_wriggle_videos = './dataset/unseen_data/wriggle' 
    
    classes = ['slip', 'wriggle']
    subsets = ['validation']
    split_ratios = [1.0]  # Train, validation, test split ratios
    
    # Create root directory
    if not os.path.exists(root_dir):
        os.makedirs(root_dir)
    
    # Create subsets directories and copy videos
    for subset in subsets:
        subset_dir = os.path.join(root_dir, subset)
        if not os.path.exists(subset_dir):
            os.makedirs(subset_dir)
    
        for class_name in classes:
            class_dir = os.path.join(subset_dir, class_name)
            if not os.path.exists(class_dir):
                os.makedirs(class_dir)
            
            source_videos = source_slip_videos if class_name == 'slip' else source_wriggle_videos
            video_files = [f for f in os.listdir(source_videos) if f.endswith('.avi')]
            random.shuffle(video_files)  # Shuffle video files
            
            split_ratio = split_ratios[subsets.index(subset)]
            num_videos = int(len(video_files) * split_ratio)
            
            for video_file in video_files[:num_videos]:
                src_path = os.path.join(source_videos, video_file)
                dest_path = os.path.join(class_dir, video_file)
                shutil.copy(src_path, dest_path)
    
    print("Directory structure and video copying completed.")


In [19]:
make_datasets()

Directory structure and video copying completed.


In [25]:
class VideoDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.classes = sorted(os.listdir(data_dir))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        self.videos = self._load_videos()
        self.transform = transform

    def _load_videos(self):
        videos = []
        for class_name in self.classes:
            class_dir = os.path.join(self.data_dir, class_name)
            for video_file in os.listdir(class_dir):
                if video_file.endswith('.avi'):
                    video_path = os.path.join(class_dir, video_file)
                    videos.append((video_path, self.class_to_idx[class_name]))
        return videos

    def __len__(self):
        return len(self.videos)

    def __getitem__(self, idx):
        video_path, label = self.videos[idx]

        video = imageio.get_reader(video_path, 'ffmpeg')  # Open video with imageio
        
        #print("Video:", video_path)
        #print("Number of frames:", len(video))
        #print("Height:", video.get_meta_data()["size"][1])
        #print("Width:", video.get_meta_data()["size"][0])
        
        
        frames = []
        
        for frame in video:
            frame = frame[:, :, :3]  # Keep only the first three channels (RGB)
            frames.append(frame)

        video.close()
        
        #container = av.open(video_path)  # Open the video file with pyav
        #frames = []
        #for frame in container.decode(video=0):  # Loop through video frames
        #    img = frame.to_image()
        #    img = img.convert('RGB')  # Convert to RGB format
        #    frame_array = torch.ByteTensor(torch.ByteStorage.from_buffer(img.tobytes())).view(img.size[1], img.size[0], 3)
        #    frames.append(frame_array.numpy())
        
        #container.close() 
        
        if self.transform:
            frames = [self.transform(frame) for frame in frames]
            video_tensor = torch.stack(frames)
        #print(video_tensor.shape)
        return video_tensor.permute(1, 0, 2, 3), label  # Permute to (batch, channels, frames, height, width)

data_transform = Compose([
    ToTensor(),
])

In [26]:

validation_data_dir = os.path.join(root_validation_data_dir, 'validation')

In [28]:
validation_dataset = VideoDataset(validation_data_dir, transform=data_transform)


In [29]:
batch_size = 4

validation_loader = DataLoader(validation_dataset, batch_size=batch_size, num_workers=0,shuffle=True)

In [37]:
validation_loader.dataset.classes

['slip', 'wriggle']