<a href="https://colab.research.google.com/github/NadineML/MAML-Pytorch/blob/master/ProtoNet_create_data_split_csv.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# The purpose of this *.ipynb* is to create *.csv*-files containing filenames and corresponding labels (deducted from the directory structure) in a specified split of a dataset contained in a specified folder to be used as datasets for training and validating in machine learning algorithms.
Depending on your implementation of a dataset for your ML algorithm you might be required to move the actual files into a specific directory structure. This notebook does not move the files in any way, it only creates *.csv*-files and directories, if specified.

In [1]:
#@title import necessary modules { form-width: "15%", display-mode: "form" }
import torch
from torch.utils.data import Dataset, IterableDataset
from torchvision import transforms, datasets
import random
import numpy as np
from typing import List, TypeVar, Iterable
import os
import csv
import bisect
import functools

random.seed(222)
np.random.seed(222)

In [2]:
#@title Mount Google Drive { form-width: "15%", display-mode: "form" }
#@markdown This block requires you to go through the login process of your Google Account to access Google Drive where your dataset should be stored.
from google.colab import drive
base_path = '/content/data'
drive.mount(base_path, force_remount=True)
wd = os.path.join(base_path, "MyDrive")

Mounted at /content/data


In [3]:
#@title CombinedDataset Class  { form-width: "15%", display-mode: "form" }
T_co = TypeVar('T_co', covariant=True)

class CombinedDataset(Dataset[T_co]):
  # modified version of torch.utils.data.dataset.ConcatDataset
    r"""Dataset as a combination of multiple datasets.

    This class is a modified version of torch.utils.data.dataset.ConcatDataset,
    which is useful to assemble different existing datasets.

    Args:
        datasets (sequence): List of datasets to be concatenated
    """
    datasets: List[Dataset[T_co]]
    cumulative_sizes: List[int]

    @staticmethod
    def cumsum(sequence):
        r, s = [], 0
        for e in sequence:
            l = len(e)
            r.append(l + s)
            s += l
        return r
    
    @staticmethod
    def labels_classes(sequence):
        label = 0
        classes = []
        for ds in sequence:
          label += len(ds.classes)
          classes.append(ds.classes)
        return list(range(label)), classes
    



    @staticmethod
    def mapped_labels(labels, classes):
        map = {}
        cidx = 0
        max_label_idx = len(labels)
        for idx in range(len(classes)):
          for idc in range(0,len(classes[idx])):
              if cidx >= max_label_idx:
                  return map
              map.update({labels[cidx] : (classes[idx][idc], idc)})
              cidx += 1
              
        return map

    
    @staticmethod
    def mapped_items_per_label(datasets, map):
        items_per_label = {}
        max_idx = len(map.keys())
        idx = 0
        offset = 0
        offset2 = 0
        
        for ds in datasets:
          labels = [instance[1] for instance in ds.imgs]
          for i in range(idx, idx+len(ds.classes)):
            items_per_label.update({i : [j+offset2 for j in range(len(labels)) if labels[j] == map.get(i)[1]]})
            offset = len(ds.classes)
          offset2 += len(labels)
          idx += offset
          #offset2 -= 1
        return items_per_label          


    @staticmethod
    def modified_classes(datasets, grouped_classes):
        mod_classes = []
        for d in range(len(grouped_classes)):
            p = os.path.basename(datasets[d].root)
            for c in grouped_classes[d]:
                mod_classes.append(p+" / "+c)
        return mod_classes


    def __init__(self, datasets: Iterable[Dataset]) -> None:
        super(CombinedDataset, self).__init__()
        self.datasets = list(datasets)
        assert len(self.datasets) > 0, 'datasets should not be an empty iterable'  # type: ignore[arg-type]
        for d in self.datasets:
            assert not isinstance(d, IterableDataset), "CobinedDataset does not support IterableDataset"
        self.cumulative_sizes = self.cumsum(self.datasets)
        
        #self.imgs, self.possible_labels, self.grouped_classes = self.labels_classes(self.datasets)
        self.possible_labels, self.grouped_classes = self.labels_classes(self.datasets)
        self.mapped_labels = self.mapped_labels(self.possible_labels, self.grouped_classes)
        self.items_per_label = self.mapped_items_per_label(self.datasets, self.mapped_labels)
        self.mod_classes = self.modified_classes(self.datasets, self.grouped_classes)

    def __len__(self):
        return self.cumulative_sizes[-1]

    def __getitem__(self, idx):

        if idx < 0:
            if -idx > len(self):
                raise ValueError("absolute value of index should not exceed dataset length")
            idx = len(self) + idx
        dataset_idx = bisect.bisect_right(self.cumulative_sizes, idx)
        if dataset_idx == 0:
            sample_idx = idx
        else:
            sample_idx = idx - self.cumulative_sizes[dataset_idx - 1]

        return (self.datasets[dataset_idx].imgs[sample_idx][0], self.labels[idx])
        



In [4]:
#@title def create_data_csv{ form-width: "15%", display-mode: "form" }
#@markdown The version **create_data_csv()** saves the relevant data in the form of **rel_path/filename, label**
def create_data_csv(save_to_path, _filename, _label):
    with open(save_to_path, 'w', newline='') as csvfile:
        header_key = ['filename','label']
        new_val = csv.DictWriter(csvfile, fieldnames=header_key)
        new_val.writeheader()
        for idx in range(len(_filename)):
            new_val.writerow({'filename': os.path.relpath(_filename[idx], start=drive_path),'label': _label[idx]})

#@markdown The version **create_data_csv_1** saves the relevant data in the form of **filename, label**
def create_data_csv_1(save_to_path, _filename, _label):
    with open(save_to_path, 'w', newline='') as csvfile:
        header_key = ['filename','label']
        new_val = csv.DictWriter(csvfile, fieldnames=header_key)
        new_val.writeheader()
        for idx in range(len(_filename)):
            new_val.writerow({'filename': os.path.basename(_filename[idx]),'label': _label[idx]})

#@markdown By selecting the *keep_rel_path_in_filename checkbox* you switch from using **create_data_csv_1** (default) to using **create_data_csv()**

In [5]:
#@title Provide a path to the dataset { form-width: "15%", display-mode: "form" }
#@markdown If your dataset consists of multiple folders for different classes with subfolders for their respective labels, please choose CombinedDataset. If you have one folder with one class containing subfolders for lables, choose ImageFolder.
dataset_type = "ImageFolder" #@param ["ImageFolder", "CombinedDataset"]
#@markdown Please provide a path to your dataset, relative to your Google Drive "root" level.
rel_data_path = "images" #@param {type:"string"}
#@markdown Please select how to split your dataset into training and validation set
max_train_data = 10 #@param {type:"integer"}
max_val_data = 30 #@param {type:"integer"}
keep_rel_path_in_filename = True #@param {type:"boolean"}
#@markdown Please select this checkbox if want to create a folder for the *.csv* files under the **parent directory** of *rel_data_path*
create_folder = True #@param {type:"boolean"}

drive_path = os.path.join(wd, rel_data_path)
custom_mode = False
if dataset_type == "CombinedDataset":
    custom_mode = True




In [6]:
#@title Create datasets { form-width: "15%", display-mode: "form" }


d_transform = transforms.Compose([])

if custom_mode:
    all_folders = [os.path.join(drive_path,  f) for f in sorted(os.listdir(drive_path))]

    #create dataset from all folders in the specified path
    test_fp = os.path.join(drive_path, "test")
    train_fp = os.path.join(drive_path, "train")

    if train_fp in all_folders and test_fp in all_folders:
        folder_list = [os.path.join(train_fp, f) for f in sorted(os.listdir(train_fp))]
        folder_list2 = [os.path.join(test_fp, f) for f in sorted(os.listdir(test_fp))]
        all_folders.remove(train_fp)
        all_folders.remove(test_fp)
        all_folders.extend(folder_list)
        all_folders.extend(folder_list2)
        
    
    dataset_list = []
    
    
    for folder in all_folders:
        _ds = datasets.ImageFolder(root=folder, transform=d_transform)
        dataset_list.append(_ds)

    ds = CombinedDataset(dataset_list)

    
    _filename = []
    _label = []
    for im in range(len(ds)):
        _filename.append(ds[im][0])
        _label.append(ds.mod_classes[ds[im][1]])

    items_per_label = ds.items_per_label



else:

    ds = datasets.ImageFolder(root=drive_path, transform=d_transform)

    items_per_label = {}
    max_idx = len(ds.classes)
    labels = ds.targets
    for i in range(max_idx):
        items_per_label.update({i : [j for j in range(len(labels)) if labels[j] == i]})


    
    _filename = []
    _label = []
    for im in range(len(ds)):
        _filename.append(ds.imgs[im][0])
        _label.append(ds.classes[ds.imgs[im][1]])

lens = []
for l in items_per_label.keys():
    lens.append(len(items_per_label[l]))




In [7]:
#@title Split datasets { form-width: "15%", display-mode: "form" }
random.seed(222)
np.random.seed(222)

_val_filename = []
val_label = []
#_test_filename = []
#test_label = [] 
_train_filename = []
train_label = []

for idx in range(len(lens)):
    l = lens[idx]
    k = min(l, max_train_data)
    m = min(l-k, max_val_data)
    cur_class = ds.classes[idx]

    data = items_per_label[idx]
    np.random.shuffle(data)
    train_slice = data[:k]
    #test_slice = data[k:k+m]
    val_slice = data[k:k+m]

    
    _train_filename.extend(train_slice)
    train_label.extend([cur_class]*k)
    #_test_filename.extend(test_slice)
    #test_label.extend([cur_class]*m)
    _val_filename.extend(val_slice)
    val_label.extend([cur_class]*m)

val_filename = [_filename[i] for i in _val_filename]
#test_filename = [_filename[i] for i in _test_filename]
train_filename = [_filename[i] for i in _train_filename]

In [8]:
#@title Save *.csv files { form-width: "15%", display-mode: "form" }
if create_folder:
    d = os.path.dirname(drive_path)
    b = os.path.basename(drive_path)
    save_to = os.path.join(d, "ProtoNet: "+b+" {}-{}".format(max_train_data, max_val_data))

    os.makedirs(save_to, exist_ok=True)
else:
    save_to = drive_path

all_sample_path = os.path.join(save_to, "all_samples.csv")
val_sample_path = os.path.join(save_to, "val.csv")
train_sample_path = os.path.join(save_to, "train.csv")
#test_sample_path = os.path.join(save_to, "test.csv")

if keep_rel_path_in_filename:
    create_data_csv(all_sample_path, _filename, _label)
    create_data_csv(val_sample_path, val_filename, val_label)
    #create_data_csv(test_sample_path, test_filename, test_label)
    create_data_csv(train_sample_path, train_filename, train_label)

else:
    create_data_csv_1(all_sample_path, _filename, _label)
    create_data_csv_1(val_sample_path, val_filename, val_label)
    #create_data_csv_1(test_sample_path, test_filename, test_label)
    create_data_csv_1(train_sample_path, train_filename, train_label)

print(f"Created 2 CSV files at \"{save_to}\".\n")

Created 2 CSV files at "/content/data/MyDrive/ProtoNet: images 10-30".

