# Improve on the deep learning appraoch

## Load model and dataset

In [1]:
import pandas as pd
import math
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
from scipy.integrate import solve_ivp

In [2]:
## Define the Lorenz system
sigma = 10
beta = 8/3
r = 10

def lorenz(t, X, sigma, beta, r):
    """The Lorenz equations."""
    x, y, z = X
    xp = sigma*(y - x)
    yp = r*x - y - x*z
    zp = -beta*z + x*y
    return xp, yp, zp

def lorenz_data(u):
    x, y, z = u[0], u[1], u[2]
    return [sigma*(y - x), r*x - y - x*z, -beta*z + x*y]

In [3]:
# Check if the trajectory is attracted to the concerned Lorenz attractor
def is_attracted(x, y, z):
    return (abs(x-math.sqrt(24))<0.01) and (abs(y-math.sqrt(24))<0.01) and (abs(z-9)<0.01)

In [4]:
## Implement the simulation process and decide if the trajectory is attracted by the Lorenz attractor
def simulation(x0 ,y0 ,z0):
    tmax, n = 1500, 100000
    soln = solve_ivp(lorenz, (0, tmax), (x0, y0, z0), args=(sigma, beta, r),dense_output=True)
    t = np.linspace(0, tmax, n)
    x, y, z = soln.sol(t)
    return is_attracted(x[n-1], y[n-1], z[n-1])

In [5]:
## Define the neural network
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(3, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x
    
net = Net()

In [6]:
## Load the dataset
class LorenzDataset(Dataset):
    def __init__(self, csv_file):
        self.data = pd.read_csv(csv_file)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        features = torch.tensor([self.data.iloc[idx].x0, self.data.iloc[idx].y0, self.data.iloc[idx].z0]).float()
        label = torch.tensor(self.data.iloc[idx].attracted).float()
        return features, label
    
dataset = LorenzDataset('dataset_large.csv')

dataset_train, dataset_test = torch.utils.data.random_split(dataset, [80000, 20000])

dataloader_train = DataLoader(dataset_train, batch_size=32, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=20, shuffle=True)

# New data generation and additional penalty

We choose an adaptive approach to enhance our model with new data: After a training phase, we consider the datapoints used as an estimation of the decision boundary, which are the points with probability predicted close to 0.5. We create new datapoints by adding small perturbations to such points with the following scheme:
1. For such a datapoint $x=(x_{0},y_{0},z_{0})$, we create $y=(x_{0}+\delta_{0},y_{0}+\delta_{1},z_{0}+\delta_{2})$, where $\delta_{i} \sim \mathbb{N}(0, \sigma^{2})$ is a Gaussian noise, and the variance $\sigma^{2}$ is positively related with the probability of $x$.
2. We add the created datapoints to the dataset and train again.

Furthermore, we expect that at the basin boundary, the direction of the dynamical system should be perpendicular to the gradient vector of the level set $f(x)=0$, we add an penalty for violation.

Let the direction of the dynamical system at point $\mathbf{x}$ be $b(\mathbf{x})$, then we expect that $p = \nabla f(\mathbf{x})^{T}b(\mathbf{x})=0$. We add penalty for violation:

1. $p$ (linear penalty)
2. $e^{p}$ (exponential penalty)
3. $\tan{\frac{\pi}{2}p}$ (tangent penalty)

Remark: When we compute $p$, both vectors are normalized as we are interested with the angular violation. If we do not normalize both vectors, $p$ may be large even if two vectors are almost perpendicular, as long as one of them has a large norm.

Furthermore, if a point is far away from the estimated boundary(probability far away from 0.5), then it should not contribute to penalty as the normality condition is mainly for points close to the boundary. Therefore, the penalty term is formulated as follows:
$$\sum_{\mathbf{x}}l(\mathbf{x})+c(\mathbf{x})m(p)$$

where $l(\mathbf{x})$ is the logistic loss, $m(p)=p,e^{p},\tan{\frac{\pi}{2}p}$, and $c(\mathbf{x})$ is an indicator function to only filter out those points that the predicted values are within $[0.5-\delta, 0.5+\delta]$, where $\delta$ is a hyperparameter.

In [7]:
def data_perturbation(data, probability):
    diff = abs(probability - 0.5)
    k = 2 * diff ## factor 2 is a hyperparameter
    noise = torch.randn_like(data) * torch.sqrt(torch.tensor(k))
    perturbed_data = data + noise
    return perturbed_data

In [8]:
class CustomLossLinear(nn.Module):
    def __init__(self):
        super(CustomLossLinear, self).__init__()
        self.bce_loss = nn.BCELoss()

    def forward(self, net, x, y, v):
        x.requires_grad_(True)  # Ensure x requires gradients

        # Compute the output and the BCE loss
        out = net(x).squeeze()
        bce_loss = self.bce_loss(out, y)

        # Zero the gradients of x
        if x.grad is not None:
            x.grad.zero_()

        # Compute the gradient of the output with respect to the input
        out.backward(torch.ones_like(out), retain_graph=True)
        grad = x.grad
        # Normalize the gradient and v
        grad_norm = grad / grad.norm()
        v = v / v.norm()

        # Compute the inner product of the normalized gradient and v
        inner_product = torch.abs(torch.dot(grad_norm.view(-1), v.view(-1)))

        # The final loss is the sum of the BCE loss and the inner product
        factor = 0
        if out < 0.51 and out > 0.49:
            factor = 1
        loss = bce_loss + factor * inner_product

        return loss
    
class CustomLossExponential(nn.Module):
    def __init__(self):
        super(CustomLossExponential, self).__init__()
        self.bce_loss = nn.BCELoss()

    def forward(self, net, x, y, v):
        # Compute the output and the BCE loss
        out = net(x).squeeze()
        bce_loss = self.bce_loss(out, y)

        # Compute the gradient of the output with respect to the input
        out.backward(torch.ones_like(out), retain_graph=True)
        grad = x.grad
        # Normalize the gradient
        grad_norm = grad / grad.norm()
        v = v / v.norm()

        # Compute the inner product of the normalized gradient and v
        inner_product = torch.abs(torch.dot(grad_norm.view(-1), v.view(-1)))

        # The final loss is the sum of the BCE loss and the inner product
        factor = 0
        if out < 0.51 and out > 0.49:
            factor = 1
        loss = bce_loss + factor * math.exp(inner_product)


        return loss
    
class CustomLossTangent(nn.Module):
    def __init__(self):
        super(CustomLossTangent, self).__init__()
        self.bce_loss = nn.BCELoss()

    def forward(self, net, x, y, v):
        # Compute the output and the BCE loss
        out = net(x).squeeze()
        bce_loss = self.bce_loss(out, y)

        # Compute the gradient of the output with respect to the input
        out.backward(torch.ones_like(out), retain_graph=True)
        grad = x.grad
        # Normalize the gradient
        grad_norm = grad / grad.norm()
        v = v / v.norm()

        # Compute the inner product of the normalized gradient and v
        inner_product = torch.abs(torch.dot(grad_norm.view(-1), v.view(-1)))

        # The final loss is the sum of the BCE loss and the inner product
        factor = 0
        if out < 0.51 and out > 0.49:
            factor = 1
        loss = bce_loss + factor * math.tan(inner_product* math.pi / 2)

        return loss

We start to train the model in the following scheme: For each epoch, we first iterate over all data in the current dataset to optimize the model. After each epoch is done, each point in the training set should receive a probability between 0 and 1, and we take out those points with probability in the range $[0.25, 0.75]$, and apply the perturbation method to generate new datapoints and add to the dataset, and we start the next epoch. We repeat the training process with each of the loss.

In [9]:
criterion_linear = CustomLossLinear()
criterion_exponential = CustomLossExponential()
criterion_tangent = CustomLossTangent()

In [17]:
# Initialize the loss and the optimizer
loss_fn = criterion_linear
optimizer = optim.Adam(net.parameters(), lr=0.001)  

# For each epoch
for epoch in range(10): 
    running_loss = 0.0
    for x, y in dataloader_train: 
        # Compute v by running the vector over lorenz
        v = lorenz_data(x)  # replace lorenz with your lorenz function

        # Convert v to a tensor if it's not already one
        if not isinstance(v, torch.Tensor):
            v = [item for sublist in v for item in sublist]  # Flatten the list
            v = [float(i) for i in v]
            v = torch.tensor(v)

        # Zero the gradients
        net.zero_grad()
        
        # Compute the loss
        y = torch.tensor([k==1 for k in y], dtype=torch.float32)
        loss = loss_fn(net, x, y, v)

        # Backpropagate the loss
        loss.backward()

        # Update the weights
        optimizer.step()

        # Add the loss of this batch to the running loss
        running_loss += loss.item()

    # Print average loss per epoch
    print(f"Epoch {epoch+1}, Loss: {running_loss/len(dataloader_train)}")

    # After each epoch, augment the dataset
    with torch.no_grad():
        # Get the predictions for the entire dataset
        predictions = net(dataset_train)  # replace x_all with your entire dataset

        # Find the indices of the data points with predicted probability within the range [0.25, 0.75]
        indices = ((predictions >= 0.25) & (predictions <= 0.75)).nonzero(as_tuple=True)[0]

        # Take out these data points
        x_to_augment = dataset_train[indices]

        # Apply data_perturbation to each of them to generate a new set of data points
        x_augmented = data_perturbation(x_to_augment)  # replace data_perturbation with your data perturbation function
        # Apply simulation to each point in x_augmented
        simulation_results = [simulation(x[0], x[1], x[2]) for x in x_augmented]

        # Convert the boolean results to integers (1 for True, -1 for False)
        y_augmented = [int(result)*2 - 1 for result in simulation_results]

        # Convert the list to a PyTorch tensor
        y_augmented = torch.tensor(y_augmented)


        # Add these data points to the original training set to form an enlarged training set
        x_all = torch.cat([x_all, x_augmented])
        y_all = torch.cat([y_all, y_augmented])

        # Create a new DataLoader for the enlarged training set
        dataset = TensorDataset(x_all, y_all)
        dataloader_train = DataLoader(dataset, batch_size=32)  # replace 32 with your batch size

RuntimeError: inconsistent tensor size, expected tensor [96] and src [9] to have the same number of elements, but got 96 and 9 elements respectively