In [None]:
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt

### Constants and hyperparameters

In [None]:
epochs = 300 # Number of training epochs
batch_size = 12 # Batch size
split_percentage = 0.8 # Training and test set splitting percentage
validation_split = 0.2 # Validation set percentage
early_stopping_patience = 20 # Number of epochs of patience before triggering early stopping
naca_numbers = ['maximum_camber', 'maximum_camber_position', 'maximum_thickness'] # NACA numbers to predict

In [None]:
dataset_path = "../Dataset/Flow signals/flow_signals_128bins.npz" # Dataset path
section_indices = [1] # Indices of the sections to extract
flow_quantity = "p" # Flow quantity to be used as input feature

### Data loading

In [None]:
# Loading the data
dataset = np.load(dataset_path)
dataset = list(zip(dataset[flow_quantity], dataset["naca_numbers"]))

### Shuffling the dataset

In [None]:
# Shuffling the dataset
np.random.shuffle(dataset)

### Features and labels

In [None]:
# Extracting the features and the labels from the dataset
X, Y = zip(*dataset)
X, Y = np.array(X), np.array(Y)

In [None]:
# Extacting a single X section from the dataset
section_X = X[:, :, section_indices] if len(section_indices) > 0 else X

### Training and test set

In [None]:
# Computing the number of training samples according to the splitting percentage
num_training_samples = int(np.floor(split_percentage * len(dataset)))

In [None]:
# Extracting the training features and labels
X_train, Y_train = section_X[:num_training_samples], Y[:num_training_samples]

# Extracting the test features and labels
X_test, Y_test = section_X[num_training_samples:], Y[num_training_samples:]

### Data normalization

In [None]:
# Computing the mean and standard deviation of the training features
mean = X_train.mean(axis=0)
std = X_train.std(axis=0)

In [None]:
# Function to normalize the features
def normalize(x):
    x = (x - mean) / std
    return x

In [None]:
# Plotting a random sample
choice = np.random.randint(0, (len(X_train)-1))
plt.title(f"NACA: {''.join(str(int(y)) for y in Y_train[choice])}")
plt.ylabel(flow_quantity)
plt.plot(X_train[choice])
plt.show()

### Building the model

In [None]:

# BEST MODEL FOR SECTIONS X = [-1c, 2c]
def buildModel():
  # Sequential model - CNN 1D
  model = keras.Sequential([
    keras.layers.InputLayer(input_shape=np.shape(X_train)[1:]),
    keras.layers.Lambda(normalize), # Normalization layer
    keras.layers.Conv1D(filters=24, kernel_size=3, activation=tf.nn.tanh),
    keras.layers.MaxPool1D(pool_size=2),
    keras.layers.Flatten(),
    keras.layers.Dense(40, tf.nn.tanh),
    keras.layers.Dense(30, tf.nn.tanh),
    keras.layers.Dense(20, tf.nn.tanh),
    keras.layers.Dense(len(naca_numbers))
  ])

  # Compiling the model
  model.compile(loss='mse', optimizer="adam", metrics=['mae'])
  
  return model

In [None]:

# BEST MODEL FOR SECTION X = 11c
def buildModel2():
  # Sequential model - CNN 1D
  model = keras.Sequential([
    keras.layers.InputLayer(input_shape=np.shape(X_train)[1:]),
    keras.layers.Lambda(normalize), # Normalization layer
    keras.layers.Conv1D(filters=12, kernel_size=3, activation=tf.nn.tanh),
    keras.layers.MaxPool1D(pool_size=2),
    keras.layers.Conv1D(filters=8, kernel_size=3, activation=tf.nn.tanh),
    keras.layers.MaxPool1D(pool_size=2),
    keras.layers.Flatten(),
    keras.layers.Dense(30, tf.nn.tanh),
    keras.layers.Dense(20, tf.nn.tanh),
    keras.layers.Dense(10, tf.nn.tanh),
    keras.layers.Dense(len(naca_numbers))
  ])

  # Compiling the model
  model.compile(loss='mse', optimizer='adam', metrics=['mae'])
  
  return model

In [None]:
model = buildModel()
model.summary()

### Model training

In [None]:
# Early stopping with a predefined patience
early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_loss', 
    patience=early_stopping_patience,
    restore_best_weights=True,
    verbose=True
)

# Training the model
history = model.fit(
    X_train, 
    Y_train,
    epochs=epochs,
    batch_size=batch_size,
    validation_split=validation_split,
    shuffle=True,
    verbose=1,
    callbacks=[early_stopping]
)

In [None]:
# Function to plot the metrics of training and validation
def plotHistory(history, training_metric, validation_metric, ylabel):
  plt.plot(history.history[training_metric], label=training_metric)
  plt.plot(history.history[validation_metric], label=validation_metric)
  plt.ylim([0, np.max(history.history[training_metric] + history.history[validation_metric])])
  plt.xlabel('Epoch')
  plt.ylabel(ylabel)
  plt.grid(True)
  plt.legend()
  plt.show()

In [None]:
plotHistory(history, 'loss', 'val_loss', "Loss")
plotHistory(history, 'mae', 'val_mae', "Mean Absolute Error")

### Model evaluation

In [None]:
# Computing the predictions of the test set
predictions = model.predict(X_test)

In [None]:
# Function to compute the classification accuracy
def classificationMetrics(predictions, labels):
    # Creating an array to save the results
    accuracy = np.zeros(len(naca_numbers))

    for idx in range(len(naca_numbers)):
        # Converting the NACA values to the closest interger
        naca_predictions = np.array([round(prediction) for prediction in predictions[:,idx]])
        naca_labels = np.array([round(label) for label in labels[:,idx]])

        # Extracting the samples correctly classified
        correctly_classified = np.where(np.equal(naca_predictions, naca_labels))

        # Computing the classification accuracy of the current NACA number
        accuracy[idx] = np.shape(correctly_classified)[1] / len(naca_labels)

    return accuracy

In [None]:
def regressionMetrics(predictions, labels):
    # Creating an array to save the results
    mses, maes = np.zeros(len(naca_numbers)), np.zeros(len(naca_numbers))

    # Computing the Mean absolute error
    for idx in range(len(naca_numbers)):
        # Computing the Mean Absolute Error of the current NACA number
        mae = np.mean(np.absolute(predictions[:,idx] - labels[:,idx]))
        mse = ((predictions[:,idx] - labels[:,idx])**2).mean(axis=0)

        # Adding the result to the array
        mses[idx], maes[idx] = mse, mae

    return mses, maes

In [None]:
# Computing the classification accuracy
accuracy = classificationMetrics(predictions, Y_test)

# Computing the regression errors: MSE and MAE
mse, mae = regressionMetrics(predictions, Y_test)

print("REGRESSION")
print(f" - Loss (Mean Square Error) --> {np.mean(mse)}")
print(f" - Mean Absolute Error --> {np.mean(mae)}")
for i in range(len(naca_numbers)):
    print(f"   • {naca_numbers[i]} --> MSE: {mse[i]} | MAE: {mae[i]}")


print("\nCLASSIFICATION")
print(f" - Accuracy --> {np.mean(accuracy)}")
for i in range(len(naca_numbers)):
    print(f"   • {naca_numbers[i]} --> {accuracy[i]}")


In [None]:
# Function to plot the predicted values
def plotPredictions(test_labels, test_predictions, label, color):
  plt.scatter(test_labels, test_predictions, label=label, color=color)
  plt.xlabel('True values')
  plt.ylabel('Predictions')
  plt.axis('equal')
  plt.axis('square')
  plt.xlim([0, np.max(test_labels)])
  plt.ylim([0, np.max(test_labels)])
  plt.plot([0, 100], [0, 100], color="black")
  plt.legend()

In [None]:
colors = ["blue", "green", "orange"]

# Plotting the obtained results
for i in range(len(naca_numbers)):
  plotPredictions(Y_test[:,i], predictions[:,i], label=naca_numbers[i], color=colors[i])