# Speaker Voice Extraction

This notebook extracts a specific person's voice from multi-speaker audio:

1. **Load Models**: Speech separation (Sepformer) and speaker recognition (ECAPA-VDNN)
2. **Provide Inputs**: Target speaker sample + conversation audio
3. **Separate**: Split conversation into individual speakers
4. **Match**: Compare each separated voice with target sample
5. **Extract**: Save only the matching speaker's voice

**What happens:**
- `separate_file()` splits audio into individual speaker streams (typically 2 sources)
- Each source is compared to your target sample using speaker verification
- The source with highest similarity score is your target speaker

In [54]:
import torchaudio
from speechbrain.pretrained import SepformerSeparation as separator
from speechbrain.pretrained import SpeakerRecognition

In [55]:
# 1Ô∏è‚É£ Load models
# Force reload with explicit parameters
import torch

# Clear any cached model
if 'separation_model' in globals():
    del separation_model
if 'spkrec' in globals():
    del spkrec
torch.cuda.empty_cache() if torch.cuda.is_available() else None

# Load separation model - use WSJ mix instead of WHAMR
print("Loading separation model...")
separation_model = separator.from_hparams(
    source="speechbrain/sepformer-wsjmix",  # Different pretrained model
    savedir="pretrained_models/sepformer-wsjmix"
)

# Load speaker recognition model
print("Loading speaker recognition model...")
spkrec = SpeakerRecognition.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    savedir="pretrained_models/spkrec-ecapa-voxceleb"
)

print("‚úÖ Models loaded successfully")

INFO:speechbrain.utils.fetching:Fetch hyperparams.yaml: Fetching from HuggingFace Hub 'speechbrain/sepformer-wsjmix' if not cached


Loading separation model...


RepositoryNotFoundError: 401 Client Error. (Request ID: Root=1-691196f4-57d926c51f90878579b8a63c;5325aa7b-6e42-480b-806c-48ae33f8376d)

Repository Not Found for url: https://huggingface.co/speechbrain/sepformer-wsjmix/resolve/main/hyperparams.yaml.
Please make sure you specified the correct `repo_id` and `repo_type`.
If you are trying to access a private or gated repo, make sure you are authenticated.
Invalid username or password.

In [None]:
# File paths
target_sample = r"C:\Users\User_1\Desktop\speeches\1\3000_sample.wav"
conversation = r"C:\Users\User_1\Desktop\speeches\1\concat_1.wav"
output_file = r"C:\Users\User_1\Desktop\speeches\1\target_voice_only.wav"

In [None]:
# Verify files exist
import os

files_to_check = [target_sample, conversation]
for file in files_to_check:
    if os.path.exists(file):
        size_mb = os.path.getsize(file) / (1024 * 1024)
        print(f"‚úÖ Found: {os.path.basename(file)} ({size_mb:.2f} MB)")
    else:
        print(f"‚ùå Missing: {file}")

‚úÖ Found: 3000_sample.wav (0.12 MB)
‚úÖ Found: concat_1.wav (2.92 MB)


In [None]:
# 2Ô∏è‚É£ Load audios
target_ref, sr_target = torchaudio.load(target_sample)
mixture, sr_mix = torchaudio.load(conversation)

# Convert to mono if stereo (take first channel)
if target_ref.shape[0] > 1:
    target_ref = target_ref[0:1, :]
    print(f"Target converted to mono")
    
if mixture.shape[0] > 1:
    mixture = mixture[0:1, :]
    print(f"Conversation converted to mono")

print(f"Target shape: {target_ref.shape}, SR: {sr_target}Hz")
print(f"Conversation shape: {mixture.shape}, SR: {sr_mix}Hz")

Target shape: torch.Size([1, 64720]), SR: 16000Hz
Conversation shape: torch.Size([1, 1529760]), SR: 16000Hz


In [None]:
# 3Ô∏è‚É£ Separate mixture into sources
print(f"Original mixture shape: {mixture.shape}")

# The model expects [batch, time] where batch=1
# mixture is currently [channels, samples] = [1, samples]
# We need to pass just [samples] and add batch dimension

# Extract the audio samples (remove channel dimension)
mixture_mono = mixture.squeeze(0)  # [1, samples] -> [samples]
print(f"Mono audio: {mixture_mono.shape}")

# Add batch dimension
mixture_batch = mixture_mono.unsqueeze(0)  # [samples] -> [1, samples]
print(f"Batched input: {mixture_batch.shape}")

# IMPORTANT: Move to same device as model
device = next(separation_model.mods.encoder.parameters()).device
print(f"Model device: {device}")
mixture_batch = mixture_batch.to(device)

# Separate the sources
print("\nSeparating sources (this may take a couple minutes)...")
est_sources = separation_model.separate_batch(mixture_batch)

print(f"Separator output shape: {est_sources.shape}")

# The output should be [batch, num_sources, samples]
if est_sources.dim() == 3 and est_sources.shape[0] == 1:
    # Remove batch dimension: [1, num_sources, samples] -> [num_sources, samples]
    est_sources = est_sources.squeeze(0)
    num_sources = est_sources.shape[0]
    num_samples = est_sources.shape[1]
    
    print(f"\n‚úÖ Successfully separated into {num_sources} sources")
    print(f"   Each source: {num_samples} samples ({num_samples/sr_mix:.1f} seconds)")
    
    if num_sources != 2:
        print(f"‚ö†Ô∏è  WARNING: Expected 2 sources, got {num_sources}")
        print("   This may indicate the model isn't working correctly")
else:
    print(f"\n‚ùå ERROR: Unexpected output shape: {est_sources.shape}")
    print("   Expected: [1, 2, samples] or similar")
    print("\nTrying to reshape...")
    
    # Last resort: if output is completely wrong, we can't proceed
    if est_sources.numel() < sr_mix * 10:  # Less than 10 seconds total
        raise ValueError(
            f"Separation failed completely. Output shape {est_sources.shape} "
            f"doesn't contain enough data. The model may not be compatible."
        )

# Move back to CPU for processing
est_sources = est_sources.cpu()

# 4Ô∏è‚É£ Compare each source with target using embeddings
print("\n" + "="*60)
print("Comparing sources with target speaker...")
print("="*60)

scores = []
for i in range(est_sources.shape[0]):
    print(f"\nSource {i}:")
    
    # Get this source's audio: [samples]
    source_audio = est_sources[i]
    
    # Prepare inputs for speaker recognition
    # Target: [1, samples] -> [1, 1, samples]
    target_input = target_ref.unsqueeze(0)
    
    # Source: [samples] -> [1, samples] -> [1, 1, samples]
    source_input = source_audio.unsqueeze(0).unsqueeze(0)
    
    print(f"  Target shape: {target_input.shape}")
    print(f"  Source shape: {source_input.shape}")
    
    try:
        # Compute embeddings
        emb_target = spkrec.encode_batch(target_input)
        emb_source = spkrec.encode_batch(source_input)
        
        # Compute cosine similarity
        from speechbrain.processing.PLDA_LDA import cosine_similarity
        score = cosine_similarity(emb_target, emb_source).item()
        
        scores.append(score)
        print(f"  ‚úì Similarity: {score:.4f}")
    except Exception as e:
        print(f"  ‚úó Error computing similarity: {e}")
        scores.append(0.0)

# 5Ô∏è‚É£ Find the best match
if scores:
    best_index = scores.index(max(scores))
    print(f"\n{'='*60}")
    print(f"‚úÖ BEST MATCH: Source {best_index}")
    print(f"   Similarity score: {scores[best_index]:.4f}")
    print(f"   Threshold: >0.25 = same speaker")
    print('='*60)
    
    # 6Ô∏è‚É£ Save the matched speaker's voice
    torchaudio.save(output_file, est_sources[best_index].unsqueeze(0), sr_mix)
    print(f"\nüíæ Saved to: {output_file}")
    print(f"   Duration: {est_sources[best_index].shape[0]/sr_mix:.1f} seconds")
else:
    print("\n‚ùå No valid similarity scores computed")

Mixture shape before: torch.Size([1, 1529760])
Input to separator: torch.Size([1, 1529760])
Raw separator output shape: torch.Size([1, 1529760, 1])
ERROR: Expected 2 sources, got shape torch.Size([1, 1529760, 1])
The model output is malformed. Trying alternative approach...
Raw separator output shape: torch.Size([1, 1529760, 1])
ERROR: Expected 2 sources, got shape torch.Size([1, 1529760, 1])
The model output is malformed. Trying alternative approach...


ValueError: Cannot handle separator output shape: torch.Size([1, 1529760, 1])

In [None]:
## Optional: Save Both Separated Sources

If you want to manually listen to both separated sources:

In [None]:
# Save all separated sources for manual inspection
output_dir = os.path.dirname(output_file)

for i in range(est_sources.shape[0]):
    source_file = os.path.join(output_dir, f"source_{i}.wav")
    torchaudio.save(source_file, est_sources[i].unsqueeze(0), sr_mix)
    print(f"Saved source {i} to: {source_file}")
    print(f"  Similarity with target: {scores[i]:.4f}")
    if i == best_index:
        print(f"  ‚≠ê This is the best match!")