<a href="https://colab.research.google.com/github/Krishnenduanitha/Cuisine-recommender/blob/main/iiith_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =============================
# Load + Prep + Robust Split
# =============================


import re, random, numpy as np
from typing import List, Optional
from collections import Counter, defaultdict
from datasets import load_dataset, Audio, Dataset, DatasetDict

# ---- Repro & lightweight config (tune later) ----
SEED = 42
random.seed(SEED); np.random.seed(SEED)

SAMPLE_RATE = 16_000
MIN_DURATION_S = 0.25     # keep short clips
MAX_DURATION_S = 8.0      # cap long clips (saves RAM/time)

# Word/sentence fallback heuristics (used only if no level field)
WORD_MAX_S = 1.5
SENT_MIN_S = 1.8

TOP_K_LABELS = 4
SAMPLES_PER_CLASS_TRAIN = 6
SAMPLES_PER_CLASS_VAL   = 2
SAMPLES_PER_CLASS_TEST  = 2

TARGET_LABELS = None  # or set like ["hindi", "tamil", "telugu", "malayalam"]

# ----------------- Helpers -----------------
def autodetect_field(ds: Dataset, candidates: List[str]) -> Optional[str]:
    cols = set(ds.column_names)
    for c in candidates:
        if c in cols:
            return c
    # case-insensitive
    low_map = {c.lower(): c for c in ds.column_names}
    for c in candidates:
        if c.lower() in low_map:
            return low_map[c.lower()]
    return None

def unify_label(x):
    """Safe normalization that never returns empty string."""
    if x is None:
        return None
    if isinstance(x, (list, tuple)):
        x = next((t for t in x if t), None)
        if x is None:
            return None

    raw = str(x).strip().lower()
    if raw == "":
        return None

    compact = re.sub(r'[^a-z]+', '', raw)  # remove non-letters
    norm = compact if compact else raw      # fallback to raw if compact emptied

    maps = {
        "hin":"hindi","hindi":"hindi",
        "tam":"tamil","tamil":"tamil",
        "tel":"telugu","telugu":"telugu",
        "mal":"malayalam","ml":"malayalam","malayalam":"malayalam",
        "kan":"kannada","kannada":"kannada",
        "pun":"punjabi","punjabi":"punjabi",
        "ben":"bengali","bengali":"bengali",
        "mar":"marathi","marathi":"marathi",
        "guj":"gujarati","gujarati":"gujarati",
        "oriya":"odia","odia":"odia",
        "urdu":"urdu"
    }
    return maps.get(norm, norm)

def guess_age_group(val):
    if val is None: return None
    s = str(val).lower()
    if any(k in s for k in ["child","kids","kid","teen","minor","u16","u18"]):
        return "child"
    if any(k in s for k in ["adult","adlt","grown","senior"]):
        return "adult"
    nums = re.findall(r'\d+', s)
    if nums:
        try:
            age = int(nums[0])
            return "child" if age < 18 else "adult"
        except:
            pass
    return None

def guess_level_from_row(dur_s: float, level_value):
    if level_value is not None:
        s = str(level_value).lower()
        if "word" in s: return "word"
        if "sent" in s or "phrase" in s: return "sentence"
    if dur_s <= WORD_MAX_S: return "word"
    if dur_s >= SENT_MIN_S: return "sentence"
    return "unknown"

def label_counts(dataset, label_col="label_canon"):
    cnt = Counter(dataset[label_col])
    print("Label counts (post-filter):")
    for k,v in sorted(cnt.items(), key=lambda kv:(-kv[1], kv[0]))[:30]:
        print(f"  {k:>12s} : {v}")
    return cnt

def try_build_split(dataset, label_col, per_train, per_val, per_test, seed):
    buckets = defaultdict(list)
    for i, l in enumerate(dataset[label_col]):
        buckets[l].append(i)
    rng = np.random.default_rng(seed)
    for l in buckets:
        rng.shuffle(buckets[l])

    train_idx, val_idx, test_idx = [], [], []
    for l, idxs in buckets.items():
        t_train = min(per_train, len(idxs))
        t_val   = min(per_val,   max(0, len(idxs)-t_train))
        t_test  = min(per_test,  max(0, len(idxs)-t_train-t_val))
        train_idx += idxs[:t_train]
        val_idx   += idxs[t_train:t_train+t_val]
        test_idx  += idxs[t_train+t_val:t_train+t_val+t_test]

    return DatasetDict({
        "train": dataset.select(train_idx).shuffle(seed=seed),
        "validation": dataset.select(val_idx).shuffle(seed=seed),
        "test": dataset.select(test_idx).shuffle(seed=seed),
    })

def ensure_multiclass_split(ds_filtered, label_col, want_tr, want_va, want_te, max_tries=40):
    # progressively relax counts until both train and val have ‚â•2 labels
    for per_tr in range(max(want_tr,1), 0, -1):
        for per_va in range(max(want_va,1), 0, -1):
            for per_te in range(max(want_te,0), -1, -1):
                for k in range(max_tries):
                    split = try_build_split(ds_filtered.shuffle(seed=SEED+k), label_col, per_tr, per_va, per_te, SEED+k)
                    ytr = split["train"][label_col]; yva = split["validation"][label_col]
                    if len(set(ytr)) >= 2 and len(set(yva)) >= 2:
                        print(f"[split] success: train={per_tr}, val={per_va}, test={per_te} (try {k+1})")
                        return split
    raise RuntimeError("Could not build a multiclass split. Loosen filters or include more labels.")

# ----------------- Load + Autodetect -----------------
print("Loading IndicAccentDb‚Ä¶")
ds = load_dataset("DarshanaS/IndicAccentDb", split="train")
print(ds)

audio_field = autodetect_field(ds, ["audio","path","file","audio_path","wav","waveform"])
label_field = autodetect_field(ds, ["l1","native_language","label","accent","language"])
age_field   = autodetect_field(ds, ["age_group","age","speaker_age","speaker_age_group","agegroup"])
level_field = autodetect_field(ds, ["level","unit","linguistic_level","linguistic_unit","type"])

if audio_field is None:
    raise ValueError(f"Could not find an audio field. Columns: {ds.column_names}")
if label_field is None:
    raise ValueError(f"Could not detect label field. Columns: {ds.column_names}")

print("Detected fields:")
print(" audio_field:", audio_field)
print(" label_field:", label_field)
print(" age_field  :", age_field)
print(" level_field:", level_field)

# Cast audio to unified feature
ds = ds.cast_column(audio_field, Audio(sampling_rate=SAMPLE_RATE))
print(ds.features)

# ----------------- Prep rows -----------------
def _prep(ex):
    a = ex[audio_field]
    dur = 0.0
    if isinstance(a, dict) and isinstance(a.get("array", None), np.ndarray) and a.get("sampling_rate"):
        dur = a["array"].shape[-1] / float(a["sampling_rate"])

    lab = unify_label(ex.get(label_field))
    age_g = guess_age_group(ex.get(age_field)) if age_field else None
    level_val = ex.get(level_field) if level_field else None

    ex["duration_s"] = float(dur)
    ex["label_canon"] = lab
    ex["age_group_guess"] = age_g
    ex["level_guess"] = guess_level_from_row(dur, level_val)
    return ex

ds = ds.map(_prep, remove_columns=[], num_proc=2)
print("Rows after prep:", len(ds))

# Duration & label filters
ds = ds.filter(lambda x: x["duration_s"] is not None and MIN_DURATION_S <= x["duration_s"] <= MAX_DURATION_S, num_proc=2)
print("Rows after duration filter:", len(ds))

ds = ds.filter(lambda x: isinstance(x["label_canon"], str) and x["label_canon"] != "", num_proc=2)
print("Rows after non-empty label filter:", len(ds))

# ----------------- Label selection -----------------
cnt = label_counts(ds, "label_canon")

if TARGET_LABELS is None:
    labels_sorted = [k for k,_ in sorted(cnt.items(), key=lambda kv: -kv[1])]
    # pick top-K that have at least 2 total examples
    label_list = [l for l in labels_sorted if cnt[l] >= 2][:max(TOP_K_LABELS, 2)]
else:
    label_list = [unify_label(z) for z in TARGET_LABELS if z]

# final fallback: ensure ‚â•2 labels
if len(label_list) < 2:
    label_list = [k for k,_ in Counter(ds["label_canon"]).most_common(2)]
    if len(label_list) < 2:
        raise ValueError("Need at least 2 distinct labels after filtering. "
                         "Loosen MIN/MAX duration or verify detected label_field.")

print("Using labels:", label_list)

label_set = set(label_list)
ds = ds.filter(lambda x: x["label_canon"] in label_set, num_proc=2)
print("Rows after label filter:", len(ds))

# ----------------- Build robust small split -----------------
small = ensure_multiclass_split(
    ds.shuffle(seed=SEED),
    "label_canon",
    SAMPLES_PER_CLASS_TRAIN,
    SAMPLES_PER_CLASS_VAL,
    SAMPLES_PER_CLASS_TEST
)

for k in small:
    labs = set(small[k]["label_canon"]) if len(small[k]) else set()
    print(f"{k}: {len(small[k])} rows, {len(labs)} unique labels -> {sorted(list(labs))}")


Loading IndicAccentDb‚Ä¶


Downloading readme: 0.00B [00:00, ?B/s]

Downloading data:   0%|          | 0.00/3.20G [00:00<?, ?B/s]

Generating train split:   0%|          | 0/8116 [00:00<?, ? examples/s]

Dataset({
    features: ['audio', 'label'],
    num_rows: 8116
})
Detected fields:
 audio_field: audio
 label_field: label
 age_field  : None
 level_field: None
{'audio': Audio(sampling_rate=16000, mono=True, decode=True, id=None), 'label': ClassLabel(names=['andhra_pradesh', 'gujrat', 'jharkhand', 'karnataka', 'kerala', 'tamil'], id=None)}


Map (num_proc=2):   0%|          | 0/8116 [00:00<?, ? examples/s]

Rows after prep: 8116


Filter (num_proc=2):   0%|          | 0/8116 [00:00<?, ? examples/s]

Rows after duration filter: 7975


Filter (num_proc=2):   0%|          | 0/7975 [00:00<?, ? examples/s]

Rows after non-empty label filter: 7975
Label counts (post-filter):
             5 : 1812
             0 : 1766
             4 : 1639
             3 : 1636
             2 : 824
             1 : 298
Using labels: ['5', '0', '4', '3']


Filter (num_proc=2):   0%|          | 0/7975 [00:00<?, ? examples/s]

Rows after label filter: 6853
[split] success: train=6, val=2, test=2 (try 1)
train: 24 rows, 4 unique labels -> ['0', '3', '4', '5']
validation: 8 rows, 4 unique labels -> ['0', '3', '4', '5']
test: 8 rows, 4 unique labels -> ['0', '3', '4', '5']


In [None]:
# --- Label mapping helpers (pretty names) ---
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import numpy as np
import torch, torchaudio
import matplotlib.pyplot as plt

# Grab original ClassLabel names from the dataset schema
orig_names = ds.features["label"].names if "label" in ds.features and hasattr(ds.features["label"], "names") else None

def pretty_label(s: str) -> str:
    # labels are strings like '0','3','4','5'
    if orig_names is not None and s.isdigit():
        idx = int(s)
        if 0 <= idx < len(orig_names):
            return orig_names[idx]
    return s

# Use the labels selected in your previous cell
label_list = sorted(list(set(small["train"]["label_canon"])))
label_to_id = {l:i for i,l in enumerate(label_list)}
id_to_label = {i:l for l,i in label_to_id.items()}
pretty_names = [pretty_label(l) for l in label_list]

print("Model classes:", label_list, "->", pretty_names)


Model classes: ['0', '3', '4', '5'] -> ['andhra_pradesh', 'karnataka', 'kerala', 'tamil']


In [None]:
# ------- MFCC config -------
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
N_MFCC = 40
MFCC_WIN_MS = 25
MFCC_HOP_MS = 10

mfcc = torchaudio.transforms.MFCC(
    sample_rate=SAMPLE_RATE,
    n_mfcc=N_MFCC,
    melkwargs={
        "n_fft": int(SAMPLE_RATE*MFCC_WIN_MS/1000),
        "hop_length": int(SAMPLE_RATE*MFCC_HOP_MS/1000),
        "n_mels": 64
    }
).to(DEVICE)

def audio_to_tensor(wav_np):
    t = torch.from_numpy(wav_np).float()
    if t.ndim > 1:
        t = t.mean(dim=0)
    return t

@torch.no_grad()
def mfcc_stats_from_example(ex):
    wav = ex["audio"]["array"]  # using the cast column name
    t = audio_to_tensor(wav).to(DEVICE)
    # truncate to MAX_DURATION_S for speed
    max_len = int(SAMPLE_RATE*MAX_DURATION_S)
    if t.numel() > max_len:
        t = t[:max_len]
    M = mfcc(t.unsqueeze(0)).squeeze(0)   # (n_mfcc, T)
    feat = torch.cat([M.mean(dim=1), M.std(dim=1)], dim=0).cpu().numpy()  # (2*n_mfcc,)
    y = label_to_id[ex["label_canon"]]
    return feat, y

def build_mfcc_matrix(dataset_split):
    X, y = [], []
    for ex in dataset_split:
        f, lab = mfcc_stats_from_example(ex)
        X.append(f); y.append(lab)
    return np.vstack(X), np.array(y)

# Build matrices
from sklearn.preprocessing import StandardScaler
X_tr, y_tr = build_mfcc_matrix(small["train"])
X_va, y_va = build_mfcc_matrix(small["validation"])
X_te, y_te = build_mfcc_matrix(small["test"])

sc_mfcc = StandardScaler().fit(X_tr)
X_tr_s = sc_mfcc.transform(X_tr)
X_va_s = sc_mfcc.transform(X_va)
X_te_s = sc_mfcc.transform(X_te)

# ----- Tiny MLP -----
class MLP(torch.nn.Module):
    def __init__(self, d_in, n_classes):
        super().__init__()
        self.seq = torch.nn.Sequential(
            torch.nn.Linear(d_in, 256), torch.nn.ReLU(), torch.nn.Dropout(0.2),
            torch.nn.Linear(256, 128), torch.nn.ReLU(), torch.nn.Dropout(0.2),
            torch.nn.Linear(128, n_classes)
        )
    def forward(self, x): return self.seq(x)

def train_mlp(Xtr, ytr, Xval, yval, epochs=15, lr=2e-3, bs=32):
    model = MLP(Xtr.shape[1], len(label_list)).to(DEVICE)
    opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-3)
    crit = torch.nn.CrossEntropyLoss()

    def to_t(x): return torch.from_numpy(x).float().to(DEVICE)
    def to_y(y): return torch.from_numpy(y).long().to(DEVICE)

    best_state, best_va = None, -1
    for ep in range(1, epochs+1):
        model.train()
        idx = np.arange(len(Xtr)); np.random.shuffle(idx)
        for i in range(0, len(idx), bs):
            sel = idx[i:i+bs]
            xb, yb = to_t(Xtr[sel]), to_y(ytr[sel])
            opt.zero_grad()
            loss = crit(model(xb), yb)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            opt.step()
        # val
        model.eval()
        with torch.no_grad():
            pv = model(to_t(Xval)).argmax(1).cpu().numpy()
            acc = (pv == yval).mean()
        print(f"[MFCC-MLP] Epoch {ep:02d}  val_acc={acc:.3f}")
        if acc > best_va:
            best_va = acc
            best_state = {k:v.detach().cpu().clone() for k,v in model.state_dict().items()}
    model.load_state_dict(best_state)
    return model

mfcc_model = train_mlp(X_tr_s, y_tr, X_va_s, y_va, epochs=12, lr=2e-3, bs=32)

# Evaluate
mfcc_model.eval()
with torch.no_grad():
    yhat = mfcc_model(torch.from_numpy(X_te_s).float().to(DEVICE)).argmax(1).cpu().numpy()

print("\n[MFCC] Test accuracy:", accuracy_score(y_te, yhat))
print(classification_report(y_te, yhat, target_names=pretty_names))

# Optional: confusion matrix
cm = confusion_matrix(y_te, yhat)
print("Confusion matrix:\n", cm)


[MFCC-MLP] Epoch 01  val_acc=0.375
[MFCC-MLP] Epoch 02  val_acc=0.625
[MFCC-MLP] Epoch 03  val_acc=0.750
[MFCC-MLP] Epoch 04  val_acc=0.875
[MFCC-MLP] Epoch 05  val_acc=0.875
[MFCC-MLP] Epoch 06  val_acc=0.875
[MFCC-MLP] Epoch 07  val_acc=0.875
[MFCC-MLP] Epoch 08  val_acc=1.000
[MFCC-MLP] Epoch 09  val_acc=1.000
[MFCC-MLP] Epoch 10  val_acc=1.000
[MFCC-MLP] Epoch 11  val_acc=1.000
[MFCC-MLP] Epoch 12  val_acc=1.000

[MFCC] Test accuracy: 0.5
                precision    recall  f1-score   support

andhra_pradesh       0.50      0.50      0.50         2
     karnataka       0.50      0.50      0.50         2
        kerala       0.67      1.00      0.80         2
         tamil       0.00      0.00      0.00         2

      accuracy                           0.50         8
     macro avg       0.42      0.50      0.45         8
  weighted avg       0.42      0.50      0.45         8

Confusion matrix:
 [[1 0 0 1]
 [1 1 0 0]
 [0 0 2 0]
 [0 1 1 0]]


In [None]:
from transformers import AutoFeatureExtractor, AutoModel
from sklearn.linear_model import LogisticRegression

HUBERT_MODEL = "facebook/hubert-base-ls960"
feature_extractor = AutoFeatureExtractor.from_pretrained(HUBERT_MODEL)
hubert = AutoModel.from_pretrained(HUBERT_MODEL, output_hidden_states=True).to(DEVICE)
hubert.eval()

@torch.no_grad()
def hubert_layer_means(ex):
    wav = ex["audio"]["array"]
    t = audio_to_tensor(wav)
    max_len = int(SAMPLE_RATE*MAX_DURATION_S)
    if t.numel() > max_len:
        t = t[:max_len]
    inputs = feature_extractor(t.numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
    inputs = {k:v.to(DEVICE) for k,v in inputs.items()}
    out = hubert(**inputs)
    # hidden_states: tuple of (layer tensors) each (B,T,D)
    hs = out.hidden_states
    vecs = []
    for lyr in hs:
        v = lyr.mean(dim=1).squeeze(0).detach().cpu().numpy()  # (D,)
        vecs.append(v)
    E = np.stack(vecs, axis=0)  # (L, D)
    y = label_to_id[ex["label_canon"]]
    return E, y

def build_hubert_cube(dataset_split, bs=2):
    X_list, y_list = [], []
    buf = []
    for ex in dataset_split:
        buf.append(ex)
        if len(buf) >= bs:
            for e in buf:
                E, y = hubert_layer_means(e)
                X_list.append(E); y_list.append(y)
            buf = []
    for e in buf:
        E, y = hubert_layer_means(e)
        X_list.append(E); y_list.append(y)
    return np.stack(X_list, axis=0), np.array(y_list)

# Build features (keep batch small to avoid OOM)
X_tr_h, y_tr = build_hubert_cube(small["train"], bs=2)
X_va_h, y_va = build_hubert_cube(small["validation"], bs=2)
X_te_h, y_te = build_hubert_cube(small["test"], bs=2)
num_layers, hid_dim = X_tr_h.shape[1], X_tr_h.shape[2]
print(f"HuBERT: layers={num_layers}, dim={hid_dim}")

# Train a linear probe per layer; pick best by validation accuracy
from sklearn.preprocessing import StandardScaler
layer_accs, probes = [], []
for L in range(num_layers):
    Xtr = X_tr_h[:, L, :]
    Xva = X_va_h[:, L, :]
    sc = StandardScaler().fit(Xtr)
    clf = LogisticRegression(max_iter=300, n_jobs=1)
    clf.fit(sc.transform(Xtr), y_tr)
    pred = clf.predict(sc.transform(Xva))
    acc = accuracy_score(y_va, pred)
    layer_accs.append(acc)
    probes.append((clf, sc))
    print(f"Layer {L:02d}  val_acc={acc:.3f}")

best_layer = int(np.argmax(layer_accs))
clf_best, sc_best = probes[best_layer]
print(f"\n[HuBERT] Best layer: {best_layer}  val_acc={layer_accs[best_layer]:.3f}")

# Test
yhat = clf_best.predict(sc_best.transform(X_te_h[:, best_layer, :]))
print("[HuBERT] Test accuracy:", accuracy_score(y_te, yhat))
print(classification_report(y_te, yhat, target_names=pretty_names))
cm = confusion_matrix(y_te, yhat)
print("Confusion matrix:\n", cm)


preprocessor_config.json:   0%|          | 0.00/213 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/378M [00:00<?, ?B/s]

Some weights of the model checkpoint at facebook/hubert-base-ls960 were not used when initializing HubertModel: ['encoder.pos_conv_embed.conv.weight_g', 'encoder.pos_conv_embed.conv.weight_v']
- This IS expected if you are initializing HubertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing HubertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of HubertModel were not initialized from the model checkpoint at facebook/hubert-base-ls960 and are newly initialized: ['encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for pre

HuBERT: layers=13, dim=768
Layer 00  val_acc=0.625
Layer 01  val_acc=0.625
Layer 02  val_acc=0.875
Layer 03  val_acc=0.875
Layer 04  val_acc=1.000
Layer 05  val_acc=0.875
Layer 06  val_acc=0.875
Layer 07  val_acc=0.750
Layer 08  val_acc=0.625
Layer 09  val_acc=0.625
Layer 10  val_acc=0.625
Layer 11  val_acc=0.750
Layer 12  val_acc=0.875

[HuBERT] Best layer: 4  val_acc=1.000
[HuBERT] Test accuracy: 0.75
                precision    recall  f1-score   support

andhra_pradesh       0.67      1.00      0.80         2
     karnataka       0.67      1.00      0.80         2
        kerala       1.00      0.50      0.67         2
         tamil       1.00      0.50      0.67         2

      accuracy                           0.75         8
     macro avg       0.83      0.75      0.73         8
  weighted avg       0.83      0.75      0.73         8

Confusion matrix:
 [[2 0 0 0]
 [0 2 0 0]
 [0 1 1 0]
 [1 0 0 1]]


In [None]:
ACCENT_TO_FOOD = {
    "andhra_pradesh": ["Gongura pachadi", "Pesarattu", "Kodi pulao"],
    "karnataka": ["Bisi bele bath", "Neer dosa", "Ragi mudde"],
    "kerala": ["Appam", "Puttu", "Avial"],
    "tamil": ["Kothu parotta", "Sambar", "Idiyappam"],
    # add more if needed
}

def recommend_from_example(ex):
    E, _ = hubert_layer_means(ex)
    x = sc_best.transform(E[best_layer, :].reshape(1,-1))
    pred = clf_best.predict(x)[0]
    lab = id_to_label[pred]
    nice = pretty_label(lab)
    foods = ACCENT_TO_FOOD.get(nice, ["Chef's special"])
    return nice, foods

ex = small["test"][6]
accent, foods = recommend_from_example(ex)
print(f"Predicted accent: {accent}")
print("Recommended dishes:", ", ".join(foods))


Predicted accent: karnataka
Recommended dishes: Bisi bele bath, Neer dosa, Ragi mudde


In [None]:
# Upload audio if not already uploaded
from google.colab import files
import torchaudio

uploaded = files.upload()  # select your .wav/.mp3
file_name = list(uploaded.keys())[0]
print("Loaded:", file_name)

# --- Show recommended foods for the predicted accent (same cell) ---
def load_and_resample(path, target_sr=16000):
    wav, sr = torchaudio.load(path)
    wav = wav.mean(dim=0)  # convert to mono
    if sr != target_sr:
        wav = torchaudio.functional.resample(wav, sr, target_sr)
    return wav

def predict_accent_from_wav(wav_tensor):
    dummy_example = {
        "audio": {"array": wav_tensor.cpu().numpy(), "sampling_rate": SAMPLE_RATE},
        "label_canon": label_list[0]
    }
    E, _ = hubert_layer_means(dummy_example)
    x = sc_best.transform(E[best_layer, :].reshape(1,-1))
    pred_id = clf_best.predict(x)[0]
    predicted_label = id_to_label[pred_id]
    return pretty_label(predicted_label)

wav = load_and_resample(file_name, SAMPLE_RATE)
accent = predict_accent_from_wav(wav)
print("Predicted Accent:", accent)

# ---------- FOOD SUGGESTIONS ----------
_alias = {
    "kannada": "karnataka",
    "gujarat": "gujrat",
    "andhra": "andhra_pradesh",
    "ap": "andhra_pradesh",
    "tn": "tamil",
    "kl": "kerala",
}

ACCENT_TO_FOOD = {
    "andhra_pradesh": ["Gongura Pachadi", "Pesarattu", "Kodi Pulao"],
    "karnataka":      ["Bisi Bele Bath", "Neer Dosa", "Ragi Mudde"],
    "kerala":         ["Appam", "Puttu", "Avial"],
    "tamil":          ["Kothu Parotta", "Sambar", "Idiyappam"],
    "jharkhand":      ["Thekua", "Rugra", "Handia"],
    "gujrat":         ["Dhokla", "Undhiyu", "Thepla"],
}

def normalize_key(name: str) -> str:
    k = name.strip().lower()
    return _alias.get(k, k)

foods = ACCENT_TO_FOOD.get(normalize_key(accent), ["Chef's Special ü§å"])

print(f"\nüçΩÔ∏è Recommended dishes for **{accent}**:")
for i, dish in enumerate(foods, 1):
    print(f"{i}. {dish}")


Saving untitled.wav to untitled.wav
Loaded: untitled.wav
Predicted Accent: kerala

üçΩÔ∏è Recommended dishes for **kerala**:
1. Appam
2. Puttu
3. Avial


In [None]:
import json, joblib

# expects these to exist from your notebook:
# best_layer, clf_best, sc_best, label_list, orig_names, SAMPLE_RATE, MAX_DURATION_S

joblib.dump(clf_best, "clf_best.joblib")
joblib.dump(sc_best,  "scaler_best.joblib")
with open("meta.json","w") as f:
    json.dump({
        "best_layer": int(best_layer),
        "label_list": label_list,            # e.g. ['0','3','4','5']
        "orig_names": orig_names,            # e.g. ['andhra_pradesh','gujrat','jharkhand','karnataka','kerala','tamil']
        "SAMPLE_RATE": int(SAMPLE_RATE),
        "MAX_DURATION_S": float(MAX_DURATION_S)
    }, f)

print("Saved: clf_best.joblib, scaler_best.joblib, meta.json")


Saved: clf_best.joblib, scaler_best.joblib, meta.json


In [None]:
%%writefile app.py
import streamlit as st
import torch, torchaudio, joblib, json
from transformers import AutoFeatureExtractor, AutoModel

st.set_page_config(page_title="Accent ‚Üí Food", page_icon="üé§")

with open("meta.json","r") as f:
    meta = json.load(f)
best_layer     = int(meta["best_layer"])
label_list     = meta["label_list"]
orig_names     = meta["orig_names"]
SAMPLE_RATE    = int(meta["SAMPLE_RATE"])
MAX_DURATION_S = float(meta["MAX_DURATION_S"])

clf_best = joblib.load("clf_best.joblib")
sc_best  = joblib.load("scaler_best.joblib")

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
feature_extractor = AutoFeatureExtractor.from_pretrained("facebook/hubert-base-ls960")
hubert = AutoModel.from_pretrained("facebook/hubert-base-ls960", output_hidden_states=True).to(DEVICE)
hubert.eval()

def pretty_label(s: str) -> str:
    if orig_names and s.isdigit():
        i = int(s)
        if 0 <= i < len(orig_names): return orig_names[i]
    return s

_alias = {"kannada":"karnataka","gujarat":"gujrat","andhra":"andhra_pradesh","ap":"andhra_pradesh","tn":"tamil","kl":"kerala"}
ACCENT_TO_FOOD = {
    "andhra_pradesh": ["Gongura Pachadi", "Pesarattu", "Kodi Pulao"],
    "karnataka":      ["Bisi Bele Bath", "Neer Dosa", "Ragi Mudde"],
    "kerala":         ["Appam", "Puttu", "Avial"],
    "tamil":          ["Kothu Parotta", "Sambar", "Idiyappam"],
    "jharkhand":      ["Thekua", "Rugra", "Handia"],
    "gujrat":         ["Dhokla", "Undhiyu", "Thepla"],
}
def normalize_key(name: str) -> str:
    k = name.strip().lower()
    return _alias.get(k, k)

@torch.no_grad()
def predict(path):
    wav, sr = torchaudio.load(path)
    wav = wav.mean(dim=0)
    if sr != SAMPLE_RATE:
        wav = torchaudio.functional.resample(wav, sr, SAMPLE_RATE)
    max_len = int(SAMPLE_RATE * MAX_DURATION_S)
    if wav.numel() > max_len: wav = wav[:max_len]

    inputs = feature_extractor(wav.numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
    inputs = {k:v.to(DEVICE) for k,v in inputs.items()}
    out = hubert(**inputs)
    vec = out.hidden_states[best_layer].mean(dim=1).squeeze(0).cpu().numpy().reshape(1,-1)
    pred = clf_best.predict(sc_best.transform(vec))[0]
    lab = label_list[pred]
    return pretty_label(lab)

st.title("üé§ Indian Accent ‚Üí üçΩÔ∏è Food Recommender")
up = st.file_uploader("Upload a short English clip (.wav/.mp3)", type=["wav","mp3","m4a","ogg"])
if up:
    with open("user_audio.tmp","wb") as f:
        f.write(up.read())
    st.audio("user_audio.tmp")
    accent = predict("user_audio.tmp")
    st.subheader(f" Predicted Accent: **{accent}**")
    foods = ACCENT_TO_FOOD.get(normalize_key(accent), ["Chef's Special ü§å"])
    st.subheader(" Recommended Dishes")
    for d in foods: st.write(f"- {d}")


Overwriting app.py


In [None]:
!pip -q install streamlit pyngrok
from pyngrok import ngrok

# get your free token: https://dashboard.ngrok.com/get-started/your-authtoken
ngrok.set_auth_token("353Uzo3PvxhBSbj3Cmm2MVd3Xpq_4LWeMa4v32pDV6zJZ1Rdq")

public_url = ngrok.connect(8501)
print("üåê Public URL:", public_url)

!streamlit run app.py --server.port 8501 &>/dev/null &


üåê Public URL: NgrokTunnel: "https://unpessimistically-unambulant-artie.ngrok-free.dev" -> "http://localhost:8501"


In [None]:
import json, joblib

joblib.dump(clf_best, "clf_best.joblib")
joblib.dump(sc_best, "scaler_best.joblib")

with open("meta.json","w") as f:
    json.dump({
        "best_layer": int(best_layer),
        "label_list": label_list,
        "orig_names": orig_names,
        "SAMPLE_RATE": int(SAMPLE_RATE),
        "MAX_DURATION_S": float(MAX_DURATION_S),
    }, f)

print("‚úÖ Saved: clf_best.joblib, scaler_best.joblib, meta.json")


‚úÖ Saved: clf_best.joblib, scaler_best.joblib, meta.json


In [None]:
from google.colab import files
files.download("clf_best.joblib")
files.download("scaler_best.joblib")
files.download("meta.json")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>