In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install keras-tuner
!pip install joblib
!pip install scikeras

In [None]:
import zipfile
import os

# Compressed file paths
zip_file_paths = [
    '/content/drive/MyDrive/Data/Facial_key_point_data_second.zip',
    '/content/drive/MyDrive/Data/val_data.zip',
    '/content/drive/MyDrive/Data/test_data.zip'
]

# Corresponding unzipped folder paths
extracted_folder_paths = [
    '/content/training_data',
    '/content/val_data',
    '/content/testing_data'
]

def unzip_file(zip_file, extract_to):
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Unzip the files
for zip_file, extracted_folder in zip(zip_file_paths, extracted_folder_paths):
    unzip_file(zip_file, extracted_folder)
    print(f"Decompression completed for {zip_file}!")

print("All decompressions completed!")

In [None]:
import os
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'

import json
import csv
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error

import tensorflow as tf
from tensorflow.keras.layers import (
    Input, Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization,
    LSTM, GRU, Bidirectional, MultiHeadAttention, LayerNormalization, GlobalAveragePooling1D, RepeatVector
)
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, LearningRateScheduler

# Suppress warnings and debug messages
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Function to retrieve all file names in the specified folder
def get_file_names(folder_path):
    file_names = []
    for entry in os.listdir(folder_path):
        full_path = os.path.join(folder_path, entry)
        if os.path.isfile(full_path):
            file_names.append(entry)
    return file_names

# Function to retrieve file names without their extension
def get_file_names_without_extension(folder_path, extension=".json"):
    file_names = []
    for entry in os.listdir(folder_path):
        if entry.endswith(extension):
            file_names.append(os.path.splitext(entry)[0])
    return file_names

# Load annotation data from CSV
def load_annotation_data(csv_path, file_names):
    annotation_dict = {}
    with open(csv_path, 'r') as csvfile:
        datareader = csv.reader(csvfile)
        data = list(datareader)

    for name in file_names:
        for row in data:
            if row[0] == name + '.mp4':
                annotation_dict[row[0]] = {
                    'extraversion': float(row[1]),
                    'neuroticism': float(row[2]),
                    'agreeableness': float(row[3]),
                    'conscientiousness': float(row[4]),
                    'openness': float(row[5])
                }
    return annotation_dict

In [None]:
# Load facial landmarks data from JSON files
def load_facial_landmarks_data(folder_path, file_list):
    data_list = []
    valid_file_list = []  # List to store files that have corresponding data
    for file_ in file_list:
        try:
            with open(os.path.join(folder_path, file_), 'r') as file:
                data = json.load(file)

            all_frames = []
            for frame in data:
                landmarks = sorted(frame['landmarks'], key=lambda k: k['index'])  # Ensure landmarks are sorted by index
                coordinates = [coord for landmark in landmarks for coord in (landmark['x'], landmark['y'], landmark['z'])]
                all_frames.append(coordinates)

            matrix = np.array(all_frames)
            if matrix.size > 0:
                data_list.append(matrix)
                valid_file_list.append(file_)
        except Exception as e:
            print(f"Error reading {file_}: {e}")
    return data_list, valid_file_list

In [None]:
# Function to preprocess the dataset
def preprocess_dataset(folder_path, csv_path, scaler=None):
    # Get the list of file names
    file_list = get_file_names(folder_path)
    print("Files found in the folder:", file_list)

    # Get the list of file names without extension
    file_names = get_file_names_without_extension(folder_path)

    # Load annotation data
    annotation_dict = load_annotation_data(csv_path, file_names)
    print("Annotation dictionary:", annotation_dict)

    # Load facial landmarks data
    data_list, valid_file_list = load_facial_landmarks_data(folder_path, file_list)
    print("Shape of the matrix:", len(data_list))

    label_list = []  # List to store labels

    for file_ in valid_file_list:
        try:
            # Get the file name without extension to match with dictionary keys
            file_name = os.path.splitext(file_)[0]
            # Fetch the labels for the corresponding file
            if file_name + '.mp4' in annotation_dict:
                labels = annotation_dict[file_name + '.mp4']
                label_list.append(labels)  # Append labels to the label list
            else:
                print(f"Warning: No labels found for {file_name}.mp4")
        except Exception as e:
            print(f"Error processing file {file_}: {e}")

    # Ensure consistent lengths of data_list and label_list
    assert len(data_list) == len(label_list), "Inconsistent lengths of data and labels"

    # Now data_list contains the matrices and label_list contains the corresponding labels
    print("First label:", label_list[0])

    # Convert labels list to a pandas DataFrame for easier manipulation
    labels_df = pd.DataFrame(label_list)

    # Plot the distribution of each personality trait
    fig, axes = plt.subplots(3, 2, figsize=(15, 15))
    traits = ['extraversion', 'neuroticism', 'agreeableness', 'conscientiousness', 'openness']
    for i, trait in enumerate(traits):
        row = i // 2
        col = i % 2
        sns.histplot(labels_df[trait], bins=30, kde=True, ax=axes[row, col])
        axes[row, col].set_title(f'Distribution of {trait}')
        axes[row, col].set_xlabel('Score')
        axes[row, col].set_ylabel('Frequency')

    # Remove the last subplot if there are only five traits
    fig.delaxes(axes[2][1])

    plt.tight_layout()
    plt.show()

    # Find the maximum length of any matrix in the list
    max_length = max(len(matrix) for matrix in data_list)

    # Assuming that each matrix has the same number of columns, find the number of columns of the first non-empty matrix
    for matrix in data_list:
        if matrix.shape[0] > 0:
            num_features = matrix.shape[1]
            break
    else:
        # If all matrices are empty, set a default number of columns
        num_features = 0

    # Normalize each matrix
    if scaler is None:
        scaler = StandardScaler()
        normalized_data_list = []
        for matrix in data_list:
            if matrix.shape[0] > 0:
                # Flatten the matrix, fit_transform, then reshape back
                original_shape = matrix.shape
                matrix = scaler.fit_transform(matrix.reshape(-1, matrix.shape[-1])).reshape(original_shape)
            normalized_data_list.append(matrix)
    else:
        normalized_data_list = []
        for matrix in data_list:
            if matrix.shape[0] > 0:
                original_shape = matrix.shape
                matrix = scaler.transform(matrix.reshape(-1, matrix.shape[-1])).reshape(original_shape)
            normalized_data_list.append(matrix)

    # Fill each matrix to the same number of frames (max_length)
    padded_data_list = []
    for matrix in normalized_data_list:
        if matrix.shape[0] == 0:
            # If the matrix is empty, create a zero matrix of shape (max_length, num_features)
            padded_matrix = np.zeros((max_length, num_features))
        else:
            padding_length = max_length - matrix.shape[0]
            padded_matrix = np.pad(matrix, ((0, padding_length), (0, 0)), 'constant', constant_values=0)
        padded_data_list.append(padded_matrix)

    # Convert lists to NumPy arrays
    padded_data_array = np.array(padded_data_list)
    labels_array = np.array([list(map(float, [labels['extraversion'], labels['neuroticism'], labels['agreeableness'], labels['conscientiousness'], labels['openness']])) for labels in label_list])

    return padded_data_array, labels_array, valid_file_list, scaler, max_length, num_features


In [None]:
# Preprocess training set
train_folder_path = '/content/training_data'
train_csv_path = '/content/drive/MyDrive/Data/annotation_training .csv'
X_train, y_train, train_file_list, scaler, max_length, num_features = preprocess_dataset(train_folder_path, train_csv_path)

# Preprocess validation set
val_folder_path = '/content/val_data'
val_csv_path = '/content/drive/MyDrive/Data/annotation_validation.csv'
X_val, y_val, val_file_list, _, _, _ = preprocess_dataset(val_folder_path, val_csv_path, scaler)

# Preprocess testing set
test_folder_path = '/content/testing_data'
test_csv_path = '/content/drive/MyDrive/Data/annotation_testing.csv'
X_test, y_test, test_file_list, _, _, _ = preprocess_dataset(test_folder_path, test_csv_path, scaler)

# Convert data to TensorFlow tensors
X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
X_val = tf.convert_to_tensor(X_val, dtype=tf.float32)
X_test = tf.convert_to_tensor(X_test, dtype=tf.float32)
y_train = tf.convert_to_tensor(y_train, dtype=tf.float32)
y_val = tf.convert_to_tensor(y_val, dtype=tf.float32)
y_test = tf.convert_to_tensor(y_test, dtype=tf.float32)

print("Preprocessing complete.")

In [None]:
import numpy as np
import tensorflow as tf
from sklearn.metrics import mean_absolute_error, mean_squared_error
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, LSTM, GRU, MultiHeadAttention, GlobalAveragePooling1D
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, LearningRateScheduler
import matplotlib.pyplot as plt
import pandas as pd
import itertools
import os
import csv

def plot_loss(history, model_name):
    plt.figure(figsize=(10, 6))
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'{model_name} Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

def lr_schedule(epoch):
    initial_lr = 0.001
    if epoch < 10:
        return initial_lr * (epoch + 1) / 10
    return initial_lr * 0.1 ** (epoch // 30)

def build_model(model_type, max_length, num_features, config):
    inputs = Input(shape=(max_length, num_features))

    if model_type == 'CNN':
        x = inputs
        for filters, kernel_size in config:
            x = Conv1D(filters=filters, kernel_size=kernel_size, activation='relu', kernel_regularizer=l2(0.01))(x)
            x = BatchNormalization()(x)
            x = MaxPooling1D(pool_size=2)(x)
            x = Dropout(0.5)(x)
        x = Flatten()(x)
    elif model_type == 'LSTM':
        x = inputs
        for units in config:
            x = LSTM(units=units, return_sequences=True, kernel_regularizer=l2(0.01))(x)
            x = Dropout(0.5)(x)
        x = Flatten()(x)
    elif model_type == 'TCN':
        x = inputs
        for filters, kernel_size in config:
            x = Conv1D(filters=filters, kernel_size=kernel_size, activation='relu', padding='causal', kernel_regularizer=l2(0.01))(x)
            x = Dropout(0.5)(x)
        x = Flatten()(x)
    elif model_type == 'Transformer':
        x = inputs
        for units in config:
            x = Dense(units, activation='relu', kernel_regularizer=l2(0.01))(x)
            attention_output = MultiHeadAttention(num_heads=1, key_dim=units)(x, x)
            x = BatchNormalization()(attention_output)
            x = Dropout(0.5)(x)
        x = GlobalAveragePooling1D()(x)
    elif model_type == 'GRU':
        x = inputs
        for units in config:
            x = GRU(units=units, return_sequences=True, kernel_regularizer=l2(0.01))(x)
            x = Dropout(0.5)(x)
        x = Flatten()(x)
    else:
        raise ValueError("Invalid model type")

    x = Dense(units=32, activation='relu', kernel_regularizer=l2(0.01))(x)
    x = Dropout(0.5)(x)
    outputs = Dense(units=5, activation='sigmoid')(x)  # Adjust output units to match the number of traits

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0003), loss='mean_absolute_error', metrics=['mae'])
    return model

def train_model(model, X_train, y_train, X_val, y_val, model_name):
    early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)  # Increased patience
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=0.00001)  # Increased patience
    lr_scheduler = LearningRateScheduler(lr_schedule)

    history = model.fit(X_train, y_train, epochs=200, batch_size=64, validation_data=(X_val, y_val),  # Increased epochs
                        callbacks=[early_stopping, reduce_lr, lr_scheduler])
    plot_loss(history, model_name)
    return model

# Load annotation data from CSV
def load_annotation_data(csv_path):
    annotation_dict = {}
    with open(csv_path, 'r') as csvfile:
        datareader = csv.reader(csvfile)
        next(datareader)  # Skip the header row
        for row in datareader:
            annotation_dict[row[0]] = {
                'extraversion': float(row[1]),
                'neuroticism': float(row[2]),
                'agreeableness': float(row[3]),
                'conscientiousness': float(row[4]),
                'openness': float(row[5])
            }
    return annotation_dict

# Function to retrieve file names without their extension
def get_file_names_without_extension(folder_path, extension=".json"):
    file_names = []
    for entry in os.listdir(folder_path):
        if entry.endswith(extension):
            file_names.append(os.path.splitext(entry)[0])
    return file_names

# Assuming X_train, X_val, X_test, y_train, y_val, y_test are already defined
# Define variables max_length and num_features based on your data
max_length = X_train.shape[1]
num_features = X_train.shape[2]

# Define four different configurations for each model type
configurations = {
    'CNN': [[(16, 3)], [(32, 3)], [(16, 3), (32, 3)], [(32, 3), (64, 3)]],
    'LSTM': [[16], [32], [16, 32], [32, 64]],
    'TCN': [[(16, 3)], [(32, 3)], [(16, 3), (32, 3)], [(32, 3), (64, 3)]],
    'Transformer': [[16], [32], [16, 32], [32, 64]],
    'GRU': [[16], [32], [16, 32], [32, 64]]
}

best_models = {}
best_model_mae = {}

# Train models with the defined configurations and select the best one for each type
for model_type in configurations:
    best_mae = float('inf')
    best_model = None
    print(f"\nTraining {model_type} models with different configurations...")

    for i, config in enumerate(configurations[model_type]):
        print(f"\nTraining {model_type} model with configuration {i+1}...")
        model = build_model(model_type, max_length, num_features, config)
        trained_model = train_model(model, X_train, y_train, X_val, y_val, f"{model_type}_config_{i+1}")
        trained_models[model_type] = trained_model

        # Evaluate the model on the validation set
        val_predictions = trained_model.predict(X_val)
        val_mae = mean_absolute_error(y_val, val_predictions)

        if val_mae < best_mae:
            best_mae = val_mae
            best_model = trained_model

    best_models[model_type] = best_model
    best_model_mae[model_type] = best_mae

# Generate predictions on the test set using the best models
test_predictions = {model: best_models[model].predict(X_test) for model in best_models}

# Evaluate each model's MAE for each trait
model_mae = {model: [] for model in best_models}
for model in best_models:
    for i in range(y_test.shape[1]):
        mae = mean_absolute_error(y_test[:, i], test_predictions[model][:, i])
        model_mae[model].append(mae)

# Print each model's MAE
for model in best_models:
    print(f"\n{model} MAE for each trait:")
    for i, trait in enumerate(['extraversion', 'neuroticism', 'agreeableness', 'conscientiousness', 'openness']):
        print(f"  {trait}: {model_mae[model][i]:.4f}")

# Simple averaging
simple_avg_predictions = np.mean([test_predictions[model] for model in best_models], axis=0)
simple_avg_mae = mean_absolute_error(y_test, simple_avg_predictions)
print(f"\nSimple average MAE: {simple_avg_mae:.4f}")

# Evaluate different weight combinations
def evaluate_combination(weights):
    combined_prediction = np.zeros_like(test_predictions['CNN'])
    for i, model in enumerate(best_models):
        combined_prediction += weights[i] * test_predictions[model]
    mae = mean_absolute_error(y_test, combined_prediction)
    return mae

# Generate all possible weight combinations
weight_combinations = list(itertools.product(np.arange(0, 1.1, 0.1), repeat=len(best_models)))
weight_combinations = [weights for weights in weight_combinations if np.isclose(sum(weights), 1.0)]

# Try different weight combinations and find the top 5 combinations with the lowest MAE
best_combinations = []
for weights in weight_combinations:
    mae = evaluate_combination(weights)
    best_combinations.append((weights, mae))

best_combinations = sorted(best_combinations, key=lambda x: x[1])[:5]

print("\nTop 5 weight combinations:")
for weights, mae in best_combinations:
    print(f"Weights: {weights}, MAE: {mae:.4f}")

# Use the best weight combination to generate the final combined prediction
best_weights = best_combinations[0][0]
final_prediction = np.zeros_like(test_predictions['CNN'])
for i, model in enumerate(best_models):
    final_prediction += best_weights[i] * test_predictions[model]

# Calculate MAE for each trait
final_mae_per_trait = [mean_absolute_error(y_test[:, i], final_prediction[:, i]) for i in range(y_test.shape[1])]
print("\nFinal MAE per trait:")
for i, trait in enumerate(['extraversion', 'neuroticism', 'agreeableness', 'conscientiousness', 'openness']):
    print(f"  {trait}: {final_mae_per_trait[i]:.4f}")

In [None]:
# Calculate MAE for each trait
final_mae_per_trait = [mean_absolute_error(y_test[:, i], final_prediction[:, i]) for i in range(y_test.shape[1])]
print("\nFinal MAE per trait:")
for i, trait in enumerate(['extraversion', 'neuroticism', 'agreeableness', 'conscientiousness', 'openness']):
    print(f"  {trait}: {final_mae_per_trait[i]:.4f}")

# Retrieve file names for the final predictions CSV
annotation_data = load_annotation_data('/content/drive/MyDrive/Data/annotation_testing.csv')
file_names = list(annotation_data.keys())

# Ensure the length of file_names matches the number of predictions
if len(file_names) != final_prediction.shape[0]:
    missing_count = final_prediction.shape[0] - len(file_names)
    if missing_count > 0:
        missing_files = list(annotation_data.keys())[:missing_count]  # Get missing file names
        file_names.extend(missing_files)
    elif missing_count < 0:
        file_names = file_names[:final_prediction.shape[0]]

# Save the final prediction to CSV
final_prediction_df = pd.DataFrame(final_prediction, columns=['extraversion', 'neuroticism', 'agreeableness', 'conscientiousness', 'openness'])
final_prediction_df['file_name'] = file_names  # Add the file names to the DataFrame
final_prediction_df.to_csv('/content/drive/MyDrive/Data/final_predictions.csv', index=False)

print("Final predictions saved to 'final_predictions.csv'")