In [None]:
import pandas as pd
import os
import soundfile as sf
import numpy as np
from pathlib import Path
import shutil
import base64
from scipy.io import wavfile
import warnings
from typing import Optional, Dict, Union
import shutil
warnings.filterwarnings('ignore')

ROOT_DIR = '/home/aloradi/adversarial-robustness-for-sr'
DATA_DIR = f"{ROOT_DIR}{os.sep}data"
VPC_DIR = f"{DATA_DIR}{os.sep}vpc2025_official"
LIBRI_DIR =  DATA_DIR
os.chdir(ROOT_DIR)

BASE_DIR = Path('data/vpc2025_official')
LIBRI_PATH = 'librispeech/test-clean/LibriSpeech'
LIBRI_SETS = {
    'TRAIN': 'train-clean-360',
    'DEV': 'dev-clean',
    'TEST': 'test-clean'
}

csv_name = 'test_enrolls.csv'
WAV_COL_ID = 'enrollment_path' # 'wav_path'

SETNAME = LIBRI_SETS['TEST']

In [None]:
def sample_speakers_and_utterances(df, num_speakers, num_utts_per_spk, random_seed=None):
    """
    Sample a specific number of utterances from a specific number of speakers.
    
    Args:
        df: pandas DataFrame containing a speaker_id column
        num_speakers: number of speakers to sample
        num_utts_per_spk: number of utterances to sample per speaker
        random_seed: random seed for reproducibility
        
    Returns:
        pandas DataFrame containing the sampled data
    """
    if random_seed is not None:
        np.random.seed(random_seed)
    
    # Get unique speaker IDs and check if we have enough speakers
    unique_speakers = df['speaker_id'].unique()
    if len(unique_speakers) < num_speakers:
        raise ValueError(f"Not enough speakers in dataset. Requested {num_speakers} but only found {len(unique_speakers)}")
    
    # Randomly sample speakers
    sampled_speakers = np.random.choice(unique_speakers, size=num_speakers, replace=False)
    
    # Initialize list to store sampled utterances
    sampled_data = []
    
    # For each sampled speaker
    for speaker in sampled_speakers:
        # Get all utterances for this speaker
        speaker_utts = df[df['speaker_id'] == speaker]
        
        # Check if we have enough utterances for this speaker
        if len(speaker_utts) < num_utts_per_spk:
            print(f"Warning: Speaker {speaker} has fewer utterances ({len(speaker_utts)}) than requested ({num_utts_per_spk})")
            sampled_utts = speaker_utts  # Take all available utterances
        else:
            # Randomly sample utterances for this speaker
            sampled_utts = speaker_utts.sample(n=num_utts_per_spk)
        
        sampled_data.append(sampled_utts)
    
    # Concatenate all sampled utterances
    result_df = pd.concat(sampled_data, ignore_index=True)
    
    return result_df

In [None]:
def process_vpc_data(num_utts_per_spk=3, num_speakers=9, sep="|"):
    
    # List of directories to process (B4 and T10-2 do not follow the standard structure)
    target_dirs = ['B3', 'B4', 'B5', 'T10-2', 'T12-5', 'T25-1', 'T8-5']
    all_samples = []
    
    for counter, dir_name in enumerate(target_dirs):
        split_name = f"metadata"
        csv_path = BASE_DIR / dir_name / "data/metadata" / csv_name
        
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path, sep=sep)

            # Sample the data
            sampled_df = sample_speakers_and_utterances(
                df, 
                num_speakers=num_speakers, 
                num_utts_per_spk=num_utts_per_spk,
                random_seed=42  # For reproducibility
            )
            if counter == 0:
                # Print statistics
                print(f"Original dataset size: {len(df)}")
                print(f"Sampled dataset size: {len(sampled_df)}")
                print(f"Number of unique speakers in sample: {sampled_df['speaker_id'].nunique()}")
                print("\nUtterances per speaker in sample:")
                print(sampled_df.groupby('speaker_id').size())
                print('=====================================\n')
            
            # Add system name column
            sampled_df['system_name'] = dir_name            
            all_samples.append(sampled_df)
            
        else:
            print(f"Warning: Could not find CSV file for {dir_name}")
    
    # Concatenate all samples
    combined_df = pd.concat(all_samples, ignore_index=True)
    
    # Create LibriSpeech path column
    combined_df['basename'] = combined_df[WAV_COL_ID].apply(lambda x: os.path.basename(x))
    combined_df['libri_path'] = combined_df['basename'].apply(
        lambda x: str(Path(LIBRI_PATH) / SETNAME / x.split('-')[0] / x.split('-')[1] / f'{os.path.splitext(x)[0]}.flac') 
    )
    
    # Made redundant by system_name
    del combined_df['model']
    return combined_df

In [None]:
def process_audio_file(file_path, root_dir, normalization_method='loudness', target_lufs=-23.0):
    """
    Process audio file with improved loudness normalization.
    
    Args:
        file_path: Path to the audio file relative to root_dir
        root_dir: Root directory containing the audio files
        normalization_method: String indicating normalization method
            'loudness': EBU R128 loudness normalization
            'peak': Peak normalization to [-1, 1]
            'none': No normalization
        target_lufs: Target loudness in LUFS (default: -23.0 LUFS for broadcast standard)
            Common values:
            -23.0 LUFS: EBU R128 broadcast standard
            -16.0 LUFS: Streaming services (Spotify)
            -14.0 LUFS: Modern streaming/YouTube
        
    Returns:
        str: base64 encoded audio data with data URI scheme
    """
    try:
        import pyloudnorm as pyln
        
        # Properly join paths
        full_path = os.path.join(root_dir, file_path)
        
        if not os.path.isfile(full_path):
            raise ValueError(f"Audio file not found: {full_path}")
            
        # Read the WAV file
        data, sample_rate = sf.read(full_path)

        # Convert to mono if stereo
        if len(data.shape) > 1:
            data = np.mean(data, axis=1)

        # Ensure the audio is float32 for loudnorm processing
        data = data.astype(np.float32)

        if normalization_method == 'loudness':
            # Create BS.1770 meter
            meter = pyln.Meter(sample_rate)  # Uses BS.1770-4 standard
            
            # Measure input loudness
            input_loudness = meter.integrated_loudness(data)
            
            print(f"Input loudness: {input_loudness:.1f} LUFS")
            
            # Check if audio is too quiet for accurate measurement
            if input_loudness < -70.0:
                print("Warning: Input audio very quiet, measurements may be inaccurate")
            
            # Normalize to target LUFS if needed
            if abs(input_loudness - target_lufs) > 0.1:  # Only normalize if difference > 0.1 LUFS
                data = pyln.normalize.loudness(data, input_loudness, target_lufs)
                
                # Verify the normalization
                final_loudness = meter.integrated_loudness(data)
                print(f"Output loudness: {final_loudness:.1f} LUFS")
                
            # Apply true-peak limiting to prevent clipping
            peak = np.max(np.abs(data))
            if peak > 1.0:
                data = data / peak * 0.99  # Leave 0.1 dB headroom
                print(f"Applied true-peak limiting. Peak value: {20 * np.log10(peak):.1f} dBFS")
                
        elif normalization_method == 'peak':
            peak = np.max(np.abs(data))
            if peak > 0:
                data = data / peak * 0.99  # Leave 0.1 dB headroom
                
        elif normalization_method != 'none':
            raise ValueError(f"Unknown normalization method: {normalization_method}")

        # Add dither before converting to 16-bit (reduce quantization noise)
        def apply_dither(x, bits=16):
            """Apply triangular dither before quantization"""
            peak = np.max(np.abs(x))
            if peak > 0:
                x = x / peak  # Normalize to [-1, 1]
            
            # Calculate quantization step size
            q = 2.0 ** (-bits)
            
            # Generate triangular dither noise
            r = np.random.random(len(x))
            noise = (r - np.random.random(len(x))) * q
            
            # Add dither and quantize
            return (x + noise) * peak

        # Apply dither and convert to 16-bit PCM
        data = apply_dither(data)
        data = np.int16(data * 32767)

        # Use tempfile for temporary file handling
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
            temp_path = temp_file.name
            
        try:
            # Write to temp file
            wavfile.write(temp_path, sample_rate, data)
            
            # Read and encode
            with open(temp_path, 'rb') as audio_file:
                audio_data = base64.b64encode(audio_file.read()).decode('utf-8')
                
            return f"data:audio/wav;base64,{audio_data}"
            
        finally:
            if os.path.exists(temp_path):
                os.remove(temp_path)
                
    except Exception as e:
        raise ValueError(f"Error processing audio file {file_path}: {str(e)}")

In [None]:
def create_html_report(
    df: pd.DataFrame,
    enrollment_root: str,
    libri_root: str,
    output_dir: str = "local",
    output_file: Optional[str] = None
) -> str:
    """
    Create an HTML report with audio files from enrollment and LibriSpeech roots.
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    if output_file is None:
        from datetime import datetime
        output_file = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
    
    # Map columns to their root directories, audio types and display names
    AUDIO_ROOTS = {
        'libri_path': {'root': Path(libri_root), 'type': 'flac', 'display': 'Original'},
        'enrollment_path': {'root': Path(enrollment_root), 'type': 'wav', 'display': 'Anonymized'}
    }

    # Map for display names of all columns
    COLUMN_DISPLAY_NAMES = {
        'libri_path': 'Original',
        'enrollment_path': 'Anonymized'
    }

    def handle_audio_file(src_path: Path, root_path: Path, audio_type: str) -> tuple[Path, str]:
        """Copy or convert audio file as needed."""
        rel_path = src_path.relative_to(root_path)
        dest_path = output_dir / 'audio' / rel_path

        # For FLAC files, we'll convert to WAV
        if audio_type == 'flac':
            dest_path = dest_path.with_suffix('.wav')
        
        dest_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Only process if file doesn't exist
        if not dest_path.exists():
            if audio_type == 'flac':
                # Read FLAC and write as WAV
                audio_data, samplerate = sf.read(str(src_path))
                sf.write(str(dest_path), audio_data, samplerate)
            else:
                # Direct copy for WAV files
                shutil.copy2(src_path, dest_path)
        
        return dest_path

    html_content = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>VPC Dataset Report</title>
        <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
        <style>
            .audio-cell {{ width: 250px; }}
            .audio-player {{ width: 100%; }}
            .error {{ color: red; }}
            .meta-data {{ font-size: 0.9em; color: #666; }}
        </style>
    </head>
    <body class="container-fluid p-4">
        <h1>VPC Dataset Report</h1>
        <div class="row mb-4">
            <div class="col">
                <div class="card">
                    <div class="card-body">
                        <h5 class="card-title">Dataset Summary</h5>
                        <p class="card-text">Total samples: {len(df)}</p>
                        <p class="card-text">Systems analyzed: {df['system_name'].nunique() if 'system_name' in df.columns else 'N/A'}</p>
                    </div>
                </div>
            </div>
        </div>
        <div class="table-responsive">
            <table class="table table-striped table-bordered">
                <thead class="table-light">
                    <tr>
                        {' '.join(f"<th>{COLUMN_DISPLAY_NAMES.get(col, col)}</th>" for col in df.columns)}
                    </tr>
                </thead>
                <tbody>
    """
    
    # Process rows
    for idx, row in df.iterrows():
        html_content += "<tr>"
        for col in df.columns:
            if col in AUDIO_ROOTS:
                try:
                    audio_info = AUDIO_ROOTS[col]
                    root_path = audio_info['root']
                    audio_type = audio_info['type']
                    audio_path = Path(str(row[col]))
                    
                    if not audio_path.is_absolute():
                        audio_path = root_path / audio_path
                    
                    if audio_path.is_file():
                        # Handle the audio file (copy or convert)
                        dest_path = handle_audio_file(audio_path, root_path, audio_type)
                        rel_path = dest_path.relative_to(output_dir)
                        
                        html_content += f'''
                            <td class="audio-cell">
                                <audio controls class="audio-player">
                                    <source src="{rel_path}" type="audio/wav">
                                    Your browser does not support audio.
                                </audio>
                                <div class="meta-data mt-1">
                                    {COLUMN_DISPLAY_NAMES.get(col, col)}: {rel_path.name}
                                </div>
                            </td>
                        '''
                    else:
                        html_content += f'<td class="error">Audio file not found: {audio_path}</td>'
                except Exception as e:
                    html_content += f'<td class="error">Error: {str(e)}</td>'
            else:
                html_content += f"<td>{row[col]}</td>"
        html_content += "</tr>"
    
    html_content += """
                </tbody>
            </table>
        </div>
        <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
    </body>
    </html>
    """
    
    output_path = output_dir / output_file
    output_path.write_text(html_content)
    
    print(f"Report generated: {output_path}")
    return str(output_path)

In [None]:
# # Process the data
# processed_df = process_vpc_data(num_speakers=29, num_utts_per_spk=2)

# # Create the HTML report
# report_file = create_html_report(processed_df, enrollment_root=VPC_DIR, libri_root=LIBRI_DIR)

report_file = ''
print(f"Report generated: {report_file}")
print("To view the report, start a local server:")
print("python -m http.server 8001")
print(f"Then open http://localhost:8001/{report_file} in your browser")