In [None]:
from functools import partial
import math
import urllib
from pathlib import Path
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import torch
import tqdm
import torchvision
from torchvision import datasets, transforms

# Useful functions
- load()
- distance_from_rotor(X,Y)

In [None]:
def load():
    
    """ Load the trajectories X_1.csv, Y_1.csv and the two directional
        aerodynamic forces FX_1.csv and FY_1.csv.
        It translates these files into np.ndarray variables of shape (N, T)
        where N = number of trajectories (int) and T = time steps (int) """
   
    X = np.genfromtxt("X_1.csv", delimiter=",")
    Y = np.genfromtxt("Y_1.csv", delimiter=",")
    Fx = np.genfromtxt("FX_1.csv", delimiter=",")
    Fy = np.genfromtxt("FY_1.csv", delimiter=",")
    
    
    return X.T, Y.T, Fx.T, Fy.T

###############################################################################

def distance_from_rotor(X, Y):
    
    """ Given the two datasets containing the coordinates of all the trajectory,
        it computes a new variable np.ndarray (N, T) containing, for all the 
        trajectories and for every time instant, the clearance parameter (i.e.
        the distance of the center of mass of the rotor from the bearing)
        
        Input: X = shape(N, T), float
               Y = shape(N, T), float
               
        Output: R-r = shape(N, T), float """
    
    R = 0.5
    r = np.sqrt(X**2 + Y**2)
    return R - r


# Loading of the dataset
- over 500 trajectories, the first 50 (10%) are taken as testing set, 400 (90%) are used as training set and the remaining 50 (10%) are the validation set used duering the training process.

In [None]:
# loading data
X, Y, Fx, Fy = load()

# take the first 50 trajectories (10%) that will be eventually tested
X_to_test = X[:50, :]
Y_to_test = Y[:50, :]
Fx_to_test = Fx[:50, :]
Fy_to_test = Fy[:50, :]
dist_to_test = distance_from_rotor(X_to_test, Y_to_test)

# take the rest of the trajectories (90%) to train and validate the model
X = X[50:, :]
Y = Y[50:, :]
Fx = Fx[50:, :]
Fy = Fy[50:, :]

N = X.shape[0]
D = X.shape[1]
dist = distance_from_rotor(X, Y)

print(X.shape)

# Useful functions for the creation of the dataset for FNNs

Functions:
- create_dataset_3feat
- create_dataset_3feat_onetraj
- random_permutation
- tensorize
- split_train_val


In [None]:
def create_dataset_3feat(N, D, dd, X, Y, dist, Fx, Fy):
    
    """ This function creates a dataset that, for each observation (i.e. row),
        contains the X and Y coordinates and the clearance parameter of the dd 
        time steps prior to the time instant t the observation itself refers to.
        To be more clear, the dimensions of the input and the output of the funztion are:
        
        Input: N = scalar, int (Number of trajectories)
               D = scalar, int (Number of overall time steps)
               d = scalar, int (Delay parameter)
               X = shape(N, T), float (X coordinates of all the trajectories)
               Y = shape(N, T), float (Y coordinates of all the trajectories)
               dist = shape(N, T), float (Clearance of all the trajectories)
               Fx = shape(N, T), float (Aerodynamic forces along x of all the trajectories)
               Fy = shape(N, T), float (Aerodynamic forces along y of all the trajectories)
        
        Output: df_input = shape(N*(D-dd), dd*3), float (Reorganized input dataset of all the trajectories)
                df_output = shape(N*(D-dd), 2), float (Reorganized output dataset of all the trajectories) """
    
    
    df_input = np.zeros((N*(D-dd), dd*3))
    df_output = np.zeros((N*(D-dd), 2))
    i = 0

    for t in range(N):
    
        for n in range(D-dd):

            row = np.zeros(dd*3)

            for d in range(dd):
            
                row[3*d] = X[t, n+d]
                row[3*d + 1] = Y[t, n+d]
                row[3*d + 2] = dist[t, n+d]
        
            df_input[i, :] = row
            df_output[i, :] = np.array([Fx[t, n+dd], Fy[t, n+dd]])
            i = i+1
    
    return df_input, df_output

#################################################################################

def create_dataset_3feat_onetraj(D, dd, X, Y, dist, Fx, Fy):
    
    """ This function creates a dataset that, for each observation (i.e. row),
        contains the X and Y coordinates and the clearance parameter of the dd 
        time steps prior to the time instant t the observation itself refers to.
        In particular, this is done for just one trajectory (and not for all the 
        trajectories as the function above)
        
        Input: D = scalar, int (Number of overall time steps)
               d = scalar, int (Delay parameter)
               X = shape(T, ), float (X coordinates of the given trajectory)
               Y = shape(T, ), float (Y coordinates of the given trajectory)
               dist = shape(T, ), float (Clearance of the given trajectory)
               Fx = shape(T, ), float (Aerodynamic forces along x of the given trajectory)
               Fy = shape(T, ), float (Aerodynamic forces along y of the given trajectory)
        
        Output: df_input = shape(D-dd, dd*3), float (Reorganized input dataset of the given trajectory)
                df_output = shape(D-dd, 2), float (Reorganized output dataset of the given trajectory) """
    
    
    df_input = np.zeros((D-dd, dd*3))
    df_output = np.zeros((D-dd, 2))
    i = 0

    for n in range(D-dd):
        
        row = np.zeros(dd*3)

        for d in range(dd):
            
            row[3*d] = X[n+d]
            row[3*d + 1] = Y[n+d]
            row[3*d + 2] = dist[n+d]
        
        df_input[i, :] = row
        df_output[i, :] = np.array([Fx[n+dd], Fy[n+dd]])
        i = i+1
    
    return df_input, df_output


In [None]:
def random_permutation(df_input, df_output):
    
    """ Random permutation of the observations (i.e. rows) of the
        input and output dataset.
        R = number of rows and C = number of columns of the input
        
        Input: df_input = shape(R, C), float (Input dataset)
               df_output = shape(R, 2), float (Output output)
        
        Output: df_in_shuff = shape(R, C), float (Shuffled input dataset)
                df_out_shuff = shape(R, 2), float (Shuffled output dataset) """
    

    N = df_input.shape[0]
    shuffle_indices = np.random.permutation(np.arange(N))
    df_in_shuff = df_input[shuffle_indices]
    df_out_shuff = df_output[shuffle_indices]

    return df_in_shuff, df_out_shuff


In [None]:
def tensorize(df_in_shuff, df_out_shuff):
    
    """ Transform a np.ndarray into a torch.Tensor variable.
        R = number of rows and C = number of columns of the input
        
        Input: df_in_shuff = shape(R, C), float (Input np.ndarray)
               df_out_shuff = shape(R, 2), float (Output np.ndarray)
        
        Output: df_input_tensor = shape(R, C), float (Input torch.Tensor)
                df_output_tensor = shape(R, 2), float (Output torch.Tensor) """
    
    df_input_tensor = torch.Tensor(df_in_shuff)
    df_output_tensor = torch.Tensor(df_out_shuff)

    return df_input_tensor, df_output_tensor


In [None]:
def split_train_val(df_input_tensor, df_output_tensor, p):
    
    """ Split the input dataset and the output dataset into a fraction p
        of training set and 1-p of validation test. 
        In particular the first (100*p)% of samples are taken as training
        and the remaining 100*(1-p)% of them is the validation set.
        R = number of rows and C = number of columns of the input
        
        Input: df_input_tensor = shape(R, C), float (Input torch.Tensor)
               df_output_tensor = shape(R, 2), float (Output torch.Tensor)
        
        Output: df_in_valid = shape(int(R*(1-p)), C), float (Input Validation Set torch.Tensor)
                df_out_valid = shape(int(R*(1-p)), 2), float (Output Validation Set torch.Tensor)
                df_in_train = shape(int(R*p), C), float (Input Training Set torch.Tensor)
                df_out_train = shape(int(R*p), 2), float (Output Training Set torch.Tensor) """
  
    # Take the first p% of the dataset as training set and (1-p)% as validation set
    N = df_input_tensor.shape[0]
    df_in_train = df_input_tensor[:int(N*p), :]
    df_in_valid = df_input_tensor[int(N*p):, :]

    df_out_train = df_output_tensor[:int(N*p), :]
    df_out_valid = df_output_tensor[int(N*p):, :]

    return df_in_train, df_in_valid, df_out_train, df_out_valid


# Creation of the datasets

In [None]:
dd = 100
p = 0.9

df_input, df_output = create_dataset_3feat(N, D, dd, X, Y, dist, Fx, Fy)
df_in_shuff, df_out_shuff = random_permutation(df_input, df_output)
df_input_tensor, df_output_tensor = tensorize(df_in_shuff, df_out_shuff)
df_in_train, df_in_valid, df_out_train, df_out_valid = split_train_val(df_input_tensor, df_output_tensor, p)

train = torch.utils.data.TensorDataset(df_in_train, df_out_train)
test = torch.utils.data.TensorDataset(df_in_valid, df_out_valid) #VALIDATION

# Didn't change name not to change everything afterwords

# Definition of the Feedforward Neural Network
Architecture: 
- 3 hidden fully connected layers ('hidden1', 'hidden2' and 'hidden3' neurons, respectively).
- Activation function: ReLu for each layer

In [None]:
class Aerospace_Bearing_FNN(torch.nn.Module):
    # Models in PyTorch usually inherit from this Module
    def __init__(self, feat, d, hidden1, hidden2, hidden3):
        super().__init__()
        
        self.input_layer = torch.nn.Linear(feat*d, hidden1)
        self.input_phi = torch.nn.ReLU()
        self.layer1 = torch.nn.Linear(hidden1, hidden2)
        self.phi1 = torch.nn.ReLU()
        self.layer2 = torch.nn.Linear(hidden2, hidden3)
        self.phi2 = torch.nn.ReLU()
        self.output_layer = torch.nn.Linear(hidden3, 2)

    def forward(self, Z):
        # Z = torch.flatten(Z, 1)  # Flatten (n, 28, 28) to (n, 784)
        Z = self.input_layer(Z)
        Z = self.input_phi(Z)
        Z = self.layer1(Z)
        Z = self.phi1(Z)
        Z = self.layer2(Z)
        Z = self.phi2(Z)
        Z = self.output_layer(Z)

        return Z

# Training process
- Optimizer: Adam's
- Scheduler: CosineAnnealingLR
- Criterion: MSE Loss Function

In [None]:
def train_epoch(model, optimizer, scheduler, criterion, train_loader, epoch, device):
    
    # Set model to training mode (affects dropout, batch norm e.g.)
    model.train()
    loss_history = []
    accuracy_history = []
    lr_history = []
    
    # Change the loop to get batch_idx, data and target from train_loader
    for batch_idx, (data, target) in enumerate(train_loader):
        
        # Move the data to the device
        data = data.to(device)
        target = target.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Compute model output
        output = model(data)
        
        # Compute loss
        loss = criterion(output, target)
        
        # Backpropagate loss
        loss.backward()
        
        # Perform an optimizer step
        optimizer.step()
        
        # Perform a learning rate scheduler step
        scheduler.step()

        # Compute loss_float (float value, not a tensor)
        loss_float = loss.item()

        # Add loss_float to loss_history
        loss_history.append(loss_float)

        lr_history.append(scheduler.get_last_lr()[0])
        if batch_idx % (len(train_loader.dataset) // len(data) // 10) == 0:
            print(
                f"Train Epoch: {epoch}-{batch_idx:03d} "
                f"batch_loss={loss_float:0.2e} "
                # f"batch_acc={accuracy_float:0.3f} "
                f"lr={scheduler.get_last_lr()[0]:0.3e} "
            )

    return loss_history, lr_history


@torch.no_grad()
def validate(model, device, val_loader, criterion):
    model.eval()  # Important: eval mode (affects dropout, batch norm etc)
    test_loss = 0
    
    for data, target in val_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        test_loss += criterion(output, target).item() * len(data)
        
    test_loss /= len(val_loader.dataset)

    print(
        "Test set: Average loss: {:.4f}".format(test_loss)
    )
    return test_loss


@torch.no_grad()
def get_predictions(model, device, val_loader, criterion, num=None):
    model.eval()
    points = []
    for data, target in val_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = criterion(output, target)
        
        data = np.split(data.cpu().numpy(), len(data))
        loss = np.split(loss.cpu().numpy(), len(data))
        target = np.split(target.cpu().numpy(), len(data))
        
        points.extend(zip(data, loss, target))

        if num is not None and len(points) > num:
            break

    return points


def run_aerobearing_training(feat, ddd, hidden1, hidden2, hidden3, num_epochs, lr, batch_size, device="cpu"):
    # ===== Data Loading =====
    transform = transforms.ToTensor()
    train_set = train
    val_set = test

    train_loader = torch.utils.data.DataLoader(
        train_set,
        batch_size=batch_size,
        shuffle=False,  # Can be important for training
        pin_memory=torch.cuda.is_available(),
        drop_last=False,
        num_workers=2,
    )

    val_loader = torch.utils.data.DataLoader(
        val_set,
        batch_size=batch_size,
    )

    # ===== Model, Optimizer and Criterion =====

    model = Aerospace_Bearing_FNN(feat, ddd, hidden1, hidden2, hidden3)
    model = model.to(device=device)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    criterion = torch.nn.functional.mse_loss
    
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=(len(train_loader.dataset) * num_epochs) // train_loader.batch_size)
    
    # ===== Train Model =====
    lr_history = []
    train_loss_history = []
    val_loss_history = []
    
    for epoch in range(1, num_epochs + 1):
        train_loss, lrs = train_epoch(model, optimizer, scheduler, criterion, train_loader, epoch, device)
        train_loss_history.extend(train_loss)
        lr_history.extend(lrs)

        val_loss = validate(model, device, val_loader, criterion)
        val_loss_history.append(val_loss)
        
    # ===== Plot training curves =====
    n_train = len(train_loss_history)
    t_train = num_epochs * np.arange(n_train) / n_train
    t_val = np.arange(1, num_epochs + 1)

    plt.figure()
    plt.plot(t_train, train_loss_history, label="Train")
    plt.plot(t_val, val_loss_history, label="Val")
    plt.legend()
    plt.xlabel("Epoch")
    plt.ylabel("Loss")

    plt.figure()
    plt.plot(t_train, lr_history)
    plt.xlabel("Epoch")
    plt.ylabel("Learning Rate")

    return model

# Set-up of the parameters

The parameters which characterize the model are the following:
- The learning rate and the number of hidden layers which were obtained from the file `FNN_coord_tuning` (respectively `lr`, `hidden1`, `hidden2`, `hidden3`)
- The size of the bacth and number of epochs (`batch_size`, `num_epochs`)

In [None]:
lr = 0.01
batch_size = 500
num_epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

feat = 3
hidden1 = 256
hidden2 = 256
hidden3 = 64

model = run_aerobearing_training(feat, dd, hidden1, hidden2, hidden3, num_epochs, lr, batch_size, device)

# Testing the model

In [None]:
def get_MSE(df_in_test_tensor, df_out_test_tensor):
    
    """ It computes the MSE (Mean Square Error) between the prediction
        of the model and the true value of the output.
        In particular, the prediction of a given observation consists of
        the forces along x and y directions. Hence, the MSE wrt to the
        actual value of the forces is computed and averaged over the two 
        directions: this is the error taken for a single observation.
        Then the mean over all samples is returned.
        R = number of rows and C = number of columns of the input
        
        Input: df_in_test_tensor = shape(R, C), float (Input Testing Set torch.Tensor)
               df_in_test_tensor = shape(R, 2), float (Output Testing Set torch.Tensor)
        
    """
    
    losses = []
    for i in tqdm.tqdm(range(len(df_in_test_tensor))):
        
        row = df_in_test_tensor[i, :]
        
        pred = model(row.to(device))
        pred = pred.to('cpu')
        
        loss = float(torch.mean((df_out_test_tensor[i, :]-pred)**2))
        losses.append(loss)
    
    mse = np.mean(losses)

    return mse


In [None]:
def get_RE(df_in_test_tensor, df_out_test_tensor):
    
    """ It computes the Relative Error (Absolute Error / Magnitude of the Output)
        between the prediction of the model and the true value of the output.
        In particular, the prediction of a given observation consists of
        the forces along x and y directions. Hence, the RE wrt to the
        actual value of the forces is computed and averaged over the two 
        directions: this is the error taken for a single observation.
        Then the mean over all samples is returned.
        R = number of rows and C = number of columns of the input
        
        Input: df_in_test_tensor = shape(R, C), float (Input Testing Set torch.Tensor)
               df_in_test_tensor = shape(R, 2), float (Output Testing Set torch.Tensor)
        
    """
    
    losses = []
    for i in tqdm.tqdm(range(len(df_in_test_tensor))):
        
        row = df_in_test_tensor[i, :]
        
        pred = model(row.to(device))
        pred = pred.to('cpu')
        
        loss = float(torch.mean(torch.abs(df_out_test_tensor[i, :]-pred)/df_out_test_tensor[i, :]))
        losses.append(loss)
    
    rel_err = np.mean(losses)

    return rel_err


Test the model and compute the error

In [None]:
# create the dataset to test
nn = X_to_test.shape[0]
df_in_test, df_out_test = create_dataset_3feat(nn, D, dd, X_to_test, Y_to_test, dist_to_test, Fx_to_test, Fy_to_test)
df_in_test_tensor, df_out_test_tensor = tensorize(df_in_test, df_out_test)

mse = get_MSE(df_in_test_tensor, df_out_test_tensor)
rel_err = get_RE(df_in_test_tensor, df_out_test_tensor)
print(mse)
print(rel_err)

# Plot of a given trajectory of aerodynamic forces and its prediction

In [None]:
def plot_trajectory_coord(dd, traj):
  
    # save one trajectory to be plotted (tbp)
    tbp_X = X_to_test[traj,:]
    tbp_Y = Y_to_test[traj,:]
    tbp_Fx = Fx_to_test[traj,:]
    tbp_Fy = Fy_to_test[traj,:]
    tbp_d = dist_to_test[traj,:]
    
    traj_in, traj_out = create_dataset_3feat_onetraj(D, dd, tbp_X, tbp_Y, tbp_d, tbp_Fx, tbp_Fy)

    fx_pred_tbp = []
    fy_pred_tbp = []

    for i in range(len(traj_in)):
        row = torch.Tensor(traj_in[i,:])
        row = torch.unsqueeze(row,0)
        pred = model(row.to(device))
        fy_pred_tbp.append(float(pred[0][0]))
        fx_pred_tbp.append(float(pred[0][1]))

    plt.figure(figsize=(25, 15))
    
    plt.plot(fx_pred_tbp,
             fy_pred_tbp,
    #         marker="X",
    #         markersize=20,
    #         linestyle="None",
    #         color="blue",
             label="Predicted",
             linewidth=8,
            )

    plt.plot(traj_out[:,1],
             traj_out[:,0],
    #         marker="X",
    #         markersize=20,
    #         linestyle="None",
    #         color="blue",
             label="True",
             linewidth=8,
            )

    plt.grid(True)
    plt.legend(fontsize=25)
    plt.xlabel("F_x", fontsize=30)
    plt.ylabel("F_y", fontsize=30)

    plt.tick_params(axis='x', labelsize=25)
    plt.tick_params(axis='y', labelsize=25)



Choose the trajectory to plot. Notice that only the first 50 trajectories are available to plot, hence the variable 'traj' can only take values in [0,49].

In [None]:
traj = 40
plot_trajectory_coord(50, traj)

plt.savefig("FNN_traj.png")