# Fourier Transform using neural networks

-   First we generated a dataset of 2000 input functions of various types (2nd cell)
-   $80:20$ split of train-test data
-   Every function is defined on the interval $(-1, 1)$. Number of points chosen $N = 512$
-   The following neural networks are chosen
    -   Layers: 3, Activation: RELU
    -   Layers: 4, Activation: RELU
    -   Layers: 3, Activation: Tanh
    -   Layers: 4, Activation: Tanh
    -   Layers: 3, Activation: Sigmoid
    -   Layers: 4, Activation: Sigmoid
-   Training and Test Losses are calculated in the end

In [26]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import legendre, chebyt, jv, hermite, eval_laguerre
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import sys
import time

In [3]:
function_types = [
    "fourier",          # sum of sinusoids
    "poly",             # standard polynomial
    "gaussian",         # single Gaussian
    "gaussian_mixture", # sum of Gaussians
    "damped_sine",      # exponentially damped sinusoid
    "exp_decay",        # exponential decay
    "piecewise",        # piecewise linear/quadratic
    "trig_combo",       # combination of sin and cos
    "legendre",         # Legendre polynomials
    "chebyshev",        # Chebyshev polynomials (1st kind)
    "bessel",           # Bessel function of first kind
    "hermite",          # Hermite polynomials
    "laguerre",         # Laguerre polynomials
    "windowed_sine",    # sinusoid multiplied by Gaussian
    "rect_pulse",       # rectangular pulse
    "sawtooth",         # sawtooth wave
    "triangle",         # triangle wave
    "modulated",        # product of sinusoids (beats)
    "chirp",            # frequency-increasing sinusoid
    "spikes",           # sparse impulses
    "wavelet"           # Mexican hat wavelet
]

In [4]:
def make_function(x, kind="random", max_freq=10):
    """
    Generate a diverse set of functions for Fourier testing.

    Parameters:
        x        : array of input points
        kind     : type of function to generate; if 'random', one is picked randomly
        max_freq : maximum frequency for Fourier-type functions

    Returns:
        f        : array of function values
    """
    
    if kind == "random":
        kind = np.random.choice(function_types)
    
    # ----------------- Standard types -----------------
    if kind == "fourier":
        coeffs = np.random.randn(max_freq)
        f = np.zeros_like(x, dtype=float)
        for n, a in enumerate(coeffs, start=1):
            f += a * np.sin(np.pi * n * x)
        return f

    elif kind == "poly":
        coeffs = np.random.randn(5)
        return sum(c * x**i for i, c in enumerate(coeffs))

    elif kind == "gaussian":
        mu, sigma = np.random.uniform(-0.5, 0.5), np.random.uniform(0.05, 0.5)
        return np.exp(-((x - mu) ** 2) / (2 * sigma ** 2))

    elif kind == "gaussian_mixture":
        num_gaussians = np.random.randint(2, 4)
        f = np.zeros_like(x)
        for _ in range(num_gaussians):
            mu, sigma, amp = np.random.uniform(-0.5,0.5), np.random.uniform(0.05,0.3), np.random.uniform(0.5,2.0)
            f += amp * np.exp(-((x - mu)**2) / (2 * sigma**2))
        return f

    elif kind == "damped_sine":
        freq = np.random.uniform(1, max_freq)
        decay = np.random.uniform(0.5, 2.0)
        phase = np.random.uniform(0, 2*np.pi)
        return np.exp(-decay * np.abs(x)) * np.sin(2 * np.pi * freq * x + phase)

    elif kind == "exp_decay":
        lam = np.random.uniform(0.5, 2.0)
        return np.exp(-lam * np.abs(x))

    elif kind == "piecewise":
        split = np.random.uniform(x[0], x[-1])
        return np.piecewise(x, [x < split, x >= split],
                            [lambda t: t**2, lambda t: -t + split])

    elif kind == "trig_combo":
        f = np.zeros_like(x)
        num_terms = np.random.randint(2, 5)
        for _ in range(num_terms):
            amp = np.random.uniform(0.5, 2.0)
            freq = np.random.randint(1, max_freq)
            phase = np.random.uniform(0, 2*np.pi)
            f += amp * (np.sin(2*np.pi*freq*x + phase) + np.cos(2*np.pi*freq*x + phase))
        return f

    # ----------------- Special polynomials -----------------
    elif kind == "legendre":
        deg = np.random.randint(1, 6)
        P = legendre(deg)
        return P(x)

    elif kind == "chebyshev":
        deg = np.random.randint(1, 6)
        T = chebyt(deg)
        return T(x)

    elif kind == "bessel":
        order = np.random.randint(0, 6)
        k = np.random.uniform(1, 10)
        return jv(order, k * x)

    elif kind == "hermite":
        deg = np.random.randint(1,5)
        H = hermite(deg)
        return H(x)

    elif kind == "laguerre":
        deg = np.random.randint(1,5)
        return eval_laguerre(deg, np.abs(x))  # Laguerre defined on [0,∞)

    # ----------------- Windowed / localized functions -----------------
    elif kind == "windowed_sine":
        freq = np.random.uniform(1, max_freq)
        alpha = np.random.uniform(1,5)
        return np.sin(2*np.pi*freq*x) * np.exp(-alpha*x**2)

    elif kind == "rect_pulse":
        start, end = np.random.uniform(-0.5, 0), np.random.uniform(0,0.5)
        return np.where((x>=start) & (x<=end), 1.0, 0.0)

    elif kind == "sawtooth":
        return 2*(x - np.floor(x + 0.5))  # normalized sawtooth

    elif kind == "triangle":
        return 2*np.abs(2*(x - np.floor(x + 0.5))) - 1

    elif kind == "modulated":
        f1 = np.sin(5*np.pi*x)
        f2 = np.cos(2*np.pi*x)
        return f1*f2

    elif kind == "chirp":
        return np.sin(2*np.pi*(x + x**2))

    elif kind == "spikes":
        f = np.zeros_like(x)
        num_spikes = np.random.randint(3,8)
        indices = np.random.choice(len(x), num_spikes, replace=False)
        f[indices] = np.random.uniform(1,3, size=num_spikes)
        return f

    elif kind == "wavelet":
        return (1 - x**2) * np.exp(-x**2 / 2)  # Mexican hat

    else:
        raise ValueError(f"Unknown function type '{kind}'")

In [5]:
def load_data(function_name, N):
    # Load CSV (skip header)
    table = np.loadtxt(function_name, delimiter=",", skiprows=2)
    
    num_samples = 2500
    
    # Split columns
    f_flat = table[:,0]
    Re_flat = table[:,1]
    Im_flat = table[:,2]
    
    # Reshape into original shapes
    f_data = f_flat.reshape(num_samples, N)
    F_data = np.stack([Re_flat.reshape(num_samples, N),
                       Im_flat.reshape(num_samples, N)], axis=-1)  # shape (num_samples, N, 2)
    
    return f_data, F_data

In [15]:
def data_loaders(f_data, F_data):
    train_frac = 0.8
    split_idx = int(train_frac * len(f_data))
    
    f_train, f_test = f_data[:split_idx], f_data[split_idx:]
    F_train, F_test = F_data[:split_idx], F_data[split_idx:]
    
    # Create DataLoaders
    batch_size = 32
    train_ds = TensorDataset(torch.tensor(f_train, dtype = torch.float32), torch.tensor(F_train, dtype = torch.float32))
    test_ds = TensorDataset(torch.tensor(f_test, dtype = torch.float32), torch.tensor(F_test, dtype = torch.float32))
    train_loader = DataLoader(train_ds, batch_size = batch_size, shuffle = True)
    test_loader = DataLoader(test_ds, batch_size = batch_size)

    return train_ds, test_ds, train_loader, test_loader

In [16]:
class FourierNet(nn.Module):
    def __init__(self, N, layers=3, activation='ReLU', prelu_alpha=0.01):
        """
        N           : input/output size
        layers      : number of hidden layers
        activation  : 'ReLU', 'LeakyReLU', 'PReLU', 'Sigmoid', 'Tanh'
        prelu_alpha : negative slope for PReLU (if used)
        """
        super().__init__()
        self.N = N
        self.layers = layers
        self.activation = activation
        self.prelu_alpha = prelu_alpha

        self.model = self._build_model()
        self.name = f"FourierNet(Layers: {self.layers}, Activation: {self.activation})"

    def _build_model(self):
        layers = []
        in_features = self.N

        # First layer
        layers.append(nn.Linear(in_features, in_features*2))

        for _ in range(self.layers - 1):
            # Add activation
            if self.activation == 'ReLU':
                layers.append(nn.ReLU())
            elif self.activation == 'Tanh':
                layers.append(nn.Tanh())
            elif self.activation == 'Sigmoid':
                layers.append(nn.Sigmoid())
            elif self.activation == 'LeakyReLU':
                layers.append(nn.LeakyReLU(negative_slope=0.01))
            elif self.activation == 'PReLU':
                layers.append(nn.PReLU(init=self.prelu_alpha))
            else:
                raise ValueError(f"Unknown activation: {self.activation}")

            # Add linear layer
            layers.append(nn.Linear(in_features*2, in_features*2))
            # Note: we keep the layer size constant (in_features*2)

        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.model(x)
        return out.view(-1, self.N, 2)  # reshape to (batch, N, 2)


In [42]:
# Parameters to sweep
Ns = [256, 512, 1024]
layers_list = [2, 3, 4, 5]
activations = ['ReLU', 'LeakyReLU', 'PReLU', 'Sigmoid', 'Tanh']

# Initialize dictionaries
models = {}
metrics = {}

# Loop through all combinations
for N in Ns:
    for num_layers in layers_list:
        for act in activations:
            # Create a descriptive name
            model_name = f"N{N}_L{num_layers}_{act}"
            
            # Instantiate the FourierNet
            model = FourierNet(N=N, layers=num_layers, activation=act)
            
            # Store in dictionaries
            models[model_name] = model
            metrics[model_name] = {"Train Loss": None, "Test Loss": None, "Training Time": None, "Epochs": None}

# Check number of models
print(f"Total models created: {len(models)}")


Total models created: 60


In [41]:
def load_csv_dataset(filename, N):
    """
    Reads a CSV file saved in the format:
    First row: header containing N and domain (ignored here, but could parse)
    Remaining rows: columns: f(x), Re, Im
    Returns:
        f_data: shape (num_samples, N)
        F_data: shape (num_samples, N, 2)
    """
    # Load table skipping first row (header)
    table = np.loadtxt(filename, delimiter=",", skiprows=2)
    
    f_flat = table[:,0]
    Re_flat = table[:,1]
    Im_flat = table[:,2]

    num_samples = 2500
    f_data = f_flat.reshape(num_samples, N)
    F_data = np.stack([Re_flat.reshape(num_samples, N),
                       Im_flat.reshape(num_samples, N)], axis=-1)
    
    return f_data, F_data

# --- Load all datasets ---
f_data_256, F_data_256 = load_csv_dataset("datasets/discrete_256.csv", 256)
f_data_512, F_data_512 = load_csv_dataset("datasets/discrete_512.csv", 512)
f_data_1024, F_data_1024 = load_csv_dataset("datasets/discrete_1024.csv", 1024)

In [50]:
# EarlyStopping helper
class EarlyStopping:
    def __init__(self, patience=5, min_delta=1e-4):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = float("inf")
        self.counter = 0
        self.should_stop = False

    def __call__(self, val_loss):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.should_stop = True

# Device
device = "cuda" if torch.cuda.is_available() else "cpu"

# Example dictionaries containing datasets
datasets = {
    256: (f_data_256, F_data_256),
    512: (f_data_512, F_data_512),
    1024: (f_data_1024, F_data_1024)
}

patience_vals = {
    256: 5,
    512: 10,
    1024: 20
}

# Hyperparameters
batch_size = 32
epochs = 30
learning_rate = 1e-3
i = 0

best_model_name = None
lowest_test_loss = float('inf')
best_model_state = None
start_time = time.time()
model_num = -1

for N in [256, 512, 1024]:
    f_data, F_data = datasets[N]
    epochs += 10 * i
    train_ds, test_ds, train_loader, test_loader = data_loaders(f_data, F_data)
    j = 0
    
    for layers in [2, 3, 4, 5]:
        epochs += 10 * j
        for act in ['ReLU', 'LeakyReLU', 'PReLU', 'Sigmoid', 'Tanh']:
            model_num += 1
            model_name = f"N{N}_L{layers}_{act}"
            model = models[model_name].to(device)
            optimizer = optim.Adam(model.parameters(), lr=learning_rate)
            criterion = nn.MSELoss()
            
            early_stopper = EarlyStopping(patience=patience_vals[N], min_delta=1e-4)
            model_start_time = time.time()
            epoch = 0
            for epoch in range(epochs):
                model.train()
                total_loss = 0
                for xb, yb in train_loader:
                    xb, yb = xb.to(device), yb.to(device)
                    optimizer.zero_grad()
                    pred = model(xb)
                    loss = criterion(pred, yb)
                    loss.backward()
                    optimizer.step()
                    total_loss += loss.item() * xb.size(0)
                avg_loss = total_loss / len(train_loader.dataset)
                
                # Compute test/validation loss
                model.eval()
                total_test_loss = 0
                with torch.no_grad():
                    for xb, yb in test_loader:
                        xb, yb = xb.to(device), yb.to(device)
                        pred = model(xb)
                        loss = criterion(pred, yb)
                        total_test_loss += loss.item() * xb.size(0)
                test_loss = total_test_loss / len(test_loader.dataset)
                
                # Early stopping check
                early_stopper(test_loss)
                if early_stopper.should_stop:
                    print(f"\nEarly stopping triggered at epoch {epoch+1} for {model_name}")
                    break
                
                elapsed = time.time() - start_time
                mins, secs = divmod(int(elapsed), 60)
                print(f"\rModel {model_num}, Epoch {epoch+1}/{epochs}, "
                      f"Train Loss={avg_loss:.6f}, Test Loss={test_loss:.6f}, "
                      f"Elapsed: {mins}m {secs}s", end='', flush=True)
            
            # Record training time
            train_time = time.time() - model_start_time
            mins, secs = divmod(int(train_time), 60)
            print()
            
            # Save metrics
            metrics[model_name]["Train Loss"] = avg_loss
            metrics[model_name]["Test Loss"] = test_loss
            metrics[model_name]["Training Time"] = train_time
            metrics[model_name]["Epochs"] = epoch + 1
            
            # Update best model
            if test_loss < lowest_test_loss:
                lowest_test_loss = test_loss
                best_model_name = model_name
                best_model_state = model.state_dict()
            
            print(f"Finished training {model_name}, Test Loss={test_loss:.6f}, Training Time = {mins}m {secs}s, Epochs = {epoch + 1}\n")
        j += 1
    i += 1

print(f"Best model: {best_model_name}, Test Loss = {lowest_test_loss:.6f}")


Model 0, Epoch 5/30, Train Loss=0.003776, Test Loss=0.003714, Elapsed: 0m 1s
Early stopping triggered at epoch 6 for N256_L2_ReLU

Finished training N256_L2_ReLU, Test Loss=0.001295, Training Time = 0m 1s, Epochs = 6

Model 1, Epoch 15/30, Train Loss=0.000109, Test Loss=0.000381, Elapsed: 0m 4s
Early stopping triggered at epoch 16 for N256_L2_LeakyReLU

Finished training N256_L2_LeakyReLU, Test Loss=0.000312, Training Time = 0m 3s, Epochs = 16

Model 2, Epoch 9/30, Train Loss=0.000105, Test Loss=0.000251, Elapsed: 0m 7s
Early stopping triggered at epoch 10 for N256_L2_PReLU

Finished training N256_L2_PReLU, Test Loss=0.000253, Training Time = 0m 2s, Epochs = 10

Model 3, Epoch 5/30, Train Loss=0.000100, Test Loss=0.000155, Elapsed: 0m 8s
Early stopping triggered at epoch 6 for N256_L2_Sigmoid

Finished training N256_L2_Sigmoid, Test Loss=0.000186, Training Time = 0m 1s, Epochs = 6

Model 4, Epoch 9/30, Train Loss=0.000072, Test Loss=0.000148, Elapsed: 0m 10s
Early stopping triggered at

In [51]:
for name, model in models.items():
    torch.save(model.state_dict(), f"{name}.pth")

import json

with open("metrics.json", "w") as f:
    json.dump(metrics, f, indent=4)