In [13]:
# This script was written with the assistance of an AI model.

import pandas as pd
import glob
import pathlib
import torchaudio
import torch
import numpy as np
import matplotlib.pyplot as plt
import uuid
from tqdm import tqdm

# --- Blind Test Configuration ---

# 1. Path to the original 5-minute source audio files.
SOURCE_AUDIO_FOLDER = pathlib.Path("/run/user/1000/gvfs/smb-share:server=sl-nas.local,share=geomatics/LEG9_S20250708_PSTATSRAADLEHMKUHL/ACOUSTIC/HYDROPHONES")

# 2. Path to the output folder from your main analysis script (where logs are).
BASE_OUTPUT_PATH = pathlib.Path("/run/user/1000/gvfs/smb-share:server=sl-nas.local,share=shared_all/_AZORES_NUUK/Natacha_Group/anomaly_files/Anomalies_Out/anomalies_output_refactored_filtered_new_instrument_detection")

# 3. Total number of samples to draw for each category from the ENTIRE dataset.
#    The script will try to spread these samples across as many days as possible.
TOTAL_POSITIVE_SAMPLES = 25  # How many "High Quality" samples to select in total.
TOTAL_NEGATIVE_SAMPLES = 25  # How many "Discarded" samples to select in total.

# 4. Settings for the generated files.
CLIP_DURATION_SECONDS = 20.0 # Duration for all generated clips.
BLIND_TEST_FOLDER = "blind_test_data" # Subfolder for all generated files.
BLIND_SHEET_FILENAME = "blind_test_sheet.csv" # The sheet for the expert to fill out.
ANSWER_KEY_FILENAME = "answer_key.csv" # The sheet with the true labels and source info.


# --- Helper Functions ---

def get_loudness_threshold(output_path):
    """Determines the loudness threshold based on folder existence."""
    return 2.5 if (output_path / "high_quality_clips").exists() else float('inf')

def categorize_for_blind_test(row, hq_threshold):
    """Categorizes anomalies into 'Positive', 'Negative', or 'Ignore' for sampling."""
    status, reason = row['status'], row['reason']
    if status == 'KEPT':
        try:
            loudness_ratio = float(str(row['details']).split('=')[-1])
            return 'Positive' if loudness_ratio >= hq_threshold else 'Ignore'
        except (ValueError, IndexError):
            return 'Ignore' # Ignore low volume or unparsable 'KEPT' events
    elif status == 'DISCARDED':
        return 'Negative'
    return 'Ignore'

def stratified_sample_total(df, n_total):
    """
    Samples a total of n_total items, ensuring maximum diversity across days.
    """
    if n_total == 0:
        return pd.DataFrame()
    if n_total >= len(df):
        return df.copy()

    # Create a list of (day, original_index) tuples for round-robin sampling
    samples_by_day = df.groupby('day').apply(lambda x: list(x.index)).to_dict()
    day_keys = list(samples_by_day.keys())
    
    final_indices = []
    day_idx = 0
    while len(final_indices) < n_total and any(samples_by_day.values()):
        current_day = day_keys[day_idx % len(day_keys)]
        if samples_by_day[current_day]:
            # Pop a sample from this day's list
            final_indices.append(samples_by_day[current_day].pop(0))
        day_idx += 1
        
    return df.loc[final_indices].copy()


def generate_clip_and_spectrogram(row, source_folder, output_dir, event_id):
    """
    Generates a standardized audio clip and a spectrogram image for a given event.
    Returns the basenames of the created files.
    """
    source_audio_path = source_folder / row['file']
    if not source_audio_path.exists():
        print(f"⚠️ Source audio not found: {source_audio_path}")
        return None, None

    try:
        info = torchaudio.info(source_audio_path)
        sr = info.sample_rate
        ts = float(row['timestamp_s'])
        
        # Calculate start/end for a centered clip
        half_duration_samples = int(CLIP_DURATION_SECONDS / 2 * sr)
        center_sample = int(ts * sr)
        start_sample = max(0, center_sample - half_duration_samples)
        end_sample = min(info.num_frames, center_sample + half_duration_samples)
        
        waveform, _ = torchaudio.load(source_audio_path, frame_offset=start_sample, num_frames=(end_sample - start_sample))

        # Save audio clip
        clip_path = output_dir / f"{event_id}.wav"
        torchaudio.save(clip_path, waveform, sr)
        
        # Generate and save spectrogram
        mel_spectrogram_transform = torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_fft=2048, hop_length=512, n_mels=128)
        melspec = mel_spectrogram_transform(waveform)
        
        # Convert to dB scale for better visualization
        melspec_db = torchaudio.transforms.AmplitudeToDB()(melspec)

        spec_path = output_dir / f"{event_id}.png"
        plt.figure(figsize=(10, 4))
        plt.imshow(melspec_db.squeeze().numpy(), aspect='auto', origin='lower', cmap='viridis')
        plt.axis('off')
        plt.savefig(spec_path, bbox_inches='tight', pad_inches=0)
        plt.close()
        
        return clip_path.name, spec_path.name

    except Exception as e:
        print(f"❌ Failed to generate files for {event_id} from {row['file']}: {e}")
        return None, None


def main():
    """Generates a blind test dataset with anonymized files and separate answer key."""
    print("🚀 Starting blind test dataset generation...")

    # --- 1. Load, Categorize, and Filter ---
    LOUDNESS_THRESHOLD = get_loudness_threshold(BASE_OUTPUT_PATH)
    log_files = glob.glob(str(BASE_OUTPUT_PATH / "*_detailed_log.csv"))
    if not log_files:
        print(f"❌ Error: No log files found in '{BASE_OUTPUT_PATH}'."); return

    df = pd.concat((pd.read_csv(f) for f in log_files), ignore_index=True)
    df['day'] = df['file'].str.extract(r'_(\d{8})_')
    df['blind_category'] = df.apply(categorize_for_blind_test, axis=1, hq_threshold=LOUDNESS_THRESHOLD)
    
    # Keep only rows that are clearly positive or negative
    df_filtered = df[df['blind_category'].isin(['Positive', 'Negative'])].copy()
    print(f"Loaded and filtered {len(df_filtered)} records ({len(df_filtered[df_filtered['blind_category']=='Positive'])} Positive, {len(df_filtered[df_filtered['blind_category']=='Negative'])} Negative).")

    # --- 2. Perform Stratified Sampling ---
    print(f"\n🔍 Sampling {TOTAL_POSITIVE_SAMPLES} Positive and {TOTAL_NEGATIVE_SAMPLES} Negative events...")
    positive_df = df_filtered[df_filtered['blind_category'] == 'Positive']
    negative_df = df_filtered[df_filtered['blind_category'] == 'Negative']

    positive_samples = stratified_sample_total(positive_df, TOTAL_POSITIVE_SAMPLES)
    negative_samples = stratified_sample_total(negative_df, TOTAL_NEGATIVE_SAMPLES)
    
    if positive_samples.empty and negative_samples.empty:
        print("❌ Error: No samples were collected. Check your data and sample counts."); return

    # Combine and shuffle for blindness
    final_samples_df = pd.concat([positive_samples, negative_samples]).sample(frac=1).reset_index(drop=True)
    print(f"Total samples for blind test: {len(final_samples_df)}")

    # --- 3. Generate Files and Collect Data for CSVs ---
    print("\n🔊 Generating anonymized audio clips and spectrograms...")
    output_data_dir = BASE_OUTPUT_PATH.parent / BLIND_TEST_FOLDER
    output_data_dir.mkdir(exist_ok=True)
    
    blind_sheet_data = []
    answer_key_data = []

    for _, row in tqdm(final_samples_df.iterrows(), total=len(final_samples_df), desc="Processing events"):
        event_id = uuid.uuid4().hex[:10] # Short, unique ID
        
        audio_file, spec_file = generate_clip_and_spectrogram(row, SOURCE_AUDIO_FOLDER, output_data_dir, event_id)
        
        if audio_file and spec_file:
            # Data for the expert's sheet
            blind_sheet_data.append({'event_id': event_id, 'label (INPUT)': ''})
            
            # Data for the answer key
            answer_key_data.append({
                'event_id': event_id,
                'audio_file': audio_file,
                'spectrogram_file': spec_file,
                'true_category': row['blind_category'],
                'original_file': row['file'],
                'timestamp_s': row['timestamp_s'],
                'day': row['day'],
                'original_status': row['status'],
                'original_reason': row['reason'],
                'original_details': row['details']
            })

    # --- 4. Create and Save Final CSVs ---
    if not blind_sheet_data:
        print("❌ Error: Failed to generate any valid data. Halting."); return

    # Create the blind sheet for the expert
    blind_df = pd.DataFrame(blind_sheet_data)
    blind_sheet_path = BASE_OUTPUT_PATH.parent / BLIND_SHEET_FILENAME
    blind_df.to_csv(blind_sheet_path, index=False)
    
    # Create the answer key for validation
    key_df = pd.DataFrame(answer_key_data)
    key_sheet_path = BASE_OUTPUT_PATH.parent / ANSWER_KEY_FILENAME
    key_df.to_csv(key_sheet_path, index=False)

    print("\n✅ Success! Blind test dataset generated.")
    print(f"  - Data folder: {output_data_dir.resolve()}")
    print(f"  - Sheet for expert: {blind_sheet_path.resolve()}")
    print(f"  - Answer key: {key_sheet_path.resolve()}")

#if __name__ == '__main__':
main()

🚀 Starting blind test dataset generation...


  samples_by_day = df.groupby('day').apply(lambda x: list(x.index)).to_dict()
  samples_by_day = df.groupby('day').apply(lambda x: list(x.index)).to_dict()


Loaded and filtered 84190 records (789 Positive, 83401 Negative).

🔍 Sampling 25 Positive and 25 Negative events...
Total samples for blind test: 50

🔊 Generating anonymized audio clips and spectrograms...


Processing events: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [02:01<00:00,  2.43s/it]


✅ Success! Blind test dataset generated.
  - Data folder: /run/user/1000/gvfs/smb-share:server=sl-nas.local,share=shared_all/_AZORES_NUUK/Natacha_Group/anomaly_files/Anomalies_Out/blind_test_data
  - Sheet for expert: /run/user/1000/gvfs/smb-share:server=sl-nas.local,share=shared_all/_AZORES_NUUK/Natacha_Group/anomaly_files/Anomalies_Out/blind_test_sheet.csv
  - Answer key: /run/user/1000/gvfs/smb-share:server=sl-nas.local,share=shared_all/_AZORES_NUUK/Natacha_Group/anomaly_files/Anomalies_Out/answer_key.csv



