In [None]:
import os
import torch
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split
from scipy.stats import pearsonr


In [None]:
def load_features(directory):
    data = {
        'file_name': [],
        'vgg_features': [],
        'culture_id': [],
        'Arousal_A_labels': [],
        'Arousal_V_labels': [],
        'Valence_A_labels': [],
        'Valence_V_labels': []
    }

    for filename in os.listdir(directory):
        if filename.endswith('.pt'):
            # Load the tensor
            feature_path = os.path.join(directory, filename)
            features = torch.load(feature_path)

            # Extract culture_id from the filename
            parts = filename.split('_')
            culture_id = next((part for part in parts if part.startswith('C')), None)

            # Append data to dictionary
            data['file_name'].append(filename[:-3])  # Remove the '.pt' extension
            data['vgg_features'].append(features)
            data['culture_id'].append(culture_id)
            # Initialize placeholder values for label lists
            data['Arousal_A_labels'].append(None)
            data['Arousal_V_labels'].append(None)
            data['Valence_A_labels'].append(None)
            data['Valence_V_labels'].append(None)
        
    return pd.DataFrame(data)


In [None]:
directory_path = 'vggpooled'
result_df = load_features(directory_path)

In [None]:
result_df

In [None]:
def load_and_adjust_features(df, base_directory):
    hubert_features_list = []
    adjusted_features_list = []
    valid_indices = []  # To track indices of valid rows
    filename_list = []

    for idx, row in df.iterrows():
        culture_id = row['culture_id']
        filename = row['file_name']
        original_features = row['vgg_features']

        # Define path to the culture-specific folder
        culture_path = os.path.join(base_directory, culture_id)

        try:
            # Locate the directory that matches the filename
            target_folder = None
            for folder in os.listdir(culture_path):
                if folder == filename:
                    target_folder = folder
                    break

            # If the matching folder is found, load the .npy file
            if target_folder:
                features_path = os.path.join(culture_path, target_folder, 'features.npy')
                hubert_features = np.load(features_path)

                hubert_features_list.append(hubert_features)
                adjusted_features_list.append(original_features)
                filename_list.append(filename)
                valid_indices.append(idx)  # Add the index of valid row
            else:
                raise FileNotFoundError(f"No matching folder for {filename} in {culture_path}")

        except FileNotFoundError as e:
            # If an error occurs (folder not found), we skip this row
            print(f"Error: {e}")

    # Create a new DataFrame with only the valid data
    new_df = pd.DataFrame({
        'vgg_features': adjusted_features_list,
        'hubert_features': hubert_features_list,
        'culture_id': df.loc[valid_indices, 'culture_id'].values,  # Retrieve valid culture_id values
        'filename': filename_list
    })

    return new_df


In [None]:
# Adjust 'your_base_directory_path' to the path of your 'output_features_full_len' folder
base_directory_path = 'output_features_full_len'
result_df = load_and_adjust_features(result_df, base_directory_path)


In [None]:
result_df

In [None]:
import os
import pandas as pd

def load_labels(df, base_directory):
    # Create new columns in the DataFrame to store the labels
    df['Arousal_A_labels'] = None
    df['Arousal_V_labels'] = None
    df['Valence_A_labels'] = None
    df['Valence_V_labels'] = None

    for index, row in df.iterrows():
        filename = row['filename']  # Ensure this is the correct column name
        folder_path = os.path.join(base_directory, filename)

        def load_csv_label(file_suffix):
            try:
                # Get the list of files that match the suffix
                files = [f for f in os.listdir(folder_path) if f.endswith(file_suffix)]
                if files:
                    # Assuming there is only one such file per folder
                    file_path = os.path.join(folder_path, files[0])
                    label_df = pd.read_csv(file_path, usecols=[1])
                    return label_df.iloc[:, 0].values  # Return the values of the second column
            except FileNotFoundError:
                print(f"File not found for {file_suffix} in {folder_path}")
                return None
            return None

        # Directly assign the label data to the DataFrame for the current row
        df.at[index, 'Arousal_A_labels'] = load_csv_label('Arousal_A_Aligned.csv')
        df.at[index, 'Arousal_V_labels'] = load_csv_label('Arousal_V_Aligned.csv')
        df.at[index, 'Valence_A_labels'] = load_csv_label('Valence_A_Aligned.csv')
        df.at[index, 'Valence_V_labels'] = load_csv_label('Valence_V_Aligned.csv')

    return df


In [None]:
# Adjust 'your_base_directory_path' to the path of your 'SEWAv02' folder
base_directory_path = 'SEWAv02'
result_df = load_labels(result_df, base_directory_path)


In [None]:
df_C1 = df_C1_C4 = result_df[(result_df['culture_id'] == 'C1') | (result_df['culture_id'] == 'C4')]

df_C2 = result_df[result_df['culture_id']=='C2']
df_C3 = result_df[result_df['culture_id']=='C3']
df_C4 = result_df[result_df['culture_id']=='C4']
df_C5 = result_df[result_df['culture_id']=='C5']
df_C6 = result_df[result_df['culture_id']=='C6']

In [None]:


def adjust_features(features, max_length=750):
    # Trim the features if longer than the max_length
    if len(features) > max_length:
        return features[:max_length]
    # Pad the features if shorter than the max_length
    elif len(features) < max_length:
        padding = np.zeros((max_length - len(features), features.shape[1]))
        return np.vstack((features, padding))
    return features


In [None]:
def adjust_labels(labels):
   # Define the desired length
    desired_length = 750

    # Initialize an empty list to store adjusted sequences
    adjusted_labels = []

    # Loop through each item in the DataFrame column
    for label in labels:
        if len(label) > desired_length:
            # If the sequence is longer, crop it to the desired length
            adjusted_label = label[:desired_length]
        elif len(label) < desired_length:
            # If the sequence is shorter, pad it with zeroes
            adjusted_label = np.pad(label, (0, desired_length - len(label)), 'constant')
        else:
            # If it's already the desired length, use it as is
            adjusted_label = label
        
        # Append the adjusted label to the list
        adjusted_labels.append(adjusted_label)

    # Convert the list of arrays to a 2D NumPy array
    y = np.array(adjusted_labels)
    return y

In [None]:
def get_data(result_df):
    audio_X = result_df['hubert_features'].apply(adjust_features)
    X = np.array(audio_X.tolist())
    y = adjust_labels(result_df['Arousal_A_labels'])

    return X, y

In [None]:
X, y = get_data(df_C1)

In [None]:
y.shape

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

class BiLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.2):
        super(BiLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout, bidirectional=True)
        self.fc_arousal = nn.Linear(hidden_size * 2, output_size)  # Multiply by 2 for bidirectional
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)  # Multiply by 2 for bidirectional
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)  # Multiply by 2 for bidirectional
        out, _ = self.lstm(x, (h0, c0))
        
        # Concatenate hidden states from both directions
        out = torch.cat((out[:, -1, :self.hidden_size], out[:, 0, self.hidden_size:]), dim=1)
        
        # Apply dropout
        out = self.dropout(out)
        
        # Pass the concatenated hidden states through a fully connected layer for arousal prediction
        arousal_output = self.fc_arousal(out)
        
        return arousal_output

# Define CCCLoss function
class CCCLoss(nn.Module):
    def __init__(self):
        super(CCCLoss, self).__init__()

    def forward(self, y_pred, y_true):
        mean_pred = torch.mean(y_pred)
        mean_true = torch.mean(y_true)
        cov_pred = torch.sum((y_pred - mean_pred) * (y_true - mean_true))
        var_pred = torch.sum((y_pred - mean_pred) ** 2)
        var_true = torch.sum((y_true - mean_true) ** 2)
        ccc = 2 * cov_pred / (var_pred + var_true + (mean_pred - mean_true) ** 2 + 1e-8)  # Adding a small epsilon to avoid division by zero
        return 1 - ccc

def train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal):
    input_size = X_train.shape[-1]  # Input size from the last dimension of X
    hidden_size = 256
    num_layers = 2
    output_size = 750  # Number of frames
    dropout = 0.2
    
    # Convert NumPy arrays to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    X_test_tensor = torch.FloatTensor(X_test)
    y_train_arousal_tensor = torch.FloatTensor(y_train_arousal)
    y_test_arousal_tensor = torch.FloatTensor(y_test_arousal)
    
    # Create DataLoader for training and testing sets
    train_dataset = TensorDataset(X_train_tensor, y_train_arousal_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_arousal_tensor)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32)
    
    model = BiLSTM(input_size, hidden_size, num_layers, output_size, dropout=dropout)
    criterion = CCCLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    num_epochs = 50
    for epoch in range(num_epochs):
        model.train()
        arousal_train_loss = 0.0
        
        for batch_features, batch_arousal_labels in train_loader:
            optimizer.zero_grad()
            arousal_output = model(batch_features)
            
            # Compute loss
            loss = criterion(arousal_output.squeeze(), batch_arousal_labels)
            
            # Backpropagation and optimization
            loss.backward()
            optimizer.step()
            
            arousal_train_loss += loss.item() * batch_features.size(0)
        
        # Average loss over the entire dataset
        arousal_train_loss /= len(train_loader.dataset)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal Train Loss: {arousal_train_loss:.4f}")
    
        # Evaluation loop
        model.eval()
        arousal_test_loss = 0.0
        ccc_arousal = 0.0
        
        with torch.no_grad():
            for batch_features, batch_arousal_labels in test_loader:
                arousal_output = model(batch_features)   
                
                # Compute CCC for arousal
                ccc_arousal += criterion(arousal_output.squeeze(), batch_arousal_labels).item() * batch_features.size(0)
                
                # Accumulate loss
                arousal_test_loss += loss.item() * batch_features.size(0)
            
        # Average loss over the entire test dataset
        arousal_test_loss /= len(test_loader.dataset)
    
        ccc_arousal /= len(test_loader.dataset)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal Test Loss: {arousal_test_loss:.4f}")
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal CCC Loss: {ccc_arousal:.4f}")
    return model


In [None]:
from sklearn.model_selection import train_test_split

# Assuming you have already defined the train_bilstm function and imported necessary libraries

# Replace X and y with your actual data
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
bilstm_model = train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error
import numpy as np

class BiLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.2):
        super(BiLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout, bidirectional=True)
        self.fc_arousal = nn.Linear(hidden_size * 2, output_size)  # Multiply by 2 for bidirectional
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)  # Multiply by 2 for bidirectional
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)  # Multiply by 2 for bidirectional
        out, _ = self.lstm(x, (h0, c0))
        
        # Concatenate hidden states from both directions
        out = torch.cat((out[:, -1, :self.hidden_size], out[:, 0, self.hidden_size:]), dim=1)
        
        # Apply dropout
        out = self.dropout(out)
        
        # Pass the concatenated hidden states through a fully connected layer for arousal prediction
        arousal_output = self.fc_arousal(out)
        
        return arousal_output

# Define CCCLoss function
class CCCLoss(nn.Module):
    def __init__(self):
        super(CCCLoss, self).__init__()

    def forward(self, y_pred, y_true):
        mean_pred = torch.mean(y_pred)
        mean_true = torch.mean(y_true)
        cov_pred = torch.sum((y_pred - mean_pred) * (y_true - mean_true))
        var_pred = torch.sum((y_pred - mean_pred) ** 2)
        var_true = torch.sum((y_true - mean_true) ** 2)
        ccc = 2 * cov_pred / (var_pred + var_true + (mean_pred - mean_true) ** 2 + 1e-8)  # Adding a small epsilon to avoid division by zero
        return 1 - ccc

def train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal):
    input_size = X_train.shape[-1]  # Input size from the last dimension of X
    hidden_size = 256
    num_layers = 2
    output_size = 750  # Number of frames
    dropout = 0.2
    
    # Convert NumPy arrays to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    X_test_tensor = torch.FloatTensor(X_test)
    y_train_arousal_tensor = torch.FloatTensor(y_train_arousal)
    y_test_arousal_tensor = torch.FloatTensor(y_test_arousal)
    
    # Create DataLoader for training and testing sets
    train_dataset = TensorDataset(X_train_tensor, y_train_arousal_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_arousal_tensor)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32)
    
    model = BiLSTM(input_size, hidden_size, num_layers, output_size, dropout=dropout)
    ccc_criterion = CCCLoss()
    rmse_criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    num_epochs = 30
    best_rmse = float('inf')
    best_ccc = float('-inf')
    
    for epoch in range(num_epochs):
        model.train()
        arousal_train_loss = 0.0
        arousal_train_rmse = 0.0
        
        for batch_features, batch_arousal_labels in train_loader:
            optimizer.zero_grad()
            arousal_output = model(batch_features)
            
            # Compute CCC loss
            ccc_loss = ccc_criterion(arousal_output.squeeze(), batch_arousal_labels)
            
            # Compute RMSE loss
            rmse_loss = torch.sqrt(rmse_criterion(arousal_output.squeeze(), batch_arousal_labels))
            
            # Compute total loss as a combination of both
            loss = ccc_loss + rmse_loss
            
            # Backpropagation and optimization
            loss.backward()
            optimizer.step()
            
            arousal_train_loss += loss.item() * batch_features.size(0)
            arousal_train_rmse += rmse_loss.item() * batch_features.size(0)
        
        # Average loss over the entire dataset
        arousal_train_loss /= len(train_loader.dataset)
        arousal_train_rmse /= len(train_loader.dataset)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal Train Loss: {arousal_train_loss:.4f}, Arousal Train RMSE: {arousal_train_rmse:.4f}")
    
        # Evaluation loop
        model.eval()
        arousal_test_loss = 0.0
        arousal_test_rmse = 0.0
        ccc_arousal = 0.0
        
        with torch.no_grad():
            for batch_features, batch_arousal_labels in test_loader:
                arousal_output = model(batch_features)   
                
                # Compute CCC for arousal
                ccc_arousal += ccc_criterion(arousal_output.squeeze(), batch_arousal_labels).item() * batch_features.size(0)
                
                # Compute RMSE for arousal
                arousal_test_rmse += mean_squared_error(batch_arousal_labels.detach().numpy(), arousal_output.squeeze().detach().numpy(), squared=False) * batch_features.size(0)
            
        # Average loss over the entire test dataset
        arousal_test_loss /= len(test_loader.dataset)
        arousal_test_rmse /= len(test_loader.dataset)
    
        ccc_arousal /= len(test_loader.dataset)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal Test Loss: {arousal_test_loss:.4f}, Arousal Test RMSE: {arousal_test_rmse:.4f}")
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal CCC Loss: {ccc_arousal:.4f}")
        
        # Update best RMSE and CCC
        if arousal_test_rmse < best_rmse:
            best_rmse = arousal_test_rmse
        if ccc_arousal > best_ccc:
            best_ccc = ccc_arousal

    # Report best RMSE and CCC
    print(f"Best Arousal RMSE: {best_rmse:.4f}")
    print(f"Best Arousal CCC: {best_ccc:.4f}")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error
import numpy as np

class BiGRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.2):
        super(BiGRU, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout, bidirectional=True)
        self.fc_arousal = nn.Linear(hidden_size * 2, output_size)  # Multiply by 2 for bidirectional
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)  # Multiply by 2 for bidirectional
        out, _ = self.gru(x, h0)
        
        # Concatenate hidden states from both directions
        out = torch.cat((out[:, -1, :self.hidden_size], out[:, 0, self.hidden_size:]), dim=1)
        
        # Apply dropout
        out = self.dropout(out)
        
        # Pass the concatenated hidden states through a fully connected layer for arousal prediction
        arousal_output = self.fc_arousal(out)
        
        return arousal_output

# Define CCCLoss function
class CCCLoss(nn.Module):
    def __init__(self):
        super(CCCLoss, self).__init__()

    def forward(self, y_pred, y_true):
        mean_pred = torch.mean(y_pred)
        mean_true = torch.mean(y_true)
        cov_pred = torch.sum((y_pred - mean_pred) * (y_true - mean_true))
        var_pred = torch.sum((y_pred - mean_pred) ** 2)
        var_true = torch.sum((y_true - mean_true) ** 2)
        ccc = 2 * cov_pred / (var_pred + var_true + (mean_pred - mean_true) ** 2 + 1e-8)  # Adding a small epsilon to avoid division by zero
        return 1 - ccc

def train_bigru(X_train, X_test, y_train_arousal, y_test_arousal):
    input_size = X_train.shape[-1]  # Input size from the last dimension of X
    hidden_size = 256
    num_layers = 2
    output_size = 750  # Number of frames
    dropout = 0.2
    
    # Convert NumPy arrays to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    X_test_tensor = torch.FloatTensor(X_test)
    y_train_arousal_tensor = torch.FloatTensor(y_train_arousal)
    y_test_arousal_tensor = torch.FloatTensor(y_test_arousal)
    
    # Create DataLoader for training and testing sets
    train_dataset = TensorDataset(X_train_tensor, y_train_arousal_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_arousal_tensor)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32)
    
    model = BiGRU(input_size, hidden_size, num_layers, output_size, dropout=dropout)
    ccc_criterion = CCCLoss()
    rmse_criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    num_epochs = 20
    best_rmse = float('inf')
    best_ccc = float('-inf')
    
    for epoch in range(num_epochs):
        model.train()
        arousal_train_loss = 0.0
        arousal_train_rmse = 0.0
        
        for batch_features, batch_arousal_labels in train_loader:
            optimizer.zero_grad()
            arousal_output = model(batch_features)
            
            # Compute CCC loss
            ccc_loss = ccc_criterion(arousal_output.squeeze(), batch_arousal_labels)
            
            # Compute RMSE loss
            rmse_loss = torch.sqrt(rmse_criterion(arousal_output.squeeze(), batch_arousal_labels))
            
            # Compute total loss as a combination of both
            loss = ccc_loss + rmse_loss
            
            # Backpropagation and optimization
            loss.backward()
            optimizer.step()
            
            arousal_train_loss += loss.item() * batch_features.size(0)
            arousal_train_rmse += rmse_loss.item() * batch_features.size(0)
        
        # Average loss over the entire dataset
        arousal_train_loss /= len(train_loader.dataset)
        arousal_train_rmse /= len(train_loader.dataset)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal Train Loss: {arousal_train_loss:.4f}, Arousal Train RMSE: {arousal_train_rmse:.4f}")
    
        # Evaluation loop
        model.eval()
        arousal_test_loss = 0.0
        arousal_test_rmse = 0.0
        ccc_arousal = 0.0
        
        with torch.no_grad():
            for batch_features, batch_arousal_labels in test_loader:
                arousal_output = model(batch_features)   
                
                # Compute CCC for arousal
                ccc_arousal += ccc_criterion(arousal_output.squeeze(), batch_arousal_labels).item() * batch_features.size(0)
                
                # Compute RMSE for arousal
                arousal_test_rmse += mean_squared_error(batch_arousal_labels.detach().numpy(), arousal_output.squeeze().detach().numpy(), squared=False) * batch_features.size(0)
            
        # Average loss over the entire test dataset
        arousal_test_loss /= len(test_loader.dataset)
        arousal_test_rmse /= len(test_loader.dataset)
    
        ccc_arousal /= len(test_loader.dataset)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal Test Loss: {arousal_test_loss:.4f}, Arousal Test RMSE: {arousal_test_rmse:.4f}")
        print(f"Epoch {epoch+1}/{num_epochs}, Arousal CCC Loss: {ccc_arousal:.4f}")
        
        # Update best RMSE and CCC
        if arousal_test_rmse < best_rmse:
            best_rmse = arousal_test_rmse
        if ccc_arousal > best_ccc:
            best_ccc = ccc_arousal

    # Report best RMSE and CCC
    print(f"Best Arousal RMSE: {best_rmse:.4f}")
    print(f"Best Arousal CCC: {best_ccc:.4f}")
    return model


# Arousal

## Bi LSTM Arousal C1

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
model = train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
X, y = get_data(df_C1)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
model_gru = train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C2 Arousal

In [None]:
X, y = get_data(df_C2)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C3 Arousal

In [None]:
X, y = get_data(df_C3)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C4 Arousal

In [None]:
X, y = get_data(df_C4)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C5 Arousal

In [None]:
X, y = get_data(df_C5)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C6 Arousal

In [None]:
X, y = get_data(df_C6)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

# Valence

In [None]:
def get_data_valence(result_df):
    audio_X = result_df['hubert_features'].apply(adjust_features)
    X = np.array(audio_X.tolist())
    y = adjust_labels(result_df['Valence_A_labels'])

    return X, y

## C1

In [None]:
X, y = get_data_valence(df_C1)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C2

In [None]:
X, y = get_data_valence(df_C2)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C3

In [None]:
X, y = get_data_valence(df_C3)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C4

In [None]:
X, y = get_data_valence(df_C4)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C5

In [None]:
X, y = get_data_valence(df_C5)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

## C6

In [None]:
X, y = get_data_valence(df_C6)

In [None]:
X_train, X_test, y_train_arousal, y_test_arousal = train_test_split(X, y, test_size=0.2, random_state=42)

# Call the train_bilstm function with your data
train_bilstm(X_train, X_test, y_train_arousal, y_test_arousal)

In [None]:
train_bigru(X_train, X_test, y_train_arousal, y_test_arousal)

# Predictions

In [None]:
X, y = get_data(df_C6)

In [None]:
import torch

def calc_pred(model, X, y, output_file):

    # Load the saved model
    model = BiGRU(128, 256, 2, 750, dropout=0.2)
    model.load_state_dict(torch.load('arousal_c1.pt'))
    model.eval()

    # Prepare data for prediction (X and y)
    X_tensor = torch.FloatTensor(X)
    y_true_tensor = torch.FloatTensor(y)

    # Make predictions
    with torch.no_grad():
        y_pred = model(X_tensor)

    predictions = np.array(y_pred)
    np.savetxt("predictions.csv", predictions, delimiter=",")





In [None]:
calc_pred('arousal_c1.pt', np.array(df_C2['hubert_features'].to_list()), np.array(df_C2['Arousal_A_labels'].to_list()), 'arousal_c1_audio')

In [None]:
def get_data(result_df):
    audio_X = result_df['hubert_features'].apply(adjust_features)
    X = np.array(audio_X.tolist())
    y = adjust_labels(result_df['Arousal_A_labels'])

    return X, y

In [None]:
import numpy as np



# Generating predictions based on RMSE as noise standard deviation
audio_predictions = true_value + np.random.randn(num_instances, len(rmse_audio)) * rmse_audio
video_predictions = true_value + np.random.randn(num_instances, len(rmse_video)) * rmse_video

# Simple average fusion
average_fusion = (audio_predictions + video_predictions) / 2

# Weighted average fusion, weights based on inverse of RMSE
weights_audio = 1 / rmse_audio
weights_video = 1 / rmse_video
total_weights = weights_audio + weights_video
weighted_fusion = (audio_predictions * weights_audio + video_predictions * weights_video) / total_weights

# Outputting the estimates for each culture excluding C1 (C2 to C6)
audio_predictions, video_predictions, average_fusion, weighted_fusion


In [None]:
def predict_with_model(model, feature_array):
    with torch.no_grad():  # Ensuring no gradients are computed
        # Convert numpy arrays to tensors
        feature_tensor = torch.tensor(feature_array, dtype=torch.float32)
        return model(feature_tensor)

# Function to load features and perform predictions for a given DataFrame
def process_and_predict(df, model):
    # Assuming 'hubert_features' are numpy arrays and stored directly in the DataFrame
    features = np.vstack(df['hubert_features'].values)  # Stacking arrays into a single numpy array
    predictions = predict_with_model(model, features)
    return predictions


In [None]:
process_and_predict(df_C2,model)