# Welcome to HW4!

# Setup

First, lets load in our model, and initialize our global variables of SAMPLE_RATE (i.e. the samples per second of the audio, in this case 44100), SAMPLE_SIZE (the *number* of audio samples we generate with the model, approximately 47.55*44100), and SEED (controls randomness, DO NOT CHANGE)

In [None]:
import torch
import torchaudio              # For audio-specific operations
from einops import rearrange   # Easy tensor reshaping
from stable_audio_tools import get_pretrained_model  # Load pretrained Stable Audio model
import IPython.display as ipd  # For playing audio in notebooks
from tqdm.auto import trange, tqdm  # Progress bars for loops
from stable_audio_tools.inference.generation import (
    generate_diffusion_cond_and_sampler_setup,
    generate_diffusion_cond_decode
)
import gc  # Garbage collection (helps manage memory in big models)

# This is to choose device: GPU if available, otherwise CPU
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load pretrained diffusion model + config info (e.g., sample rate)
model, model_config = get_pretrained_model("stabilityai/stable-audio-open-1.0")
SAMPLE_RATE = model_config["sample_rate"]             # Samples per second (e.g. 44100 Hz)
SAMPLE_SIZE = model_config["sample_size"] // 8       # Total samples per clip (downscaled for latent)
SEED = 456                                            # For reproducibility, this controls randomness

# Moves model to your device
model = model.to(device)


In [None]:
# If you are using collab, uncomment the following lines

# from google.colab import drive
# drive.mount('[/content/drive]')
# cd /content/drive/MyDrive/[path to your folder]
# pip install -e .
# pip install numpy==1.26.4
# pip install protobuf==3.20.1

# Q1 Simple Sampler

Here you should implement the to_d and simple_sample functions:

In [None]:
# === Q1: To Convert model output (x-prediction) to derivative (gradient direction) ===

def to_d(x, sigma, denoised):
    # SAO predicts the denoised x₀ directly (x-prediction), not the noise.
    # To get the derivative (dx/dt), we use: derivative = (x - x₀) / sigma
    return (x - denoised) / sigma

In [None]:
@torch.no_grad()  # We don't need gradients during inference (an optimization technique)
def simple_sample(model, x, sigmas, extra_args=None):
    
    extra_args = {} if extra_args is None else extra_args
    
    # s_in is a scale tensor needed by SAO (must be a tensor, not float)
    s_in = x.new_ones([x.shape[0]])
    
    for i in trange(len(sigmas) - 1):
        # === Step 1: Denoise current x ===
        # Model expects (x, sigmas[i], **kwargs) format
        denoised = model(x, sigmas[i] * s_in, **extra_args)
        
        # === Step 2: Estimate the derivative ===
        d = to_d(x, sigmas[i], denoised)

        # === Step 3: Compute step size (Euler integration) ===
        dt = sigmas[i + 1] - sigmas[i]
        
        # === Step 4: Take a step in the reverse diffusion process ===
        x = x + d * dt

    # Clean up memory after sampling
    del extra_args
    torch.cuda.empty_cache()
    return x

Given your code, you can now run it using this below block. Feel free to play around with the prompt in the conditioning list, the number of steps, and cfg_scale to explore unique outputs. This can help you test your code, as if it sounds bad, you're probably doing something wrong!

In [None]:
def generate(prompt="128 BPM electronic drum loop", steps=50, cfg_scale=7, return_latents=False):

    # Set up text and timing conditioning
    conditioning = [{
        "prompt": prompt,
        "seconds_start": 0, 
        "seconds_total": 5
    }]

    # Generate diffusion setup params
    denoiser, x_T, sigmas, extra_args = generate_diffusion_cond_and_sampler_setup(
        model,
        steps=steps, # number of steps, more = better quality
        cfg_scale=cfg_scale, # Classifier-Free Guidance Scale, higher = better text relevance / quality but less diversity
        conditioning=conditioning,
        sample_size=SAMPLE_SIZE, # number of audio samples to generate, DON'T CHANGE
        device=device, # cuda device
        seed=SEED # random seed, DON'T CHANGE
    )

    # Sample
    samples = simple_sample(denoiser, x_T, sigmas, extra_args=extra_args)
    del x_T
    del sigmas
    del extra_args
    torch.cuda.empty_cache()
    gc.collect()

    if return_latents:
        return samples

    # Decode
    audio = generate_diffusion_cond_decode(
        model,
        samples
    ).cpu()
    return audio



In [None]:
# === Optional: Run and listen to generation results (GPU recommended) ===
# NOTE: Running on CPU is very slow and not required for this assignment.

for ix, prompt in enumerate([
    "lo-fi jazz piano in a rainy cafe",
    "deep ambient wash with ocean sounds"
]):
    # === Generate audio from prompt using simple sampler ===
    audio = generate(
        prompt=prompt,
        steps=50,
        cfg_scale=7,
        return_latents=False
    )

    # === Play the generated audio ===
    ipd.display(ipd.Audio(audio.cpu().numpy()[0], rate=SAMPLE_RATE))


### Q1 Summary – What are we doing?

In this question, we are implementing the **core sampler** for generating audio using diffusion.

- `to_d`: Converts the model's prediction (denoised audio) into a "score", the gradient direction that helps us denoise.
- `simple_sample`: The main loop that starts with noise and walks it backward using the model, step-by-step.
- The model uses **x-prediction**, so we have to convert it to the time-derivative it using \[(x - denoised) / sigma\].
- We use a basic numerical solver (Euler method) to go from pure noise to a clean sample.


# Q2 - Inpainting Mask

In [None]:
# === LOAD AND ENCODE REFERENCE AUDIO ===
def load_and_encode_audio(path, model):
    audio, sr = torchaudio.load(path)
    
    # === Resample audio to model's expected SAMPLE_RATE ===
    resampler = torchaudio.transforms.Resample(sr, SAMPLE_RATE)
    audio = resampler(audio)

    # === Normalize audio to [-1, 1] by peak value ===
    audio = audio / audio.abs().max()

    # === Ensure audio length matches SAMPLE_SIZE ===
    # If too short, tile the audio; if too long, crop
    if audio.shape[1] < SAMPLE_SIZE:
        while audio.shape[1] < SAMPLE_SIZE:
            audio = torch.cat((audio, audio), dim=1)

    audio = audio[:, :SAMPLE_SIZE][None].to(device)

    # === Encode into latent space (for inpainting) ===
    reference = model.pretransform.encode(audio)
    return reference


def load_encoded_audio(path):
    encoded_latent = torch.load(path)
    # Move to device and convert to float16
    return encoded_latent.to(device)
    


In [None]:
def generate_inpainting_mask(reference, mask_start_s, mask_end_s):
    # Convert mask start/end times in seconds to latent indices
    mask_start = int(mask_start_s * SAMPLE_RATE // model.pretransform.downsampling_ratio)
    mask_end = int(mask_end_s * SAMPLE_RATE // model.pretransform.downsampling_ratio)

    # Create a zero mask of the same shape as the reference latent
    mask = torch.zeros_like(reference)

    # Set mask = 1 in the region to be inpainted
    mask[..., mask_start:mask_end] = 1
    return mask


### Q2 Summary – What’s going on here?

We are preparing reference audio for **inpainting**, where we "fill in" a missing segment using a diffusion model.

- First, we **load and normalize the reference audio**.
- Then, we **encode it into latent space**, since all our operations are done on latents (to save compute).
- Next, we create an **inpainting mask**:
  - The mask has `1`s where we want the model to generate new content.
  - And `0`s where we want to keep the original audio untouched.
  - The time ranges are converted from seconds → latent indices using the sample rate and downsampling ratio.

We'll use this mask in the next question to guide the generation process.


# Q3  - Inpainting

In [None]:
@torch.no_grad()
def simple_sample_inpaint(model, x, sigmas, reference, mask, extra_args=None):
    """Implements Algorithm 2 (Euler steps) from Karras et al. (2022)."""
    extra_args = {} if extra_args is None else extra_args
    s_in = x.new_ones([x.shape[0]])  # Needed for classifier-free guidance scale

    for i in trange(len(sigmas) - 1):
        # === 1. Denoise current sample
        denoised = model(x, sigmas[i] * s_in, **extra_args)

        # === 2. Convert model output to derivative
        d = to_d(x, sigmas[i], denoised)

        # === 3. Euler step update
        dt = sigmas[i + 1] - sigmas[i]
        x = x + d * dt

        # === 4. Inpainting step
        # Add noise to reference to match current noise level
        ref = reference + torch.randn_like(reference) * sigmas[i + 1]

        # Replace unmasked regions in x with noisy reference
        x = x * mask + (1 - mask) * ref

    del extra_args
    torch.cuda.empty_cache()
    return x


In [None]:
def inpaint(prompt="128 BPM house drum loop", steps=50, cfg_scale=7,
            reference=None, mask_start_s=2.0, mask_end_s=3.0,
            return_latents=False):

    # === 1. Text + timing conditioning ===
    conditioning = [{
        "prompt": prompt,
        "seconds_start": 0,
        "seconds_total": 5
    }]

    # === 2. Generate binary mask for inpainting
    mask = generate_inpainting_mask(reference, mask_start_s, mask_end_s)

    # === 3. Setup denoiser and diffusion parameters
    denoiser, x_T, sigmas, extra_args = generate_diffusion_cond_and_sampler_setup(
        model,
        steps=steps,
        cfg_scale=cfg_scale,
        conditioning=conditioning,
        sample_size=SAMPLE_SIZE,
        device=device,
        seed=SEED
    )

    # === 4. Run inpainting-aware sampler
    inp_samples = simple_sample_inpaint(denoiser, x_T, sigmas, reference, mask, extra_args=extra_args)

    # Clean up unused memory
    del x_T, sigmas, extra_args
    torch.cuda.empty_cache()
    gc.collect()

    # Optional return: raw latents
    if return_latents:
        return inp_samples

    # === 5. Decode latents to audio
    inpainted_audio = generate_diffusion_cond_decode(model, inp_samples).cpu()
    return inpainted_audio



In [None]:
# === Optional: Run and listen to inpainting results (GPU recommended) ===
# This is slow on CPU and not required for submission.

for ix, prompt in enumerate([
    "lo-fi jazz piano in a rainy cafe",
    "deep ambient wash with ocean sounds"
]):
    # === Load reference latent from file ===
    reference = load_encoded_audio(f"testing_files/q1_{ix}.pt")

    # === Define inpainting mask (in seconds) ===
    mask = generate_inpainting_mask(reference, mask_start_s=0, mask_end_s=3)

    # === Perform inpainting using mask and prompt ===
    audio = inpaint(
        prompt=prompt,
        steps=50,
        cfg_scale=7,
        reference=reference,
        mask_start_s=0,
        mask_end_s=3,
        return_latents=False
    )

    # === Listen to the output ===
    ipd.display(ipd.Audio(audio.cpu().numpy()[0], rate=SAMPLE_RATE))


### Q3 Summary – What’s happening?

This is your first true “guided generation” task where you’ll fill in only a **masked segment** of audio while preserving the rest.

Key logic:
- Add noise to the reference latent to match the current noise level.
- Use the binary mask to **only update the masked region**.
- Outside the mask, we just reuse the noisy reference every step.

This allows creative audio editing with full control over what gets changed and what stays!


# Q4 Painting with Starting and Stopping Times

In [None]:
@torch.no_grad()
def simple_sample_variable_inpaint(model, x, sigmas, reference, mask, extra_args=None, paint_start=None, paint_end=None):
    """
    Implements Algorithm 2 (Euler steps) from Karras et al. (2022).
    Diffusion sampler with selective inpainting steps.
    Inpaints only during [paint_start, paint_end] timesteps.
    """
    # Default to full range if not specified
    if paint_start is None:
        paint_start = 0
    if paint_end is None:
        paint_end = len(sigmas) - 1

    extra_args = {} if extra_args is None else extra_args
    s_in = x.new_ones([x.shape[0]])

    for i in trange(len(sigmas) - 1):
        # === 1. Denoise
        denoised = model(x, sigmas[i] * s_in, **extra_args)

        # === 2. Convert to derivative
        d = to_d(x, sigmas[i], denoised)

        # === 3. Euler step
        dt = sigmas[i + 1] - sigmas[i]
        x = x + d * dt

        # === 4. Apply inpainting ONLY during chosen step range
        if paint_start <= i <= paint_end:
            # Add matching noise to reference
            ref = reference + torch.randn_like(reference) * sigmas[i + 1]
            x = x * mask + (1 - mask) * ref

    del extra_args
    torch.cuda.empty_cache()
    return x


In [None]:
def variable_inpaint(prompt="128 BPM house drum loop", steps=50, cfg_scale=7, reference=None, mask_start_s=20, mask_end_s=30, paint_start=None, paint_end=None, return_latents=False):
    # === 1. Prompt + timing conditioning ===
    conditioning = [{
        "prompt": prompt,
        "seconds_start": 0, 
        "seconds_total": 5
    }]
    # === 2. Create inpainting mask
    mask = generate_inpainting_mask(reference, mask_start_s, mask_end_s)

    # === 3. Prepare diffusion setup params
    denoiser, x_T, sigmas, extra_args = generate_diffusion_cond_and_sampler_setup(
        model,
        steps=steps,
        cfg_scale=cfg_scale,
        conditioning=conditioning,
        sample_size=SAMPLE_SIZE,
        device=device,
        seed=SEED
    )

    # === 4. Sample with inpainting only between [paint_start, paint_end] steps
    inp_samples = simple_sample_variable_inpaint(denoiser, x_T, sigmas, reference, mask, extra_args=extra_args, paint_start=paint_start, paint_end=paint_end)
    del x_T
    del sigmas
    del extra_args
    torch.cuda.empty_cache()
    gc.collect()

    if return_latents:
        return inp_samples

    # === 5. Decode to audio
    inpainted_audio = generate_diffusion_cond_decode(
        model,
        inp_samples
    ).cpu()
    return inpainted_audio



In [None]:
# === Optional: Run full audio inpainting if you have a GPU ===
# NOTE: This is slow on CPU and NOT required for the homework.
# This is just for listening to results.

from IPython.display import Audio, display

for ix, prompt in enumerate([
    "lo-fi jazz piano in a rainy cafe",
    "deep ambient wash with ocean sounds"
]):
    reference = load_encoded_audio(f"testing_files/q1_{ix}.pt")   # Load latent reference
    mask = generate_inpainting_mask(reference, 0, 3)              # Create inpainting mask

    if ix == 0:
        paint_start = 0
        paint_end = 20
    else:
        paint_start = 15
        paint_end = 45

    # Run variable-strength inpainting
    audio = variable_inpaint(
        prompt=prompt,
        steps=50,
        cfg_scale=7,
        reference=reference,
        mask_start_s=0,
        mask_end_s=3,
        paint_start=paint_start,
        paint_end=paint_end
    )

    # Playback
    display(Audio(audio.cpu().numpy()[0], rate=SAMPLE_RATE))


### Q4 Summary – What’s happening?

This question adds more control by letting you choose **when** inpainting happens during the diffusion process.

Key logic:
- Only apply inpainting during specific timesteps using `paint_start` and `paint_end`.
- This limits how much the model alters the reference, especially near the boundaries.
- Outside this step range, the sample evolves normally.

Useful for achieving smoother transitions and less aggressive edits.


# Q5 Style Transfer

In [None]:
def simple_sample_style_transfer(model, sigmas, reference, extra_args=None, transfer_strength=0):
    # === Convert transfer strength (0.0–1.0) to a step index ===
    inv_step = int(transfer_strength * len(sigmas))

    # === Add noise to reference to simulate partial destruction ===
    x_t = reference + torch.randn_like(reference) * sigmas[inv_step]

    # === Run normal sampling starting from that noisy reference ===
    return simple_sample(model, x_t, sigmas[inv_step:], extra_args=extra_args)


In [None]:
def style_transfer(prompt="128 BPM house drum loop", steps=50, cfg_scale=7, reference=None, transfer_strength=0, return_latents=False):
    # === Set up prompt and duration ===
    conditioning = [{
        "prompt": prompt,
        "seconds_start": 0,
        "seconds_total": 5
    }]

    # === Get initial noise, denoiser, sigmas, etc. ===
    denoiser, x_T, sigmas, extra_args = generate_diffusion_cond_and_sampler_setup(
        model,
        steps=steps,
        cfg_scale=cfg_scale,
        conditioning=conditioning,
        sample_size=SAMPLE_SIZE,
        device=device,
        seed=SEED
    )

    # === Run sampling using style-transferred starting point ===
    inp_samples = simple_sample_style_transfer(
        denoiser, sigmas, reference,
        extra_args=extra_args,
        transfer_strength=transfer_strength
    )

    # === Clean up unused memory ===
    del x_T, sigmas, extra_args
    torch.cuda.empty_cache()
    gc.collect()

    if return_latents:
        return inp_samples

    # === Decode to waveform ===
    inpainted_audio = generate_diffusion_cond_decode(model, inp_samples).cpu()
    return inpainted_audio


In [None]:
# === For those running with a GPU, you can also generate and listen to audio samples ===
# This is NOT recommended for CPU-only users due to long runtimes and memory requirements.

# Loop over two style prompts
# Each prompt has a different reference audio and transfer strength
for ix, prompt in enumerate([
    "deep ambient wash with ocean sounds"
    "lo-fi jazz piano in a rainy cafe",
]):
    # === Load encoded latent reference ===
    # These are pre-encoded .pt files with reference style audio in latent space
    reference = load_encoded_audio(f"testing_files/q1_{ix}.pt")

    # === Define how much of the reference style to inject ===
    # 0.0 = no influence from reference (pure prompt)
    # 1.0 = fully follow reference audio, less prompt effect
    if ix == 0:
        transfer_strength = 0.2  # Mostly prompt-driven
    else:
        transfer_strength = 0.5  # More reference-style preserved

    # === Generate audio with style transfer ===
    audio = style_transfer(
        prompt=prompt,
        steps=50,
        cfg_scale=7,
        reference=reference,
        transfer_strength=transfer_strength,
        return_latents=False
    )

    # === Listen to the result directly in notebook ===
    ipd.display(ipd.Audio(audio.cpu().numpy()[0], rate=SAMPLE_RATE))


### Q5 Summary – What’s happening?

This task performs style transfer by combining a reference audio with a text prompt.

Key logic:
- Add noise to the reference latent up to a certain level based on `transfer_strength`.
- Use that noisy latent as the starting point for generation.
- Guide the rest of the process using the prompt and the diffusion model.

Higher transfer strength keeps more of the reference’s characteristics, while lower values let the prompt have more influence.
