In [6]:
import torch
import os
import pandas as pd
import torchaudio
from datasets import Dataset
from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification
import gc
import numpy as np
import psutil
from pathlib import Path
import warnings
import json
from sklearn.model_selection import train_test_split
warnings.filterwarnings('ignore')



In [7]:
# ============================================================================
# OPTIMIZED CONFIGURATION FOR MEMORY EFFICIENCY
# ============================================================================

def check_resources():
    """Check available resources and recommend settings"""
    print("🔍 System Resources:")
    print(f"  CPU cores: {psutil.cpu_count()}")
    ram_gb = psutil.virtual_memory().total / (1024**3)
    available_ram_gb = psutil.virtual_memory().available / (1024**3)
    print(f"  RAM: {ram_gb:.1f} GB total, {available_ram_gb:.1f} GB available")
    
    # Memory-based batch size recommendation
    if available_ram_gb < 4:
        recommended_batch = 4
    elif available_ram_gb < 8:
        recommended_batch = 8
    elif available_ram_gb < 16:
        recommended_batch = 16
    else:
        recommended_batch = 32
    
    print(f"  Recommended batch size: {recommended_batch}")
    
    # GPU check
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    if device.type == 'cuda':
        print(f"  GPU: Available - {torch.cuda.get_device_name()}")
        print(f"  GPU Memory: {torch.cuda.get_device_properties(0).total_memory / (1024**3):.1f} GB")
    else:
        print("  GPU: Not available")
    
    return device, recommended_batch

device, RECOMMENDED_BATCH_SIZE = check_resources()

# OPTIMIZED SETTINGS
BATCH_SIZE = min(RECOMMENDED_BATCH_SIZE, 8)  # Conservative for stability
MAX_AUDIO_LENGTH = 16000 * 4  # Reduced to 4 seconds (was 5)
TARGET_SAMPLING_RATE = 16000
SAVE_EVERY_N_BATCHES = 10  # Save progress more frequently

# Output directory
OUTPUT_DIR = Path("/teamspace/studios/this_studio/speechSentimentAnalysis/processed_data")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print(f"🎯 Using device: {device}")
print(f"📊 Optimized settings:")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Max audio length: {MAX_AUDIO_LENGTH/16000:.1f} seconds")
print(f"  Save frequency: Every {SAVE_EVERY_N_BATCHES} batches")

🔍 System Resources:
  CPU cores: 8
  RAM: 15.7 GB total, 2.3 GB available
  Recommended batch size: 4
  GPU: Not available
🎯 Using device: cpu
📊 Optimized settings:
  Batch size: 4
  Max audio length: 4.0 seconds
  Save frequency: Every 10 batches


In [8]:
# ============================================================================
# STREAMING PROCESSOR (MEMORY EFFICIENT)
# ============================================================================

class StreamingAudioProcessor:
    """
    Memory-efficient streaming processor that saves intermediate results
    """
    def __init__(self, output_dir, batch_size=BATCH_SIZE):
        self.output_dir = Path(output_dir)
        self.batch_size = batch_size
        self.processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-large-960h-lv60-self")
        self.processed_samples = []
        self.batch_count = 0
        
    def process_single_file(self, file_path, label_id, label, dataset):
        """Process a single audio file"""
        try:
            # Load audio
            speech_array, sr = load_audio_memory_efficient(file_path)
            
            # Process with Wav2Vec2 - WITHOUT truncation parameters
            inputs = self.processor(
                speech_array, 
                sampling_rate=sr, 
                return_tensors="pt",
                padding=True
            )
            
            # Extract and convert to list (save memory)
            input_values = inputs['input_values'].squeeze().tolist()
            
            # Clean up tensors immediately
            del inputs
            del speech_array
            
            return {
                'input_values': input_values,
                'labels': int(label_id),
                'label': label,
                'dataset': dataset,
                'path': file_path
            }
            
        except Exception as e:
            print(f"❌ Error processing {file_path}: {e}")
            return None
    
    def save_batch(self, batch_samples, batch_idx):
        """Save a batch of processed samples"""
        if not batch_samples:
            return
        
        batch_df = pd.DataFrame(batch_samples)
        batch_file = self.output_dir / f"batch_{batch_idx:04d}.parquet"
        batch_df.to_parquet(batch_file)
        
        # Clear the batch from memory
        del batch_df
        del batch_samples
        gc.collect()
        
        return batch_file
    
    def process_dataset_streaming(self, dataset_df, dataset_name):
        """Process dataset in streaming fashion"""
        print(f"\n🔄 Processing {dataset_name} in streaming mode...")
        
        total_samples = len(dataset_df)
        processed_count = 0
        batch_files = []
        current_batch = []
        
        for idx, row in dataset_df.iterrows():
            # Process single file
            sample = self.process_single_file(
                row['path'], 
                row['label_id'], 
                row['label'], 
                row['dataset']
            )
            
            if sample:
                current_batch.append(sample)
                processed_count += 1
            
            # Save batch when full or at end
            if len(current_batch) >= self.batch_size or idx == total_samples - 1:
                if current_batch:
                    batch_file = self.save_batch(current_batch, self.batch_count)
                    batch_files.append(batch_file)
                    current_batch = []
                    self.batch_count += 1
                    
                    # Progress update
                    progress = (processed_count / total_samples) * 100
                    print(f"  Processed {processed_count}/{total_samples} ({progress:.1f}%)")
                    
                    # Aggressive memory cleanup
                    if self.batch_count % SAVE_EVERY_N_BATCHES == 0:
                        gc.collect()
                        if device.type == 'cuda':
                            torch.cuda.empty_cache()
        
        print(f"✅ {dataset_name}: {processed_count} samples processed in {len(batch_files)} batches")
        return batch_files, processed_count
    
    def combine_batches(self, batch_files, output_name):
        """Combine batch files into final dataset"""
        print(f"\n🔗 Combining {len(batch_files)} batches into {output_name}...")
        
        all_data = []
        for batch_file in batch_files:
            try:
                batch_df = pd.read_parquet(batch_file)
                all_data.append(batch_df)
                
                # Delete batch file to save space
                batch_file.unlink()
                
            except Exception as e:
                print(f"❌ Error reading batch {batch_file}: {e}")
        
        if all_data:
            combined_df = pd.concat(all_data, ignore_index=True)
            output_path = self.output_dir / f"{output_name}.parquet"
            combined_df.to_parquet(output_path)
            
            print(f"✅ Combined dataset saved: {output_path}")
            print(f"   Final size: {len(combined_df)} samples")
            
            # Cleanup
            del all_data
            del combined_df
            gc.collect()
            
            return output_path
        
        return None

In [None]:
# ============================================================================
# DATASET LOADING (SAME AS ORIGINAL)
# ============================================================================

def load_datasets():
    """Load all datasets with file existence checking"""
    BASE_DIR = "/teamspace/studios/this_studio/speechSentimentAnalysis/data/input"
    all_entries = []
    
    print(f"📂 Loading datasets from: {BASE_DIR}")
    
    # RAVDESS
    ravdess_path = os.path.join(BASE_DIR, "Ravdess/audio_speech_actors_01-24")
    if os.path.exists(ravdess_path):
        print("Loading RAVDESS...")
        for actor in os.listdir(ravdess_path):
            actor_path = os.path.join(ravdess_path, actor)
            if os.path.isdir(actor_path):
                for file in os.listdir(actor_path):
                    if file.endswith(".wav"):
                        file_path = os.path.join(actor_path, file)
                        if os.path.exists(file_path):  # Check file exists
                            parts = file.split('.')[0].split('-')
                            if len(parts) >= 3:
                                emotion_id = int(parts[2])
                                label_map = {
                                    1: 'neutral', 2: 'calm', 3: 'happy', 4: 'sad',
                                    5: 'angry', 6: 'fear', 7: 'disgust', 8: 'surprise'
                                }
                                label = label_map.get(emotion_id, 'unknown')
                                all_entries.append({
                                    "path": file_path,
                                    "label": label,
                                    "dataset": "ravdess"
                                })
    
    # CREMA-D
    crema_path = os.path.join(BASE_DIR, "Crema")
    if os.path.exists(crema_path):
        print("Loading CREMA-D...")
        for file in os.listdir(crema_path):
            if file.endswith(".wav"):
                file_path = os.path.join(crema_path, file)
                if os.path.exists(file_path):  # Check file exists
                    parts = file.split('_')
                    if len(parts) >= 3:
                        emotion_code = parts[2]
                        label_map = {
                            'SAD': 'sad', 'ANG': 'angry', 'DIS': 'disgust',
                            'FEA': 'fear', 'HAP': 'happy', 'NEU': 'neutral'
                        }
                        label = label_map.get(emotion_code, 'unknown')
                        all_entries.append({
                            "path": file_path,
                            "label": label,
                            "dataset": "crema"
                        })
    
    # TESS
    tess_path = os.path.join(BASE_DIR, "Tess")
    if os.path.exists(tess_path):
        print("Loading TESS...")
        for speaker_dir in os.listdir(tess_path):
            speaker_path = os.path.join(tess_path, speaker_dir)
            if os.path.isdir(speaker_path):
                for file in os.listdir(speaker_path):
                    if file.endswith(".wav"):
                        file_path = os.path.join(speaker_path, file)
                        if os.path.exists(file_path):  # Check file exists
                            parts = file.split('.')[0].split('_')
                            if len(parts) >= 3:
                                emotion_code = parts[2]
                                label = 'surprise' if emotion_code == 'ps' else emotion_code.lower()
                                all_entries.append({
                                    "path": file_path,
                                    "label": label,
                                    "dataset": "tess"
                                })
    
    # SAVEE
    savee_path = os.path.join(BASE_DIR, "Savee")
    if os.path.exists(savee_path):
        print("Loading SAVEE...")
        for file in os.listdir(savee_path):
            if file.endswith(".wav"):
                file_path = os.path.join(savee_path, file)
                if os.path.exists(file_path):  # Check file exists
                    parts = file.split('_')
                    if len(parts) >= 2:
                        emo = parts[1]
                        code = emo[:2] if emo.startswith('sa') else emo[0]
                        label_map = {
                            'a': 'angry', 'd': 'disgust', 'f': 'fear', 'h': 'happy',
                            'n': 'neutral', 'sa': 'sad', 'su': 'surprise'
                        }
                        label = label_map.get(code, 'unknown')
                        all_entries.append({
                            "path": file_path,
                            "label": label,
                            "dataset": "savee"
                        })
    
    # Create DataFrame and clean
    df = pd.DataFrame(all_entries)
    df = df[df["label"] != "unknown"]
    
    # Create label mappings
    label_list = sorted(df["label"].unique())
    label2id = {label: idx for idx, label in enumerate(label_list)}
    id2label = {idx: label for label, idx in label2id.items()}
    
    df["label_id"] = df["label"].map(label2id)
    
    print(f"✅ Loaded {len(df)} total samples")
    print(f"📊 Labels: {label_list}")
    
    return df, label2id, id2label

In [10]:
# ============================================================================
# MAIN PROCESSING PIPELINE
# ============================================================================

def main():
    """Main processing pipeline"""
    print("🚀 Starting Memory-Optimized Processing Pipeline")
    print("=" * 60)
    
    # Load datasets
    df, label2id, id2label = load_datasets()
    
    # Save metadata
    metadata_path = OUTPUT_DIR / "metadata.csv"
    df.to_csv(metadata_path, index=False)
    
    # Save label mappings
    with open(OUTPUT_DIR / "label_mappings.json", "w") as f:
        json.dump({"label2id": label2id, "id2label": id2label}, f, indent=2)
    
    # Split dataset
    print("\n📊 Splitting dataset...")
    train_df, temp_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])
    valid_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42, stratify=temp_df['label'])
    
    print(f"  Train: {len(train_df)} samples")
    print(f"  Validation: {len(valid_df)} samples")
    print(f"  Test: {len(test_df)} samples")
    
    # Initialize processor
    processor = StreamingAudioProcessor(OUTPUT_DIR, BATCH_SIZE)
    
    # Process each split
    datasets = [
        (train_df, "train"),
        (valid_df, "valid"),
        (test_df, "test")
    ]
    
    for dataset_df, name in datasets:
        print(f"\n{'='*60}")
        print(f"Processing {name.upper()} dataset")
        print('='*60)
        
        # Process in streaming mode
        batch_files, processed_count = processor.process_dataset_streaming(dataset_df, name)
        
        # Combine batches
        if batch_files:
            final_path = processor.combine_batches(batch_files, f"{name}_processed")
            print(f"✅ {name} dataset: {processed_count} samples saved to {final_path}")
        
        # Reset batch counter for next dataset
        processor.batch_count = 0
    
    print(f"\n🎉 All processing complete!")
    print(f"📁 Output directory: {OUTPUT_DIR}")
    print(f"📄 Files created:")
    print(f"  - metadata.csv")
    print(f"  - label_mappings.json")
    print(f"  - train_processed.parquet")
    print(f"  - valid_processed.parquet")
    print(f"  - test_processed.parquet")

# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================

def load_processed_datasets():
    """Load processed datasets for training"""
    try:
        train_df = pd.read_parquet(OUTPUT_DIR / "train_processed.parquet")
        valid_df = pd.read_parquet(OUTPUT_DIR / "valid_processed.parquet")
        test_df = pd.read_parquet(OUTPUT_DIR / "test_processed.parquet")
        
        train_dataset = Dataset.from_pandas(train_df)
        valid_dataset = Dataset.from_pandas(valid_df)
        test_dataset = Dataset.from_pandas(test_df)
        
        print(f"✅ Loaded processed datasets:")
        print(f"  Train: {len(train_dataset)} samples")
        print(f"  Validation: {len(valid_dataset)} samples")
        print(f"  Test: {len(test_dataset)} samples")
        
        return train_dataset, valid_dataset, test_dataset
        
    except Exception as e:
        print(f"❌ Error loading processed datasets: {e}")
        return None, None, None

def get_memory_usage():
    """Check current memory usage"""
    process = psutil.Process(os.getpid())
    memory_mb = process.memory_info().rss / 1024 / 1024
    print(f"💾 Current memory usage: {memory_mb:.1f} MB")
    return memory_mb

if __name__ == "__main__":
    main()

🚀 Starting Memory-Optimized Processing Pipeline
📂 Loading datasets from: data\input


KeyError: 'label'