In [1]:
import pandas as pd
from sklearn.model_selection import KFold
from torchvision import transforms  # If you're dealing with image data
import torch.nn as nn
from mkit.torch_support.nn_utils import training_loop
from mkit.torch_support.tensor_utils import xy_to_tensordataset, one_cut_split, sequential_x_y_split
from mkit.torch_support.predict import autoregressive
from mkit.torch_support.model.Autoencoder import GANEncoder
from mkit.torch_support.model.CNN import AdjustableCNN
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm

In [2]:

class Predictor(nn.Module):
    def __init__(self, width, height, hidden_dims, output_dim):
        """
        Predictor class with GANEncoder and fully connected layers for prediction.

        Args:
            hidden_dims (list[int]): List of dimensions for convolutional layers in GANEncoder.
            output_dim (int): Dimension of the prediction output.
        """
        super(Predictor, self).__init__()
        
        # Encoder
        self.encoder = GANEncoder(hidden_dims=hidden_dims)
        
        # Fully connected layers for prediction
        self.fc_layers = nn.Sequential(
            nn.Linear(hidden_dims[-1] * width * height, 128),  # From the last encoder dimension to 128
            nn.ReLU(),
            nn.Linear(128, 64),              # From 128 to 64
            nn.ReLU(),
            nn.Linear(64, output_dim)        # From 64 to output dimension
        )

    def forward(self, x):
        """
        Forward pass for the predictor.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Prediction output.
        """
        x = self.encoder(x)  # Pass through the encoder
        x = x.view(x.size(0), -1)  # Flatten the output for the fully connected layers
        x = self.fc_layers(x)  # Pass through the fully connected layers
        return x

In [3]:
from torchvision.datasets import MNIST
from torch.utils.data import Subset

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

from torch.utils.data import DataLoader, random_split
dataset = MNIST(root='./data', train=False, download=True, transform=transform)


In [None]:

from tqdm import tqdm
from time import sleep
import torch

def training_procedure(train_loader):
    NUM_OF_CLASSES = 10
    WIDTH, HEIGHT = 28, 28
    model = Predictor(width=WIDTH, height=HEIGHT, hidden_dims=[1, 16], output_dim=NUM_OF_CLASSES)
    device = torch.device('cuda')
    criterion = torch.nn.CrossEntropyLoss()
    model, losses = training_loop(
        model=model, 
        device=device,
        train_loader=train_loader,
        optimizer=torch.optim.Adam(model.parameters()),
        criterion=criterion,
        keep_losses=True,
    )
    return model, device, criterion

def testing_procedure(
        model,
        test_loader,
        device,
        criterion    
    ):
    
    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, labels in tqdm(test_loader):
            inputs, labels = inputs.to(device), labels.to(device)  # Move data to the appropriate device

            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, labels)  # Compute loss
            total_loss += loss.item()

            # Compute accuracy
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)

    average_loss = total_loss / len(test_loader)
    accuracy = correct_predictions / total_samples

    print(f'Average Loss: {average_loss:.4f}, Accuracy: {accuracy:.4%}')

# Define the procedure function
def procedure(train_subset, test_subset, **kwargs):
    train_loader = DataLoader(train_subset)
    test_loader = DataLoader(test_subset)
    print("Data Parsed.")
    model, device, criterion = training_procedure(
        train_loader
    )
    testing_procedure(
        model,
        test_loader, 
        device,
        criterion
    )

# Define the K-Fold validation function
def k_fold_validation(
        dataset: torch.utils.data.Dataset = None,
        n_splits: int = 5,
        procedure: callable = None,
        **kwargs
    ):
    """
    Performs K-Fold Cross Validation on a given dataset.

    Parameters:
    - dataset (torch.utils.data.Dataset): The dataset to split.
    - n_splits (int): Number of folds.
    - procedure (callable): Function to execute on each fold. Should accept (train_subset, test_subset, **kwargs).
    - **kwargs: Additional keyword arguments to pass to the procedure.

    Examples:
    >>> N_SPLITS = 5
    >>> dataset = MNIST(root='./data', train=False, download=True, transform=transform)
    >>> def procedure(train_subset, test_subset, **kwargs):
    >>>     ...
    >>> k_fold_validation(dataset, procedure=procedure)
    """
    if dataset is None:
        raise ValueError("Dataset must be provided.")

    kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)

    for fold, (train_ids, val_ids) in enumerate(kfold.split(dataset)):
        tqdm.write(f"Current Fold: [{fold + 1}/{n_splits}]")
        tqdm.write(f"Training Data Size: {len(train_ids)}; Testing Data Size: {len(val_ids)}")
        train_subset = Subset(dataset, train_ids)
        test_subset = Subset(dataset, val_ids)
        procedure(train_subset, test_subset, **kwargs)

k_fold_validation(dataset, procedure=procedure)


K-Fold Validation: 0it [00:00, ?it/s]

Current Fold: [1/5]
Training Data Size: 8000; Testing Data Size: 2000
Data Parsed.


EPOCH 1/5: 100%|██████████| 8000/8000 [01:33<00:00, 85.15it/s]
K-Fold Validation: 0it [01:34, ?it/s]

Epoch [1/5] Training Loss: 0.5030 

EPOCH 2/5: 100%|██████████| 8000/8000 [01:22<00:00, 96.82it/s]
K-Fold Validation: 0it [02:56, ?it/s]

Epoch [2/5] Training Loss: 0.2750 

EPOCH 3/5:  35%|███▌      | 2837/8000 [00:28<00:52, 99.20it/s] 
K-Fold Validation: 0it [03:25, ?it/s]


KeyboardInterrupt: 