In [31]:
!pip install pandas numpy matplotlib librosa tqdm scikit-learn

Collecting librosa
  Using cached librosa-0.10.2.post1-py3-none-any.whl.metadata (8.6 kB)
Collecting audioread>=2.1.9 (from librosa)
  Using cached audioread-3.0.1-py3-none-any.whl.metadata (8.4 kB)
Collecting numba>=0.51.0 (from librosa)
  Using cached numba-0.60.0.tar.gz (2.7 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'error'


  error: subprocess-exited-with-error
  
  × python setup.py egg_info did not run successfully.
  │ exit code: 1
  ╰─> [18 lines of output]
      Traceback (most recent call last):
        File "<string>", line 2, in <module>
          exec(compile('''
          ~~~~^^^^^^^^^^^^
          # This is <pip-setuptools-caller> -- a caller that pip uses to run setup.py
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
          ...<31 lines>...
          exec(compile(setup_py_code, filename, "exec"))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
          ''' % ('C:\\Users\\raksh\\AppData\\Local\\Temp\\pip-install-rzlvt5wi\\numba_18e8dcac6d3c4c4e875af3affc9a79fb\\setup.py',), "<pip-setuptools-caller>", "exec"))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "<pip-setuptools-caller>", line 34, in <module>
        

In [32]:
import os
import wave
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import shutil
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from scipy.io import wavfile
from random import sample
from scipy.signal import spectrogram, resample
import random
import scipy.io.wavfile as wav

In [33]:
def get_audio_length(file_path):
    """Get the length of an audio file in seconds."""
    try:
        with wave.open(file_path, 'rb') as wave_file:
            frames = wave_file.getnframes()
            rate = wave_file.getframerate()
            duration = frames / float(rate)
            return duration
    except:
        return None

def create_dataset_df(root_dir):
    """Create a DataFrame with file paths and their classes."""
    data = []
    for class_name in os.listdir(root_dir):
        class_path = os.path.join(root_dir, class_name)
        if os.path.isdir(class_path) and not class_name.startswith('.'):
            for file_name in os.listdir(class_path):
                if file_name.endswith('.wav'):
                    file_path = os.path.join(class_path, file_name)
                    audio_length = get_audio_length(file_path)
                    if audio_length is not None:
                        data.append({
                            'file_path': file_path,
                            'class': class_name,
                            'length': audio_length
                        })
    return pd.DataFrame(data)

def safe_split(df, split_ratio=0.8):
    """
    Safely split DataFrame even with very few samples.
    Returns train and test splits while ensuring at least one sample in each split.
    """
    if len(df) == 0:
        return pd.DataFrame(), pd.DataFrame()
    
    if len(df) == 1:
        # If only one sample, put it in train set
        return df, pd.DataFrame()
    
    if len(df) == 2:
        # If two samples, split them between train and test
        return df.iloc[[0]], df.iloc[[1]]
    
    # For more than 2 samples, use train_test_split
    try:
        train_size = max(1, int(len(df) * split_ratio))
        train_df = df.sample(n=train_size, random_state=42)
        test_df = df.drop(train_df.index)
        return train_df, test_df
    except:
        # Fallback for any unexpected cases
        mid = int(len(df) * split_ratio)
        return df.iloc[:mid], df.iloc[mid:]

def create_split_directories(base_path):
    """Create directories for train, validation, and test sets."""
    splits = ['train', 'validation', 'test']
    for split in splits:
        split_path = os.path.join(base_path, split)
        if not os.path.exists(split_path):
            os.makedirs(split_path)

def copy_files_to_split(df, split_name, base_path):
    """Copy files to their respective split directories."""
    if len(df) == 0:
        print(f"No files to copy for {split_name} split")
        return
        
    for _, row in tqdm(df.iterrows(), desc=f"Copying {split_name} files"):
        class_name = row['class']
        class_dir = os.path.join(base_path, split_name, class_name)
        if not os.path.exists(class_dir):
            os.makedirs(class_dir)
        shutil.copy2(row['file_path'], 
                    os.path.join(class_dir, os.path.basename(row['file_path'])))

def plot_average_lengths(df):
    """Plot average audio length per class."""
    avg_lengths = df.groupby('class')['length'].mean().sort_values(ascending=False)
    
    plt.figure(figsize=(15, 8))
    avg_lengths.plot(kind='bar')
    plt.title('Average Audio Length per Class')
    plt.xlabel('Class')
    plt.ylabel('Average Length (seconds)')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.savefig('average_audio_lengths.png')
    plt.close()
    
    return avg_lengths

def split_dataset(df):
    """Split dataset into train, validation, and test sets."""
    # Split by class to ensure at least one sample per class in training
    train_dfs = []
    val_dfs = []
    test_dfs = []
    
    for class_name in df['class'].unique():
        class_df = df[df['class'] == class_name]
        
        # First split: 80% for train, 20% for rest
        train_df, rest_df = safe_split(class_df, split_ratio=0.8)
        
        # Second split: split remaining data 50-50 for validation and test
        val_df, test_df = safe_split(rest_df, split_ratio=0.5)
        
        train_dfs.append(train_df)
        val_dfs.append(val_df)
        test_dfs.append(test_df)
    
    return (pd.concat(train_dfs, ignore_index=True),
            pd.concat(val_dfs, ignore_index=True),
            pd.concat(test_dfs, ignore_index=True))



In [34]:
def load_audio(file_path):
    """Load audio file and return sample rate and data."""
    sample_rate, data = wavfile.read(file_path)
    # Convert to mono if stereo
    if len(data.shape) > 1:
        data = np.mean(data, axis=1)
    # Normalize the data
    data = data.astype(float)
    data /= np.max(np.abs(data))
    return sample_rate, data

def compute_zcr(audio_data, frame_length=2048, hop_length=512):
    """Compute Zero Crossing Rate for frames of audio data."""
    zcr = []
    # Process frame by frame
    for i in range(0, len(audio_data) - frame_length, hop_length):
        frame = audio_data[i:i + frame_length]
        # Count zero crossings
        zero_crossings = np.sum(np.abs(np.diff(np.signbit(frame).astype(int))))
        zcr.append(zero_crossings / frame_length)
    return np.mean(zcr)

def compute_autocorrelation(audio_data, max_lag=100):
    """Compute autocorrelation of audio data."""
    # Normalize the signal
    audio_data = audio_data - np.mean(audio_data)
    # Compute autocorrelation
    correlation = np.correlate(audio_data, audio_data, mode='full')
    # Take only the positive lags (center to end)
    correlation = correlation[len(correlation)//2:len(correlation)//2 + max_lag]
    # Normalize
    correlation = correlation / correlation[0]
    return correlation

def plot_features(audio_data, sample_rate, zcr, autocorr, filename):
    """Plot waveform, ZCR, and autocorrelation."""
    fig, axes = plt.subplots(3, 1, figsize=(12, 10))
    fig.suptitle(f'Audio Analysis for {os.path.basename(filename)}')
    
    # Plot waveform
    time = np.arange(len(audio_data)) / sample_rate
    axes[0].plot(time, audio_data)
    axes[0].set_title('Waveform')
    axes[0].set_xlabel('Time (s)')
    axes[0].set_ylabel('Amplitude')
    
    # Plot ZCR
    axes[1].axhline(y=zcr, color='r', linestyle='-')
    axes[1].set_title(f'Zero Crossing Rate (Mean: {zcr:.4f})')
    axes[1].set_ylabel('ZCR')
    
    # Plot autocorrelation
    lags = np.arange(len(autocorr))
    axes[2].plot(lags, autocorr)
    axes[2].set_title('Autocorrelation')
    axes[2].set_xlabel('Lag')
    axes[2].set_ylabel('Correlation')
    
    plt.tight_layout()
    plt.savefig(f'./zcrAC/analysis_{os.path.basename(filename)}.png')
    plt.close()

def analyze_random_files(train_dir, n_files=5):
    """Analyze n random audio files from the training set."""
    # Get all audio files from training directory
    audio_files = []
    for root, _, files in os.walk(train_dir):
        for file in files:
            if file.endswith('.wav'):
                audio_files.append(os.path.join(root, file))
    
    # Randomly select n files
    if len(audio_files) < n_files:
        print(f"Warning: Only {len(audio_files)} files available")
        selected_files = audio_files
    else:
        selected_files = sample(audio_files, n_files)
    
    # Analyze each selected file
    results = []
    for file_path in selected_files:
        print(f"\nAnalyzing {os.path.basename(file_path)}...")
        
        # Load audio
        sample_rate, audio_data = load_audio(file_path)
        
        # Compute features
        zcr = compute_zcr(audio_data)
        autocorr = compute_autocorrelation(audio_data)
        
        # Plot features
        plot_features(audio_data, sample_rate, zcr, autocorr, file_path)
        
        # Store results
        results.append({
            'file': os.path.basename(file_path),
            'class': os.path.basename(os.path.dirname(file_path)),
            'zcr': zcr,
            'autocorr_peak': np.max(autocorr[1:]),  # Exclude lag 0
            'sample_rate': sample_rate,
            'duration': len(audio_data) / sample_rate
        })
    
    # Create and display results DataFrame
    results_df = pd.DataFrame(results)
    print("\nAnalysis Results:")
    print(results_df)
    return results_df


In [35]:
def load_audio(file_path):
    """Load audio file and return sample rate and data."""
    sample_rate, data = wavfile.read(file_path)
    # Convert to mono if stereo
    if len(data.shape) > 1:
        data = np.mean(data, axis=1)
    # Normalize the data
    data = data.astype(float)
    data /= np.max(np.abs(data))
    return sample_rate, data

def create_spectrogram(audio_data, sample_rate, title):
    """Generate and plot spectrogram."""
    # Compute spectrogram
    frequencies, times, Sxx = spectrogram(audio_data, fs=sample_rate, 
                                        nperseg=256, noverlap=128)
    
    # Convert to dB scale
    Sxx_db = 10 * np.log10(Sxx + 1e-10)
    
    # Create plot
    plt.figure(figsize=(10, 6))
    plt.pcolormesh(times, frequencies, Sxx_db, shading='gouraud')
    plt.title(f'Spectrogram - {title}')
    plt.ylabel('Frequency [Hz]')
    plt.xlabel('Time [sec]')
    plt.colorbar(label='Intensity [dB]')
    plt.tight_layout()
    
    # Save the plot
    plt.savefig(f'./spectrogram/spectrogram_{title.replace(" ", "_")}.png')
    plt.close()

def get_class_directories(train_dir):
    """Get all class directories from training set."""
    return [d for d in os.listdir(train_dir) 
            if os.path.isdir(os.path.join(train_dir, d))]

def get_random_audio_file(class_dir):
    """Get a random audio file from the given class directory."""
    audio_files = [f for f in os.listdir(class_dir) 
                   if f.endswith('.wav')]
    if audio_files:
        return os.path.join(class_dir, random.choice(audio_files))
    return None

In [36]:
# Function to load an audio file
def load_audio(file_path):
    sample_rate, audio_data = wav.read(file_path)
    return sample_rate, audio_data

# Function to save an audio file
def save_audio(file_path, sample_rate, audio_data):
    wav.write(file_path, sample_rate, audio_data)

# Function to change the pitch of audio
def change_pitch(audio_data, sample_rate, pitch_factor):
    indices = np.round(np.arange(0, len(audio_data), pitch_factor))
    indices = indices[indices < len(audio_data)].astype(int)
    return audio_data[indices]

# Function to add background noise
def add_background_noise(audio_data, noise_factor=0.005):
    noise = np.random.normal(0, 1, len(audio_data))
    augmented_audio = audio_data + noise_factor * noise * np.max(audio_data)
    return augmented_audio.astype(audio_data.dtype)

# Function to time-stretch audio
def time_stretch(audio_data, stretch_factor):
    num_samples = int(len(audio_data) * stretch_factor)
    stretched_audio = resample(audio_data, num_samples)
    return stretched_audio.astype(audio_data.dtype)

# Function to apply augmentations sequentially and save them
def apply_augmentations(train_dir, output_dir):
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # List all class directories
    class_dirs = [d for d in os.listdir(train_dir) if os.path.isdir(os.path.join(train_dir, d))]
    
    # Randomly select 5 files from the dataset
    selected_files = []
    for class_dir in class_dirs:
        class_path = os.path.join(train_dir, class_dir)
        audio_files = [f for f in os.listdir(class_path) if f.endswith('.wav')]
        if audio_files:
            selected_files.append(os.path.join(class_path, random.choice(audio_files)))
    
    # Limit to 5 files if more than 5 are selected
    selected_files = random.sample(selected_files, min(5, len(selected_files)))

    print("Selected files for augmentation:")
    for file in selected_files:
        print(file)

    for file_path in selected_files:
        sample_rate, audio_data = load_audio(file_path)

        # Get the class name and file name
        class_name = os.path.basename(os.path.dirname(file_path))
        file_name = os.path.basename(file_path).replace('.wav', '')

        # Create class-specific output directory
        class_output_dir = os.path.join(output_dir, class_name)
        os.makedirs(class_output_dir, exist_ok=True)

        # Apply and save pitch change
        pitch_changed = change_pitch(audio_data, sample_rate, pitch_factor=1.2)
        pitch_file_path = os.path.join(class_output_dir, f"{file_name}_pitch.wav")
        save_audio(pitch_file_path, sample_rate, pitch_changed)
        print(f"Pitch-changed audio saved: {pitch_file_path}")

        # Apply and save background noise
        noisy_audio = add_background_noise(audio_data)
        noise_file_path = os.path.join(class_output_dir, f"{file_name}_noise.wav")
        save_audio(noise_file_path, sample_rate, noisy_audio)
        print(f"Noisy audio saved: {noise_file_path}")

        # Apply and save time-stretch
        stretched_audio = time_stretch(audio_data, stretch_factor=1.5)
        stretch_file_path = os.path.join(class_output_dir, f"{file_name}_stretch.wav")
        save_audio(stretch_file_path, sample_rate, stretched_audio)
        print(f"Time-stretched audio saved: {stretch_file_path}")

In [39]:
def main():
    # Set random seed for reproducibility
    np.random.seed(42)
    
    # # Define paths
    # dataset_path = './archive'  # Current directory
    # output_base_path = './seg'
    
    # # Create DataFrame of the dataset
    # print("Creating dataset DataFrame...")
    # df = create_dataset_df(dataset_path)
    
    # if len(df) == 0:
    #     print("No .wav files found in the current directory structure!")
    #     return
    
    # # Calculate and plot average lengths
    # print("Calculating average lengths per class...")
    # avg_lengths = plot_average_lengths(df)
    # print("\nAverage audio lengths per class:")
    # print(avg_lengths)
    
    # # Print initial class distribution
    # print("\nInitial class distribution:")
    # print(df['class'].value_counts())
    
    # # Split the dataset
    # print("\nSplitting dataset...")
    # train_df, val_df, test_df = split_dataset(df)
    
    # # Create directories for splits
    # create_split_directories(output_base_path)
    
    # # Copy files to their respective directories
    # print("\nCopying files to split directories...")
    # copy_files_to_split(train_df, 'train', output_base_path)
    # copy_files_to_split(val_df, 'validation', output_base_path)
    # copy_files_to_split(test_df, 'test', output_base_path)
    
    # # Print split statistics
    # print("\nDataset split statistics:")
    # print(f"Total samples: {len(df)}")
    # print(f"Training samples: {len(train_df)} ({len(train_df)/len(df)*100:.1f}%)")
    # print(f"Validation samples: {len(val_df)} ({len(val_df)/len(df)*100:.1f}%)")
    # print(f"Testing samples: {len(test_df)} ({len(test_df)/len(df)*100:.1f}%)")
    
    # # Print class distribution in each split
    # splits = {'Training': train_df, 'Validation': val_df, 'Testing': test_df}
    # for split_name, split_df in splits.items():
    #     print(f"\n{split_name} class distribution:")
    #     print(split_df['class'].value_counts())
        
    train_dir = "./seg/train"
    
    """PART C"""
    
    results_df = analyze_random_files(train_dir)
    results_df.to_csv('./zcrAC/audio_analysis_results.csv', index=False)
    
    """PART D"""
    
    class_dirs = get_class_directories(train_dir)
    if len(class_dirs) < 5:
        print(f"Not enough classes found. Only {len(class_dirs)} classes available.")
        selected_classes = class_dirs
    else:
        selected_classes = random.sample(class_dirs, 5)
    print("\nGenerating spectrograms for the following classes:")
    for class_name in selected_classes:
        class_dir = os.path.join(train_dir, class_name)
        audio_file = get_random_audio_file(class_dir)
        if audio_file:
            print(f"Processing class: {class_name}")
            sample_rate, audio_data = load_audio(audio_file)
            create_spectrogram(audio_data, sample_rate, 
                             f"Class {class_name}")
        else:
            print(f"No audio files found in class: {class_name}")



    """Part E"""       

    output_dir = "./augmented"
    apply_augmentations(train_dir, output_dir)

if __name__ == "__main__":
    main()


Analyzing 8494fba8_nohash_0.wav...

Analyzing 8910e5ef_nohash_1.wav...

Analyzing b4ea0d9a_nohash_6.wav...

Analyzing be7a5b2d_nohash_4.wav...

Analyzing b0f5b16d_nohash_3.wav...

Analysis Results:
                    file    class       zcr  autocorr_peak  sample_rate  \
0  8494fba8_nohash_0.wav      yes  0.209880       0.674165        16000   
1  8910e5ef_nohash_1.wav    eight  0.075422       0.941178        16000   
2  b4ea0d9a_nohash_6.wav      one  0.141950       0.948823        16000   
3  be7a5b2d_nohash_4.wav     nine  0.053031       0.980683        16000   
4  b0f5b16d_nohash_3.wav  forward  0.231410       0.959451        16000   

   duration  
0     0.896  
1     1.000  
2     1.000  
3     1.000  
4     1.000  

Generating spectrograms for the following classes:
Processing class: nine
Processing class: visual
Processing class: seven
Processing class: one
Processing class: two
Selected files for augmentation:
./seg/train\learn\51f7a034_nohash_4.wav
./seg/train\right\ee6163d