## ✅ Summary of the Approach

1. **Input**: Bank call center **audio recordings**
2. **Step 1 – Transcription**: Use **Whisper Large** to transcribe calls → Get **text data**
3. **Step 2 – Labeling**: Use ***Clustering*** and ***ChatGPT or another LLM*** to infer **topics/labels** from transcriptions
4. **Step 3 – Dataset Creation**: Use transcribed and labeled examples as your **training data**
5. **Step 4 – Vectorization**: Use **TF-IDF** or **BERT to vectorize via embeddings** the transcriptions
6. **Step 5 – Classification**: Train a **classical ML model** (Logistic Regression, SVM, etc.) to classify future calls by topic
7. **Step 6 – Inference**: For new calls → vectorize → classify

## Transcribing

In [None]:
import os
import csv
from pathlib import Path
import torch
from stable_whisper import load_model

# === SETTINGS ===
root_dir = r"C:\Pasha-PoC\Audio-Data"
output_csv = r"C:\Pasha-PoC\Topic-Modeling\transcriptions.csv"  # ✅ Save here
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🚀 Using device: {device}")

# === LOAD MODEL ===
model = load_model("large-v3", device=device)

# === HELPER FUNCTION ===
def transcribe_audio_files(directory):
    results = []

    # Walk through all subdirectories
    for root, dirs, files in os.walk(directory):
        for filename in files:
            if filename.lower().endswith((".wav", ".mp3", ".m4a")):
                filepath = os.path.join(root, filename)
                print(f"🎧 Transcribing: {filepath}")

                try:
                    result = model.transcribe(filepath, language="az")
                    text = result.text.strip()  # ✅ Access object attribute, not dict

                    if text:  # Skip empty transcriptions
                        results.append([text, 1])  # Dummy label '1'
                except Exception as e:
                    print(f"⚠️ Failed to transcribe {filename}: {e}")
    return results

# === PROCESS ===
transcriptions = transcribe_audio_files(root_dir)

# === SAVE TO CSV ===
if transcriptions:
    os.makedirs(os.path.dirname(output_csv), exist_ok=True)  # ✅ Ensure output dir exists

    with open(output_csv, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(["Transcription", "Label"])  # Header
        writer.writerows(transcriptions)

    print(f"✅ Transcriptions saved to {output_csv}")
else:
    print("❌ No transcriptions were generated.")

## Preprocessing

In [None]:
import pandas as pd

df = pd.read_csv(r"C:\Pasha-PoC\Topic-Modeling\transcriptions.csv")
display(df)
print()
display(df['Transcription'].iloc[0])
print()
display(df.index)
print()
df.info()

In [None]:
print(df.duplicated().sum())

In [None]:
df['Transcription'].loc[df['Transcription'] == '']

In [None]:
# Check for very short or likely meaningless transcriptions (e.g., less than 5 words or characters)
short_transcriptions = df[df["Transcription"].str.split().str.len() < 24]

# Also check for transcriptions with only repeating characters or gibberish
suspicious_transcriptions = df[df["Transcription"].str.fullmatch(r'[\w\s\.\,\-\']{0,10}')]

# Combine the two filters
potentially_meaningless = pd.concat([short_transcriptions, suspicious_transcriptions]).drop_duplicates()

potentially_meaningless

In [None]:
display(df['Transcription'].iloc[41])

In [None]:
# Remove the potentially meaningless transcriptions
cleaned_df = df.drop(index=potentially_meaningless.index).reset_index(drop=True)
cleaned_df

In [None]:
cleaned_file_path = "C:/Pasha-PoC/Topic-Modeling/transcriptions.csv"
cleaned_df.to_csv(cleaned_file_path, index=False)

## Labeling

In [None]:
import pandas as pd

df = pd.read_csv(r"C:\Pasha-PoC\Topic-Modeling\Labeled_Transcriptions.csv")
display(df)
print()
display(df['Transcription'].iloc[0])
print()
display(df.index)
print()
df.info()

In [None]:
df['Label'].unique()

In [None]:
from collections import Counter
print(Counter(df['Label']))

In [None]:
df['Label'] = df['Label'].replace({
    'General Inquiry': 'Other',
})

In [None]:
from collections import Counter
print(Counter(df['Label']))

In [None]:
n=9
display(df['Transcription'].iloc[n])
print()
display(df['Label'].iloc[n])

In [None]:
# Update label for the transcription at index N
N = 1
df.at[N, 'Label'] = "Account Statement"
display(df.iloc[N])

In [None]:
cleaned_file_path = "C:/Pasha-PoC/Topic-Modeling/Labeled_Transcriptions.csv"
df.to_csv(cleaned_file_path, index=False)

## Feature Extraction

In [None]:
import pandas as pd

df = pd.read_csv(r"C:\Pasha-PoC\Topic-Modeling\Labeled_Transcriptions.csv")
display(df)
print()
display(df.index)

In [None]:
import torch
import numpy as np
from transformers import BertTokenizer, BertModel

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"🚀 Using device: {device}")

# Load model and tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-uncased')
model = BertModel.from_pretrained('bert-base-multilingual-uncased').to(device)
model.eval()

embeddings = []

for text in df['Transcription']:
    # Tokenize and move to device
    encoded = tokenizer(text, return_tensors='pt', truncation=True, padding=True).to(device)

    with torch.no_grad():
        output = model(**encoded)
        cls_embedding = output.pooler_output.detach().cpu().numpy().squeeze()  # move back to CPU for numpy
        embeddings.append(cls_embedding)

X = np.vstack(embeddings)  # Feature matrix
y = df['Label'].values      # Labels

In [None]:
import pandas as pd
import numpy as np
import os

# Combine embeddings and labels into a DataFrame
df_embeddings = pd.DataFrame(X)
df_embeddings["Label"] = y  # Append label column

# Define output path
output_path = r"C:/Pasha-PoC/Topic-Modeling/train_data.csv"

# Ensure directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Save to CSV
df_embeddings.to_csv(output_path, index=False)

print(f"✅ Embeddings saved to: {output_path}")

In [None]:
import pandas as pd

df = pd.read_csv(r"C:/Pasha-PoC/Topic-Modeling/train_data.csv")
display(df)
print()

## Model Training

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder

# Load your dataset
df = pd.read_csv(r"C:/Pasha-PoC/Topic-Modeling/train_data.csv")

# Features and encoded labels
X = df.drop(columns=["Label"]).values

le = LabelEncoder()
y = le.fit_transform(df["Label"])  # ✅ Correct way to encode labels
class_names = le.classes_          # Optional: useful for decoding

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=42)

# Define models
models = {
    "Logistic Regression": Pipeline([
        ("scaler", StandardScaler()),
        ("clf", LogisticRegression(max_iter=1_000, random_state=42))
    ]),
    "SVM (RBF)": Pipeline([
        ("scaler", StandardScaler()),
        ("clf", SVC(random_state=42))
    ]),
    "K-NN": Pipeline([
        ("scaler", StandardScaler()),
        ("clf", KNeighborsClassifier())
    ]),
    "Random Forest": RandomForestClassifier(random_state=42),
    "Gradient Boosting": GradientBoostingClassifier(random_state=42),
    "Naive Bayes": GaussianNB()
}

# Train and evaluate each model
for name, model in models.items():
    print(f"\n🔍 Evaluating: {name}")
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    acc = accuracy_score(y_test, y_pred)
    print(f"✅ Accuracy: {acc:.4f}")
    print(classification_report(y_test, y_pred, target_names=class_names))

In [None]:
import joblib

# Save model
joblib.dump(models["Logistic Regression"], "C:/Pasha-PoC/Topic-Modeling/lrm.pkl")

# Save LabelEncoder
joblib.dump(le, "C:/Pasha-PoC/Topic-Modeling/label_encoder.pkl")

print("✅ Model and LabelEncoder saved successfully!")

### FFNN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, accuracy_score
import numpy as np
import pandas as pd

# Load dataset
df = pd.read_csv("C:/Pasha-PoC/Topic-Modeling/train_data.csv")

# Features and labels
X = df.drop(columns=["Label"]).values

le = LabelEncoder()
y = le.fit_transform(df["Label"])

# Standardize features
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=42)

# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Neural network model
class FeedforwardNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(FeedforwardNN, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.model(x)

# Hyperparameters
input_dim = X.shape[1]
hidden_dim = 64
output_dim = len(np.unique(y))
epochs = 300
lr = 0.01
batch_size = 27

# Model, loss, optimizer
model = FeedforwardNN(input_dim, hidden_dim, output_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

best_val_acc = 0.0
best_model_state = None

# Training loop
for epoch in range(epochs):
    model.train()
    permutation = torch.randperm(X_train_tensor.size()[0])
    for i in range(0, X_train_tensor.size()[0], batch_size):
        indices = permutation[i:i+batch_size]
        batch_X, batch_y = X_train_tensor[indices], y_train_tensor[indices]

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

    # Evaluate on validation (test) set every epoch
    model.eval()
    with torch.no_grad():
        y_val_pred_probs = model(X_test_tensor)
        y_val_pred = torch.argmax(y_val_pred_probs, axis=1).numpy()
        val_acc = accuracy_score(y_test, y_val_pred)

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model_state = model.state_dict()

    if (epoch+1) % 10 == 0:
        print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}, Val Acc: {val_acc:.4f}")

model.load_state_dict(best_model_state)

# Evaluation
model.eval()
with torch.no_grad():
    y_pred_probs = model(X_test_tensor)
    y_pred = torch.argmax(y_pred_probs, axis=1).numpy()
    acc = accuracy_score(y_test, y_pred)
    print(f"\n✅ Test Accuracy: {acc:.4f}")
    print(classification_report(y_test, y_pred, target_names=np.unique(df["Label"])))

In [None]:
import joblib

torch.save(model.state_dict(), "C:/Pasha-PoC/Topic-Modeling/ffnn_model.pth")
joblib.dump(scaler, "C:/Pasha-PoC/Topic-Modeling/scaler.pkl")
joblib.dump(le, "C:/Pasha-PoC/Topic-Modeling/label_encoder.pkl")

## Inference

In [None]:
import pandas as pd

df = pd.read_csv(r"C:\Pasha-PoC\Topic-Modeling\Labeled_Transcriptions.csv")
display(df)
print()
display(df.index)

In [None]:
df['Transcription'].iloc[84]

### FFNN

In [None]:
import pandas as pd

df = pd.read_csv(r"C:\Pasha-PoC\Topic-Modeling\topics.csv")
df

In [None]:
import os
import uuid
import shutil
import torch
import joblib
import numpy as np
import torchaudio
import gradio as gr
import webbrowser
import threading
import time

from transformers import BertTokenizer, BertModel
from torch import nn
from denoiser import pretrained
from denoiser.dsp import convert_audio
from stable_whisper import load_model as load_sw_model
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# ========== Setup ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

DEBUG_DIR = "debug/"
RECORDER_DIR = "records"
PROCESSED_DIR = os.path.join(RECORDER_DIR, "processed")
os.makedirs(DEBUG_DIR, exist_ok=True)
os.makedirs(RECORDER_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)

# ========== Models ==========
# Load BERT
tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-uncased")
bert_model = BertModel.from_pretrained("bert-base-multilingual-uncased").to(device)
bert_model.eval()

# Load scaler and label encoder
scaler = joblib.load("C:/Pasha-PoC/Topic-Modeling/scaler.pkl")
label_encoder = joblib.load("C:/Pasha-PoC/Topic-Modeling/label_encoder.pkl")

# Load FFNN model
class FeedforwardNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(FeedforwardNN, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.model(x)

input_dim = 768
hidden_dim = 64
output_dim = len(label_encoder.classes_)
model = FeedforwardNN(input_dim, hidden_dim, output_dim)
model.load_state_dict(torch.load("C:/Pasha-PoC/Topic-Modeling/ffnn_model.pth", map_location=device))
model.eval()

# Load Whisper
sw_model = load_sw_model("large-v3", device=device)

# Load Denoiser
denoise_model = pretrained.dns64().to(device)

def denoise_audio(audio_path):
    wav, sr = torchaudio.load(audio_path)
    wav = convert_audio(wav, sr, denoise_model.sample_rate, denoise_model.chin)
    with torch.no_grad():
        enhanced = denoise_model(wav.to(device))
    enhanced = enhanced.squeeze(0).cpu()
    out_path = os.path.join(DEBUG_DIR, f"denoised_{uuid.uuid4().hex}.wav")
    torchaudio.save(out_path, enhanced, denoise_model.sample_rate)
    return out_path

# ========== Classification ==========
def classify_with_ffnn(text):
    text = text.lower().strip()
    with torch.no_grad():
        encoded = tokenizer(text, return_tensors='pt', truncation=True, padding=True).to(device)
        output = bert_model(**encoded)
        cls_embedding = output.pooler_output.squeeze().cpu().numpy()
        cls_scaled = scaler.transform([cls_embedding])
        cls_tensor = torch.tensor(cls_scaled, dtype=torch.float32)
        logits = model(cls_tensor)
        predicted_class = torch.argmax(logits, axis=1).item()
        label = label_encoder.inverse_transform([predicted_class])[0]
        return label

# ========== Full Pipeline ==========
def process_audio_and_classify(audio_path):
    raw_copy = os.path.join(DEBUG_DIR, f"original_{uuid.uuid4().hex}.wav")
    shutil.copy(audio_path, raw_copy)

    denoised_path = denoise_audio(audio_path)

    result = sw_model.transcribe(denoised_path, language="azerbaijani", word_timestamps=False)
    full_text = result.text.strip()

    label = classify_with_ffnn(full_text)

    html = f"""
    <h3>🔊 Denoised Audio</h3>
    <audio controls src='{denoised_path}' style='width:100%; margin-bottom:16px;'></audio>
    <h3>📄 Transcription</h3>
    <div style='white-space: pre-wrap; border:1px solid #ccc; padding:8px;'>{full_text}</div>
    <h3>🤖 Topic Prediction</h3>
    <div style='font-size: 1.2em; font-weight: bold;'>{label}</div>
    """
    return html

# ========== Watchdog ==========
class NewWavHandler(FileSystemEventHandler):
    def on_created(self, event):
        if event.is_directory or not event.src_path.endswith(".wav"):
            return
        if "processed" in os.path.normpath(event.src_path).split(os.sep):
            return
        
        print(f"[Watcher] Detected new file: {event.src_path}")
        try:
            result_html = process_audio_and_classify(event.src_path)
            result_filename = f"result_{uuid.uuid4().hex}.html"
            result_path = os.path.join(DEBUG_DIR, result_filename)
            with open(result_path, "w", encoding="utf-8") as f:
                f.write(result_html)
            webbrowser.open(f"file://{os.path.abspath(result_path)}")
            relative_path = os.path.relpath(event.src_path, RECORDER_DIR)
            processed_path = os.path.join(PROCESSED_DIR, relative_path)
            os.makedirs(os.path.dirname(processed_path), exist_ok=True)
            shutil.move(event.src_path, processed_path)
        except Exception as e:
            print(f"[Watcher] Error: {e}")

def start_file_watcher():
    observer = Observer()
    handler = NewWavHandler()
    observer.schedule(handler, path=RECORDER_DIR, recursive=True)
    observer.start()
    print("[Watcher] Started.")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

threading.Thread(target=start_file_watcher, daemon=True).start()

# ========== Gradio UI with Text and Audio ==========
with gr.Blocks() as demo:
    gr.Markdown("## 🧠 Bank Call Classifier")
    gr.Markdown("Classify topics either by uploading a call recording or by entering a call transcription.")

    with gr.Tab("🎙️ Audio Upload"):
        audio_input = gr.Audio(type="filepath", label="Upload Call Audio (WAV)")
        status_message = gr.HTML()
        result_output = gr.HTML()
        run_btn = gr.Button("Analyze Audio")

        def show_processing_msg(audio_path):
            return "<b>⏳ Processing, please wait...</b>", ""

        def analyze(audio_path):
            result = process_audio_and_classify(audio_path)
            return "<b>✅ Done!</b>", result

        run_btn.click(fn=show_processing_msg, inputs=audio_input, outputs=[status_message, result_output]) \
               .then(fn=analyze, inputs=audio_input, outputs=[status_message, result_output])

    with gr.Tab("📝 Text Input"):
        text_input = gr.Textbox(lines=4, placeholder="Paste or type the call transcription here...", label="Call Transcription")
        text_output = gr.Text(label="Predicted Topic")
        text_btn = gr.Button("Classify Text")

        text_btn.click(fn=classify_with_ffnn, inputs=text_input, outputs=text_output)

demo.launch()

### Logistic Regression

In [None]:
import torch
from transformers import BertTokenizer, BertModel
import joblib
import numpy as np
import gradio as gr

# === Load BERT tokenizer and model ===
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-uncased')
bert_model = BertModel.from_pretrained('bert-base-multilingual-uncased')
bert_model.eval()

# === Load trained classifier and LabelEncoder ===
classifier_model = joblib.load("C:/Pasha-PoC/Topic-Modeling/lrm.pkl")
label_encoder = joblib.load("C:/Pasha-PoC/Topic-Modeling/label_encoder.pkl")

# === Prediction function ===
def classify_call(text):
    text = text.lower().strip()
    with torch.no_grad():
        encoded = tokenizer(text, return_tensors='pt', truncation=True, padding=True)
        output = bert_model(**encoded)
        cls_embedding = output.pooler_output.detach().numpy().squeeze()

    pred_encoded = classifier_model.predict([cls_embedding])[0]
    pred_label = label_encoder.inverse_transform([pred_encoded])[0]
    return f"🔍 Topic: {pred_label}"

# === Gradio UI ===
demo = gr.Interface(
    fn=classify_call,
    inputs=gr.Textbox(lines=3, placeholder="Enter bank call transcription..."),
    outputs="text",
    title="📞 Bank Call Classifier",
    description="Enter a call transcript to predict its topic (e.g., Card Issues, Other)"
)

demo.launch()