In [313]:
import numpy as np
import pandas as pd
import tensorflow as tf
import glob
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.utils import compute_class_weight
from collections import Counter

Get the input and label files from CSVs

In [339]:
def filter_sequences(x_sequences, y_sequences, model):
    filtered_x_sequences = []
    filtered_y_sequences = []
    if model == 'Model I':
        for x_seq, y_seq in zip(x_sequences, y_sequences):
                if np.any(y_seq[:, 1:] != 0) or (np.sum(y_seq[:, 0] != 0) < 2 and np.any(y_seq[:, 0] != 0)):
                    filtered_x_sequences.append(x_seq)
                    filtered_y_sequences.append(y_seq)
    else:
        for x_seq, y_seq in zip(x_sequences, y_sequences):
                if np.any(y_seq[:, 1:] != 0):
                    filtered_x_sequences.append(x_seq)
                    filtered_y_sequences.append(y_seq)
            
    return filtered_x_sequences,filtered_y_sequences
    

In [340]:
def get_train_test_splitted_data(label_files, input_files, global_mean, global_std, test_size=0.2, random_state=42):
    # Initialize lists to hold all sequences
    all_x_sequences = []
    all_y_sequences = []
    # Process each pair of input and label files
    for input_file, label_file in zip(input_files, label_files):
        # Load data
        input_df = pd.read_csv(input_file)
        if label_file.endswith('BORIS_method_II.csv'):
            model = 'Model II'
            label_df = pd.read_csv(label_file)
            labels = label_df.values / 100
        else:
            model = 'Model I'
            label_df = pd.read_csv(label_file, dtype=str, na_values=[])   
            column_names = ['Happy', 'Sad', 'Scared', 'Disgusted', 'Surprised', 'Angry']

            # Create a OneHotEncoder with predefined categories
            encoder = OneHotEncoder(categories=[column_names], handle_unknown='ignore')

            # Fit and transform the label data
            labels = pd.DataFrame(
                encoder.fit_transform(label_df).toarray(),
                columns=encoder.get_feature_names_out()
            )
        # Prepare features and labels
        features = (input_df.values - global_mean) / global_std
        
        # Ensure alignment of frames
        if features.shape[0] != labels.shape[0]:
            print(f"Mismatch in frames: {input_file}, {label_file}")
            continue
            
        # Sample sequences
        x_sequences, y_sequences = create_sequences(features, labels, SEQUENCE_LENGTH, STRIDE)
        filtered_x_sequences, filtered_y_sequences = filter_sequences(x_sequences, y_sequences, model)
        
        # Append to global lists
        if filtered_x_sequences and filtered_y_sequences:
            all_x_sequences.append(filtered_x_sequences)
            all_y_sequences.append(filtered_y_sequences)
            
    all_x_sequences = np.concatenate(all_x_sequences, axis=0)
    all_y_sequences = np.concatenate(all_y_sequences, axis=0)
    # Split into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(
        all_x_sequences, all_y_sequences, test_size=test_size, random_state=random_state
    )

    # Convert to TensorFlow datasets
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))

    # Shuffle, batch, and prefetch
    train_dataset = train_dataset.shuffle(buffer_size=10000).batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)
    test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)

    return train_dataset, test_dataset

In [221]:
def count_method_I(all_y_sequences, emotion_labels):
    label_counts = Counter()
    
    # Loop through each y_sequence in all_y_sequences
    for y_sequence in all_y_sequences:
        # Loop through each label_vector (timestamp) in the sequence
        for label_vector in y_sequence:
            # Check if all values are zero (i.e., no emotion)
            if np.all(label_vector == 0):
                # Count as 'Neutral' if all values are zero
                label_counts['Neutral'] += 1
            else:
                # Find the label corresponding to the max value (one-hot or normalized)
                label_index = np.argmax(label_vector)
                label_counts[emotion_labels[label_index]] += 1
    
    # Compute the total number of labels
    total_labels = sum(label_counts.values())
    
    # Convert the Counter to a DataFrame
    df = pd.DataFrame.from_dict(label_counts, orient='index', columns=['Count'])
    
    # Add a new column for percentage
    df['Percentage'] = (df['Count'] / total_labels * 100).round(2).astype(str) + '%'  # Format as percentage
    
    # Reset the index to have labels as a column
    df = df.reset_index().rename(columns={'index': 'Label'})
    
    return df

In [308]:
def count_method_II(all_y_sequences):
    
    combined_df = pd.DataFrame(all_y_sequences)

    emotion_labels = ['Happy', 'Sad', 'Disgusted', 'Surprised', 'Angry', 'Scared']
    
    # Calculate the number of rows where each emotion is non-zero
    non_zero_counts = (combined_df != 0).sum()  # Counts of non-zero values per column
    total_rows = len(combined_df)  # Total number of rows
    print(total_rows)
    # Calculate the "Neutral" count (rows where all values are zero)
    neutral_count = (combined_df.sum(axis=1) == 0).sum()
    
    # Add the "Neutral" category to the counts
    non_zero_counts["Neutral"] = neutral_count
    
    result_df = pd.DataFrame(non_zero_counts.items(), columns=['Label', 'Count'])
    # Calculate the percentage for each label
    result_df['Percentage'] = ((result_df['Count'] / total_rows) * 100).round(2).astype(str) + '%'

        # Create a mapping from numbers to emotion labels
    label_mapping = {i: emotion_labels[i] for i in range(len(emotion_labels))}
    
    # Replace the numerical labels with the corresponding emotion labels
    result_df['Label'] = result_df['Label'].replace(label_mapping)
    
    # Display the result
    return(result_df)

In [12]:
# Constants
SEQUENCE_LENGTH = 20
STRIDE = 10
BATCH_SIZE = 32
INPUT_DIM = 515  # Number of features per frame (e.g., biosignals + embeddings)
OUTPUT_DIM = 6 

In [314]:
# Helper function to create random sequences
def create_sequences(features, labels, sequence_length, stride):
    x_sequences, y_sequences = [], []
    for i in range(0, len(features) - sequence_length + 1, stride):
        x_sequences.append(features[i:i + sequence_length])
        y_sequences.append(labels[i:i + sequence_length])
    return np.array(x_sequences), np.array(y_sequences)

# Initialize lists to hold all sequences
all_x_sequences = []
all_y_sequences = []
all_features = []

sources = ["GUT", "ITU-YU", "MAAP"]
base_path = "//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/"
input_files, label_files = [], []

for source in sources:
    input_files.extend(glob.glob(os.path.join(base_path, source, '*_input.csv')))
    label_files.extend(glob.glob(os.path.join(base_path, source, '*_BORIS.csv')))

input_files.sort()
label_files.sort()

i = 0
for input_file in input_files:
    input_df = pd.read_csv(input_file)
    all_features.append(input_df.values)

# Concatenate all features from all files to compute global mean and std
all_features = np.concatenate(all_features, axis=0)

num_columns = all_features.shape[1]
global_mean_zero = np.zeros(num_columns)
global_std_zero = np.zeros(num_columns)

# Compute column-wise statistics ignoring zeros
for col in range(num_columns):
    non_zero_col = all_features[:, col][all_features[:, col] != 0]  # Filter non-zero values
    if non_zero_col.size > 0:  # Avoid empty arrays
        global_mean_zero[col] = np.mean(non_zero_col)
        global_std_zero[col] = np.std(non_zero_col, ddof=1)  # Use sample std dev
    else:
        global_mean_zero[col] = 0  # Default if no non-zero elements
        global_std_zero[col] = 0

# Output results as ndarray
global_mean = np.array(global_mean_zero)
global_std = np.array(global_std_zero)


In [341]:
GUT_path_input = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/GUT/*_input.csv'))
ITU_YU_path_input = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/ITU-YU/*_input.csv'))
MAAP_path_input = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/MAAP/*_input.csv'))

GUT_path_label_method_I = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/GUT/*_BORIS_method_I.csv'))
ITU_YU_path_label_method_I = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/ITU-YU/*_BORIS_method_I.csv'))
MAAP_path_label_method_I = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/MAAP/*_BORIS_method_I.csv'))

GUT_path_label_method_II = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/GUT/*_BORIS_method_II.csv'))
ITU_YU_path_label_method_II = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/ITU-YU/*_BORIS_method_II.csv'))
MAAP_path_label_method_II = sorted(glob.glob('//153.19.52.107/emboa/IO3-sessions/NEW STRUCTURE/de-earlyfusionthesis/MAAP/*_BORIS_method_II.csv'))

GUT_train_method_I, GUT_test_method_I = get_train_test_splitted_data(GUT_path_label_method_I, GUT_path_input, global_mean, global_std, test_size=0.3)
ITU_YU_train_method_I, ITU_YU_test_method_I = get_train_test_splitted_data(ITU_YU_path_label_method_I, ITU_YU_path_input, global_mean, global_std, test_size=0.3)
MAAP_train_method_I, MAAP_test_method_I = get_train_test_splitted_data(MAAP_path_label_method_I, MAAP_path_input, global_mean, global_std, test_size=0.3)

GUT_train_method_II, GUT_test_method_II = get_train_test_splitted_data(GUT_path_label_method_II, GUT_path_input, global_mean, global_std, test_size=0.3)
ITU_YU_train_method_II, ITU_YU_test_method_II = get_train_test_splitted_data(ITU_YU_path_label_method_II, ITU_YU_path_input, global_mean, global_std, test_size=0.3)
MAAP_train_method_II, MAAP_test_method_II = get_train_test_splitted_data(MAAP_path_label_method_II, MAAP_path_input, global_mean, global_std, test_size=0.3)

dataset_method_I = GUT_train_method_I.concatenate(ITU_YU_train_method_I).concatenate(MAAP_train_method_I)
dataset_method_II = GUT_train_method_II.concatenate(ITU_YU_train_method_II).concatenate(MAAP_train_method_II)

In [16]:
import pandas as pd
import numpy as np

all_features = []

for input_file in input_files:
    input_df = pd.read_csv(input_file)
    
    # Check if the DataFrame contains any NaN
    if input_df.isnull().values.any():
        print(f"NaN found in file: {input_file}")
    
    all_features.append(input_df.values)

In [13]:
# Zakomentowane bo się cykam
# path = r'S:\IO3-sessions\NEW STRUCTURE\de-earlyfusionthesis\Datasets'
# 
# tf.data.Dataset.save(dataset_method_I ,os.path.join(path, 'train_dataset_method_I'))
# tf.data.Dataset.save(GUT_train_method_I, os.path.join(path, 'GUT_train_method_I'))
# tf.data.Dataset.save(GUT_test_method_I, os.path.join(path, 'GUT_test_method_I'))
# tf.data.Dataset.save(ITU_YU_train_method_I, os.path.join(path, 'ITU_YU_train_method_I'))
# tf.data.Dataset.save(ITU_YU_test_method_I, os.path.join(path, 'ITU_YU_test_method_I'))
# tf.data.Dataset.save(MAAP_train_method_I, os.path.join(path, 'MAAP_train_method_I'))
# tf.data.Dataset.save(MAAP_test_method_I, os.path.join(path, 'MAAP_test_method_I'))
# 
# tf.data.Dataset.save(dataset_method_II ,os.path.join(path, 'train_dataset_method_II'))
# tf.data.Dataset.save(GUT_train_method_II, os.path.join(path, 'GUT_train_method_II'))
# tf.data.Dataset.save(GUT_test_method_II, os.path.join(path, 'GUT_test_method_II'))
# tf.data.Dataset.save(ITU_YU_train_method_II, os.path.join(path, 'ITU_YU_train_method_II'))
# tf.data.Dataset.save(ITU_YU_test_method_II, os.path.join(path, 'ITU_YU_test_method_II'))
# tf.data.Dataset.save(MAAP_train_method_II, os.path.join(path, 'MAAP_train_method_II'))
# tf.data.Dataset.save(MAAP_test_method_II, os.path.join(path, 'MAAP_test_method_II'))

In [342]:
path = r'S:\IO3-sessions\NEW STRUCTURE\de-earlyfusionthesis\Datasets'

tf.data.Dataset.save(dataset_method_I ,os.path.join(path, 'train_dataset_method_I_balanced'))
tf.data.Dataset.save(GUT_train_method_I, os.path.join(path, 'GUT_train_method_I_balanced'))
tf.data.Dataset.save(GUT_test_method_I, os.path.join(path, 'GUT_test_method_I_balanced'))
tf.data.Dataset.save(ITU_YU_train_method_I, os.path.join(path, 'ITU_YU_train_method_I_balanced'))
tf.data.Dataset.save(ITU_YU_test_method_I, os.path.join(path, 'ITU_YU_test_method_I_balanced'))
tf.data.Dataset.save(MAAP_train_method_I, os.path.join(path, 'MAAP_train_method_I_balanced'))
tf.data.Dataset.save(MAAP_test_method_I, os.path.join(path, 'MAAP_test_method_I_balanced'))

tf.data.Dataset.save(dataset_method_II ,os.path.join(path, 'train_dataset_method_II_balanced'))
tf.data.Dataset.save(GUT_train_method_II, os.path.join(path, 'GUT_train_method_II_balanced'))
tf.data.Dataset.save(GUT_test_method_II, os.path.join(path, 'GUT_test_method_II_balanced'))
tf.data.Dataset.save(ITU_YU_train_method_II, os.path.join(path, 'ITU_YU_train_method_II_balanced'))
tf.data.Dataset.save(ITU_YU_test_method_II, os.path.join(path, 'ITU_YU_test_method_II_balanced'))
tf.data.Dataset.save(MAAP_train_method_II, os.path.join(path, 'MAAP_train_method_II_balanced'))
tf.data.Dataset.save(MAAP_test_method_II, os.path.join(path, 'MAAP_test_method_II_balanced'))

In [343]:
path = r'S:\IO3-sessions\NEW STRUCTURE\de-earlyfusionthesis\Models'

## MODEL I

In [344]:
model_I = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, INPUT_DIM)),    # Input shape: (sequence_length, features)
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)),  # LSTM layer to capture temporal patterns
    tf.keras.layers.Dense(32, activation='relu'),                 # Dense layer to reduce dimensionality
    tf.keras.layers.Dense(OUTPUT_DIM, activation='softmax')       # Output layer with sigmoid for continuous values between 0 and 1
])

In [15]:
def custom_masking(inputs):
    # Create a mask where any feature being 0 masks the entire time step
    mask = tf.reduce_all(tf.not_equal(inputs, 0.0), axis=-1)  # True if no feature is 0
    return tf.where(mask[:, :, tf.newaxis], inputs, tf.zeros_like(inputs))

# Model with custom masking
model_I = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, INPUT_DIM)),   # Input shape
    tf.keras.layers.Lambda(custom_masking),                      # Custom masking layer
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)),  # Bidirectional LSTM
    tf.keras.layers.Dense(32, activation='relu'),                # Dense layer to reduce dimensionality
    tf.keras.layers.Dense(OUTPUT_DIM, activation='softmax')      # Output layer with softmax for class probabilities
])

In [345]:
model_I.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [118]:
# Iterate through the dataset to extract labels
labels = []

for inputs, label in dataset_method_I:
    labels.append(label)

# Convert to a tensor or numpy array
y_train = np.concatenate(labels, axis=0)  # Stack labels into a single array
y_train_flat = y_train.reshape(-1, 6)  # Flattening the labels: (num_samples * 10, 6)
y_train_classes = np.argmax(y_train_flat, axis=-1)  # Get the class labels for each timestep (0 to 5)

# Calculate class weights based on the frequency of each class
class_weights = compute_class_weight('balanced', classes=np.unique(y_train_classes), y=y_train_classes)

# Convert class_weights into a dictionary format for fit() function
class_weight_dict = {i: class_weights[i] for i in range(len(class_weights))}

In [119]:
print(class_weights)

[1.69060938e-01 2.54473304e+01 2.90288066e+01 5.34393939e+02
 2.02701149e+02 2.26089744e+02]


In [120]:
print(class_weight_dict)

{0: 0.1690609376722605, 1: 25.447330447330447, 2: 29.02880658436214, 3: 534.3939393939394, 4: 202.70114942528735, 5: 226.0897435897436}


In [346]:
history = model_I.fit(dataset_method_I, epochs=50, batch_size=BATCH_SIZE)

Epoch 1/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 19ms/step - accuracy: 0.5008 - loss: 0.3837
Epoch 2/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step - accuracy: 0.1967 - loss: 0.3447
Epoch 3/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 14ms/step - accuracy: 0.1297 - loss: 0.3384
Epoch 4/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step - accuracy: 0.1577 - loss: 0.3030
Epoch 5/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - accuracy: 0.1169 - loss: 0.3111
Epoch 6/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - accuracy: 0.1272 - loss: 0.3336
Epoch 7/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step - accuracy: 0.0789 - loss: 0.3335
Epoch 8/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step - accuracy: 0.0410 - loss: 0.3431
Epoch 9/50
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━

In [347]:
model_I.summary()

In [348]:
# Evaluate the model
loss, acc = model_I.evaluate(MAAP_test_method_I, verbose=2)
print("Untrained model, accuracy: {:5.2f}%".format(acc))

3/3 - 1s - 468ms/step - accuracy: 0.0736 - loss: 0.4225
Untrained model, accuracy:  0.07%


In [349]:
model_I.save(os.path.join(path, 'model_method_I/model.keras'))

In [115]:
for inputs, labels in dataset_method_I.take(1):  # Take a small batch
    preds = model_I.predict(inputs)
    print("Predictions:", preds)
    print("Ground Truth:", labels)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
Predictions: [[[0.18423472 0.18805367 0.15358369 0.15975992 0.14348555 0.17088243]
  [0.18988968 0.13390423 0.15667434 0.17703208 0.17288329 0.16961627]
  [0.15876924 0.12157965 0.1510373  0.21069701 0.19562545 0.16229136]
  ...
  [0.2360522  0.11189648 0.14936432 0.21476454 0.17604093 0.11188145]
  [0.19055042 0.13608056 0.1481003  0.2175018  0.17827131 0.1294956 ]
  [0.1851553  0.13742118 0.17442626 0.21284981 0.16557454 0.1245729 ]]

 [[0.12943289 0.15690541 0.24211912 0.20530048 0.16510516 0.10113697]
  [0.1359072  0.12828828 0.29404885 0.22961743 0.13318458 0.07895364]
  [0.15133195 0.1261013  0.30664647 0.21946089 0.11806839 0.07839099]
  ...
  [0.16341293 0.13049836 0.26789552 0.14440988 0.19457927 0.099204  ]
  [0.16080208 0.12511742 0.27074748 0.20283565 0.16643985 0.07405749]
  [0.2062359  0.11890065 0.2678376  0.18631993 0.14456865 0.07613719]]

 [[0.16589867 0.12919189 0.14554487 0.19906464 0.21806522 0.1

## MODEL II

In [333]:
model_II = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, INPUT_DIM)),    # Input shape: (sequence_length, features)
    tf.keras.layers.LSTM(64, return_sequences=True),              # LSTM layer to capture temporal patterns
    tf.keras.layers.Dense(32, activation='relu'),                 # Dense layer to reduce dimensionality
    tf.keras.layers.Dense(OUTPUT_DIM, activation='sigmoid')       # Output layer with sigmoid for continuous values between 0 and 1
])

In [None]:
def mask_timestep(inputs):
    # Mask timesteps where any feature is 0
    # tf.reduce_any checks if any feature in the timestep is 0
    mask = tf.reduce_any(tf.equal(inputs, 0.0), axis=-1)  # Create mask for timesteps with any 0
    return mask

# Input Layer
inputs = tf.keras.Input(shape=(SEQUENCE_LENGTH, INPUT_DIM))

# Apply the custom masking logic
mask = tf.keras.layers.Lambda(mask_timestep)(inputs)

# Apply the mask to the input sequence (this will zero out timesteps that have missing values)
masked_inputs = tf.keras.layers.Masking(mask_value=0.0)(inputs)

# LSTM layers to process the masked inputs
lstm_output = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True))(masked_inputs)

# Dense layers
dense_output = tf.keras.layers.Dense(32, activation='relu')(lstm_output)
output = tf.keras.layers.Dense(OUTPUT_DIM, activation='sigmoid')(dense_output)

# Build and compile the model
model_I = tf.keras.Model(inputs=inputs, outputs=output)

In [334]:
model_II.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])

In [335]:
history = model_II.fit(dataset_method_II, epochs=50)

Epoch 1/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 40ms/step - loss: 0.1417 - mae: 0.3394
Epoch 2/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 21ms/step - loss: 0.0313 - mae: 0.1330
Epoch 3/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 21ms/step - loss: 0.0199 - mae: 0.0854
Epoch 4/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - loss: 0.0186 - mae: 0.0752
Epoch 5/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 42ms/step - loss: 0.0172 - mae: 0.0691
Epoch 6/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 39ms/step - loss: 0.0170 - mae: 0.0683
Epoch 7/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 25ms/step - loss: 0.0160 - mae: 0.0657
Epoch 8/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - loss: 0.0162 - mae: 0.0648
Epoch 9/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 20ms/step - l

In [336]:
model_II.summary()

In [337]:
# Evaluate the model
loss, acc = model_II.evaluate(MAAP_test_method_II, verbose=2)
print("Untrained model, coherence: {:5.2f}%".format(100 * (1-acc)))

9/9 - 1s - 103ms/step - loss: 0.0179 - mae: 0.0647
Untrained model, coherence: 93.53%


In [338]:
model_II.save(os.path.join(path, 'model_method_II/model.keras'))

## Transformer

In [87]:
from tensorflow.keras import layers, Model
from tensorflow.keras.layers import Input, Dense, LayerNormalization, MultiHeadAttention, Dropout

In [102]:
def get_positional_encoding(sequence_length, input_dim):
    """
    Generate a positional encoding matrix for the transformer model.
    This encoding is added to the input data to give the model an understanding of the order of the sequence.
    """
    position = np.arange(sequence_length)[:, np.newaxis]  # Shape: (sequence_length, 1)
    div_term = np.exp(np.arange(0, input_dim, 2) * -(np.log(10000.0) / input_dim))  # Shape: (input_dim // 2,)
    
    pos_enc = np.zeros((sequence_length, input_dim))  # Shape: (sequence_length, input_dim)
    
    # Apply sine to even indices (0, 2, 4, ...)
    pos_enc[:, 0::2] = np.sin(position * div_term)  # Apply sine to even indices (0, 2, 4, ...)
    pos_enc[:, 1::2] = np.cos(position * div_term)
    # Apply cosine to odd indices (1, 3, 5, ...)
    if input_dim % 2 != 0:  # Even input_dim
        pos_enc[:, -1] = np.sin(position * div_term[-1])  # If odd, handle the last dimension separately

    return tf.constant(pos_enc, dtype=tf.float32)



In [103]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout_rate):
    # Multi-Head Self Attention Layer
    attention_output = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size, dropout=dropout_rate)(inputs, inputs)
    
    # Skip connection and normalization
    x = tf.keras.layers.Add()([inputs, attention_output])
    x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
    
    # Feed-forward network
    ffn_output = tf.keras.layers.Dense(ff_dim, activation='relu')(x)
    ffn_output = tf.keras.layers.Dense(inputs.shape[-1])(ffn_output)
    
    # Skip connection and normalization
    x = tf.keras.layers.Add()([x, ffn_output])
    x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
    
    return x

In [104]:
def build_transformer_model(sequence_length, input_dim, output_dim, num_heads=8, ff_dim=128, num_layers=4, dropout_rate=0.1):
    inputs = tf.keras.Input(shape=(sequence_length, input_dim))  # Input shape
    
    # Add positional encoding to inputs
    pos_encoding = get_positional_encoding(sequence_length, input_dim)
    x = tf.keras.layers.Add()([inputs, pos_encoding])  # Adding positional encoding to input
    
    # Pass through multiple Transformer encoder blocks
    for _ in range(num_layers):
        x = transformer_encoder(x, head_size=input_dim, num_heads=num_heads, ff_dim=ff_dim, dropout_rate=dropout_rate)
    
    # Global Average Pooling
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    
    # Dense layers
    x = tf.keras.layers.Dense(64, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    output = tf.keras.layers.Dense(output_dim, activation='sigmoid')(x)  # Output layer with 6 emotions
    
    # Build the model
    model = tf.keras.Model(inputs=inputs, outputs=output)
    return model

In [105]:
# Dataset dimensions (example values)
sequence_length = 10  # Length of each sequence (e.g., 10 frames per video)
input_dim = 515  # Number of features per timestep (e.g., face embeddings with 515 features)
output_dim = 6  # Number of classes (emotions)

# Build the model
model = build_transformer_model(sequence_length, input_dim, output_dim)

ValueError: could not broadcast input array from shape (10,258) into shape (10,257)

In [None]:
# Summary of the model
model.summary()

# Assuming you have `X_train` and `y_train` loaded from your dataset
# Model training
history = model.fit(dataset_method_I,epochs=50, batch_size=32)