In [None]:
# 1. EDA

In [1]:
%%writefile requirements.txt

--extra-index-url https://download.pytorch.org/whl/cu118
torch==2.5.1+cu118  # Explicit CUDA suffix
torchaudio==2.5.1+cu118
librosa>=0.9.0
numpy>=1.20.0
soundfile>=0.10.3
boto3>=1.24.0

Overwriting requirements.txt


In [2]:
!pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://download.pytorch.org/whl/cu118


In [None]:

# CODE (That maybe has a memory leak?)

import librosa
import matplotlib.pyplot as plt
import numpy as np
import torch
import soundfile as sf
import boto3
from io import BytesIO
import gc  # For garbage collection

# Initialize S3 client
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')

input_bucket = 'cs401finalpipelineinput'
output_bucket = 'cs401finalpipelineprocessingdata'
output_prefix = 'data'

TEST_COUNT_LIMIT = 7

# Load VAD model
torch.set_num_threads(1)
model, (get_speech_timestamps, _, read_audio, _, _) = torch.hub.load(
    repo_or_dir="snakers4/silero-vad", model="silero_vad"
)



import psutil
import os
def memory_usage_mb():
    process = psutil.Process(os.getpid())
    mem_bytes = process.memory_info().rss  # Resident Set Size
    return f"{mem_bytes / (1024 * 1024)} mb"





def list_s3_files(bucket, prefix):
    """List files in an S3 bucket with given prefix"""
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
    
    file_list = []
    for page in pages:
        if "Contents" in page:
            for obj in page["Contents"]:
                file_list.append(obj["Key"])
    
    return file_list

def compute_melspec(audio, sr, n_mels, fmin, fmax):
    """Compute a mel-spectrogram."""
    melspec = librosa.feature.melspectrogram(
        y=audio,
        sr=sr,
        n_mels=n_mels,
        fmin=fmin,
        fmax=fmax,
    )

    # Convert to log scale (dB)
    melspec = librosa.power_to_db(melspec)
    return melspec

def mono_to_color(X, eps=1e-6, mean=None, std=None):
    """Convert mono audio to color image format"""
    mean = mean or X.mean()
    std = std or X.std()
    X = (X - mean) / (std + eps)

    # Normalize to [0, 255]
    _min, _max = X.min(), X.max()
    if (_max - _min) > eps:
        V = np.clip(X, _min, _max)
        V = 255 * (V - _min) / (_max - _min)
    else:
        V = np.zeros_like(X)

    # Convert to uint8
    V = V.astype(np.uint8)

    # Create RGB channels (stack the same array 3 times)
    return np.stack([V, V, V], axis=2)

def crop_or_pad(audio, length):
    """Crop or pad an audio sample to a fixed length."""
    if len(audio) < length:
        # Pad with zeros
        audio = np.pad(audio, (0, length - len(audio)))
    else:
        # Crop to length
        audio = audio[:length]
    return audio

def process_single_file(s3_key, max_size_mb=1.2):
    print(f"Memory at very start: {memory_usage_mb()}")
    
    try:
        # Extract class from directory structure
        path_parts = s3_key.split('/')
        parent_dir = path_parts[-2]  # Class name
        base_filename = path_parts[-1]  # Filename
        base_name = base_filename.split('.')[0] if '.' in base_filename else base_filename

        print(f"Memory Before Downloading Audio File: {memory_usage_mb()}")
        
        # Download audio file to memory using context manager
        with BytesIO() as audio_buffer:
            s3_client.download_fileobj(input_bucket, s3_key, audio_buffer)
            audio_buffer.seek(0)

            print(f"Memory After Downloading Audio File: {memory_usage_mb()}")
            
            # Check file size
            file_size_mb = len(audio_buffer.getvalue()) / (1024 * 1024)
            if file_size_mb > max_size_mb:
                print(f"Skipping {base_filename}: {file_size_mb:.1f}MB exceeds {max_size_mb}MB limit")
                return
            
            # Use BytesIO for VAD model
            print(f"Memory Before audio_buffer and wav: {memory_usage_mb()}")
            
            audio_buffer.seek(0)
            wav = read_audio(audio_buffer)

            print(f"Memory After audio_buffer and wav: {memory_usage_mb()}")
            
            # Get speech timestamps
            print(f"Memory Before speech_timestamps: {memory_usage_mb()}")
            
            speech_timestamps = get_speech_timestamps(
                wav, model, return_seconds=True, threshold=0.4
            )

            print(f"Memory After speech_timestamps: {memory_usage_mb()}")
            
            # Skip if no speech found
            if not speech_timestamps:
                print(f"No speech found in {base_filename}")
                return

            print(f"Memory Before librosa.load: {memory_usage_mb()}")
            
            # Load audio at original sample rate for processing
            audio_buffer.seek(0)
            y, sr = librosa.load(audio_buffer, sr=None)

            print(f"Memory After librosa.load: {memory_usage_mb()}")
            
            # Free memory
            del wav
            gc.collect()

        # Note: audio_buffer automatically closed after the with block
        print(f"Memory After closing audio_buffer: {memory_usage_mb()}")

        print(f"Memory Before Mask Stuff: {memory_usage_mb()}")

        # Create clean audio by removing voice segments
        keep_mask = np.ones(len(y), dtype=bool)
        for segment in speech_timestamps:
            buffer = 0.5
            start_with_buffer = max(0, segment["start"] - buffer)
            end_with_buffer = min(len(y) / sr, segment["end"] + buffer)
            start_sample = int(start_with_buffer * sr)
            end_sample = int(end_with_buffer * sr)
            keep_mask[start_sample:end_sample] = False
            
        # Apply mask to get clean audio
        clean_audio = y[keep_mask]
        
        # Calculate percentage of audio retained
        percent_retained = (np.sum(keep_mask) / len(keep_mask)) * 100

        print(f"Memory After Mask Stuff: {memory_usage_mb()}")
        
        # Free memory
        del y, keep_mask
        gc.collect()

        print(f"Memory After deleting mask data: {memory_usage_mb()}")

        print(f"Memory Before saving audio to s3: {memory_usage_mb()}")
        
        # Save clean audio to S3 using context manager
        with BytesIO() as clean_audio_buffer:
            sf.write(clean_audio_buffer, clean_audio, sr, format='ogg')
            clean_audio_buffer.seek(0)
            clean_s3_key = f"{output_prefix}/clean_audio/{parent_dir}/{base_filename}"
            s3_client.upload_fileobj(clean_audio_buffer, output_bucket, clean_s3_key)

        print(f"Memory After saving audio to s3: {memory_usage_mb()}")
        
        # Configuration for spectrogram generation
        config = {
            'sampling_rate': 32000,
            'duration': 5,
            'fmin': 0,
            'fmax': None,
            'n_mels': 128,
            'res_type': "kaiser_fast"
        }

        print(f"Memory Before resample if necessary: {memory_usage_mb()}")
        
        # Resample if necessary
        if sr != config['sampling_rate']:
            clean_audio = librosa.resample(
                clean_audio, sr, config['sampling_rate'], res_type=config['res_type']
            )
            sr = config['sampling_rate']

        print(f"Memory After resample if necessary: {memory_usage_mb()}")
        
        # Calculate step size (for overlapping windows)
        audio_length = config['duration'] * sr
        step = int(config['duration'] * 0.666 * sr)

        print(f"Memory Before Split into Chunks: {memory_usage_mb()}")
        
        # Split audio into chunks
        audio_chunks = [
            clean_audio[i:i + audio_length]
            for i in range(0, max(1, len(clean_audio) - audio_length + 1), step)
        ]
        
        print(f"Memory After Split into Chunks: {memory_usage_mb()}")
        
        # Ensure last chunk has correct length
        if audio_chunks and len(audio_chunks[-1]) < audio_length:
            audio_chunks[-1] = crop_or_pad(audio_chunks[-1], audio_length)
            
        # Free memory
        del clean_audio
        gc.collect()

        print(f"Memory After delete clean_audio: {memory_usage_mb()}")

        print(f"Memory Before audio_chunks for loop: {memory_usage_mb()}")
        
        # Process each chunk
        for i, chunk in enumerate(audio_chunks):
            # Create spectrogram
            melspec = compute_melspec(
                chunk, sr, config['n_mels'], config['fmin'], 
                config['fmax'] or sr//2
            )
            image = mono_to_color(melspec)
            
            # Save as npy file and upload to S3 using context manager
            with BytesIO() as npy_buffer:
                np.save(npy_buffer, image)
                npy_buffer.seek(0)
                
                # Upload to S3
                spec_s3_key = f"{output_prefix}/audio_specs/{parent_dir}/{base_name}_chunk_{i}.npy"
                s3_client.upload_fileobj(npy_buffer, output_bucket, spec_s3_key)
            
            # Free memory
            print(f"Memory Before delete stuff in for loop: {memory_usage_mb()}")
            del melspec, image, chunk
            gc.collect()
            print(f"Memory After delete stuff in for loop: {memory_usage_mb()}")

        print(f"Memory After audio_chunks for loop: {memory_usage_mb()}")
        
        # Free memory for audio chunks
        print(f"Memory Before delete audio_chunks: {memory_usage_mb()}")
        del audio_chunks
        gc.collect()
        print(f"Memory After delete audio_chunks: {memory_usage_mb()}")
        
    except Exception as e:
        print(f"Error processing {s3_key}: {e}")
    finally:
        # Clean up memory explicitly
        print(f"Memory Before final garbage collection: {memory_usage_mb()}")
        gc.collect()
        print(f"Memory After final garbage collection: {memory_usage_mb()}")
        print()
        print()


    
def main():
    # List files from S3
    s3_files = list_s3_files(input_bucket, 'train_audio/')
    files = sorted([f for f in s3_files if f.endswith('.ogg')])
    
    # Process files one by one
    count = 0
    for s3_key in files:
        count += 1
        print(f"Processing file {count}/{len(files)}")
        if count > TEST_COUNT_LIMIT:
            break
            
        process_single_file(s3_key)
        
        # Explicitly clean up memory after each file
        gc.collect()

if __name__ == "__main__":
    main()


In [None]:
# CREATES NEW PROCESS EACH TIME AND WORKS (I THINK, NEED TO TEST IT AGAIN)


import librosa
import matplotlib.pyplot as plt
import numpy as np
import torch
import soundfile as sf
import boto3
from io import BytesIO
import gc
import multiprocessing
import psutil
import os
import sys
import time
from functools import partial

def memory_usage_mb():
    process = psutil.Process(os.getpid())
    mem_bytes = process.memory_info().rss
    return f"{mem_bytes / (1024 * 1024)} mb"

def process_single_file_wrapper(s3_key):
    """
    Wrapper function that spawns a completely new process for each file.
    This ensures complete memory cleanup after each file.
    """
    # Create and start a new process for this specific file
    p = multiprocessing.Process(target=process_single_file, args=(s3_key,))
    p.start()
    p.join()
    
    # Check if process exited normally
    if p.exitcode != 0:
        print(f"Warning: Process for {s3_key} exited with code {p.exitcode}")
    
    

def process_single_file(s3_key, max_size_mb=1.2):
    """Function to process a single audio file"""
    
    # This function now should run in its own isolated process
    print(f"Memory at start: {memory_usage_mb()}")
    
    # Initialize resources needed for this process
    s3_client = boto3.client('s3')
    input_bucket = 'cs401finalpipelineinput'
    output_bucket = 'cs401finalpipelineprocessingdata'
    output_prefix = 'data'
    
    try:
        # Initialize VAD model - keep this inside the try block to ensure cleanup
        torch.set_num_threads(1)
        model, (get_speech_timestamps, _, read_audio, _, _) = torch.hub.load(
            repo_or_dir="snakers4/silero-vad", model="silero_vad"
        )
        
        # Extract class from directory structure
        path_parts = s3_key.split('/')
        parent_dir = path_parts[-2]  # Class name
        base_filename = path_parts[-1]  # Filename
        base_name = base_filename.split('.')[0] if '.' in base_filename else base_filename

        print(f"Process {os.getpid()} - Processing {base_filename}")
        
        # Download audio file to memory
        with BytesIO() as audio_buffer:
            s3_client.download_fileobj(input_bucket, s3_key, audio_buffer)
            audio_buffer.seek(0)
            
            # Check file size
            file_size_mb = len(audio_buffer.getvalue()) / (1024 * 1024)
            if file_size_mb > max_size_mb:
                print(f"Skipping {base_filename}: {file_size_mb:.1f}MB exceeds {max_size_mb}MB limit")
                return
            
            # Use BytesIO for VAD model
            audio_buffer.seek(0)
            wav = read_audio(audio_buffer)
            
            # Get speech timestamps
            speech_timestamps = get_speech_timestamps(
                wav, model, return_seconds=True, threshold=0.4
            )
            
            # Skip if no speech found
            if not speech_timestamps:
                print(f"No speech found in {base_filename}")
                return
            
            # Load audio at original sample rate for processing
            audio_buffer.seek(0)
            y, sr = librosa.load(audio_buffer, sr=None)
            
            # Free memory
            del wav
            gc.collect()

        # Create clean audio by removing voice segments
        keep_mask = np.ones(len(y), dtype=bool)
        for segment in speech_timestamps:
            buffer = 0.5
            start_with_buffer = max(0, segment["start"] - buffer)
            end_with_buffer = min(len(y) / sr, segment["end"] + buffer)
            start_sample = int(start_with_buffer * sr)
            end_sample = int(end_with_buffer * sr)
            keep_mask[start_sample:end_sample] = False
            
        # Apply mask to get clean audio
        clean_audio = y[keep_mask]
        
        # Calculate percentage of audio retained
        percent_retained = (np.sum(keep_mask) / len(keep_mask)) * 100
        
        # Free memory
        del y, keep_mask, speech_timestamps
        gc.collect()

        # Save clean audio to S3
        with BytesIO() as clean_audio_buffer:
            sf.write(clean_audio_buffer, clean_audio, sr, format='ogg')
            clean_audio_buffer.seek(0)
            clean_s3_key = f"{output_prefix}/clean_audio/{parent_dir}/{base_filename}"
            s3_client.upload_fileobj(clean_audio_buffer, output_bucket, clean_s3_key)

        # Configuration for spectrogram generation
        config = {
            'sampling_rate': 32000,
            'duration': 5,
            'fmin': 0,
            'fmax': None,
            'n_mels': 128,
            'res_type': "kaiser_fast"
        }
        
        # Resample if necessary
        if sr != config['sampling_rate']:
            clean_audio = librosa.resample(
                clean_audio, sr, config['sampling_rate'], res_type=config['res_type']
            )
            sr = config['sampling_rate']
        
        # Calculate step size (for overlapping windows)
        audio_length = config['duration'] * sr
        step = int(config['duration'] * 0.666 * sr)
        
        # Split audio into chunks
        audio_chunks = [
            clean_audio[i:i + audio_length]
            for i in range(0, max(1, len(clean_audio) - audio_length + 1), step)
        ]
        
        # Ensure last chunk has correct length
        if audio_chunks and len(audio_chunks[-1]) < audio_length:
            audio_chunks[-1] = crop_or_pad(audio_chunks[-1], audio_length)
            
        # Free memory
        del clean_audio
        gc.collect()

        # Process each chunk
        for i, chunk in enumerate(audio_chunks):
            # Create spectrogram
            melspec = compute_melspec(
                chunk, sr, config['n_mels'], config['fmin'], 
                config['fmax'] or sr//2
            )
            image = mono_to_color(melspec)
            
            # Save as npy file and upload to S3
            with BytesIO() as npy_buffer:
                np.save(npy_buffer, image)
                npy_buffer.seek(0)
                
                # Upload to S3
                spec_s3_key = f"{output_prefix}/audio_specs/{parent_dir}/{base_name}_chunk_{i}.npy"
                s3_client.upload_fileobj(npy_buffer, output_bucket, spec_s3_key)
            
            # Free memory
            del melspec, image
            gc.collect()

        # Free memory for audio chunks
        del audio_chunks
        gc.collect()
        
        print(f"Completed {base_filename}, memory: {memory_usage_mb()}")
        
    except Exception as e:
        print(f"Error processing {s3_key}: {e}")
        # In case of error, ensure the process exits with non-zero code
        sys.exit(1)
    finally:
        # Clean up memory explicitly
        gc.collect()
        
        # Clean up PyTorch resources
        if 'model' in locals():
            del model
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

            
def list_s3_files(bucket, prefix):
    """List files in an S3 bucket with given prefix"""
    s3_client = boto3.client('s3')
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
    
    file_list = []
    for page in pages:
        if "Contents" in page:
            for obj in page["Contents"]:
                file_list.append(obj["Key"])
    
    return file_list

def compute_melspec(audio, sr, n_mels, fmin, fmax):
    """Compute a mel-spectrogram."""
    melspec = librosa.feature.melspectrogram(
        y=audio,
        sr=sr,
        n_mels=n_mels,
        fmin=fmin,
        fmax=fmax,
    )

    # Convert to log scale (dB)
    melspec = librosa.power_to_db(melspec)
    return melspec

def mono_to_color(X, eps=1e-6, mean=None, std=None):
    """Convert mono audio to color image format"""
    mean = mean or X.mean()
    std = std or X.std()
    X = (X - mean) / (std + eps)

    # Normalize to [0, 255]
    _min, _max = X.min(), X.max()
    if (_max - _min) > eps:
        V = np.clip(X, _min, _max)
        V = 255 * (V - _min) / (_max - _min)
    else:
        V = np.zeros_like(X)

    # Convert to uint8
    V = V.astype(np.uint8)

    # Create RGB channels (stack the same array 3 times)
    return np.stack([V, V, V], axis=2)

def crop_or_pad(audio, length):
    """Crop or pad an audio sample to a fixed length."""
    if len(audio) < length:
        # Pad with zeros
        audio = np.pad(audio, (0, length - len(audio)))
    else:
        # Crop to length
        audio = audio[:length]
    return audio

def main():
    # Constants
    input_bucket = 'cs401finalpipelineinput'
    
    TEST_COUNT_LIMIT = 500
    
    # List files from S3
    s3_files = list_s3_files(input_bucket, 'train_audio/')
    files = sorted([f for f in s3_files if f.endswith('.ogg')])
    
    if len(files) > TEST_COUNT_LIMIT:
        files = files[:TEST_COUNT_LIMIT]
    
    print(f"Processing {len(files)} files with separate processes")
    
    # Process each file in its own process
    for i, s3_key in enumerate(files):
        print(f"Starting file {i+1}/{len(files)}: {s3_key.split('/')[-1]}")
        process_single_file_wrapper(s3_key)
    
    print("All processing complete")


if __name__ == "__main__":
    print("here")
    multiprocessing.freeze_support()
    main()

In [None]:
# 2 TRAINING MODEL

In [None]:
# Some usless experimentation pipeline stuff

In [67]:
# 1. IMPORTS


import sagemaker
import boto3
import os

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep

from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterBoolean

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.pytorch.processing import PyTorchProcessor



In [68]:
# 2. CONSTANTS

pipeline_name = "cs401finalpipeline"

write_bucket = "cs401finalpipelineprocessingdata"

instance_type = "ml.t3.medium"

In [69]:
# 3. SAGEMAKER SESSION SETUP

sess = sagemaker.Session()
sagemaker_role = sagemaker.get_execution_role()

region = sess.boto_region_name
s3_client = boto3.client("s3", region_name=region)
sm_client = boto3.client("sagemaker", region_name=region)
sm_runtime_client = boto3.client("sagemaker-runtime")


In [1]:
%%writefile local_preprocessing.py

# import argparse
# import logging


# if __name__ == "__main__":
#     parser = argparse.ArgumentParser()
#     parser.add_argument("--message", type=str, default="N/A")
#     args, _ = parser.parse_known_args()
#     logger.info("Received arguments {}".format(args))
#     logger.info(f"Message: {args.message}")



Writing local_preprocessing.py


In [None]:
%%writefile local_requirements.txt

librosa>=0.9.2
soundfile>=0.10.0
pandas>=1.3.0

In [71]:
s3_client.upload_file(
    Filename="local_preprocessing.py", Bucket=write_bucket, Key="scripts/preprocessing/preprocessing.py"
)

s3_client.upload_file(
    Filename="local_requirements.txt", Bucket=write_bucket, Key="scripts/preprocessing/requirements.txt"
)

In [72]:
# 4. DEFINE DATA PROCESSING ACTUAL STEP
pytorch_processor = PyTorchProcessor(
    framework_version="1.12.0",
    role=sagemaker_role,
    instance_count=1,
    instance_type=instance_type,
    base_job_name=f"{pipeline_name}-preprocessing",
    py_version="py38",
    source_dir=["librosa>=0.9.2", "soundfile>=0.10.0", "pandas>=1.3.0"]
)

# Define pipeline processing step
preprocess_step = ProcessingStep(
    name="PreProcessing",
    processor=pytorch_processor,
    inputs=[],
    outputs=[],
    job_arguments=[
        "--message", "Hello World"
    ],
    code=f"s3://{write_bucket}/scripts/preprocessing/preprocessing.py"
)

TypeError: PyTorchProcessor.__init__() got an unexpected keyword argument 'requirements'

In [64]:
# 4. CREATE PIPELINE
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[],
    steps=[
        preprocess_step,
    ],
    sagemaker_session=sess
)

# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)

[2;36m                    [0m         from the pipeline definition by    [2m                [0m
[2;36m                    [0m         default since it will be           [2m                [0m
[2;36m                    [0m         overridden at pipeline execution   [2m                [0m
[2;36m                    [0m         time. Please utilize the           [2m                [0m
[2;36m                    [0m         PipelineDefinitionConfig to        [2m                [0m
[2;36m                    [0m         persist this field in the pipeline [2m                [0m
[2;36m                    [0m         definition if desired.             [2m                [0m
[2;36m                    [0m         from the pipeline definition by    [2m                [0m
[2;36m                    [0m         default since it will be           [2m                [0m
[2;36m                    [0m         overridden at pipeline execution   [2m                [0m


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:084375543672:pipeline/cs401finalpipeline',
 'ResponseMetadata': {'RequestId': '9623aa6a-ea51-4475-bc04-839953bf5910',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '9623aa6a-ea51-4475-bc04-839953bf5910',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Thu, 17 Apr 2025 06:56:54 GMT'},
  'RetryAttempts': 0}}

In [65]:
# 5. RUN PIPELINE

pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:084375543672:pipeline/cs401finalpipeline/execution/pzsbk1atso3g', sagemaker_session=<sagemaker.session.Session object at 0x7fd4a16cb740>)