In [564]:
import librosa
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, GlobalAveragePooling2D, Flatten, Concatenate, Softmax, Reshape, Lambda
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
import cv2
import moviepy.editor as mp
import os


In [565]:
# Split the video data in 2 seconds frames and audio
class VideoProcessor:
    def __init__(self, video_path):
        self.video_path = video_path
        self.output_path = 'preprocessed_data/' + video_path.split('/')[1]
    # Split the video into frames and save them in the output folder
    def split_video(self, split_duration=2):
        """Split the video into frames and save them as an object variable"""
        video = mp.VideoFileClip(self.video_path)
        # Split the video to 2 second clips
        for i in range(0, int(video.duration), split_duration):
            # If the last clip is shorter than 2 seconds, ignore it
            if i + split_duration > video.duration:
                break
            output = self.output_path + '/clip' + str(i)
            if not os.path.exists(output):
                os.makedirs(output)
            subclip = (video.subclip(i, i + split_duration))
            subclip.write_images_sequence(output + '/frame%0d.jpg')
        print('Video split into', int(split_duration), 'second clips and saved in', self.output_path)

    def split_audio(self, split_duration=2):
        """Split the audio from the video and save it in the output path"""
        video = mp.VideoFileClip(self.video_path)
        # Split the audio to 2 second clips
        for i in range(0, int(video.duration), split_duration):
            # If the last clip is shorter than 2 seconds, ignore it
            if i + split_duration > video.duration:
                break
            output = self.output_path + '/clip' + str(i)
            if not os.path.exists(output):
                os.makedirs(output)
            subclip = video.subclip(i, i + split_duration)
            subclip.audio.write_audiofile(output + '/audio.wav')
        print('Audio split into', int(split_duration), 'second clips and saved in', self.output_path)

In [566]:
# Audio Encoder
# 1. Prétraitement de l'audio
# max_length : longueur maximale des MFCCs, correspond à 2 secondes d'audio
def preprocess_audio(file_path, n_mfcc=40, max_length=173):
    # Charger l'audio
    audio, sr = librosa.load(file_path, sr=None)
    # Extraire les MFCCs
    mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc)
    # Transposer pour avoir la forme (timesteps, features)
    mfccs = mfccs.T
    # Troncature ou padding pour uniformiser la longueur
    if mfccs.shape[0] > max_length:
        mfccs = mfccs[:max_length, :]
    else:
        padding = max_length - mfccs.shape[0]
        mfccs = np.pad(mfccs, ((0, padding), (0, 0)), mode='constant')
    return mfccs
    # La sortie doit être de forme (None, timesteps, features)
    # return mfccs[np.newaxis, ...]

def AudioEncoder(input_shape=(1,173,40), encoded_dim=128):
    """
    Modèle d'encodage audio avec un LSTM.
    Arguments:
    - input_shape : forme de l'entrée audio (timesteps, features) ou (features).
    - encoded_dim : dimensions du vecteur encodé.
    """
    inputs = Input(shape=input_shape)

    # Adapter les dimensions si l'entrée est 2D (batch_size, features)
    if len(input_shape) == 1:
        reshaped = Reshape((1, input_shape[0]))(inputs)  # (batch_size, 1, features)
    else:
        reshaped = inputs

    # LSTM pour encoder
    x = LSTM(64, return_sequences=True)(reshaped)
    x = LSTM(64)(x)
    encoded = Dense(encoded_dim, activation='relu')(x)

    # Modèle
    return Model(inputs, encoded, name="AudioEncoder")


In [567]:
# Video Encoder
# 1. Prétraitement des frames
def preprocess_frames(frame_paths, target_size=(224, 224)):
    frames = []
    for path in frame_paths:
        # Charger l'image
        img = cv2.imread(path)
        # Redimensionner l'image
        img = cv2.resize(img, target_size)
        # Normalisation pour ResNet
        img = tf.keras.applications.resnet.preprocess_input(img)
        frames.append(img)
    return np.array(frames)
    # La sortie doit être de forme (None, height, width, channels)
    # Il faut donc ajouter une dimension pour le batch (None)
    # return np.array(frames)[np.newaxis, ...]

# 2. Création du modèle ResNet pour l'encodage
def VideoEncoder(output_dim=128, input_shape=(224, 224, 3)):
    base_model = ResNet50(weights="imagenet", include_top=False, input_shape=input_shape)
    for layer in base_model.layers:
        layer.trainable = False  # Geler les poids du modèle pré-entraîné
    
    # Ajouter des couches pour obtenir un vecteur encodé
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(output_dim, activation='relu')(x)
    
    # Construire le modèle
    model = Model(inputs=base_model.input, outputs=x, name="VideoEncoder")
    return model

In [568]:
# MLP encoder
# 1. Création du MLP Encoder
def MLPEncoder(audio_dim=128, frame_dim=128, num_frames=60, output_dim=128):
    # Entrée pour le vecteur audio
    audio_input = Input(shape=(audio_dim,), name="audio_input")
    audio_dense = Dense(64, activation="relu")(audio_input)

    # Entrée pour les vecteurs des frames
    # ajouter une dimension batch au début 
    frames_input = Input(shape=(num_frames, frame_dim), name="frames_input")
    frames_flattened = Flatten()(frames_input)  # Aplatir les frames (60x128 -> 7680)
    frames_dense = Dense(256, activation="relu")(frames_flattened)

    # Fusionner les deux entrées
    merged = Concatenate()([audio_dense, frames_dense])  # Fusionner (64 + 256 = 320)

    # Passages dans des couches MLP pour obtenir le vecteur utilisateur
    x = Dense(256, activation="relu")(merged)
    x = Dense(128, activation="relu")(x)
    user_vector = Dense(output_dim, activation="relu", name="user_vector")(x)

    # Modèle
    model = Model(inputs=[audio_input, frames_input], outputs=user_vector, name="MLPEncoder")
    return model

In [569]:
def Caracterizer(audio_dim=128, frame_dim=128, num_frames=60, user_dim=128):
    # Entrées
    # audio input shape: (None, 173, 40)
    audio_input = Input(shape=(audio_dim,), name="audio_input")
    # frames input shape: (60, 224, 224, 3)
    frames_input = Input(shape=(num_frames, 224, 224, 3), name="frames_input")

    # définir les encodeurs
    audio_encoder = AudioEncoder((audio_dim,))
    video_encoder = VideoEncoder(output_dim=frame_dim)
    mlp_encoder = MLPEncoder(audio_dim=audio_dim, frame_dim=frame_dim, num_frames=num_frames, output_dim=user_dim)

    # Créer les encodeurs
    audio_encoded = audio_encoder(audio_input)
    print('audio_encoded shape:', audio_encoded.shape)
    # frames_input shape: (60, 224, 224, 3)
    frames_encoded_list = []
    for i in range(num_frames):
        frame = frames_input[:, i, :, :, :]
        frame_encoded = video_encoder(frame)
        frames_encoded_list.append(frame_encoded)
    # Utilisez une couche Lambda pour empiler les tensors le long de la dimension des frames
    frames_encoded = Lambda(lambda x: tf.stack(x, axis=1), name="frames_concatenation")(frames_encoded_list)
    # ajouter une dimension batch au début afin d'avoir la forme (None, 60, 128)
    frames_encoded = Reshape((num_frames, frame_dim))(frames_encoded)
    
    # frames_encoded shape: (None, 60, 128)
    print('frames_encoded shape:', frames_encoded.shape)

    # frames_encoded = video_encoder(frames_input)
    print('frames_encoded shape:', frames_encoded.shape)
    # MLP Encoder input shape: None, 60, 128
    user_vector = mlp_encoder([audio_encoded, frames_encoded])

    # Modèle final
    model = Model(inputs=[audio_input, frames_input], outputs=user_vector)
    return model


In [570]:
# MLP Decoder
def MLPDecoder(input_dim=128, num_users=10):
    """
    Crée un modèle MLP Decoder prenant en entrée un vecteur utilisateur (user_vector)
    et renvoyant la probabilité d'appartenance à un utilisateur via une couche softmax.
    
    Arguments :
    - input_dim : Dimension du vecteur utilisateur en entrée.
    - num_users : Nombre maximal d'utilisateurs (#usersmax).
    
    Retour :
    - Modèle Keras du MLP Decoder.
    """
    # Entrée : vecteur utilisateur
    user_vector_input = Input(shape=(input_dim,), name="user_vector_input")
    
    # Couches Dense avec activations ReLU
    x = Dense(128, activation="relu")(user_vector_input)
    x = Dense(64, activation="relu")(x)
    x = Dense(32, activation="relu")(x)
    x = Dense(num_users, activation="relu")(x)  # Dernière couche dense
    
    # Couche Softmax pour obtenir les probabilités
    output = Softmax(name="user_probabilities")(x)
    
    # Modèle
    model = Model(inputs=user_vector_input, outputs=output, name="MLP_Decoder")
    return model


In [571]:
# 3. Exemple d'utilisation pour le Characterizer (l'audio + la vidéo + le MLP Encoder)
audio_path = 'C:/Users/yass_/OneDrive - HESSO/MASTER/MachLeData/MachLeData_Project/model/preprocessed_data/Yann_Zurbrugg/clip0/audio.wav'  # Remplacez par le chemin de votre fichier audio
# Chemins des frames
frame_paths = [f"C:/Users/yass_/OneDrive - HESSO/MASTER/MachLeData/MachLeData_Project/model/preprocessed_data/Yann_Zurbrugg/clip0/frame{i}.jpg" for i in range(60)]  # Remplacez par vos chemins de frames

# Prétraitement
audio = preprocess_audio(audio_path)
frames = preprocess_frames(frame_paths)
print("Audio input shape:", audio.shape)
print("Frames input shape:", frames.shape)

# Création du modèle
caracterizer = Caracterizer()

# Résumé du modèle
caracterizer.summary()

# Prédiction
user_vector = caracterizer.predict([audio, frames])
print("User vector shape:", user_vector.shape)

Audio input shape: (173, 40)
Frames input shape: (60, 224, 224, 3)
audio_encoded shape: (None, 128)
frames_encoded shape: (None, 60, 128)
frames_encoded shape: (None, 60, 128)


ValueError: Data cardinality is ambiguous. Make sure all arrays contain the same number of samples.'x' sizes: 173, 60


In [572]:
# Execute the video processor to extract the frames and audio
# Define data path
video_path = 'data/Yann_Zurbrugg/1.mp4'
# Create the video processor
video_processor = VideoProcessor(video_path)
# Split the video into frames and audio
video_processor.split_video()
video_processor.split_audio()

Moviepy - Writing frames preprocessed_data/Yann_Zurbrugg/clip0/frame%0d.jpg.
Epoch 0:   0%|          | 0/3 [3:34:30<?, ?it/s]



Moviepy - Done writing frames preprocessed_data/Yann_Zurbrugg/clip0/frame%0d.jpg.
Epoch 0:   0%|          | 0/3 [3:34:31<?, ?it/s]Video split into 2 second clips and saved in preprocessed_data/Yann_Zurbrugg
MoviePy - Writing audio in preprocessed_data/Yann_Zurbrugg/clip0/audio.wav
Epoch 0:   0%|          | 0/3 [3:34:31<?, ?it/s]



MoviePy - Done.                                 
Epoch 0:   0%|          | 0/3 [3:34:31<?, ?it/s]Audio split into 2 second clips and saved in preprocessed_data/Yann_Zurbrugg


In [573]:
# 3. Exemple d'utilisation pour le Characterizer (l'audio + la vidéo + le MLP Encoder)
audio_path = 'C:/Users/yass_/OneDrive - HESSO/MASTER/MachLeData/MachLeData_Project/model/preprocessed_data/Yann_Zurbrugg/clip0/audio.wav'  # Remplacez par le chemin de votre fichier audio
# Chemins des frames
frame_paths = [f"C:/Users/yass_/OneDrive - HESSO/MASTER/MachLeData/MachLeData_Project/model/preprocessed_data/Yann_Zurbrugg/clip0/frame{i}.jpg" for i in range(60)]  # Remplacez par vos chemins de frames

# Prétraiter le fichier audio
mfccs = preprocess_audio(audio_path)
# Prétraiter les frames
frames = preprocess_frames(frame_paths)

# Créer le modèle
input_shape = (mfccs.shape[0], mfccs.shape[1])
audio_encoder = AudioEncoder(input_shape)
video_encoder = VideoEncoder(output_dim=128)
mlp_encoder = MLPEncoder(audio_dim=128, frame_dim=128, num_frames=60, output_dim=128)

# Résumer les modèles
# audio_encoder.summary()
# video_encoder.summary()
# mlp_encoder.summary()

# Passer les données dans le modèle
mfccs = np.expand_dims(mfccs, axis=0)  # Ajouter une dimension batch
audio_vector = audio_encoder.predict(mfccs)
frames_vectors = video_encoder.predict(frames)
# Ajouter une dimension batch pour les frames, cela est important pour que le MLP traite les frames comme une séquence et non pas comme 60 images indépendantes
frames_vectors = np.expand_dims(frames_vectors, axis=0)
user_vector = mlp_encoder.predict([audio_vector, frames_vectors])

print("Vecteur audio encodé:", audio_vector)
print("Forme des vecteurs vidéo encodés :", frames_vectors.shape)  # (60, 128)
print("Vecteur utilisateur :", user_vector)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 458ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 8s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 100ms/step
Vecteur audio encodé: [[0.08698702 0.         0.         0.17423376 0.         0.14290261
  0.         0.         0.27892783 0.01799424 0.1806303  0.
  0.02438486 0.         0.         0.         0.01730265 0.16722848
  0.24810827 0.         0.11134604 0.11064702 0.05901519 0.
  0.         0.         0.07044807 0.         0.12469877 0.
  0.         0.         0.1874195  0.04758587 0.26992327 0.
  0.09297589 0.0032363  0.         0.         0.14817253 0.10058922
  0.         0.01922811 0.         0.         0.07943948 0.0946884
  0.09998079 0.         0.35954946 0.02706683 0.0971486  0.
  0.         0.         0.10614376 0.         0.         0.09887897
  0.0193208  0.06367833 0.         0.09263084 0.         0.
  0.         0.         0.30480975 0.3054374  0.5729572  0.145

In [574]:
# Exemple d'utilisation pour le MLP Decoder
num_users = 5  # Nombre maximal d'utilisateurs
decoder = MLPDecoder(input_dim=128, num_users=num_users)

# Résumé du modèle
decoder.summary()

# Prédire les probabilités pour chaque utilisateur
user_probabilities = decoder.predict(user_vector)
print("Probabilités utilisateur :", user_probabilities)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 82ms/step
Probabilités utilisateur : [[0.25819477 0.21578553 0.18498316 0.16931407 0.17172246]]
