In [1]:
# On importe les bibliotèques pour la gestion d'objet et le traitement de données
import matlab
import matlab.engine as engine
import numpy as np
import scipy.io as scipy
import tensorflow as tf

In [2]:
# On importe les bibliotèques associées aux méthodes d'apprentissage supervisé
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping

In [3]:
# On démarre l'environnement virtuel MATLAB
eng = engine.start_matlab()

In [4]:
# On ajoute le chemin d'accès vers l'emplacement de la fonction 
eng.addpath(r"C:\Users\Utilisateur\OneDrive - ETS\CRIAQ-REAR\Maitrise LB\Functions", nargout = 0)
eng.add_all_paths(r"C:\Users\Utilisateur\OneDrive - ETS\CRIAQ-REAR\Maitrise LB", nargout = 0)

In [5]:
# on définit les paramètres de l'environnement
T = 23 # température
P = 100800 # static pressure
H = 50 # relative humidity

# on définit les paramètres du support fréquentiel
fmin = 1 # fréquence minimale (Hz)
fmax = 3000 # fréquence maximale (Hz)
points = 7 # nombre de points sur le spectre

# On convertit les données en format MATLAB
T_matlab = matlab.double([T])  # Convertir en tableau MATLAB
P_matlab = matlab.double([P])
H_matlab = matlab.double([H])
fmin_matlab = matlab.double([fmin])
fmax_matlab = matlab.double([fmax])
points_matlab = matlab.double([points])

# Appeler la fonction MATLAB (par exemple create_environnement)
air_matlab, w_matlab = eng.create_environnement(T_matlab[0], P_matlab[0], H_matlab[0], fmin_matlab[0], fmax_matlab[0], points_matlab[0], nargout = 2)

In [6]:
# On importe le dataset (création du trainset et du testset)
eng.load(r'C:\Users\Utilisateur\OneDrive - ETS\CRIAQ-REAR\Maitrise LB\Caractérisation inverse\CI pour configurations\CI multiQWL\data.mat', nargout = 0)
data_matlab  = eng.workspace['data']
data = {field: np.array(data_matlab[field]) for field in data_matlab.keys()}
x = data['X']
y = data['Y']

In [7]:
# On prépare les données pour l'apprentissage
x_train, x_test, y_train, y_test = train_test_split(x, y, train_size = 0.8, random_state = 42)

# On convertit les données en float32
x_train = tf.cast(x_train, tf.float32)
y_train = tf.cast(y_train, tf.float32)
x_test = tf.cast(x_test, tf.float32)
y_test = tf.cast(y_test, tf.float32)

# On crée les datasets TensorFlow à partir des valeurs extraites
trainset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(buffer_size = 100).batch(32)
testset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).shuffle(buffer_size = 100).batch(32)

In [56]:
# On teste les données du trainset
for x_batch, y_batch in trainset.take(1):
    print(x_batch.shape)  # Devrait afficher (32, 7)
    print(y_batch.shape)  # Devrait afficher (32, 8)
    print("x_batch sample:", x_batch.numpy()[0])
    print("y_batch sample:", y_batch.numpy()[0])

(32, 7)
(32, 8)
x_batch sample: [0.00596544 0.6744992  0.5438734  0.48202226 0.77146727 0.862798
 0.6283986 ]
y_batch sample: [0.00505857 0.00154253 0.0039564  0.00125924 0.6067501  0.37713042
 0.50102377 0.5613443 ]


In [58]:
# On définit un modèle personnalisé
class CustomModel(tf.keras.Model):
    def __init__(self, hidden_layers_dim, input_dim, output_dim, air_matlab, w_matlab):
        super(CustomModel, self).__init__()
        self.hidden_layers_dim = hidden_layers_dim
        self.air_matlab = air_matlab
        self.w_matlab = w_matlab
        self.input_dim = input_dim
        self.output_dim = output_dim

        self.input_layer = tf.keras.layers.Dense(self.input_dim, input_shape(self.input_dim,))
        self.dense1 = tf.keras.layers.Dense(hidden_units, activation = 'relu')
        self.dense2 = tf.keras.layers.Dense(hidden_units, activation = 'relu')
        self.dense3 = tf.keras.layers.Dense(hidden_units, activation = 'relu')
        self.dense4 = tf.keras.layers.Dense(hidden_units, activation = 'relu')
        self.output_layer = tf.keras.layers.Dense(self.output_dim, activation = 'linear')

    def call(self, inputs):
        print("Shape of inputs:", inputs.shape)
        x = self.input_layer(inputs)
        print("Shape after input layer:", x.shape)
        x = self.dense1(x)
        print("Shape after dense1:", x.shape)
        x = self.dense2(x)
        print("Shape after dense2:", x.shape)
        x = self.dense3(x)
        print("Shape after dense3:", x.shape)
        x = self.dense4(x)
        print("Shape after dense4:", x.shape)
        output = self.output_layer(x)
        print("Shape of output:", output.shape)
        return output

    def train_step(self, data):
        x_batch, y_batch = data  # Obtenez les entrées et les sorties du batch
        
        # Vérifiez les dimensions des batchs
        print("Shape of x_batch:", x_batch.shape)
        print("Shape of y_batch:", y_batch.shape)
        
        with tf.GradientTape() as tape:
            y_pred = self.call(x_batch)
            print("Shape of y_pred:", y_pred.shape)
            loss = custom_loss(x_batch, y_pred, self.air_matlab, self.w_matlab)
            print("loss dans trainstep: ", loss)
            

        # Calculer les gradients et les appliquer
        gradients = tape.gradient(loss, self.trainable_variables)
        print("Gradients:", gradients)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        return {"loss": loss}

In [50]:
def multiQWL_cost_function_python_wrapper(x_batch_tf_symb, y_pred_tf_symb, air_matlab, w_matlab):
    def wrapped_cost_function(x_batch_tf, y_pred_tf):
        return multiQWL_cost_function_python(x_batch_tf, y_pred_tf, air_matlab, w_matlab)
    return wrapped_cost_function

def custom_loss(x_batch_tf_symb, y_pred_tf_symb, air_matlab, w_matlab):
    # On évalue la fonction d'appel après avoir évaluer les tenseurs symboliques
    wrapper = multiQWL_cost_function_python_wrapper(x_batch_tf_symb, y_pred_tf_symb, air_matlab, w_matlab)
    loss_value = tf.py_function(func = wrapper, inp = [x_batch_tf_symb, y_pred_tf_symb], Tout = tf.float32)
    return loss_value

In [51]:
# On définit la fonction d'appel de la fonction de perte définit sur matlab qui prend en entrée des tenseurs statiques
def multiQWL_cost_function_python(x_batch_tf_stat, y_pred_tf_stat, air_matlab, w_matlab):
    x_batch_np = x_batch_tf_stat.numpy()  # Cette opération devrait être effectuée ici
    y_batch_np = y_pred_tf_stat.numpy()
    
    # Convertir en format MATLAB
    x_batch_matlab = matlab.double(x_batch_np.tolist())
    y_batch_matlab = matlab.double(y_batch_np.tolist())
    
    # Appeler la fonction MATLAB pour calculer la perte
    loss = eng.multiQWL_cost_function(x_batch_matlab, y_batch_matlab, air_matlab, w_matlab)
    print("loss", loss)
    return loss

In [52]:
# On définit les hyperparamètres nécessaires à l'apprentissage
input_dim = x_train.shape[1]  # Dimension de l'entrée (nombre de colonnes)
output_dim = y_train.shape[1] # Dimension de la sortie (nombre de colonnes)
hidden_units = 10  # Nombre de neurones dans la couche cachée
learning_rate = 0.01  # Taux d'apprentissage
epochs = 5  # Nombre maximum d'itérations
goal = 1e-5  # Seuil d'erreur (utilisé pour l'early stopping)
max_fail = 6  # Nombre maximum d'échecs de validation (patience pour l'early stopping)

In [53]:
output_dim

8

In [54]:
# On crée et compile le modèle personnalisé
model = CustomModel(hidden_units, input_dim, output_dim, air_matlab, w_matlab)
model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = learning_rate))

In [55]:
# On implémente un callback d'early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(monitor = 'loss', patience = max_fail, min_delta = goal)

# On crée et on compile le modèle personnalisé
history = model.fit(trainset, epochs = epochs, callbacks = [early_stopping])

# On affiche le suivi de l'apprentissage
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

Epoch 1/5
Shape of x_batch: (None, 7)
Shape of y_batch: (None, 8)
Shape of inputs: (None, 7)
Shape after input layer: (None, 7)
Shape after dense1: (None, 10)
Shape after dense2: (None, 10)
Shape after dense3: (None, 10)
Shape after dense4: (None, 10)
Shape of output: (None, 8)
Shape of y_pred: (None, 8)
Gradients: [<tf.Tensor 'gradient_tape/dense_35_1/MatMul/MatMul:0' shape=(7, 7) dtype=float32>, <tf.Tensor 'gradient_tape/dense_35_1/Add/Reshape:0' shape=(7,) dtype=float32>, <tf.Tensor 'gradient_tape/dense_36_1/MatMul/MatMul_1:0' shape=(7, 10) dtype=float32>, <tf.Tensor 'gradient_tape/dense_36_1/Add/Reshape:0' shape=(10,) dtype=float32>, <tf.Tensor 'gradient_tape/dense_37_1/MatMul/MatMul_1:0' shape=(10, 10) dtype=float32>, <tf.Tensor 'gradient_tape/dense_37_1/Add/Reshape:0' shape=(10,) dtype=float32>, <tf.Tensor 'gradient_tape/dense_38_1/MatMul/MatMul_1:0' shape=(10, 10) dtype=float32>, <tf.Tensor 'gradient_tape/dense_38_1/Add/Reshape:0' shape=(10,) dtype=float32>, <tf.Tensor 'gradient

InvalidArgumentError: Graph execution error:

Detected at node StatefulPartitionedCall/gradient_tape/dense_40_1/MatMul/MatMul_1 defined at (most recent call last):
<stack traces unavailable>
In[1] ndims must be >= 2
	 [[{{node StatefulPartitionedCall/gradient_tape/dense_40_1/MatMul/MatMul_1}}]] [Op:__inference_one_step_on_iterator_13080]

In [None]:
# On évalue le modèle sur le testset
loss = model.evaluate(testset)
print("Test Loss:", loss.numpy())