In [None]:
from itertools import product
from collections import defaultdict
import numpy as np

class stratifiedKFold:
  def __init__(self, n_split, shuffle, random_state):
    self.n_split = n_split
    self.shuffle = shuffle
    self.random_state = random_state

  def split(self, X, y):
    if (self.shuffle and self.random_state) is not None:
      np.random.seed(self.random_state)

    idx = np.arange(len(y))
    if self.shuffle:
      np.random.shuffle(idx)

    y_idx = defaultdict(list)
    for i, label in zip(idx, y):
      y_idx[label].append(i)

    splits = [[] for _ in range(self.n_split)]

    for label, i in y_idx.items():
      np.random.shuffle(i)
      split_portions = [len(i) // self.n_split] * self.n_split
      for j in range(len(i) % self.n_split):
        split_portions[j] += 1

      first = 0
      for k in range(self.n_split):
        last = first + split_portions[k]
        splits[k].extend(i[first:last])
        first = last

    for i in range(self.n_splits):
      test_idx = splits[i]
      train_idx = np.concatenate([splits[j] for j in range(self.n_split) if j != i])
      yield train_idx, test_idx

    def get_n_splits(self):
        return self.n_split

In [None]:
# to be filled/expand, such as batch size
hypers = [
    {'lr': []},
    {'epochs': []}
]

# calling the function, results is a list of tuple (best_params, test_acc) where
# best_params is the optimal param and test_acc is its corresponding accuracy

# results = cross_validate(model, metadata, metadata['dx_encoded'], hypers)

def cross_validate(model, X, y, hyperparameters):
    outer_fold = stratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    outer_results = []

    # outer fold
    for train_idx, test_idx in outer_fold.split(X, y):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        inner_fold = stratifiedKFold(n_splits=3, shuffle=True, random_state=42)
        best_acc = -np.inf
        best_params = None

        # set up hyperparameter combinations
        param_keys = [list(d.keys())[0] for d in hyperparameters]
        param_values = [list(d.values())[0] for d in hyperparameters]
        combinations = product(*param_values)
        hyper_combo = [{param_keys[i]: combo[i] for i in range(len(param_keys))} for combo in combinations]

        for param in hyper_combo:
            # TODO: replace/remove based on your own implementation
            # LEARNING_RATE = param['lr']
            # EPOCHS = param['epochs']
            # optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

            # total_steps = len(train_loader) * EPOCHS
            # scheduler = get_linear_schedule_with_warmup(
            #     optimizer,
            #     num_warmup_steps=int(0.1 * total_steps),
            #     num_training_steps=total_steps
            # )

            # inner_acc = []

            # inner fold
            for inner_train_idx, inner_val_idx in inner_fold.split(X_train, y_train):
                X_inner_train, X_inner_val = X_train[inner_train_idx], X_train[inner_val_idx]

                # TODO: replace/remove based on your own implementation
            #     # Create Datasets for inner folds
            #     train_dataset = SkinCancerDataset(X_inner_train, IMAGE_DIRS, feature_extractor, transform=train_transform)
            #     val_dataset = SkinCancerDataset(X_inner_val, IMAGE_DIRS, feature_extractor, transform=val_test_transform)

            #     # Create DataLoaders for inner folds
            #     BATCH_SIZE = 128
            #     train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
            #     val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)

            #     # epoch loop (can be coded as a function to reduce redundancy)
            #     for epoch in range(EPOCHS):
            #       print(f"\nEpoch {epoch+1}/{EPOCHS}")
            #       print("-" * 10)

            #       train_loss, train_acc = train_epoch(model, train_loader, optimizer, scheduler, DEVICE)
            #       val_loss, val_acc = eval_epoch(model, val_loader, DEVICE)

            #       print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%")
            #       print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc*100:.2f}%")

            #     inner_acc.append(val_acc)

            # mean_inner_acc = np.mean(inner_acc)

            # # Update best score and parameters
            # if mean_inner_acc > best_acc:
            #     best_acc = mean_inner_acc
            #     best_params = param

        # Retrain the model with the best parameters on the entire outer training set
        # LEARNING_RATE = best_params['lr']
        # EPOCHS = best_params['epochs']
        # optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

        # total_steps = len(train_loader) * EPOCHS
        # scheduler = get_linear_schedule_with_warmup(
        #     optimizer,
        #     num_warmup_steps=int(0.1 * total_steps),
        #     num_training_steps=total_steps
        # )

        # # Create Datasets for outer folds
        # train_dataset = SkinCancerDataset(X_train, IMAGE_DIRS, feature_extractor, transform=train_transform)
        # test_dataset = SkinCancerDataset(X_test, IMAGE_DIRS, feature_extractor, transform=val_test_transform)

        # # Create DataLoaders for outer folds
        # BATCH_SIZE = 128
        # train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
        # test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)

        # best_test_acc = 0.0

        # for epoch in range(EPOCHS):
        #   print(f"\nEpoch {epoch+1}/{EPOCHS}")
        #   print("-" * 10)

        #   train_loss, train_acc = train_epoch(model, train_loader, optimizer, scheduler, DEVICE)
        #   test_loss, test_acc = eval_epoch(model, test_loader, DEVICE)

        #   print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%")
        #   print(f"Val Loss: {test_loss:.4f} | Val Acc: {test_acc*100:.2f}%")

        #   # Checkpoint
        #   if test_acc > best_test_acc:
        #       best_test_acc = test_acc
        #       torch.save(model.state_dict(), 'models/vit_skin_cancer_model.pth')
        #       print("Model checkpoint saved.")

        outer_results.append((best_params, best_test_acc))

    return outer_results