# Libraries

In [None]:
import os
import librosa
import numpy as np
import pywt
import scipy
from scipy.signal.windows import hamming 
import pandas as pd

from tqdm import tqdm

# Paths

In [None]:
base_holdout_path = '/kaggle/input/truthlie-clean-split/TruthLie_Holdout_Stratified'
train_data = base_holdout_path + '/train'
val_data = base_holdout_path + '/val'
test_data = base_holdout_path + '/test'

cross_validation_path = '/kaggle/input/truthlie-clean-crossvalidation/TruthLie_CrossVal_Stratified'
folds = [f"{cross_validation_path}/fold_{i}" for i in range(4)]  # Paths for each fold

train_audio_dir = train_data + "/Audio"
train_transcript_file = train_data + "/Transcripts/Transcripts.xlsx"

val_audio_dir = val_data + "/Audio"
val_transcript_file = val_data + "/Transcripts/Transcripts.xlsx"

test_audio_dir = test_data + "/Audio"
test_transcript_file = test_data + "/Transcripts/Transcripts.xlsx"

# Feature Extraction

In [None]:
def pad_audio(signal, target_length):
    """Pads the signal to reach the required length."""
    return np.pad(signal, (0, max(0, target_length - len(signal))), mode="constant")
        
def preprocess_audio_sequential_fixed_segments(signal, sample_rate, num_segments=20, n_mfcc=13):
    """
    Pre-processes an audio signal into segments:
    - Noise filtering via STFT thresholding
    - Normalization of the signal between -1 and 1
    - Splitting into segments with guaranteed minimum padding
    - Feature extraction: DWT and MFCC
    """    
    # Noise filtering via STFT thresholding
    stft = np.abs(librosa.stft(signal))
    threshold = np.median(stft)
    stft_filtered = np.where(stft > threshold, stft, 0)
    signal_filtered = librosa.istft(stft_filtered)
    
    # Normalize signal between -1 and 1
    normalized_signal = signal_filtered / np.max(np.abs(signal_filtered))
    window_length = len(normalized_signal) // num_segments
    
    # Split the signal into segments
    segmented_signal = []
    for i in range(num_segments):
        start_idx = i * window_length
        end_idx = start_idx + window_length
        segment = normalized_signal[start_idx:end_idx]
        
        # Padding to ensure minimum length
        segment = pad_audio(segment, window_length)
        segmented_signal.append(segment)
    segmented_signal = np.array(segmented_signal)
    
    # Apply Hamming window to each segment
    hamming_window = hamming(segmented_signal.shape[1])
    windowed_signal = segmented_signal * hamming_window
    
    # Initialize the list of sequential features
    sequence_features = []
    for segment in windowed_signal:
        # Wavelet decomposition (5 levels, Daubechies-4)
        coeffs = pywt.wavedec(segment, 'db4', level=5)
        segment_features = []
        for coeff in coeffs:
            variance = np.var(coeff)
            if variance == 0:
                kurtosis = 0
                skewness = 0
            else:
                kurtosis = np.mean((coeff - np.mean(coeff))**4) / (variance**2)
                skewness = np.mean((coeff - np.mean(coeff))**3) / (variance**1.5)
            energy = np.sum(np.square(coeff))
            entropy = -np.sum(coeff * np.log2(np.abs(coeff) + 1e-12))
            std_dev = np.std(coeff)
            # Add features computed for each level
            segment_features.extend([energy, entropy, kurtosis, skewness, std_dev])
        
        # MFCC feature extraction
        segment_length = len(segment)
        n_fft = min(2048, max(256, 2 ** int(np.floor(np.log2(segment_length)))))
        mfccs = librosa.feature.mfcc(y=segment, sr=sample_rate, n_mfcc=n_mfcc, n_fft=n_fft)
        mfcc_features = np.mean(mfccs, axis=1)  # Average over frames to get stable features

        # Combine DWT and MFCC features
        segment_features = segment_features + mfcc_features.tolist()
        
        # Add this segment's features to the sequence
        sequence_features.append(segment_features)
    
    return np.array(sequence_features)


def create_sequences_fixed_segments_noise(data_dir, transcript_file, num_segments=20, n_mfcc=13):
    """
    Function to create sequences of features and labels from audio files with fixed segmentation.
    Parameters:
    - data_dir: folder containing the audio files
    - transcript_file: Excel file with transcripts and labels
    - num_segments: fixed number of segments per audio
    - n_mfcc: number of MFCC coefficients to extract per segment
    Returns:
    - features: numpy array with sequential features
    - labels: numpy array with labels
    """
    transcripts_df = pd.read_excel(transcript_file)
    features = []
    labels = []
    
    for _, row in tqdm(transcripts_df.iterrows(), total=transcripts_df.shape[0]):
        audio_name = row['audio name']
        label = row['label']
        input_audio_path = os.path.join(data_dir, audio_name)
        
        try:
            # Load audio file
            signal, sample_rate = librosa.load(input_audio_path, sr=None)
            
            # Extract sequential features with fixed segmentation
            sequence_features = preprocess_audio_sequential_fixed_segments(
                signal, sample_rate, num_segments=num_segments, n_mfcc=n_mfcc
            )
            
            features.append(sequence_features)
            labels.append(label)
        except:
            print("Error while reading audio")
    
    return np.array(features), np.array(labels)

## Holdout

In [None]:
num_segments = 20  # Fixed number of segments for each audio

# Train
train_features, train_labels = create_sequences_fixed_segments_noise(train_audio_dir, train_transcript_file, num_segments)
print(f"Train features shape: {train_features.shape}")
print(f"Train labels shape: {train_labels.shape}")

# Validation
val_features, val_labels = create_sequences_fixed_segments_noise(val_audio_dir, val_transcript_file, num_segments)
print(f"Validation features shape: {val_features.shape}")
print(f"Validation labels shape: {val_labels.shape}")

# Test
test_features, test_labels = create_sequences_fixed_segments_noise(test_audio_dir, test_transcript_file, num_segments)
print(f"Test features shape: {test_features.shape}")
print(f"Test labels shape: {test_labels.shape}")

In [None]:
# Save features in npy files
folder_name = f"audio_{num_segments}"
os.makedirs(f"/kaggle/working/{folder_name}", exist_ok=True)

# Salva gli array per poterli utilizzare in seguito
np.save(f"{folder_name}/train_features.npy", train_features)
np.save(f"{folder_name}/val_features.npy", val_features)
np.save(f"{folder_name}/test_features.npy", test_features)
np.save(f"{folder_name}/train_labels.npy", train_labels)
np.save(f"{folder_name}/val_labels.npy", val_labels)
np.save(f"{folder_name}/test_labels.npy", test_labels)

## Cross-Validation

In [None]:
features_by_fold = []

for fold in folds:
    audio_dir = fold + "/Audio"
    transcript_file = fold + "/Transcripts/Transcripts.xlsx"

    features, labels = create_sequences_fixed_segments_noise(audio_dir, transcript_file, num_segments = num_segments)

    features_by_fold.append((features,labels))

In [None]:
# Save features in npy files
for i in range(4):
    np.save(f"{folder_name}/fold_{i}_features.npy", features_by_fold[i][0])
    np.save(f"{folder_name}/fold_{i}_labels.npy", features_by_fold[i][1])