<a href="https://colab.research.google.com/github/aggarwaldimple/Speech-Intent-Recognition-App/blob/main/Speech_to_intent_recognition_using_wav2vec2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:

!pip install torchaudio pydub scikit-learn numpy

!pip install transformers==4.41.1 --upgrade --quiet

!pip install qdrant-client




In [4]:
from typing import Dict, List, Tuple
import io
import os
import numpy as np
import torch
from torch.nn.functional import cosine_similarity
from google.colab import files

# Audio utils
from pydub import AudioSegment
from pydub.silence import detect_nonsilent

# HF model
from transformers import Wav2Vec2FeatureExtractor, Wav2Vec2Model

# -----------------------------
# Config
# -----------------------------
TARGET_SR = 16000  # Wav2Vec2 expects 16 kHz
MIN_SILENCE_LEN_MS = 150  # consecutive ms of silence to consider as silence
SILENCE_DB_OFFSET = 16    # silence threshold = (audio.dBFS - this)

# -----------------------------
# Model loading (do once)
# -----------------------------
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("facebook/wav2vec2-large-xlsr-53")
model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-large-xlsr-53")
model.eval()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/212 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/1.27G [00:00<?, ?B/s]

Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-large-xlsr-53 and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Wav2Vec2Model(
  (feature_extractor): Wav2Vec2FeatureEncoder(
    (conv_layers): ModuleList(
      (0): Wav2Vec2LayerNormConvLayer(
        (conv): Conv1d(1, 512, kernel_size=(10,), stride=(5,))
        (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (activation): GELUActivation()
      )
      (1-4): 4 x Wav2Vec2LayerNormConvLayer(
        (conv): Conv1d(512, 512, kernel_size=(3,), stride=(2,))
        (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (activation): GELUActivation()
      )
      (5-6): 2 x Wav2Vec2LayerNormConvLayer(
        (conv): Conv1d(512, 512, kernel_size=(2,), stride=(2,))
        (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (activation): GELUActivation()
      )
    )
  )
  (feature_projection): Wav2Vec2FeatureProjection(
    (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    (projection): Linear(in_features=512, out_features=1024, bias=True)
    (dropout)

In [6]:
# -----------------------------
# Audio preprocessing
# -----------------------------

def load_and_preprocess(path: str,
                        target_sr: int = TARGET_SR,
                        remove_silence: bool = False) -> torch.Tensor:
    """
    Load an audio file of any common type (mp3/m4a/wav),
    convert to mono 16 kHz, optionally remove silence,
    and return a 1-D float32 waveform tensor in [-1, 1].
    """
    # 1) Load with pydub (handles many formats)
    audio = AudioSegment.from_file(path)

    # 2) Standardize sample rate + channels
    audio = audio.set_frame_rate(target_sr).set_channels(1)

    # 3) Remove silence using energy thresholding
    if remove_silence:
        # threshold below which audio is considered silent
        silence_thresh = audio.dBFS - SILENCE_DB_OFFSET
        ranges = detect_nonsilent(audio, min_silence_len=MIN_SILENCE_LEN_MS,
                                  silence_thresh=silence_thresh)
        if ranges:
            trimmed = AudioSegment.silent(duration=0, frame_rate=target_sr)
            for start_ms, end_ms in ranges:
                trimmed += audio[start_ms:end_ms]
            audio = trimmed
        # if no ranges found, keep original audio

    # 4) Convert to numpy float32 in [-1, 1]
    samples = np.array(audio.get_array_of_samples())
    # Scale based on sample width (e.g., 16-bit -> 32768)
    max_val = float(1 << (8 * audio.sample_width - 1))
    samples = (samples.astype(np.float32) / max_val)

    # 5) To torch tensor, shape [time]; keep as 1-D for HF extractor
    waveform = torch.from_numpy(samples)
    return waveform

# -----------------------------
# Average Embeddings
# -----------------------------

def average_embeddings(emb_list: List[torch.Tensor], l2_normalize: bool = False) -> torch.Tensor:
    """Average multiple 1024-d embeddings and (optionally) L2-normalize the result."""
    if len(emb_list) == 1:
        avg = emb_list[0]
    else:
        avg = torch.stack(emb_list, dim=0).mean(dim=0)
    if l2_normalize:
        avg = avg / avg.norm(p=2).clamp_min(1e-12)
    return avg

import torch
import torchaudio
from transformers import Wav2Vec2Processor, Wav2Vec2Model

# Attention pooling layer
class AttentionPooling(torch.nn.Module):
    def __init__(self, hidden_size: int):
        super().__init__()
        self.attention = torch.nn.Linear(hidden_size, 1)

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        # hidden_states: [seq_len, hidden_size]
        attn_scores = self.attention(hidden_states)  # [seq_len, 1]
        attn_weights = torch.softmax(attn_scores, dim=0)  # [seq_len, 1]
        pooled = torch.sum(attn_weights * hidden_states, dim=0)  # [hidden_size]
        return pooled

attention_pooling = AttentionPooling(hidden_size=model.config.hidden_size)

def audio_to_embedding(wav: torch.Tensor, l2_normalize: bool = True) -> torch.Tensor:
    """
    Converts waveform to embedding using attention pooling.
    """
    # if wav.ndim > 1:
    #   wav = wav.mean(dim=0)
    #   wav = wav.squeeze()

    if wav.ndim == 2 and wav.shape[0] > 1:
        wav = wav.mean(dim=0)  # convert to mono

    wav = wav.float()
    input_values = feature_extractor(wav, sampling_rate=16000, return_tensors="pt").input_values

    with torch.no_grad():
        outputs = model(input_values)
        hidden_states = outputs.last_hidden_state.squeeze(0)  # [seq_len, hidden_size]

        embedding = attention_pooling(hidden_states)  # [hidden_size]

        if l2_normalize:
            embedding = embedding / embedding.norm(p=2)

    return embedding

import torch

def augment_embedding(emb: torch.Tensor, noise_level: float = 0.01) -> torch.Tensor:
    """
    Returns a slightly perturbed version of the embedding.
    Small Gaussian noise is added to simulate variation in speech.
    """
    noise = torch.randn_like(emb) * noise_level
    return emb + noise

import torch
import torchaudio
import torchaudio.transforms as T
import random
import gc
from typing import List, Dict

# ---------- Utility ----------
def _ensure_ct(x: torch.Tensor) -> torch.Tensor:
    """Ensure shape is (channels, time)."""
    x = x.squeeze()
    if x.dim() == 1:
        x = x.unsqueeze(0)
    elif x.dim() > 2:
        new_c = int(torch.prod(torch.tensor(x.shape[:-1])).item())
        x = x.reshape(new_c, x.shape[-1])
    return x.contiguous()

def _ensure_mono(x: torch.Tensor) -> torch.Tensor:
    """Convert (channels, time) -> (time,) mono."""
    x = _ensure_ct(x)
    if x.shape[0] > 1:
        x = x.mean(dim=0)
    else:
        x = x.squeeze(0)
    return x.contiguous()

def _ensure_bct(x: torch.Tensor) -> torch.Tensor:
    """Convert (channels, time) -> (batch=1, channels, time)."""
    x = _ensure_ct(x)
    return x.unsqueeze(0).contiguous()

# ---------- Chunked transform ----------
def apply_in_chunks(waveform: torch.Tensor, transform, chunk_size: int) -> torch.Tensor:
    """
    Apply a torchaudio transform in memory-safe chunks.
    Input/output are (channels, time).
    """
    wf = _ensure_ct(waveform)
    num_samples = wf.shape[1]
    if num_samples == 0:
        return wf

    chunk_size = max(1, int(chunk_size))
    chunks = []

    with torch.no_grad():  # prevent computation graph buildup
        for start in range(0, num_samples, chunk_size):
            end = min(start + chunk_size, num_samples)
            chunk = wf[:, start:end].contiguous()

            out = transform(chunk)
            out = _ensure_ct(out)
            chunks.append(out)

            del chunk, out
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

    return torch.cat(chunks, dim=1)

# ---------- Augmentation ----------
def create_audio_variations(
    waveform: torch.Tensor,
    sample_rate: int,
    num_variations: int = 2,
    max_chunk_seconds: float = 1.0
) -> List[torch.Tensor]:
    """
    Returns a list of augmented mono waveforms (1D) ready for embedding.
    Each tensor shape: [T]
    """
    base = _ensure_ct(waveform).cpu()
    chunk_size = max(1, int(sample_rate * max_chunk_seconds))
    variations = []

    for i in range(num_variations):
        aug = base.clone().detach()

        with torch.no_grad():
            # Pitch shift
            if random.random() < 0.5:
                n_steps = random.uniform(-2.0, 2.0)
                print(f"  Pitch: {n_steps:+.2f} semitones")
                pitch_shift = T.PitchShift(sample_rate, n_steps=n_steps)
                aug = apply_in_chunks(aug, pitch_shift, chunk_size)
                del pitch_shift
                gc.collect()

            # Noise
            if random.random() < 0.5:
                sigma = random.uniform(0.001, 0.01)
                print(f"  Noise sigma={sigma:.4f}")
                aug.add_(torch.randn_like(aug) * sigma)
                gc.collect()

            # Gain
            if random.random() < 0.5:
                gain_db = random.uniform(-2.0, 2.0)
                print(f"  Gain: {gain_db:+.2f} dB")
                aug.mul_(10.0 ** (gain_db / 20.0))

            # Speed change
            if random.random() < 0.5:
                speed = random.uniform(0.95, 1.05)
                print(f"  Speed factor: {speed:.3f}")
                new_sr = max(1, int(sample_rate * speed))
                resample_up = T.Resample(sample_rate, new_sr)
                resample_down = T.Resample(new_sr, sample_rate)
                aug = apply_in_chunks(aug, resample_up, chunk_size/2)
                aug = apply_in_chunks(aug, resample_down, chunk_size/2)
                del resample_up, resample_down
                gc.collect()

        # Convert to mono 1D [T]
        mono_aug = _ensure_mono(aug)
        variations.append(mono_aug)

        del aug
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    return variations

gc.collect()

import torch
import numpy as np
from scipy.io import wavfile
import io
import base64

def audio_to_base64(waveform: torch.Tensor, sample_rate: int) -> str:
    """
    Convert a waveform tensor to WAV and encode as Base64.
    """
    # Detach and move to CPU
    wav = waveform.detach().cpu()
    wav_np = wav.numpy()

    # If multi-channel, make sure shape is (N,) or (N, C)
    if wav_np.ndim > 1:
        wav_np = wav_np.T  # scipy expects shape (N, C)

    # Convert to float32 if needed
    if not np.issubdtype(wav_np.dtype, np.floating):
        wav_np = wav_np.astype(np.float32)

    # Write to in-memory WAV file
    buf = io.BytesIO()
    wavfile.write(buf, sample_rate, wav_np)
    buf.seek(0)
    b64 = base64.b64encode(buf.read()).decode("utf-8")
    return b64

from typing import List, Dict
from google.colab import files
import torch

# ---------- Reference index builder ----------
def build_reference_index_from_list(
    intents,
    samples_per_intent: int = 3,
    augment: bool = True,
    augment_factor: int = 3
) -> Dict[str, Dict[str, List[torch.Tensor]]]:
    """
    Builds reference index with original + augmented embeddings.
    Processes each file individually to minimize memory footprint.
    """
    reference_index: Dict[str, Dict[str, List[torch.Tensor]]] = {}

    for intent in intents:
        print(f"\nPlease upload {samples_per_intent} audio samples for intent: '{intent}'")
        uploaded = files.upload()

        emb_list: List[torch.Tensor] = []
        individual_list = []
        original_base64_list = []

        for fname in uploaded.keys():
            wav = load_and_preprocess(fname, remove_silence=False)  # should return mono or stereo tensor
            wav_mono = _ensure_mono(wav)  # ensure mono before embedding

            # Original embedding
            emb = audio_to_embedding(wav_mono.detach(), l2_normalize=True)
            emb_base64 = audio_to_base64(wav_mono.detach(), sample_rate=TARGET_SR)
            individual_list.append({"embedding": emb, "audio_base64": emb_base64})
            original_base64_list.append(emb_base64)
            del wav, emb
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

            # Augmentation
            if augment:
                print(f"Creating variations for '{fname}'...")
                variations = create_audio_variations(
                    wav_mono,
                    sample_rate=TARGET_SR,
                    num_variations=augment_factor,
                    max_chunk_seconds=1.0
                )
                for i, var_wav in enumerate(variations, start=1):
                    aug_emb = audio_to_embedding(var_wav.detach(), l2_normalize=True)
                    aug_base64 = audio_to_base64(var_wav.detach(), sample_rate=TARGET_SR)
                    individual_list.append({"embedding": aug_emb, "audio_base64": aug_base64})
                    del var_wav, aug_emb
                    gc.collect()
                    if torch.cuda.is_available():
                        torch.cuda.empty_cache()
                    print(f"Processed variation {i}.")

        # Average embedding from originals
        avg_emb = average_embeddings(
            [d["embedding"] for d in individual_list[:samples_per_intent]],
            l2_normalize=True
        )

        reference_index[intent] = {
            "individual": individual_list,
            "average": {"embedding": avg_emb, "audio_base64_list": original_base64_list}
        }

        print(f"Intent '{intent}': {len(individual_list)} embeddings stored.")

    print("\n✅ Reference index ready!")
    return reference_index

# Call it to start collecting audio samples

# Predefined intents/phrases (expand this to 40–50)
COMMON_INTENTS = [
    "Thank you",
    "I need help",
    "I need Water"
    # … add all the rest
]


# qdrant Part

from qdrant_client import QdrantClient
from qdrant_client.http import models
import torch

def save_reference_index_to_qdrant(
    reference_index: dict,
    username: str,
    collection_name: str = "speech_intents_for_attention_polling",
    qdrant_url: str = "https://7d3e9db7-bab2-4f55-952c-fde18ffb7d98.eu-west-1-0.aws.cloud.qdrant.io",
    api_key: str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.vWJNPK2iJIZoYAsBuuqD03ZvkxhKAhaDaB9UujOPv3s"
):
    """
    Saves both individual and average embeddings for each intent to Qdrant.
    """

    client = QdrantClient(url=qdrant_url, api_key=api_key)

    # Ensure collection exists
    first_intent = next(iter(reference_index.values()))
    first_vector = first_intent["average"]["embedding"]
    vector_size = first_vector.shape[0]

    if not client.collection_exists(collection_name):
        client.create_collection(
            collection_name=collection_name,
            vectors_config=models.VectorParams(size=vector_size, distance=models.Distance.COSINE),
        )

    payloads = []
    vectors = []

    for intent, data in reference_index.items():
        # Save individual vectors
        for item in data["individual"]:
            vec = item["embedding"]
            audio_b64 = item["audio_base64"]
            vector_list = vec.detach().cpu().tolist()
            payloads.append({"username": username, "intent": intent, "type": "individual","audio_b64": None})
            vectors.append(vector_list)

        # Save average vector

        avg_vec = data["average"]["embedding"]
        avg_vec_list = avg_vec.detach().cpu().tolist()
        payloads.append({"username": username, "intent": intent, "type": "average","audio_b64": None})
        vectors.append(avg_vec_list)

    # Insert into Qdrant
    client.upload_collection(
        collection_name=collection_name,
        payload=payloads,
        vectors=vectors,
    )

    print(f"✅ Saved {len(vectors)} vectors for user '{username}' into Qdrant.")


"""You only need to run this once for the collection."""

# You only need to run this once for the collection.
# This tells Qdrant to treat username as a keyword and allows filtering on it efficiently.

from qdrant_client import QdrantClient
from qdrant_client.http import models

from qdrant_client.http import models as rest

qdrant_url = "https://7d3e9db7-bab2-4f55-952c-fde18ffb7d98.eu-west-1-0.aws.cloud.qdrant.io"
api_key = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.vWJNPK2iJIZoYAsBuuqD03ZvkxhKAhaDaB9UujOPv3s"

client = QdrantClient(url=qdrant_url, api_key=api_key)

# Create index for 'username' as a keyword
client.create_payload_index(
    collection_name="speech_intents_for_attention_polling",
    field_name="username",
    field_schema=rest.PayloadSchemaType.KEYWORD
)

# Create index for 'type' as a keyword
client.create_payload_index(
    collection_name="speech_intents_for_attention_polling",
    field_name="type",
    field_schema=rest.PayloadSchemaType.KEYWORD
)

"""Fetch Vector Data From Qdrant for given user"""

def classify_audio(embedding, username, top_k=5, confidence_threshold=0.6, avg_weight=1.5):

    qdrant_url: str = "https://7d3e9db7-bab2-4f55-952c-fde18ffb7d98.eu-west-1-0.aws.cloud.qdrant.io"
    api_key: str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.vWJNPK2iJIZoYAsBuuqD03ZvkxhKAhaDaB9UujOPv3s"
    client = QdrantClient(url=qdrant_url, api_key=api_key)

    # Query individual vectors first
    search_results = client.query_points(
        collection_name="speech_intents_for_attention_polling",
        query=embedding.detach().cpu().tolist(),
        limit=top_k,
        query_filter=models.Filter(
            must=[
                models.FieldCondition(key="username", match=models.MatchValue(value=username)),
                models.FieldCondition(key="type", match=models.MatchValue(value="individual"))
            ]
        )
    ).points

    if not search_results:
        return None, 0.0

    # Weighted voting for individual vectors
    intent_scores = {}
    for hit in search_results:
        lbl = hit.payload["intent"]
        score = hit.score
        intent_scores[lbl] = intent_scores.get(lbl, 0) + score
        scores = [hit.score for hit in search_results]  # cosine similarity score

    predicted_intent = max(intent_scores, key=intent_scores.get)
    confidence = intent_scores[predicted_intent] /  sum(intent_scores.values())  # sum(scores)

    # Fallback to average vectors if confidence is low
    if confidence < confidence_threshold:
        avg_results = client.query_points(
            collection_name="speech_intents_for_attention_polling",
            query=embedding.detach().cpu().tolist(),
            limit=top_k,
            query_filter=models.Filter(
                must=[
                    models.FieldCondition(key="username", match=models.MatchValue(value=username)),
                    models.FieldCondition(key="type", match=models.MatchValue(value="average"))
                ]
            )
        ).points

        if avg_results:
            intent_scores = {}
            for hit in avg_results:
                lbl = hit.payload["intent"]
                score = hit.score * avg_weight
                intent_scores[lbl] = intent_scores.get(lbl, 0) + score

            predicted_intent = max(intent_scores, key=intent_scores.get)
            confidence = intent_scores[predicted_intent] / sum([h.score for h in avg_results])

    return predicted_intent, confidence



#------------------------------------------------------------------------------------------------------------------------------------------------




create reference_index

In [8]:

#create reference_index

reference_index = build_reference_index_from_list(
    intents=COMMON_INTENTS,
    samples_per_intent=3  # or 2, depending on your choice
)


# Loop over each item in the reference_index dictionary
for intent, ref in reference_index.items():
    print(f"Intent: {intent}")

    # Average embedding info
    avg_emb = ref['average']['embedding']
    avg_audio_list = ref['average']['audio_base64_list']
    print(f"Average embedding shape: {avg_emb.shape}")
    print(f"Average embedding as list: {avg_emb.tolist()}")
    print(f"Number of original audio Base64 clips: {len(avg_audio_list)}")

    # Individual embeddings info
    for i, item in enumerate(ref['individual'], start=1):
        emb = item['embedding']
        audio_base64 = item['audio_base64']
        # print(f"Individual #{i} embedding shape: {emb.shape}")
        print(f"Individual #{i} embedding as list: {emb.tolist()}")
        print(f"Individual #{i} audio Base64 length: {len(audio_base64)}")

    print("-" * 40)  # Separator for clarity


Please upload 3 audio samples for intent: 'Thank you'


Saving Thank you 1.mp3 to Thank you 1 (1).mp3
Creating variations for 'Thank you 1 (1).mp3'...
  Pitch: -0.43 semitones


KeyboardInterrupt: 

save to Qdrant

In [None]:
# Step 2: save to Qdrant
save_reference_index_to_qdrant(reference_index, username="dimple")

Classify a new audio clip

In [9]:
print("\n=== Classify a new audio clip ===")
print("Please upload the audio file you want to classify…")

uploaded = files.upload()  # User uploads file
fname = next(iter(uploaded.keys()))

# Convert to embedding
wav = load_and_preprocess(fname, remove_silence=False)
emb = audio_to_embedding(wav, l2_normalize=True)


# Classify via Qdrant
predicted, conf = classify_audio(emb, username="dimple")

if predicted:
    print(f"\n✅ Predicted Intent: {predicted}  |  Confidence: {conf:.2f}")
else:
    print(f"\n⚠️ No confident match found. Confidence: {conf:.2f}")


=== Classify a new audio clip ===
Please upload the audio file you want to classify…


Saving I need Help 1.mp3 to I need Help 1.mp3

✅ Predicted Intent: I need help  |  Confidence: 0.60
