In [5]:
import os
import pandas as pd
import numpy as np
import re
import csv
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Load CSV files from a directory based on a filename pattern
def load_csv_files(directory, filename_pattern, type="position"):
    files_data = {}
    for filename in os.listdir(directory):
        if filename_pattern in filename:
            df = pd.read_csv(os.path.join(directory, filename))
            df.fillna(0, inplace=True)
            base_name = filename.split(f'_{type}_')[1].split('.csv')[0]
            files_data[base_name] = df
    return files_data

# Find corresponding phoneme files based on the base names of position filenames
def find_phoneme_files(directory, base_names):
    phoneme_files = {}
    for base_name in base_names:
        phoneme_file = os.path.join(directory, f'{base_name}.csv')
        if os.path.exists(phoneme_file):
            phoneme_files[base_name] = phoneme_file
    return phoneme_files


In [3]:
import os
import re
import csv
import numpy as np
import torch
import pandas as pd

# ==========================================================
# Helper Functions
# ==========================================================


def pad_sequences(sequences, max_length, pad_value=0):
    """
    Pad sequences to the maximum length.

    Args:
        sequences (list): List of sequences to pad.
        max_length (int): Maximum length to pad to.
        pad_value (int): Value to use for padding.

    Returns:
        np.ndarray: Padded sequences.
    """
    padded_sequences = []
    for seq in sequences:
        if len(seq) < max_length:
            padding = np.full((max_length - len(seq), seq.shape[1]), pad_value)
            padded_seq = np.vstack((seq, padding))
        else:
            padded_seq = seq[:max_length]
        padded_sequences.append(padded_seq)
    return np.array(padded_sequences)


def extract_probabilities(data, columns):
    """
    Extract and concatenate probabilities from multiple DataFrames.

    Args:
        data (list): List of DataFrames.
        columns (list): Columns to extract probabilities from.

    Returns:
        np.ndarray: Concatenated probabilities.
    """
    data = [df.fillna(0) for df in data]
    probs_list = [df[columns].to_numpy() for df in data]
    return np.concatenate(probs_list, axis=0)


def apply_phonotactic_rules(combinations):
    """
    Apply phonotactic rules to filter invalid combinations.

    Args:
        combinations (list): List of phoneme combinations.

    Returns:
        list: Valid phoneme combinations.
    """
    valid_combinations = []
    for combination in combinations:
        # Example phonotactic rules:
        # 1. No consecutive vowels (e.g., "ae")
        # 2. Certain consonant clusters are invalid (e.g., "tl")
        if re.search(r"[aeiouy]{2}", combination):
            continue  # Skip invalid combinations with consecutive vowels
        if re.search(
            r"([s])\1", combination
        ):  # Skip invalid combinations with double consonants
            continue
        valid_combinations.append(combination)
    return valid_combinations


def combine_sequences_with_padding(video_data):
    """
    Combine sequences with padding to ensure uniform length.

    Args:
        video_data (dict): Dictionary containing video data.

    Returns:
        tuple: Padded input sequences (X) and padded labels (y).
    """
    max_length = max(len(video_data[video]["X"]) for video in video_data)
    X_padded = [
        pad_sequences([video_data[video]["X"]], max_length)[0] for video in video_data
    ]
    y_padded = [
        video_data[video]["y"]
        + [phoneme_to_index[" "]] * (max_length - len(video_data[video]["y"]))
        for video in video_data
    ]
    return X_padded, y_padded


# ==========================================================
# Data Preparation Functions
# ==========================================================

# Load phoneme-to-index mapping
with open(
    r"C:\Users\bouba\OneDrive\Documents\ACSR\ACSR-main\data\phonelist.csv", "r"
) as file:
    reader = csv.reader(file)
    vocabulary_list = [row[0] for row in reader]


phoneme_to_index = {phoneme: idx for idx, phoneme in enumerate(vocabulary_list)}
index_to_phoneme = {idx: phoneme for phoneme, idx in phoneme_to_index.items()}
phoneme_to_index[" "] = len(phoneme_to_index)
index_to_phoneme[len(index_to_phoneme)] = " "


def prepare_data_for_videos_no_sliding_windows(
    hand_position_data, hand_shape_data, phoneme_files
):
    """
    Prepare data for all videos without sliding windows.

    Args:
        hand_position_data (dict): Dictionary of hand position data.
        hand_shape_data (dict): Dictionary of hand shape data.
        phoneme_files (dict): Dictionary of phoneme file paths.

    Returns:
        dict: Dictionary containing combined probabilities and phoneme indices.
    """
    all_videos_data = {}
    for base_name in hand_position_data:
        if base_name in phoneme_files:
            position_df = hand_position_data[base_name]
            shape_df = hand_shape_data[base_name]
            phoneme_file = phoneme_files[base_name]

            # Extract probabilities
            hand_position_probs = extract_probabilities(
                [position_df],
                ["p_class_1", "p_class_2", "p_class_3", "p_class_4", "p_class_5"],
            )
            hand_shape_probs = extract_probabilities(
                [shape_df],
                [
                    "p_class_1",
                    "p_class_2",
                    "p_class_3",
                    "p_class_4",
                    "p_class_5",
                    "p_class_6",
                    "p_class_7",
                    "p_class_8",
                ],
            )
            combined_probs = np.concatenate(
                (hand_position_probs, hand_shape_probs), axis=1
            )

            # Read phoneme sequences
            with open(phoneme_file, "r", encoding="utf-8") as f:
                reader = csv.reader(f)
                phoneme_sequence = [row[0] for row in reader]

            # Convert phoneme sequence to indices
            phoneme_indices = [
                phoneme_to_index[phoneme] for phoneme in phoneme_sequence
            ]
            all_videos_data[base_name] = {"X": combined_probs, "y": phoneme_indices}
    return all_videos_data

In [9]:
# ==========================================================
# Main Script
# ==========================================================

# Directories
data_dir = r'C:\Users\bouba\OneDrive\Documents\ACSR\ACSR\output\predictions'
phoneme_dir = r'C:\Users\bouba\Downloads\CSF22\CSF22\CSF22_train\train_labels'

# Load position and shape data
hand_position_data = load_csv_files(data_dir, 'predictions_rf_position', type='position')
hand_shape_data = load_csv_files(data_dir, 'predictions_rf_shape', type='shape')

# Find phoneme files
base_names = hand_position_data.keys()
phoneme_files = find_phoneme_files(phoneme_dir, base_names)

# Print the number of files found
print(f"Number of position files: {len(hand_position_data)}")
print(f"Number of shape files: {len(hand_shape_data)}")
print(f"Number of phoneme files: {len(phoneme_files)}")

# Take only the first 5 videos for demonstration
hand_position_data = {
    key: hand_position_data[key] for key in list(hand_position_data.keys())[:5]
}
hand_shape_data = {
    key: hand_shape_data[key] for key in list(hand_shape_data.keys())[:5]
}
phoneme_files = {key: phoneme_files[key] for key in list(phoneme_files.keys())[:5]}

# Prepare data
all_videos_data = prepare_data_for_videos_no_sliding_windows(
    hand_position_data, hand_shape_data, phoneme_files
)
X_combined, y_combined = combine_sequences_with_padding(all_videos_data)

# Convert phoneme sequences to tensors
y_tensors = [
    torch.tensor([index for index in video_data["y"]], dtype=torch.long)
    for video_data in all_videos_data.values()
]
all_videos_data = {
    key: {"X": video_data["X"], "y": y_tensors[i]}
    for i, (key, video_data) in enumerate(all_videos_data.items())
}

# Combine all data into tensors
X_combined = torch.tensor(np.array(X_combined), dtype=torch.float32) 
y_combined = torch.tensor(y_combined, dtype=torch.long)

# Normalize data (optional)
# X_combined = (X_combined - X_combined.mean()) / X_combined.std()

# Final organized data
all_videos_data = {"X": X_combined, "y": y_combined}

Number of position files: 2
Number of shape files: 2
Number of phoneme files: 2


In [10]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Function to split data into training and validation sets
def train_val_split(data, train_ratio=0.8):
    num_samples = len(data['X'])
    split_idx = int(num_samples * train_ratio)
    
    train_data = {
        'X': data['X'][:split_idx],
        'y': data['y'][:split_idx]
    }
    val_data = {
        'X': data['X'][split_idx:],
        'y': data['y'][split_idx:]
    }
    return train_data, val_data

# Convert data to DataLoader format
def data_to_dataloader(data, batch_size=4, shuffle=True):
    X_tensors = data['X']
    y_tensors = data['y']
    
    # Create a TensorDataset with both inputs and labels
    dataset = TensorDataset(X_tensors, y_tensors)
    
    # Create a DataLoader
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    return dataloader


# Split data
train_data, val_data = train_val_split(all_videos_data)

# Prepare DataLoaders
train_loader = data_to_dataloader(train_data, batch_size=4, shuffle=True)
val_loader = data_to_dataloader(val_data, batch_size=4, shuffle=False)

# Check the DataLoader output
for batch_X, batch_y in train_loader:
    print("Batch X shape:", batch_X.shape)
    print("Batch y shape:", batch_y.shape)
    break

Batch X shape: torch.Size([1, 257, 13])
Batch y shape: torch.Size([1, 257])


In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Model Definition
class CuedSpeechRNN(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim=128, num_layers=2):
        super(CuedSpeechRNN, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim + 1)  # +1 for the CTC blank token

    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.fc(x)
        return F.log_softmax(x, dim=-1)

# Training Function
def train_model(model, train_loader, criterion, optimizer, num_epochs=50):
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for X_batch, y_batch in train_loader:
            # Ensure X_batch is 3D: (batch_size, sequence_length, feature_dimension)
            if X_batch.dim() == 2:
                X_batch = X_batch.unsqueeze(0)

            outputs = model(X_batch)
            input_lengths = torch.full((X_batch.size(0),), outputs.size(1), dtype=torch.long)  # Sequence length for each batch element
            target_lengths = torch.tensor([len(y[y != phoneme_to_index[' ']]) for y in y_batch], dtype=torch.long)  # Target sequence length ignoring padding

            # Compute CTC loss
            loss = criterion(outputs.transpose(0, 1), y_batch, input_lengths, target_lengths)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader)}")

# Evaluation Function
def evaluate_model(model, val_loader, criterion):
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            if X_batch.dim() == 2:
                X_batch = X_batch.unsqueeze(0)

            outputs = model(X_batch)
            input_lengths = torch.full((X_batch.size(0),), outputs.size(1), dtype=torch.long)  # Sequence length for each batch element
            target_lengths = torch.tensor([len(y[y != phoneme_to_index[' ']]) for y in y_batch], dtype=torch.long)  # Target sequence length ignoring padding

            val_loss = criterion(outputs.transpose(0, 1), y_batch, input_lengths, target_lengths)
            total_val_loss += val_loss.item()

    print(f"Validation Loss: {total_val_loss/len(val_loader)}")


# Instantiate and Train Model
input_dim = X_combined.shape[-1]
output_dim = len(phoneme_to_index)
model = CuedSpeechRNN(input_dim, output_dim)
criterion = nn.CTCLoss(blank=len(phoneme_to_index))
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
train_model(model, train_loader, criterion, optimizer, num_epochs=1000)

# Evaluate the model
evaluate_model(model, val_loader, criterion)

Epoch 1/1000, Loss: 60.435726165771484
Epoch 2/1000, Loss: 59.404659271240234
Epoch 3/1000, Loss: 58.17195510864258
Epoch 4/1000, Loss: 56.66895294189453
Epoch 5/1000, Loss: 54.7713737487793
Epoch 6/1000, Loss: 52.272186279296875
Epoch 7/1000, Loss: 48.819602966308594
Epoch 8/1000, Loss: 43.8242301940918
Epoch 9/1000, Loss: 36.3961181640625
Epoch 10/1000, Loss: 25.830293655395508
Epoch 11/1000, Loss: 14.042709350585938
Epoch 12/1000, Loss: 6.298023700714111
Epoch 13/1000, Loss: 3.7357428073883057
Epoch 14/1000, Loss: 3.502570867538452
Epoch 15/1000, Loss: 4.014628887176514
Epoch 16/1000, Loss: 4.529301166534424
Epoch 17/1000, Loss: 4.8412604331970215
Epoch 18/1000, Loss: 4.997861385345459
Epoch 19/1000, Loss: 5.043270111083984
Epoch 20/1000, Loss: 5.0059027671813965
Epoch 21/1000, Loss: 4.905983924865723
Epoch 22/1000, Loss: 4.758982181549072
Epoch 23/1000, Loss: 4.577328681945801
Epoch 24/1000, Loss: 4.371266841888428
Epoch 25/1000, Loss: 4.149406909942627
Epoch 26/1000, Loss: 3.91947

In [17]:
def greedy_decoder(output, blank):
    """
    Decode model outputs using a greedy decoder.

    Args:
        output (torch.Tensor): Model outputs of shape (batch_size, sequence_length, num_classes).
        blank (int): Index of the blank token.

    Returns:
        list: List of decoded sequences.
    """
    arg_maxes = torch.argmax(output, dim=2)  # Get the most likely class for each time step
    decodes = []
    for args in arg_maxes:
        decode = []
        previous_idx = None
        for index in args:
            if index != blank and (previous_idx is None or index != previous_idx):
                decode.append(index.item())  # Append non-blank and non-repeated tokens
            previous_idx = index
        decodes.append(decode)
    return decodes


def decode_loader(model, loader, blank, index_to_phoneme):
    """
    Decode outputs for all batches in a DataLoader and return both decoded and true sequences.

    Args:
        model (torch.nn.Module): Trained model.
        loader (torch.utils.data.DataLoader): DataLoader containing input data and labels.
        blank (int): Index of the blank token.
        index_to_phoneme (dict): Mapping from indices to phonemes.

    Returns:
        tuple: (decoded_sequences, true_sequences), where:
            - decoded_sequences: List of decoded phoneme sequences.
            - true_sequences: List of true phoneme sequences.
    """
    model.eval()  # Set the model to evaluation mode
    all_decoded_sequences = []
    all_true_sequences = []

    with torch.no_grad():  # Disable gradient computation
        for X_batch, y_batch in loader:  # Iterate over batches (X_batch: inputs, y_batch: labels)
            outputs = model(X_batch)  # Get model predictions
            decoded_phoneme_sequences = greedy_decoder(outputs, blank=blank)  # Decode outputs
            decoded_phonemes = [[index_to_phoneme[idx] for idx in sequence] for sequence in decoded_phoneme_sequences]  # Convert indices to phonemes
            all_decoded_sequences.extend(decoded_phonemes)  # Add to the list of decoded sequences

            # Convert true labels to phoneme sequences
            true_phoneme_sequences = [[index_to_phoneme[idx.item()] for idx in sequence if idx != blank and 
                                       index_to_phoneme[idx.item()] != " "] for sequence in y_batch]
            all_true_sequences.extend(true_phoneme_sequences)  # Add to the list of true sequences

    return all_decoded_sequences, all_true_sequences


# Example usage
blank_token = len(phoneme_to_index)  # Index of the blank token
decoded_train_sequences, true_train_sequences = decode_loader(model, train_loader, blank_token, index_to_phoneme)
decoded_val_sequences, true_val_sequences = decode_loader(model, val_loader, blank_token, index_to_phoneme)

# Print results
print("Decoded training phoneme sequences:", decoded_train_sequences)
print("True training phoneme sequences:", true_train_sequences)
print("Decoded validation phoneme sequences:", decoded_val_sequences)
print("True validation phoneme sequences:", true_val_sequences)



Decoded training phoneme sequences: [['<start>', 'm', 'a', 's^', 'x', 'm', 'i', 'r', 'u', 's', 'i', '<end>']]
True training phoneme sequences: [['<start>', 'm', 'a', 's^', 'x', 'm', 'i', 'z', 'e^', 'r', 'u', 's', 'i', '<end>']]
Decoded validation phoneme sequences: [['<start>', 'm', 'a', 's^', 'x', 'm', 'i', 'r', 'u', 's', 'i', '<end>']]
True validation phoneme sequences: [['<start>', 'i', 'l', 's', 'x', 'g', 'a', 'r', 'a~', 't', 'i', 'r', 'a', 'd', 'y', 'f', 'r', 'w', 'a', 'a', 'v', 'e^', 'k', 's', 'x', 'b', 'o~', 'k', 'a', 'p', 'y', 's^', 'o~', '<end>']]
