In [1]:
import os
import time
import warnings
import numpy as np
import torch
import math
import torch.nn as nn
import torch.nn.functional as F
from typing import List
import argparse
import matplotlib
import matplotlib.pyplot as plt
import datetime
import re
from torch.utils.data import DataLoader, Dataset, Subset, TensorDataset
from torch.utils.data.sampler import Sampler
from torchvision import datasets, transforms

#   Default configuration parameter settings.

In [2]:
parser = argparse.ArgumentParser(
    description='PyTorch Training for Frequency Principle')


parser.add_argument('--lr', default=1e-3, type=float, help='learning rate')
parser.add_argument('--optimizer', default='nesterov',
                    help='optimizer: sgd | adam | nesterov')
parser.add_argument('--epochs', default=100, type=int,
                    metavar='N', help='number of total epochs to run')
parser.add_argument('--training_batch_size',   default=50, type=int,
                    help='the batch size for model (default: 1000)')
parser.add_argument('--training_size',   default=5000, type=int,
                    help='the training size for model (default: 1000)')
parser.add_argument('--test_batch_size',   default=50, type=int,
                    help='the test size for model (default: 10000)')
parser.add_argument('--act_func_name', default='ReLU',
                    help='activation function')
parser.add_argument('--in_channel',   default=3, type=int,
                    help='the input channel for model (default: 3)')
parser.add_argument('--num_classes',   default=10, type=int,
                    help='the output dimension for model (default: 10)')
parser.add_argument('--device',   default='cuda', type=str,
                    help='device used to train (cpu or cuda)')
parser.add_argument('--plot_epoch',   default=1, type=int,
                    help='step size of plotting interval (default: 1000)')
parser.add_argument('--ini_path', type=str,
                    default='')
parser.add_argument('--start_filter',   default=2, type=float,
                    help='the start point of the filter (default: 2)')
parser.add_argument('--end_filter',   default=100, type=float,
                    help='the end point of the filter (default: 100)')
parser.add_argument('--num_filter',   default=20, type=int,
                    help='the point number of the filter (default: 20)')


args, _ = parser.parse_known_args()

#   The storage path of the output file.

In [3]:
def mkdirs(fn):
    """
    Create directories if they don't exist.

    Args:
    fn: The directory path to create.
    """

    if not os.path.isdir(fn):
        os.makedirs(fn)


def create_save_dir(path_ini):
    """
    Create a new directory with the current date and time as its name and return the path of the new directory.

    Args:
    path_ini: The initial path to create the new directory.

    Return:
    The path of the new directory.
    """
    subFolderName = re.sub(r'[^0-9]', '', str(datetime.datetime.now()))
    path = os.path.join(path_ini, subFolderName)
    mkdirs(path)
    mkdirs(os.path.join(path, 'output'))
    return path


args.path = create_save_dir(args.ini_path)
print('save path: %s' % (args.path))

save path: 20251013225708452580


# Generation of dataloader.

A dataloader is a utility in PyTorch that helps to load and preprocess data for machine learning models. It is used to efficiently load large datasets and feed them to the model in batches during training or inference. 

The `load_data` function defines a dataloader for the CIFAR10 dataset. The function loads the CIFAR10 dataset using the `datasets.CIFAR10` class and applies the `transform` object to the data. It creates a `Subset` object from the training dataset, which selects a subset of the data based on the `training_size` argument. 

The `DataLoader` class is then used to create dataloaders for the training and test datasets. The `train_loader` loads the training data in batches of size `training_batch_size`, while the `test_loader` loads the test data in batches of size `test_batch_size`. The `num_workers` argument specifies the number of subprocesses to use for data loading, while the `shuffle` argument specifies whether to shuffle the data before each epoch. 


This will return two dataloaders: `train_loader` and `test_loader`, which can be used to iterate over the training and test data in batches during training or inference.



In [4]:
def load_data(training_size, training_batch_size, test_batch_size):
    transform = transforms.Compose([transforms.ToTensor()])

    train_dataset = datasets.CIFAR10(root="./",
                                    train=True,
                                    transform=transform,
                                    download=True
                                    )

    test_dataset = datasets.CIFAR10(root="./",
                                    train=False,
                                    transform=transform,
                                    download=True)

    train_dataset=Subset(train_dataset,range(training_size))

    train_loader = DataLoader(train_dataset, batch_size=training_batch_size, num_workers=2, shuffle=False, drop_last=True, pin_memory=True)

    test_loader = DataLoader(dataset=test_dataset,
                                 batch_size=test_batch_size,
                                 shuffle=False, num_workers=2,drop_last=True, pin_memory=True)

    return train_loader, test_loader

In [5]:
train_loader, test_loader = load_data(
    args.training_size, args.training_batch_size, args.test_batch_size)

train_loader, test_loader=list(train_loader), list(test_loader)

Files already downloaded and verified
Files already downloaded and verified


# How to create a 1D datalodaer

In [6]:
class DealDataset(Dataset):
    def __init__(self, train_X, train_y):
        self.x_data = train_X
        self.y_data = train_y
        self.len = train_X.shape[0]

    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    def __len__(self):
        return self.len


class get_1D_data:
    def __init__(self, args):
        self.args = args

    def get_target_func(self, x):
        return torch.sin(x)+2*torch.sin(3*x)+3*torch.sin(5*x)
    
    def get_inputs(self):
        args = self.args
        bound=1

        for i in range(2):
            if isinstance(args.data_boundary[i], str):
                args.data_boundary[i]=eval(args.data_boundary[i])

        if args.input_dim == 1:
            test_inputs = torch.reshape(torch.linspace(
                bound*args.data_boundary[0], bound*args.data_boundary[1], args.test_size), [-1, 1])
            train_inputs = torch.reshape(torch.linspace(
                args.data_boundary[0], args.data_boundary[1], args.training_size), [-1, 1])
        else:
            test_inputs = (torch.rand(args.test_size, args.input_dim)
                           )*(bound*args.data_boundary[1]-bound*args.data_boundary[0])+bound*args.data_boundary[0]
            train_inputs = torch.rand(args.training_size, args.input_dim) *(args.data_boundary[1]-args.data_boundary[0])+args.data_boundary[0]
        return test_inputs, train_inputs

    def get_data(self):
        test_inputs, train_inputs = self.get_inputs()
        test_targets, train_targets = self.get_target_func(
            test_inputs), self.get_target_func(train_inputs)
        train_dataset = DealDataset(
            train_inputs, train_targets)
        test_dataset = DealDataset(
            test_inputs, test_targets)
        return train_dataset, test_dataset, test_inputs, train_inputs, test_targets, train_targets


def load_data(training_batch_size, test_batch_size, args):
    Get_data = get_1D_data(args)
    train_dataset, test_dataset, _, _, _, _ = Get_data.get_data()
    train_loader = DataLoader(dataset=train_dataset,
                                batch_size=training_batch_size,
                                shuffle=False,drop_last=True)
    test_loader = DataLoader(dataset=test_dataset,
                                batch_size=test_batch_size,
                                shuffle=False,drop_last=True)
    
    return train_loader, test_loader

# Network model

In [7]:
class My_CNN(nn.Module):
    def __init__(self, in_channels=3, num_classes: int = 10, act_layer: nn.Module = nn.ReLU()):
        super(My_CNN, self).__init__()
        self.num_classes = num_classes
        self.in_channels = in_channels
        conv_layers: List[nn.Module] = []

        conv_layers += [nn.Conv2d(self.in_channels,
                             32, kernel_size=3,stride=1,padding=0), act_layer]
        conv_layers += [nn.Conv2d(32, 64, kernel_size=3,stride=1,padding=0), act_layer]
        conv_layers += [nn.MaxPool2d(kernel_size=(2, 2))]
        self.conv = nn.Sequential(*conv_layers)
        mlp_layers: List[nn.Module] = []
        mlp_layers += [nn.Linear(14*14*64, 400), act_layer]
        mlp_layers += [nn.Linear(400, self.num_classes), nn.Softmax(dim=1)]
        self.mlp = nn.Sequential(*mlp_layers)
        self._initialize_weights()

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x = self.mlp(x)
        return x
    
    def _initialize_weights(self) -> None:
        for obj in self.modules():
            if isinstance(obj, (nn.Linear, nn.Conv2d)):
                nn.init.xavier_uniform_(obj.weight.data)


def get_act_func(act_func):
    """
    Get activation function.

    Args:
        act_func (str): activation function name.

    Returns:
        nn.Module: activation function.
    """
    if act_func == 'tanh':
        return nn.Tanh()
    elif act_func == 'ReLU':
        return nn.ReLU()
    elif act_func == 'Sigmoid':
        return nn.Sigmoid()
    else:
        raise NameError('No such act func!')


act_func = get_act_func(args.act_func_name)

model = My_CNN(args.in_channel, args.num_classes, act_func).to(args.device)
print("The network structure:")
print(model)

The network structure:
My_CNN(
  (conv): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
    (3): ReLU()
    (4): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (mlp): Sequential(
    (0): Linear(in_features=12544, out_features=400, bias=True)
    (1): ReLU()
    (2): Linear(in_features=400, out_features=10, bias=True)
    (3): Softmax(dim=1)
  )
)


In [8]:
model

My_CNN(
  (conv): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
    (3): ReLU()
    (4): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (mlp): Sequential(
    (0): Linear(in_features=12544, out_features=400, bias=True)
    (1): ReLU()
    (2): Linear(in_features=400, out_features=10, bias=True)
    (3): Softmax(dim=1)
  )
)

# One-step training function.

The training data set is denoted as  $S=\{(x_i,y_i)\}_{i=1}^n$, where $x_i\in\mathbb{R}^d$ and $y_i\in \mathbb{R}^{d'}$. For simplicity, we assume an unknown function $y$ satisfying $y(x_i)=y_i$ for $i\in[n]$. The empirical risk reads as
\begin{equation*}
    R_S(\theta)=\frac{1}{n}\sum_{i=1}^n\ell(f(x_i,\theta),y(x_i)),
\end{equation*}
where the loss function $\ell(\cdot,\cdot)$ is differentiable and the derivative of $\ell$ with respect to its first argument is denoted by $\nabla\ell(y,y^*)$. 

For a one-step gradient descent, we have, 

\begin{equation*}
    \theta_{t+1}=\theta_t-\eta\nabla R_S(\theta).
\end{equation*}

In [8]:
def train_one_step(model, optimizer, loss_fn,  train_loader, args):
    """
    It takes in a model, optimizer, loss function, resultsaver, train_loader, and args, and returns the
    average loss and accuracy for the training set

    :param model: the model we're training
    :param optimizer: the optimizer for training model
    :param loss_fn: the loss function
    :param train_loader: the training data loader
    :param args: a dictionary containing all the parameters for the training process
    :return: The average loss and the accuracy
    """
    model.train()
    train_loss = 0.0
    correct = 0
    total = 0
    device = args.device

    for inputs, targets in train_loader:
        batch_size = inputs.size(0)
        total += batch_size

        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        inputs, targets = torch.autograd.Variable(
            inputs), torch.autograd.Variable(targets)
        outputs = model(inputs)
        loss = loss_fn(torch.log(outputs), targets)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()*batch_size
        _, predicted = torch.max(outputs.data, 1)
        correct += predicted.eq(targets.data).cpu().sum().item()

    acc = 100*correct/total
    return train_loss/total, acc

One-step test function

In [9]:
def test(model, loss_fn, test_loader,  args):
    """
    It takes a model, a test_loader, a loss function, and some arguments, and returns the average loss
    and accuracy of the model on the test set.

    :param model: the model
    :param loss_fn: the loss function
    :param test_loader: a DataLoader object
    :param args: a dictionary containing all the parameters for the training process
    :return: The loss and accuracy of the model on the test set.
    """
    model.eval()
    train_loss = 0.0
    correct = 0
    total = 0
    device = args.device
    with torch.no_grad():
        for inputs, targets in test_loader:
            batch_size = inputs.size(0)
            total += batch_size
            inputs, targets = inputs.to(device), targets.to(device)
            inputs, targets = torch.autograd.Variable(
                inputs), torch.autograd.Variable(targets)
            outputs = model(inputs)
            loss = loss_fn(torch.log(outputs), targets)
            train_loss += loss.item() * batch_size
            _, predicted = torch.max(outputs.data, 1)
            correct += predicted.eq(targets.data).cpu().sum().item()
        acc = 100 * correct / total
    return train_loss / total, acc

# Function to obtain model output on the full training set

In [10]:
def val(model, training_loader,  args):
    """
    It takes a model, a test_loader, a loss function, and some arguments, and returns the average loss
    and accuracy of the model on the test set.

    :param model: the model
    :param training_loader: a DataLoader object
    :param args: a dictionary containing all the parameters for the training process
    :return: the outputs of the model on the training set
    """
    model.eval()

    y_pred =[]

    device = args.device
    with torch.no_grad():
        for inputs, targets in training_loader:

            inputs, targets = inputs.to(device), targets.to(device)
            inputs, targets = torch.autograd.Variable(
                inputs), torch.autograd.Variable(targets)

            outputs = model(inputs)

            y_pred.append(outputs)

    return torch.cat(y_pred)

# Plot the loss value.

In [11]:
def plot_loss(path, loss_train, x_log=False):
    """
    Plot loss.

    Args:
        path (str): path.
        loss_train (list): list of training loss.
        x_log (bool): whether to use log scale for x-axis.

    Returns: None.
    """
    plt.figure()
    ax = plt.gca()
    y2 = np.asarray(loss_train)
    plt.plot(y2, 'k-', label='Train')
    plt.xlabel('epoch', fontsize=18)
    ax.tick_params(labelsize=18)
    plt.yscale('log')
    if x_log == False:
        fntmp = os.path.join(path, 'loss.jpg')
    else:
        plt.xscale('log')
        fntmp = os.path.join(path, 'loss_log.jpg')
    plt.tight_layout()
    plt.savefig(fntmp, dpi=300)

    plt.close()

# Filtering
For the original dataset $\left\{\left(x_i, y_i\right)\right\}_{i=0}^{n-1}$, such as MNIST or CIFAR10. $x_i$ is an image vector, $y_i$ is a one-hot vector. The low frequency part can be derived by
$$
y_i^{\mathrm{low}, \delta}=\frac{1}{C_i} \sum_{j=0}^{n-1} y_j G^\delta\left(x_i-x_j\right),
$$
where $C_i=\sum_{j=0}^{n-1} G^\delta\left(x_i-x_j\right)$ is a normalization factor and
$$
G^\delta\left(x_i-x_j\right)=\exp \left(-\left|x_i-x_j\right|^2 /(2 \delta)\right).
$$
The high frequency part can be derived by $\boldsymbol{y}_i^{\text {high, } \delta} \triangleq \boldsymbol{y}_i-\boldsymbol{y}_i^{\text {low }, \delta}$. We also compute $\boldsymbol{h}_i^{\text {low, } \delta}$ and $\boldsymbol{h}_i^{\text {high, } \delta}$ for each DNN output $\boldsymbol{h}_i$.
Then, we can examine
$$
\begin{aligned}
& e_{\text {low }}=\left(\frac{\sum_i\left|\boldsymbol{y}_i^{\text {low }, \delta}-\boldsymbol{h}_i^{\text {low }, \delta}\right|^2}{\sum_i\left|\boldsymbol{y}_i^{\text {low }, \delta}\right|^2}\right)^{\frac{1}{2}}, \\
& e_{\text {high }}=\left(\frac{\sum_i\left|\boldsymbol{y}_i^{\text {high }, \delta}-\boldsymbol{h}_i^{\text {high }, \delta}\right|^2}{\sum_i\left|y_i^{\text {high }, \delta}\right|^2}\right)^{\frac{1}{2}}.
\end{aligned}
$$

In [12]:
def compute_distance_for_training_loader(training_loader):
    """
    Computes the pairwise Euclidean distance between all pairs of flattened images in a training loader.

    Args:
        training_loader (DataLoader): A DataLoader object containing the training data.

    Returns:
        np.ndarray: A 2D NumPy array of shape [N, N], where N is the number of images in the training set.
    """

    data_list = [data for data, _ in training_loader]

    # Concatenate the list of data into a single tensor
    data = torch.cat(data_list)
    # Reshape data into [B, C*H*W]
    flattened_images = data.view(data.shape[0], -1).numpy()

    dist = -2 * np.dot(flattened_images, flattened_images.T) + np.sum(flattened_images**2, axis=1) + \
        np.sum(flattened_images**2, axis=1)[:, np.newaxis]
    return dist


def normal_kernel(dist, filter_dict):

    """
    Computes the normalized Gaussian kernel for each filter in the filter dictionary.

    Args:
        dist (np.ndarray): A 2D NumPy array of pairwise distances between data points.
        filter_dict (list): A list of filter values to use for each kernel.

    Returns:
        list: A list of 2D NumPy arrays, where each array is a normalized Gaussian kernel for a filter in the filter dictionary.
    """

    kernel_dict = []
    for filter in filter_dict:
        kernel = np.exp(-dist / 2 / filter)
        mean = np.sum(kernel, axis=1, keepdims=True)
        kernel_dict.append(kernel/mean)
    return kernel_dict


def gauss_filiter(f_orig, kernel):

    """
    Applies a Gaussian filter to an image output.

    Args:
        f_orig (np.ndarray): A 2D NumPy array representing the model output.
        kernel (np.ndarray): A 2D NumPy array representing the Gaussian kernel.

    Returns:
        np.ndarray: A 2D NumPy array representing the filtered output.
    """

    return np.matmul(kernel, f_orig)


def get_freq_low_high(yy, kernel_dict):

    """
    Computes the low and high frequency components of the model output using a set of Gaussian filters.

    Args:
        yy (np.ndarray): NumPy array representing the model output.
        kernel_dict (list): A list of 2D NumPy arrays representing the Gaussian kernels to use for filtering.

    Returns:
        tuple: A tuple of two lists, where the first list contains the low frequency components of
                the model output and the second list contains the high frequency components of the model output.
    """
    f_low = []
    f_high = []
    for filter in range(len(kernel_dict)):
        kernel = kernel_dict[filter]
        f_new_norm = gauss_filiter(yy, kernel)
        f_low.append(f_new_norm)
        f_high_tmp = yy - f_new_norm
        f_high.append(f_high_tmp)

    return f_low, f_high 


def get_target_freq_distr(train_labels, dist, filter_start, filter_end, filter_num):
    """
    Computes the target frequency distribution of a set of training labels using a set of Gaussian filters.

    Args:
        train_labels (np.ndarray): NumPy array representing the training labels.
        dist (np.ndarray): A 2D NumPy array of pairwise Euclidean distance between all pairs of flattened images in a training loader.
        filter_start (float): The starting value of the filter range.
        filter_end (float): The ending value of the filter range.
        filter_num (int): The number of filters to use in the filter range.

    Returns:
        tuple: A tuple of four elements, where the first element is a 1D NumPy array of filter values, 
                the second element is a list of 2D NumPy arrays representing the Gaussian kernels used for filtering, 
                the third element is a list of 1D NumPy arrays representing the low frequency components of the training labels, 
                and the fourth element is a list of 1D NumPy arrays representing the high frequency components of the training labels.
    """

    filter_dict = np.linspace(
        filter_start, filter_end, num=filter_num)

    kernel_dict = normal_kernel(dist, filter_dict)
    f_low, f_high = get_freq_low_high(train_labels, kernel_dict)

    return filter_dict, kernel_dict, f_low, f_high


# Examination. 

Plot the relative error $e_{\text {low }}$ and $e_{\text {high }}$ at each training epoch.

In [13]:
def plot_diff_distr(filter, lowdiff, highdiff, args):
    """
    Plots the difference between the low and high frequency components of the predicted labels and the targets.

    Args:
        filter (float): The filter value used for filtering.
        lowdiff (list): A list of relative distances between the low frequency components of the predicted labels and targets.
        highdiff (list): A list of relative distances between the high frequency components of the predicted labels and targets.
        args (argparse.Namespace): An argparse Namespace object containing the path to save the plot.

    Returns:
        None
    """
    plt.figure(figsize=(10, 4))
    plt.title('freq with filter {:.02f}'.format(filter))
    plt.subplot(121)
    plt.plot(lowdiff, 'r-', label='low_{:.02f}'.format(filter))
    plt.plot(highdiff, 'b-', label='high_{:0.2f}'.format(filter))
    plt.legend()
    plt.xlabel('epoch')
    plt.ylabel('freq')
    plt.subplot(122)
    tmp = np.stack([lowdiff, highdiff])
    plt.pcolor(tmp, cmap='RdBu', vmin=0.1, vmax=1)
    plt.colorbar()
    plt.yticks([0.6, 1.6], ('low freq', 'high freq'), rotation='vertical')
    plt.xlabel('epoch')
    plt.savefig(args.path + '/hot_{:0.2f}.png'.format(filter))
    plt.close()

# Compute the high and low frequency distribution of targets.

In [14]:
dist = compute_distance_for_training_loader(train_loader)

target_list = [target for data, target in train_loader]

targets = torch.cat(target_list)

train_labels = F.one_hot(targets, num_classes=10).detach().cpu().numpy()

filter_dict, kernel_dict, f_low, f_high=get_target_freq_distr(train_labels, dist, args.start_filter, args.end_filter, args.num_filter)


: 

# Main

In [None]:

loss_fn = nn.NLLLoss()

if args.optimizer == 'sgd':
    optimizer = torch.optim.SGD(model.parameters(), lr=args.lr)
elif args.optimizer == 'nesterov':
    optimizer = torch.optim.SGD(
        model.parameters(), lr=args.lr, momentum=0.9, nesterov=True)
else:
    optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)

lowdiff = [[] for _ in range(len(filter_dict))]
highdiff = [[] for _ in range(len(filter_dict))]

t0 = time.time()

args.loss_training_lst = []

for epoch in range(args.epochs+1):
    model.train()
    loss, acc = train_one_step(
        model, optimizer, loss_fn, train_loader, args)
    loss_test, acc_test = test(
        model, loss_fn, test_loader, args)
    y_pred=val(model, train_loader, args)
    f_train_low, f_train_high=get_freq_low_high(y_pred.detach().cpu().numpy(), kernel_dict)

    for i in range(len(filter_dict)):
        lowdiff[i].append(np.linalg.norm(
            f_train_low[i] - f_low[i])/np.linalg.norm(f_low[i]))
        highdiff[i].append(np.linalg.norm(
            f_train_high[i] - f_high[i])/np.linalg.norm(f_high[i]))

    args.loss_training_lst.append(loss)

    if epoch % args.plot_epoch == 0:
          print("[%d] loss: %.6f, acc: %.2f, val loss: %.6f, val acc: %.2f, time: %.2f s" %
                (epoch + 1, loss, acc, loss_test, acc_test, (time.time()-t0)))

    if (epoch+1) % (args.plot_epoch) == 0:
          plot_loss(path=args.path,
                    loss_train=args.loss_training_lst, x_log=True)
          plot_loss(path=args.path,
                    loss_train=args.loss_training_lst, x_log=False)

: 

# Plot the frequency difference distribution between the predicted labels and the targets

In [None]:
for filter_index, filter in enumerate(filter_dict):
    lowdiff_ind, highdiff_ind = lowdiff[filter_index], highdiff[filter_index]
    plot_diff_distr(filter, lowdiff_ind, highdiff_ind, args)

: 