In [None]:
import torch
torch.set_printoptions(threshold=torch.inf)
import torch.nn as nn
from torchvision import transforms
from PIL import Image
import numpy as np
import random
import glob, os, re
import matplotlib.pyplot as plt, matplotlib.pylab as pylab
import pickle


In [None]:
class Network:

  def __init__(self, model_type, training, kwargs):

    self.model_type = model_type

    if training:
      hyperparameters = kwargs.get("hyperparameters")
      self.learn_rate = hyperparameters.get("learn_rate")
      self.batch_size = hyperparameters.get("batch_size")
      self.loss_func = hyperparameters.get("loss_func")
      self.reduction = hyperparameters.get("reduction")
      self.optimizer = hyperparameters.get("optimizer")
      self.lambda_L2 = hyperparameters.get("lambda_L2") 
      self.dropout_rate = hyperparameters.get("dropout_rate")
      self.total_num_correct = 0





  def setOptimizer(self):

    if self.optimizer == "adam":
        self.t = 0
        self.weight_moment_list = [[0, 0]]*self.num_layers
        self.bias_moment_list = [[0, 0]]*self.num_layers




  def inference(self, data):
    return self.forward(data, training=False)



  
  def train(self, data, target, save_params=True, epochs=None):

    epoch_plt = []
    loss_plt = []

    if not epochs: 
      epochs = data.size(dim=1)/self.batch_size



    for epoch in range(1, int(epochs+1)):

      data_batch, target_batch = self.batch(data, target)
      pred_batch = self.forward(data_batch, training=True)

      loss = getattr(self, self.loss_func)(pred_batch, target_batch)

      if self.lambda_L2:
        loss += self.L2Regularization()
      self.backprop(loss)


      epoch_plt.append(epoch)
      loss_plt.append(loss.item())
      train_accuracy = self.calc_train_accuracy(pred_batch, target_batch, epoch)
      print(f"epoch = {epoch}, loss = {loss}, train_accuracy = {train_accuracy} ")
      print(f"__________________________________________")
      
    os.makedirs('params/parametersCNN', exist_ok=True)
    os.makedirs('params/parametersMLP', exist_ok=True)
    self.saveParameters() if save_params else None

    

    return epoch_plt, loss_plt





  def calc_train_accuracy(self, pred_batch, target_batch, epoch):
    num_correct = torch.sum(torch.abs(pred_batch[:, 0:epoch] - target_batch[0:epoch]) <= 0.5)
    self.total_num_correct += num_correct 
    return (self.total_num_correct/(epoch*self.batch_size))*100






  def reduce(self, x):
    if self.reduction == "mean":
      return x.mean()
    elif self.reduction == "sum":
      return x.sum()





  def batch(self, data, target):

      batch_indicies = torch.randperm(n=data.size(dim=0))[:self.batch_size]  # stochastic

      data_batch = data[batch_indicies]
      target_batch = target.T[batch_indicies].T

      return data_batch, target_batch



  def BCELoss(self, pred_batch, target_batch):

    epsilon = 1e-8
    pred_batch = torch.clamp(pred_batch, epsilon, 1 - epsilon)
    errs = target_batch * torch.log(pred_batch) + (1 - target_batch) * torch.log(1 - pred_batch)

    bce_loss = -(1/self.batch_size)*torch.sum(errs, dim=0)  # BCE (Binary Cross Entropy) Loss
    bce_loss_reduced = self.reduce(bce_loss)

    return bce_loss_reduced




  def update(self):

    for layer in self.layers:

      if self.model_type == "CNN" and layer.is_conv_layer:
        layer.kernels -= self.learn_rate * layer.kernels.grad
        layer.biases -= self.learn_rate * layer.biases.grad

      elif self.model_type == "MLP":
        layer.weights -= self.learn_rate * layer.weights.grad
        layer.biases -= self.learn_rate * layer.biases.grad





  def adam(self, layer_index, gt, param_type, *args):

    moment_list = self.weight_moment_list if param_type == "weight" else self.bias_moment_list

    mt_1, vt_1 = moment_list[layer_index]

    """Hyperparameter"""
    beta1 = 0.9  # first moment estimate decay rate (smaller = more aggressive)
    beta2 = 0.999  # second moment estimate decay rate (smaller = more aggressive)
    epsilon = 1e-8

    mt = beta1*mt_1 + (1-beta1)*gt
    vt = beta2*vt_1 + (1-beta2)*gt**2
    mt_hat = mt/(1-beta1**self.t)
    vt_hat = vt/(1-beta2**self.t)

    moment_list[layer_index] = [mt, vt]

    adam_grad = (self.learn_rate*mt_hat)/(torch.sqrt(vt_hat) + epsilon)

    return adam_grad




  def optimizerUpdate(self):

    optimizer_func = getattr(self, self.optimizer)

    for layer in self.layers:

      if self.model_type == "CNN" and layer.is_conv_layer:
        layer_index = self.layers.index(layer)
        layer.kernels -= optimizer_func(layer_index=layer_index, gt=layer.kernels.grad, param_type="weight")
        layer.biases -= optimizer_func(layer_index=layer_index, gt=layer.biases.grad, param_type="bias")

      elif self.model_type == "MLP":
        layer_index = self.layers.index(layer)
        layer.weights -= optimizer_func(layer_index=layer_index, gt=layer.weights.grad, param_type="weight")
        layer.biases -= optimizer_func(layer_index=layer_index, gt=layer.biases.grad, param_type="bias")






  def L2Regularization(self):

      weight_sum = 0

      for layer in self.layers:
        if self.model_type == "CNN" and layer.is_conv_layer:
          weight_sum += (torch.sum(layer.kernels ** 2))
        elif self.model_type == "MLP":
          weight_sum += (torch.sum(layer.weights ** 2))


      regularization = self.lambda_L2*weight_sum

      return regularization







  def checkConfig(self, model_config):

    config_lengths = [len(v) for k, v in model_config.items()]
    all_same_length = all(config_length == config_lengths[0] for config_length in config_lengths)

    if not all_same_length:
      raise IndexError(f"{self.model_type} Configuration Error. Recheck sizes of configuration objects")






  def printLayers(self):
    for layer in self.layers:
      print(layer)

    if self.model_type == "CNN":
      for layer in self.MLP.layers:
        print(layer)




  def zerograd(self):

    for layer in self.layers:

      if self.model_type == "CNN" and layer.is_conv_layer:
        layer.kernels.grad = None
        layer.biases.grad = None

      elif self.model_type == "MLP":
        layer.weights.grad = None
        layer.biases.grad = None





In [None]:
class Layer:


  @staticmethod
  def reLU(k):
    return k * (k > 0)

  @staticmethod
  def leakyReLU(k):
    alpha = 0.01
    k[k < 0] *= alpha
    return k

  @staticmethod
  def sigmoid(k):
    return 1/(1 + torch.exp(-k))



  @staticmethod
  def softmax(k):
    k = k - torch.max(k)  # Subtract the max value from the logits to avoid overflow
    exp_k = torch.exp(k)
    return exp_k / torch.sum(exp_k)



  @staticmethod
  def none(k):
    return k


  def activate(self, z, nonlinearity):
    return getattr(self, nonlinearity)(z)






In [None]:

class FullyConnectedLayer(Layer):

  def __init__(self, pretrained, device_type, **kwargs):

    self.nonlinearity = kwargs.get("nonlinearity")
    self.index = int(kwargs.get("index"))


    self.device_type = device_type


    if not pretrained:

      input_count = kwargs.get("input_count")
      neuron_count = kwargs.get("neuron_count")

      ##### Initialize weights
      # self.weights = torch.rand(size=(neuron_count, input_count), dtype=torch.float32)  # Random Initialization

      # stddev = np.sqrt(2 / (input_count + neuron_count))
      # self.weights = torch.normal(0, stddev, size=(neuron_count, input_count), dtype=torch.float32, device=self.device_type)  # Xavier Initialization

      stddev = np.sqrt(2 / input_count)
      self.weights = torch.normal(0, stddev, size=(neuron_count, input_count), dtype=torch.float32)  # He Initialization

      self.biases = torch.zeros(size=(neuron_count, 1), dtype=torch.float32, device=self.device_type)

    else:
      self.weights = kwargs.get("pretrained_weights").to(device=self.device_type)
      self.biases = kwargs.get("pretrained_biases").to(device=self.device_type)



    self.weights.requires_grad_()
    self.biases.requires_grad_()





  def __repr__(self):
    return (f"__________________________________________\n"
            f"MLP Layer {self.index}\nWeights:\n{self.weights}\nBiases:\n{self.biases}\nActivation:\n{self.nonlinearity}\n"
            f"__________________________________________")



  def feed(self, x):

    z = torch.matmul(self.weights, x) + self.biases

    # ####### TESTING THIS ############################
    if x.size(dim=1) > 1:
      bn1 = nn.BatchNorm1d(num_features=self.weights.size(dim=0), dtype=torch.float32, device=self.device_type)
      z = bn1(z.T).T
    # ####### TESTING THIS ############################


    activations = self.activate(z, self.nonlinearity)


    return activations














In [None]:

class ConvolutionalLayer(Layer):


  def __init__(self, pretrained, is_conv_layer, device_type, **kwargs):

    self.is_conv_layer = is_conv_layer
    self.padding = 0
    self.nonlinearity = kwargs.get("nonlinearity")
    self.index = int(kwargs.get("index"))
    self.kernel_stride = int(kwargs.get("kernel_stride"))


    self.device_type = device_type



    if not pretrained:
      self.filter_count = kwargs.get("filter_count")
      self.kernel_height, self.kernel_width = kwargs.get("kernel_shape")

      # Random Initilization
      self.kernels = torch.rand(size=(1, self.filter_count, self.kernel_height, self.kernel_width), dtype=torch.float32, device=self.device_type) if is_conv_layer else torch.empty(size=(1, self.filter_count, self.kernel_height, self.kernel_width), dtype=torch.float32, device=self.device_type)
      self.biases = torch.rand(size=(1, self.filter_count, 1), dtype=torch.float32, device=self.device_type) if is_conv_layer else None

    else:
      self.kernels = kwargs.get("pretrained_kernels")
      self.biases = kwargs.get("pretrained_biases")
      self.filter_count, self.kernel_height, self.kernel_width = self.kernels.size()[-3:]


    if is_conv_layer:
      self.kernels.requires_grad_()
      self.biases.requires_grad_()









  def __repr__(self):

    return (f"__________________________________________\n"
            f"CNN Layer {self.index}\nKernels:\n{self.kernels}\nKernel Size: {self.kernels.size()}\nBiases:\n{self.biases}\nBias Size: {self.biases.size() if self.biases is not None else None}\nActivation: {self.nonlinearity}\n"
            f"__________________________________________")





  def traverse(self, imgs, func):

    if imgs.ndim == 3:
      imgs = imgs.unsqueeze(dim=1)

    img_batch_size, img_channel_count, img_height, img_width = imgs.size()

    img_slices_stack = imgs.unfold(dimension=2, size=self.kernel_height, step=self.kernel_stride).unfold(dimension=3, size=self.kernel_width, step=self.kernel_stride).reshape(img_batch_size, img_channel_count, -1, self.kernel_height, self.kernel_width)
    img_slices_stack = img_slices_stack.to(self.device_type)

    result = func(img_slices_stack)

    feature_map_rows = int(((img_height - self.kernel_height + 2*self.padding)/self.kernel_stride) + 1)
    feature_map_cols = int(((img_width - self.kernel_width + 2*self.padding)/self.kernel_stride) + 1)
    feature_map = result.reshape(img_batch_size, self.filter_count, feature_map_rows, feature_map_cols)




    if self.is_conv_layer:

      # ####### TESTING THIS ############################
      # if img_batch_size > 1:
      bn2 = nn.BatchNorm2d(num_features=self.filter_count, dtype=torch.float32, device=self.device_type)
      feature_map = bn2(feature_map)
      # ####### TESTING THIS ############################

      feature_map = self.activate(feature_map, self.nonlinearity)

    feature_map = feature_map.to(self.device_type)


    return feature_map




  def convolve(self, x):

    def f(img_slices_stack):

      result = torch.einsum('bcshw,bfhw->bfshw', img_slices_stack, self.kernels)
      result = torch.sum(result, dim=(3, 4)) + self.biases

      return result

    return self.traverse(x, func=f)



  def maxpool(self, x):

    def f(img_slices_stack):

      result = img_slices_stack.max(dim=3)[0].max(dim=3)[0]

      return result

    return self.traverse(x, func=f)











In [None]:


class MLP(Network):

  def __init__(self, pretrained, training, device_type, **kwargs):

    super().__init__(model_type="MLP", training=training, kwargs=kwargs)

    self.device_type = device_type

    if not pretrained:
      mlp_model_config = kwargs.get("mlp_model_config")
      self.checkConfig(model_config=mlp_model_config)
      self.layers = self.buildLayers(mlp_model_config=mlp_model_config, input_feature_count=kwargs.get("input_feature_count"))
    else:
      self.layers = self.loadLayers(mlp_model_params=kwargs.get("mlp_model_params"))

    self.num_layers = len(self.layers)

    if training and self.optimizer:
      self.setOptimizer()






  def loadLayers(self, mlp_model_params):
    layers = [FullyConnectedLayer(pretrained=True, device_type=self.device_type, pretrained_weights=weights, pretrained_biases=biases, nonlinearity=nonlinearity, index=index) for (weights, biases, nonlinearity, index) in mlp_model_params.values()]
    return layers


  def buildLayers(self, mlp_model_config, input_feature_count):

    neuron_counts = mlp_model_config.get("neuron_counts")
    activation_functions = mlp_model_config.get("MLP_activation_functions")

    neuron_counts.insert(0, input_feature_count)

    layers = [FullyConnectedLayer(pretrained=False, device_type=self.device_type, input_count=neuron_counts[i], neuron_count=neuron_counts[i+1], nonlinearity=activation_functions[i], index=i+2) for i in range(len(neuron_counts)-1)]

    return layers


  def saveParameters(self):
    for layer in self.layers:
      layer.index = "0" + str(layer.index) if layer.index < 10 else layer.index
      torch.save(layer.weights, f"params/parametersMLP/layer_{layer.index}_weights_{layer.nonlinearity}.pth")
      torch.save(layer.biases, f"params/parametersMLP/layer_{layer.index}_biases_{layer.nonlinearity}.pth")





  def forward(self, curr_input, training):
    for layer in self.layers:

      curr_input = layer.feed(curr_input)

      if training and self.dropout_rate and layer != self.layers[-1]:
        curr_input = self.dropout(curr_input)

    return curr_input




  def dropout(self, curr_input):

    drop_count = int(self.dropout_rate * curr_input.numel())
    dropout_row_indicies = torch.randint(low=0, high=curr_input.size(dim=0), size=(drop_count,))
    dropout_col_indicies = torch.randint(low=0, high=curr_input.size(dim=1), size=(drop_count,))

    curr_input[dropout_row_indicies, dropout_col_indicies] = 0

    return curr_input






  def backprop(self, loss):

    self.zerograd()

    loss.backward()

    with torch.no_grad():

      if not self.optimizer:
        for layer in self.layers:
          self.update(layer)
      else:
        self.t += 1
        for layer in self.layers:
          self.optimizerUpdate(layer)





In [None]:

class CNN(Network):

  def __init__(self, pretrained, device_type, training, **kwargs):

    super().__init__(model_type="CNN", training=training, kwargs=kwargs)

    self.device_type = device_type

    if not pretrained:
      cnn_model_config = kwargs.get("cnn_model_config")
      self.checkConfig(model_config=cnn_model_config)
      self.layers = self.buildLayers(cnn_model_config=cnn_model_config)

      MLP_input_feature_count = self.calcMLPInputSize(kwargs.get("input_data_dim"))
      self.MLP = MLP(pretrained=False, device_type=self.device_type, training=training, input_feature_count=MLP_input_feature_count, mlp_model_config=kwargs.get("mlp_model_config"), hyperparameters=kwargs.get("mlp_hyperparameters"))
    else:
      self.layers = self.loadLayers(kwargs.get("cnn_model_params"))
      self.MLP = MLP(pretrained=True, device_type=self.device_type, training=training, mlp_model_params=kwargs.get("mlp_model_params"), hyperparameters=kwargs.get("mlp_hyperparameters"))


    self.num_layers = len(self.layers)

    if training and self.optimizer:
      self.setOptimizer()





  def loadLayers(self, cnn_model_params):

    layers = [ConvolutionalLayer(pretrained=True, device_type=self.device_type, is_conv_layer=(is_conv_layer == "True"), pretrained_kernels=kernels, pretrained_biases=biases, nonlinearity=nonlinearity, kernel_stride=stride, index=index) for (is_conv_layer, kernels, biases, nonlinearity, stride, index) in cnn_model_params.values()]

    return layers


  def buildLayers(self, cnn_model_config):

    is_conv_layer = cnn_model_config.get("is_conv_layer")
    filter_counts = cnn_model_config.get("filter_counts")
    kernel_shapes = cnn_model_config.get("kernel_shapes")
    kernel_strides = cnn_model_config.get("kernel_strides")
    activation_functions = cnn_model_config.get("CNN_activation_functions")
    num_layers = len(is_conv_layer)

    layers = [ConvolutionalLayer(pretrained=False, device_type=self.device_type, is_conv_layer=is_conv_layer[i], filter_count=filter_counts[i], kernel_shape=kernel_shapes[i], kernel_stride=kernel_strides[i], nonlinearity=activation_functions[i], index=i+1) for i in range(num_layers)]

    return layers



  def saveParameters(self):
    for layer in self.layers:
      layer.index = "0" + str(layer.index) if layer.index < 10 else layer.index
      torch.save(layer.kernels, f"params/parametersCNN/cnn_layer_{layer.index}_kernels_{layer.nonlinearity}_{layer.is_conv_layer}_{layer.kernel_stride}.pth")
      torch.save(layer.biases, f"params/parametersCNN/cnn_layer_{layer.index}_biases_{layer.nonlinearity}_{layer.is_conv_layer}_{layer.kernel_stride}.pth")

    self.MLP.saveParameters()



  def calcMLPInputSize(self, input_data_dim):

    print("calculating MLP input size.......")

    input_data_dim = (1, ) + input_data_dim

    dummy_data = torch.empty(size=input_data_dim, device=self.device_type)
    dummy_MLP_input = self.forward(dummy_data, training=True, dummy=True)
    dummy_MLP_input_feature_count = dummy_MLP_input.size(dim=0)

    return dummy_MLP_input_feature_count






  def forward(self, curr_input, training, dummy=False):

    for layer in self.layers:
      if layer.is_conv_layer:

        curr_input = layer.convolve(curr_input)

      else:
        curr_input = layer.maxpool(curr_input)


    curr_input_batch_size = curr_input.size(dim=0)
    flattened_feature_map = curr_input.view(curr_input_batch_size, -1).to(torch.float32).T

    if dummy:
      return flattened_feature_map
    else:
      return self.MLP.forward(flattened_feature_map, training=training)





  def backprop(self, loss):

    self.zerograd()
    self.MLP.zerograd()


    loss.backward()

    with torch.no_grad():

      if not self.optimizer:
        self.update()
      else:
        self.t += 1
        self.optimizerUpdate()


      if not self.MLP.optimizer:
        self.MLP.update()
      else:
        self.MLP.t += 1
        self.MLP.optimizerUpdate()



In [None]:

def calculate_accuracy(dataset_size, img_batch, label_batch, prediction_batch, display_results=True):


  if dataset_size == 1:

    predicted_animal = "dog" if prediction_batch.item() >= 0.5 else "cat"
    animal_label = "dog" if label_batch.item() == 1 else "cat"

    # print(f"prediction = {predicted_animal}")
    # print(f"label      = {animal_label}")

    img_batch = img_batch.cpu()

    plt.imshow(img_batch.squeeze(), cmap='gray')
    plt.title(f'Prediction: {predicted_animal}, Label: {animal_label}')
    plt.show()



  else:

    predictions = ["dog" if pet >= 0.5 else "cat" for pet in prediction_batch[0].tolist()]
    labels = ["dog" if pet_label == 1 else "cat" for pet_label in label_batch.tolist()]
    
    # print(f"predictions = {predictions}")
    # print(f"labels      = {labels}")

    num_correct = torch.sum(torch.abs(prediction_batch - label_batch) <= 0.5)
    print(f"number correct = {num_correct.item()}/{dataset_size}")

    percent_correct = (num_correct/dataset_size)*100
    print(f"accuracy = {percent_correct.item()}%\n\n")

    if display_results:
      for (img, pred, lbl) in zip(img_batch, predictions, labels):
        img = img.cpu()

        plt.imshow(img.squeeze(), cmap='gray') # grey images
        plt.title(f'Prediction: {pred}, Label: {lbl}')

        plt.show(block=False)
        plt.close()







def plotTrainingResults(epoch_plt, loss_plt):

  epoch_plt = torch.tensor(epoch_plt)
  loss_plt = torch.tensor(loss_plt)
  print(f"mean loss = {loss_plt.mean()}")

  plt.figure(1)
  marker_size = 1
  f = plt.scatter(epoch_plt[:], loss_plt[:], s=marker_size)
  plt.xlabel("epoch")
  plt.ylabel("loss")
  plt.title(f"mean loss = {loss_plt.mean()}")
  plt.grid()

  z = np.polyfit(epoch_plt, loss_plt, 5)
  p = np.poly1d(z)
  pylab.plot(epoch_plt, p(epoch_plt), "r--")

  plt.savefig('lossPlot.pdf')

  plt.show()


def fetchMLPParametersFromFile(device_type, directory):

  modelParams = {}

  weight_pattern = "layer_*_weights_*.pth"  # Pattern to match
  weight_files = glob.glob(os.path.join(directory, weight_pattern))
  weight_files.sort()

  bias_pattern = "layer_*_biases_*.pth"  # Pattern to match
  bias_files = glob.glob(os.path.join(directory, bias_pattern))
  bias_files.sort()


  for (w_file, b_file) in zip(weight_files, bias_files):

    weights = torch.load(w_file, map_location=device_type, weights_only=True)
    biases = torch.load(b_file, map_location=device_type, weights_only=True)

    regex_pattern = r"layer_(\d+)_weights_(.*?)\.pth"
    match = re.search(regex_pattern, w_file)

    index = match.group(1)
    activation = match.group(2)

    modelParams.update({f"Layer {index}": [weights, biases, activation, index] })

  return modelParams



def fetchCNNParametersFromFile(device_type, directory):

  modelParams = {}

  kernel_pattern = "cnn_layer_*_kernels_*_*_*.pth"  # Pattern to match
  kernel_files = glob.glob(os.path.join(directory, kernel_pattern))
  kernel_files.sort()

  bias_pattern = "cnn_layer_*_biases_*_*_*.pth"  # Pattern to match
  bias_files = glob.glob(os.path.join(directory, bias_pattern))
  bias_files.sort()


  for (k_file, b_file) in zip(kernel_files, bias_files):

    kernels = torch.load(k_file, map_location=device_type, weights_only=True)
    biases = torch.load(b_file, map_location=device_type, weights_only=True)

    regex_pattern = r"cnn_layer_(\d+)_kernels_(\w+)_([\w]+)_(\d+)\.pth"

    match = re.search(regex_pattern, k_file)

    index = match.group(1)
    activation = match.group(2)
    is_conv = match.group(3)
    stride = match.group(4)

    modelParams.update({f"CNN Layer {index}": [is_conv, kernels, biases, activation, stride, index] })

  return modelParams




def select_random_file(label, transform, use):

  folder_path = f"{use}dataset/{label}sfolder"

  files = [
    f for f in os.listdir(folder_path)
    if os.path.isfile(os.path.join(folder_path, f)) 
    and not f.startswith('.')
    and not f.startswith('._')
  ]

  # Select a random file
  random_file = random.choice(files)
  image_path = os.path.join(folder_path, random_file)

  image = Image.open(image_path)

  image_tensor = transform(image)
  return image_tensor



def genEE364PetImageStack(dataset_size, img_height, img_width, color_channels, use, device_type):

  # Create dataset transformation
  transformations = [
    transforms.ToTensor(),
    transforms.Resize((img_height, img_width)),
    # transforms.Normalize(mean=[0.5], std=[0.5])
  ]

  if color_channels == 1:
    transformations.append(transforms.Grayscale(num_output_channels=1))
    transform = transforms.Compose(transformations)

  # Preallocate tensors for the entire dataset
  data_tensors = torch.zeros((dataset_size, color_channels, img_height, img_width), device=device_type)  # Assuming RGB images
  target_tensors = torch.zeros(dataset_size, device=device_type)

  for n in range(dataset_size):
    # print(f"preparing data.... {((n/dataset_size)*100):.2f}%") if n % 20 == 0 else None
    # Generate random index and select seed

    label_int = random.randint(0, 1)
    label = "dog" if label_int == 1 else "cat"

    image_tensor = select_random_file(label, transform, use)

    # Insert into preallocated tensor
    data_tensors[n] = image_tensor
    target_tensors[n] = label_int

  return data_tensors, target_tensors



In [None]:
def pth_to_pkl(source_folder, target_folder):

    os.makedirs(target_folder, exist_ok=True)

    for filename in os.listdir(source_folder):
        source_path = os.path.join(source_folder, filename)
        
        if os.path.isfile(source_path):

            data = torch.load(source_path)
            
            target_filename = f"{os.path.splitext(filename)[0]}.pkl"
            target_path = os.path.join(target_folder, target_filename)
            
            with open(target_path, 'wb') as f:
                pickle.dump(data, f)
            
            print(f"Converted {filename} to {target_filename}")
            
