### **Imports**

In [None]:
%pip install pandas librosa soundfile numpy

In [3]:
import os
import pandas as pd
import librosa
import soundfile as sf
import numpy as np

### **Folder Paths**

In [4]:
AUDIO_FOLDER = "D:/_3rd Year Class/1st Sem/Machine Learning/_ForLE/For DataSet/TestInput" 
ANNOTATION_FOLDER = "D:/_3rd Year Class/1st Sem/Machine Learning/_ForLE/For DataSet/TestTable"
OUTPUT_FOLDER = "D:/_3rd Year Class/1st Sem/Machine Learning/_ForLE/For DataSet/TestOutput"

### **Sementation Settings**

In [None]:
SEGMENT_LENGTH = 5  # seconds
COL_TYPE = "Type"
COL_QUALITY = "Quality"

# HYBRID APPROACH SETTINGS
WINDOW_POSITION = "center"  # Center the call for better CNN context
MIN_ANNOTATION_GAP = 1.0  # Minimum seconds between annotations to create separate segments
OVERLAP_THRESHOLD = 0.8  # Only skip if >80% overlap (vs 50% before)

### **Setup MainLoop**

In [None]:
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

audio_files = [f for f in os.listdir(AUDIO_FOLDER) if f.endswith(".wav")]
annotation_files = [f for f in os.listdir(ANNOTATION_FOLDER) if f.endswith(".txt")]

def get_base(filename):
    return filename.split(".Table")[0].split(".txt")[0].split(".wav")[0]

audio_bases = {get_base(f): f for f in audio_files}
annot_bases = {get_base(f): f for f in annotation_files}

# Master manifest to collect all segments
all_segments = []

# CARRY-OVER SYSTEM: Track deficit across files
no_eagle_deficit = 0

# Track segment groups for train/test splitting
segment_group_id = 0

# ===============================
# HELPER FUNCTIONS
# ===============================
def calculate_overlap(start1, end1, start2, end2):
    """Calculate the overlap ratio between two time ranges"""
    overlap_start = max(start1, start2)
    overlap_end = min(end1, end2)
    overlap_duration = max(0, overlap_end - overlap_start)
    
    segment1_duration = end1 - start1
    segment2_duration = end2 - start2
    
    if segment1_duration == 0 or segment2_duration == 0:
        return 0
    
    # Return the maximum overlap ratio
    ratio1 = overlap_duration / segment1_duration
    ratio2 = overlap_duration / segment2_duration
    
    return max(ratio1, ratio2)

# ===============================
# MAIN LOOP
# ===============================
for base_name, audio_file in audio_bases.items():
    if base_name not in annot_bases:
        print(f"‚ö†Ô∏è No annotation file for {audio_file}, skipping...")
        continue

    annotation_file = annot_bases[base_name]
    audio_path = os.path.join(AUDIO_FOLDER, audio_file)
    annot_path = os.path.join(ANNOTATION_FOLDER, annotation_file)

    print(f"\nüéß Processing: {audio_file}")
    print(f"   ‚Ü≥ Using annotations from: {annotation_file}")

    # Load audio
    y, sr = librosa.load(audio_path, sr=None)
    total_duration = librosa.get_duration(y=y, sr=sr)

    # Load annotation table
    annotations = pd.read_csv(
        annot_path,
        sep="\t",
        engine="python",
        comment="#",
        skip_blank_lines=True
    )
    annotations.columns = [c.strip() for c in annotations.columns]

    if COL_TYPE not in annotations.columns:
        print(f"‚ö†Ô∏è No '{COL_TYPE}' column found in {annotation_file}, skipping...")
        continue

    # Check if Quality column exists
    has_quality = COL_QUALITY in annotations.columns

    print(f"   üìã Annotation types found: {annotations[COL_TYPE].unique()}")
    if has_quality:
        print(f"   üìã Quality levels found: {annotations[COL_QUALITY].unique()}")

    # Remove ambiguous labels
    annotations = annotations[
        ~annotations[COL_TYPE].astype(str).str.contains(r'\?|ambiguous', case=False, na=False)
    ].copy()

    # Separate unnecessary parts
    unnecessary = annotations[
        annotations[COL_TYPE].astype(str).str.lower().str.contains("necessar", na=False)
    ].copy()
    
    valid_annots = annotations[
        ~annotations[COL_TYPE].astype(str).str.lower().str.contains("necessar", na=False)
    ].copy()

    print(f"   üö´ Unnecessary regions: {len(unnecessary)}")
    print(f"   ‚úÖ Valid eagle annotations: {len(valid_annots)}")

    # Convert times to float
    for col in ["Begin Time (s)", "End Time (s)"]:
        if not valid_annots.empty:
            valid_annots[col] = valid_annots[col].astype(float)
        if not unnecessary.empty:
            unnecessary[col] = unnecessary[col].astype(float)

    if 'Selection' in valid_annots.columns:
        valid_annots['Selection'] = valid_annots['Selection'].astype(int)

    seg_idx = 0
    stats = {}
    file_segments = []
    
    # Process each eagle annotation to create centered segments
    eagle_segments = []
    created_segments = []  # Track all created segments for overlap detection
    
    if not valid_annots.empty:
        # Sort annotations by start time
        valid_annots_sorted = valid_annots.sort_values('Begin Time (s)')
        
        for idx, annot in valid_annots_sorted.iterrows():
            annot_start = annot["Begin Time (s)"]
            annot_end = annot["End Time (s)"]
            annot_type = str(annot[COL_TYPE])
            annot_selection = int(annot['Selection']) if 'Selection' in annot else idx
            
            # Check if too close to previous annotation
            if len(valid_annots_sorted) > 1:
                prev_annots = valid_annots_sorted[valid_annots_sorted['Begin Time (s)'] < annot_start]
                if not prev_annots.empty:
                    prev_end = prev_annots.iloc[-1]['End Time (s)']
                    gap = annot_start - prev_end
                    if gap < MIN_ANNOTATION_GAP and gap > 0:
                        print(f"   ‚è≠Ô∏è  Skipping Selection {annot_selection} (too close to previous: {gap:.2f}s gap)")
                        continue
            
            # Calculate segment window - CENTERED
            if WINDOW_POSITION == "start":
                seg_start = annot_start
                seg_end = min(annot_start + SEGMENT_LENGTH, total_duration)
            elif WINDOW_POSITION == "center":
                annot_mid = (annot_start + annot_end) / 2
                seg_start = max(0, annot_mid - SEGMENT_LENGTH / 2)
                seg_end = min(seg_start + SEGMENT_LENGTH, total_duration)
                # Adjust if we hit the end
                if seg_end >= total_duration:
                    seg_end = total_duration
                    seg_start = max(0, seg_end - SEGMENT_LENGTH)
            else:  # "end"
                seg_end = annot_end
                seg_start = max(0, seg_end - SEGMENT_LENGTH)
            
            # Check if this segment overlaps with unnecessary regions
            skip_unnecessary = False
            if not unnecessary.empty:
                for _, unn_row in unnecessary.iterrows():
                    if seg_start < unn_row["End Time (s)"] and seg_end > unn_row["Begin Time (s)"]:
                        skip_unnecessary = True
                        break
            
            if skip_unnecessary:
                print(f"   ‚è≠Ô∏è  Skipping annotation at {annot_start:.2f}s (overlaps unnecessary region)")
                continue
            
            # Check overlap with already created segments (SMART SKIPPING)
            skip_overlap = False
            overlapping_group = None
            for existing_seg in created_segments:
                overlap_ratio = calculate_overlap(seg_start, seg_end, 
                                                  existing_seg['start'], existing_seg['end'])
                if overlap_ratio > OVERLAP_THRESHOLD:
                    skip_overlap = True
                    print(f"   ‚è≠Ô∏è  Skipping Selection {annot_selection} ({overlap_ratio*100:.0f}% overlap with existing segment)")
                    break
                elif overlap_ratio > 0.3:  # Moderate overlap - same group for splitting
                    overlapping_group = existing_seg.get('group_id')
            
            if skip_overlap:
                continue
            
            # Assign group ID for train/test splitting
            if overlapping_group is None:
                current_group_id = segment_group_id
                segment_group_id += 1
            else:
                current_group_id = overlapping_group
            
            # Find all annotations that overlap with this segment window
            overlapped = valid_annots[
                (valid_annots["Begin Time (s)"] < seg_end) &
                (valid_annots["End Time (s)"] > seg_start)
            ]
            
            # Collect annotation details
            annotation_details = []
            types = []
            qualities = []
            selection_numbers = []
            fully_captured_selections = []
            
            for _, overlap_annot in overlapped.iterrows():
                overlap_start = overlap_annot["Begin Time (s)"]
                overlap_end = overlap_annot["End Time (s)"]
                overlap_selection = int(overlap_annot['Selection']) if 'Selection' in overlap_annot else None
                overlap_type = str(overlap_annot[COL_TYPE]).lower()
                
                # Check if this annotation is FULLY contained in the segment
                is_fully_contained = (overlap_start >= seg_start and overlap_end <= seg_end)
                if is_fully_contained and overlap_selection is not None:
                    fully_captured_selections.append(overlap_selection)
                
                if "long" in overlap_type or "_long" in overlap_type:
                    call_type = "long"
                elif "short" in overlap_type or "_short" in overlap_type:
                    call_type = "short"
                else:
                    call_type = "unknown"
                
                types.append(call_type)
                
                # Get quality
                if has_quality:
                    quality = str(overlap_annot[COL_QUALITY]).strip()
                    if quality.lower() not in ["high", "medium", "low"]:
                        quality = "Medium"
                else:
                    quality = "Medium"
                
                qualities.append(quality)
                
                if 'Selection' in overlap_annot:
                    selection_numbers.append(int(overlap_annot['Selection']))
                
                annotation_details.append({
                    'selection': int(overlap_annot['Selection']) if 'Selection' in overlap_annot else None,
                    'type': overlap_annot[COL_TYPE],
                    'call_type': call_type,
                    'quality': quality,
                    'begin': overlap_annot["Begin Time (s)"],
                    'end': overlap_annot["End Time (s)"],
                    'fully_contained': is_fully_contained
                })
            
            # Determine label based on PRIMARY annotation (the trigger)
            annot_type_lower = annot_type.lower()
            if "long" in annot_type_lower:
                label_base = "Eagle_long"
            elif "short" in annot_type_lower:
                label_base = "Eagle_short"
            else:
                label_base = "EagleSound"
            
            # If multiple different types, mark as mixed
            unique_types = list(set(types))
            if len(unique_types) > 1:
                label_base = "EagleMixed"
            
            # Determine quality (use quality of PRIMARY annotation)
            if has_quality:
                segment_quality = str(annot[COL_QUALITY]).strip()
                if segment_quality.lower() not in ["high", "medium", "low"]:
                    segment_quality = "Medium"
            else:
                segment_quality = "Medium"
            
            label_text = f"{label_base}_{segment_quality}"
            
            segment_info = {
                'start': seg_start,
                'end': seg_end,
                'label_base': label_base,
                'label_full': label_text,
                'quality': segment_quality,
                'selection_numbers': selection_numbers,
                'annotation_details': annotation_details,
                'fully_captured_selections': fully_captured_selections,
                'group_id': current_group_id,
                'trigger_annotation': {
                    'start': annot_start,
                    'end': annot_end,
                    'type': annot_type,
                    'selection': annot_selection
                }
            }
            
            eagle_segments.append(segment_info)
            created_segments.append(segment_info)
            
            # Log what was captured
            fully_msg = f" (fully captured: {', '.join(map(str, fully_captured_selections))})" if fully_captured_selections else " (partial)"
            overlap_msg = f" [Group {current_group_id}]" if overlapping_group is not None else ""
            print(f"   ‚úÖ Created segment at {seg_start:.2f}-{seg_end:.2f}s for Selection {annot_selection}{fully_msg}{overlap_msg}")
    
    print(f"   üìä Eagle segments created: {len(eagle_segments)}")
    
    # CARRY-OVER SYSTEM: Calculate total needed including deficit
    num_no_eagle_needed = len(eagle_segments) + no_eagle_deficit
    print(f"   üìä Deficit from previous files: {no_eagle_deficit}")
    print(f"   ‚öñÔ∏è  Total no-eagle segments needed: {num_no_eagle_needed}")
    
    # Create no-eagle segments
    no_eagle_segments = []
    
    if num_no_eagle_needed > 0:
        attempts = 0
        max_attempts = num_no_eagle_needed * 10
        
        while len(no_eagle_segments) < num_no_eagle_needed and attempts < max_attempts:
            attempts += 1
            
            # Random start time
            random_start = np.random.uniform(0, max(0, total_duration - SEGMENT_LENGTH))
            random_end = min(random_start + SEGMENT_LENGTH, total_duration)
            
            # Check if overlaps with eagle annotations
            overlaps_eagle = False
            if not valid_annots.empty:
                for _, annot in valid_annots.iterrows():
                    if random_start < annot["End Time (s)"] and random_end > annot["Begin Time (s)"]:
                        overlaps_eagle = True
                        break
            
            if overlaps_eagle:
                continue
            
            # Check if overlaps with unnecessary
            overlaps_unnecessary = False
            if not unnecessary.empty:
                for _, unn_row in unnecessary.iterrows():
                    if random_start < unn_row["End Time (s)"] and random_end > unn_row["Begin Time (s)"]:
                        overlaps_unnecessary = True
                        break
            
            if overlaps_unnecessary:
                continue
            
            # Check if overlaps with existing no-eagle segments
            overlaps_existing = False
            for existing in no_eagle_segments:
                if random_start < existing['end'] and random_end > existing['start']:
                    overlaps_existing = True
                    break
            
            if overlaps_existing:
                continue
            
            # Valid no-eagle segment
            no_eagle_segments.append({
                'start': random_start,
                'end': random_end,
                'label': "NoEagleSound",
                'group_id': segment_group_id
            })
            segment_group_id += 1
        
        print(f"   üìä No-eagle segments created: {len(no_eagle_segments)}")
        
        # CARRY-OVER SYSTEM: Update deficit
        if len(no_eagle_segments) >= num_no_eagle_needed:
            no_eagle_deficit = 0
            print(f"   ‚úÖ Deficit cleared! All {num_no_eagle_needed} no-eagle segments created.")
        else:
            no_eagle_deficit = num_no_eagle_needed - len(no_eagle_segments)
            print(f"   ‚ö†Ô∏è  Could only create {len(no_eagle_segments)}/{num_no_eagle_needed} segments.")
            print(f"   ‚ö†Ô∏è  Carrying over deficit of {no_eagle_deficit} to next file.")
    
    # Save all segments
    all_selected_segments = eagle_segments + no_eagle_segments
    
    for segment_data in all_selected_segments:
        start = segment_data['start']
        end = segment_data['end']
        group_id = segment_data['group_id']
        
        if 'label_full' in segment_data:
            # Eagle segment
            label_text = segment_data['label_full']
            label_base = segment_data['label_base']
            quality = segment_data['quality']
            selection_numbers = segment_data.get('selection_numbers', [])
            annotation_details = segment_data['annotation_details']
            trigger = segment_data['trigger_annotation']
        else:
            # No eagle segment
            label_text = segment_data['label']
            label_base = "NoEagleSound"
            quality = None
            selection_numbers = []
            annotation_details = []
            trigger = None
        
        # Prepare subfolder
        label_folder = os.path.join(OUTPUT_FOLDER, label_text)
        os.makedirs(label_folder, exist_ok=True)

        # Extract segment
        start_sample = int(start * sr)
        end_sample = int(end * sr)
        segment = y[start_sample:end_sample]
        
        # Skip if too short
        if len(segment) < 0.5 * sr:
            continue

        # Save segment
        filename = f"{base_name}_seg{seg_idx:04d}.wav"
        save_path = os.path.join(label_folder, filename)
        sf.write(save_path, segment, sr)
        
        stats[label_text] = stats.get(label_text, 0) + 1
        
        # Create manifest entry
        selection_list_str = ', '.join(map(str, selection_numbers)) if selection_numbers else 'None'
        
        individual_times = []
        if annotation_details:
            for detail in annotation_details:
                sel_num = detail.get('selection', '?')
                fully_contained = detail.get('fully_contained', False)
                containment_marker = "‚úì" if fully_contained else "‚ö†"
                if label_base == "EagleMixed":
                    individual_times.append(
                        f"Sel{sel_num}({detail['call_type']}){containment_marker}: {detail['begin']:.3f}-{detail['end']:.3f}s"
                    )
                else:
                    individual_times.append(
                        f"Sel{sel_num}{containment_marker}: {detail['begin']:.3f}-{detail['end']:.3f}s"
                    )
        individual_times_str = '; '.join(individual_times) if individual_times else 'N/A'
        
        segment_info = {
            'source_audio': audio_file,
            'segment_filename': filename,
            'label': label_text,
            'label_category': label_base,
            'quality': quality if quality else 'N/A',
            'output_folder': label_text,
            'segment_start_time': round(start, 3),
            'segment_end_time': round(end, 3),
            'segment_duration': round(end - start, 3),
            'trigger_annotation_start': round(trigger['start'], 3) if trigger else 'N/A',
            'trigger_annotation_end': round(trigger['end'], 3) if trigger else 'N/A',
            'trigger_selection': trigger['selection'] if trigger else 'N/A',
            'num_annotations': len(annotation_details),
            'selection_numbers': selection_list_str,
            'annotation_types': '; '.join([a['type'] for a in annotation_details]) if annotation_details else 'None',
            'annotation_times': '; '.join([f"{a['begin']:.3f}-{a['end']:.3f}" for a in annotation_details]) if annotation_details else 'None',
            'individual_call_details': individual_times_str,
            'overlap_group_id': group_id,  # For train/test splitting
            'window_position': WINDOW_POSITION
        }
        
        file_segments.append(segment_info)
        all_segments.append(segment_info)
        
        seg_idx += 1
        
        trigger_info = f"triggered by Sel{trigger['selection']}" if trigger else "random"
        print(f"‚úÖ Saved: {label_text}/{filename} ({trigger_info}, Group {group_id})")
    
    # Print summary
    print(f"\nüìä Summary for {audio_file}:")
    for label, count in stats.items():
        if count > 0:
            print(f"   ‚Ä¢ {label}: {count} segments")
    
    eagle_count = len(eagle_segments)
    no_eagle_count = len(no_eagle_segments)
    print(f"   ‚Ä¢ Total Eagle: {eagle_count} segments")
    print(f"   ‚Ä¢ Total No Eagle: {no_eagle_count} segments")
    print(f"   ‚Ä¢ Balance for this file: {eagle_count}/{no_eagle_count} (Eagle/NoEagle)")
    
    # Save per-file manifest
    if file_segments:
        file_manifest_df = pd.DataFrame(file_segments)
        manifest_filename = f"{base_name}_manifest.csv"
        manifest_path = os.path.join(OUTPUT_FOLDER, manifest_filename)
        file_manifest_df.to_csv(manifest_path, index=False)
        print(f"üìã Manifest saved: {manifest_filename}")

# Save master manifest
if all_segments:
    master_manifest_df = pd.DataFrame(all_segments)
    master_manifest_path = os.path.join(OUTPUT_FOLDER, "master_manifest.csv")
    master_manifest_df.to_csv(master_manifest_path, index=False)
    print(f"\nüìã Master manifest saved: master_manifest.csv")
    
    print(f"\nüìä Overall Statistics:")
    print(f"   ‚Ä¢ Total segments created: {len(all_segments)}")
    print(f"   ‚Ä¢ Total overlap groups: {master_manifest_df['overlap_group_id'].nunique()}")
    
    print(f"\n   Eagle segments by quality:")
    for quality in ["High", "Medium", "Low"]:
        quality_segs = master_manifest_df[master_manifest_df['quality'] == quality]
        if len(quality_segs) > 0:
            print(f"   ‚Ä¢ {quality}: {len(quality_segs)} segments")
    
    print(f"\n   By label:")
    label_counts = master_manifest_df['label'].value_counts()
    for label, count in label_counts.items():
        print(f"   ‚Ä¢ {label}: {count} segments")
    
    eagle_total = len(master_manifest_df[master_manifest_df['label_category'] != 'NoEagleSound'])
    no_eagle_total = len(master_manifest_df[master_manifest_df['label_category'] == 'NoEagleSound'])
    print(f"\n   ‚Ä¢ Total Eagle: {eagle_total} segments")
    print(f"   ‚Ä¢ Total No Eagle: {no_eagle_total} segments")
    
    # Final deficit warning
    if no_eagle_deficit > 0:
        print(f"\n   ‚ö†Ô∏è  FINAL DEFICIT: {no_eagle_deficit} no-eagle segments still needed!")
        print(f"   ‚ö†Ô∏è  Dataset is imbalanced by {no_eagle_deficit} segments.")
        actual_balance = (eagle_total / (eagle_total + no_eagle_total)) * 100 if (eagle_total + no_eagle_total) > 0 else 0
        print(f"   ‚Ä¢ Actual Balance: {actual_balance:.1f}% Eagle / {100-actual_balance:.1f}% No Eagle")
    else:
        if eagle_total + no_eagle_total > 0:
            eagle_pct = (eagle_total / (eagle_total + no_eagle_total)) * 100
            print(f"   ‚úÖ Perfect Balance: {eagle_pct:.1f}% Eagle / {100-eagle_pct:.1f}% No Eagle")
    
    # Important note about train/test splitting
    print(f"\nüí° IMPORTANT NOTE FOR ML TRAINING:")
    print(f"   ‚Ä¢ Use 'overlap_group_id' for stratified splitting")
    print(f"   ‚Ä¢ This ensures overlapping segments stay together in train/val/test")
    print(f"   ‚Ä¢ Prevents data leakage between splits")

print("\nüéâ All audio files segmented successfully!")