In [None]:
# prompt: I need to mount drive

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install requests pandas openpyxl



In [None]:
import pandas as pd
import requests
import subprocess
import os
import time
from uuid import uuid4

# === CONFIGURATION ===
BASE_URL = "https://asr.vaani-artpark.in"
API_KEY = "GCTYVSHR5609345XCSMV"
LANGUAGE = "hi"
INPUT_CSV = "/content/drive/MyDrive/Tech Team Task/IISC API evaluation/New IISC Transcription from Vaani /hindi_processed - hindi_processed (2).csv"
OUTPUT_DIR = "/content/drive/MyDrive/Tech Team Task/IISC API evaluation/New IISC Transcription from Vaani /Transcription"
BATCH_SIZE = 50
MAX_ROWS = 10000
SKIPPED_ROWS_PATH = os.path.join(OUTPUT_DIR, "skipped_rows.csv")

DOWNLOAD_DIR = "downloaded_ogg"
CONVERTED_DIR = "converted_wav"
headers = {"x-api-key": API_KEY}

# === PREPARE FOLDERS ===
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
os.makedirs(CONVERTED_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# === UTILITY FUNCTIONS ===

def download_file(url, output_path):
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(1024):
                f.write(chunk)
        print(f"✅ Downloaded: {url}")
    else:
        raise Exception(f"Failed to download {url} (status {response.status_code})")

def convert_ogg_to_wav(input_path, output_path):
    command = ['ffmpeg', '-y', '-i', input_path, '-ac', '1', '-ar', '16000', output_path]
    subprocess.run(command, check=True)
    print(f"🎧 Converted to WAV: {output_path}")

def enable_asr_service(wait_seconds=420):
    print("⚙️ Enabling ASR service...")
    response = requests.post(f"{BASE_URL}/enable-asr", headers=headers)
    if response.status_code == 200:
        print(f"🟢 ASR service is starting. Waiting {wait_seconds // 60} minutes...")
        for i in range(wait_seconds, 0, -30):
            print(f"⏳ Waiting... {i} seconds left")
            time.sleep(30)
    else:
        print(f"⚠️ Failed to enable ASR: {response.status_code} — {response.text}")
        print("⏭️ Skipping ASR enabling and continuing anyway (assuming it's already active).")

def transcribe_audio(audio_file_path, language="hi"):
    with open(audio_file_path, "rb") as audio_file:
        files = {
            "file": (os.path.basename(audio_file_path), audio_file, "audio/wav"),
            "language": (None, language)
        }
        response = requests.post(f"{BASE_URL}/transcribe", headers=headers, files=files)
        if response.status_code == 200:
            return response.json().get("transcription", "")
        else:
            raise Exception(f"Transcription error: {response.status_code} {response.text}")

# === MAIN PROCESSING FUNCTION ===

def process_batches(input_csv, output_dir, batch_size=50, max_rows=10000):
    df = pd.read_csv(input_csv).head(max_rows)
    skipped_rows = []  # will collect skipped rows with reason

    enable_asr_service()

    for start in range(0, len(df), batch_size):
        end = min(start + batch_size, len(df))
        batch_df = df.iloc[start:end].copy()
        successful_rows = []

        print(f"\n🚀 Processing batch {start} to {end}")

        for idx, row in batch_df.iterrows():
            url = row.get("Filename")
            print(f"\n🔹 Row {idx + 1}: {url}")

            if pd.isna(url) or not isinstance(url, str) or url.strip() == "":
                print("⚠️ Skipped: Missing or invalid URL")
                row["Skip_Reason"] = "Missing or invalid URL"
                skipped_rows.append(row)
                continue

            try:
                file_id = str(uuid4())
                ogg_path = os.path.join(DOWNLOAD_DIR, f"{file_id}.ogg")
                wav_path = os.path.join(CONVERTED_DIR, f"{file_id}.wav")

                download_file(url, ogg_path)
                convert_ogg_to_wav(ogg_path, wav_path)

                transcription = transcribe_audio(wav_path, LANGUAGE)
                print(f"✅ Transcription: {transcription}")

                row["Transcription"] = transcription
                successful_rows.append(row)

            except Exception as e:
                print(f"❌ Skipping Row {idx + 1} due to error: {e}")
                row["Skip_Reason"] = str(e)
                skipped_rows.append(row)

        # Save successful rows of this batch
        if successful_rows:
            output_batch = pd.DataFrame(successful_rows)
            batch_filename = os.path.join(output_dir, f"Transcription_batch_{start}.csv")
            output_batch.to_csv(batch_filename, index=False)
            print(f"📄 Saved {len(successful_rows)} rows to {batch_filename}")
        else:
            print("⚠️ No successful rows in this batch.")

    # Save skipped rows at the end
    if skipped_rows:
        skipped_df = pd.DataFrame(skipped_rows)
        skipped_df.to_csv(SKIPPED_ROWS_PATH, index=False)
        print(f"\n📁 Skipped rows saved to: {SKIPPED_ROWS_PATH}")
    else:
        print("\n✅ No rows were skipped!")

# === RUN SCRIPT ===

if __name__ == "__main__":
    process_batches(INPUT_CSV, OUTPUT_DIR, BATCH_SIZE, MAX_ROWS)


###Tokenizatio

In [None]:
import pandas as pd
sheet_path = "/content/sarvam_hindi_transcription - sarvam_hindi_transcription (1).csv"
df = pd.read_csv(sheet_path)  # Limit to first 50 rows
df.head()

In [None]:
hindi_corpus_reference = []
hindi_corpus_hypothesis = []
for _, row in df.iterrows():
    hindi_corpus_reference.append(row['reference'])
    hindi_corpus_hypothesis.append(row['hypothesis'])

In [None]:
hindi_corpus_reference.extend(hindi_corpus_hypothesis)
len(hindi_corpus_reference)

In [None]:
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders, normalizers
from tokenizers.normalizers import NFKC, Lowercase, Sequence, Replace
from tokenizers.pre_tokenizers import Whitespace
import os

# Save sentences to a temporary file
os.makedirs("hindi_data", exist_ok=True)
with open("hindi_data/corpus.txt", "w", encoding="utf-8") as f:
    for line in hindi_corpus_reference:
        f.write(str(line) + "\n")

# Initialize a tokenizer (WordLevel or BPE can be used)
tokenizer = Tokenizer(models.WordLevel(unk_token="[UNK]"))


# Add Replace normalizers to remove Hindi and English sentence enders
remove_sentence_enders = Sequence([
    Replace(pattern="।", content=""),
    Replace(pattern="?", content=""),
    Replace(pattern="#", content=""),
    Replace(pattern=":", content=""),
    Replace(pattern="\\", content=""),
    Replace(pattern="‘", content=""),
    # Replace(pattern="'", content=""),
    # Replace(pattern=".", content=""),
    Replace(pattern=",", content=""),
    Replace(pattern="-", content=""),
    Replace(pattern="!", content=""),
    Replace(pattern="(", content=""),
    Replace(pattern=")", content=""),
    Replace(pattern="'", content=""),
    Replace(pattern="):", content=""),
    Replace(pattern="='", content=""),
    Replace(pattern="='", content=""),
    Replace(pattern=".", content=""),
    Replace(pattern="%", content=""),
    Replace(pattern="/", content=""),
    Replace(pattern=";", content=""),
    Replace(pattern=". ", content=""),
    Replace(pattern="《", content=""),
    Replace(pattern="》", content=""),
    Replace(pattern="・", content=""),
    Replace(pattern="。", content=""),


    # Replace(pattern=",?", content=""),
    # Replace(pattern=".,", content=""),
    # Replace(pattern="..", content=""),
    # Replace(pattern="....", content=""),
    # Replace(pattern="[!?]", content=""),
    # Replace(pattern="।", content=""),       # Hindi danda
    # Replace(pattern="[!?]", content=""),# English sentence enders
    # Replace(pattern="[!]", content=""),
    # Replace(pattern="[%]", content=""),
    # Replace(pattern="[*]", content=""),
    # Replace(pattern="[,]", content=""),
    # Replace(pattern="[-]", content=""),
    # Replace(pattern="[.]", content=""),
    # Replace(pattern="[:]", content=""),
    Replace(pattern="<", content=""),
    Replace(pattern=">", content=""),
    Replace(pattern="�", content=""),
    Replace(pattern="[", content=""),
    Replace(pattern="]", content=""),
    # Replace(pattern="[?]", content=""),
    # Replace(pattern="[nan]", content="")
    NFKC(),
    Lowercase()
])

# Setup normalizer, pre-tokenizer, and decoder
tokenizer.normalizer = remove_sentence_enders
tokenizer.pre_tokenizer = Whitespace()
tokenizer.decoder = decoders.WordPiece(prefix="##")

# Trainer config
trainer = trainers.WordLevelTrainer(
    special_tokens=["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"]
)

# Train tokenizer
tokenizer.train(["hindi_data/corpus.txt"], trainer)

# Save tokenizer to JSON
tokenizer.save("hindi_data_tokenizer.json")
print("✅ Saved tokenizer as 'hindi_data_tokenizer.json'")

In [None]:
from transformers import PreTrainedTokenizerFast
tokenizer = PreTrainedTokenizerFast(tokenizer_file="/content/hindi_data_tokenizer.json")

In [None]:
import unicodedata
text = "गेहूं का जो बीज बोया गया है वह पता पीला हो रहा है, उसका कोई दवा?"
normalized_text = unicodedata.normalize("NFKC", str(text))
print(tokenizer.tokenize(normalized_text))

#WER

In [None]:
import pandas as pd
import numpy as np
import unicodedata
from transformers import PreTrainedTokenizerFast

# Load the sheet from Google Drive
# sheet_path = "/content/drive/MyDrive/Tech Team Task/IISC API evaluation/IISC_WER/Vaani ASR transcription.xlsx - Sheet1.csv"
# df = pd.read_csv(sheet_path)  # Limit to first 50 rows

# Load the tokenizer
tokenizer = PreTrainedTokenizerFast(tokenizer_file="/content/hindi_data_tokenizer.json")

# Tokenization function
def tokenize(text, tokenizer):
    normalized_text = unicodedata.normalize("NFKC", str(text))
    return tokenizer.tokenize(normalized_text)

# WER with error counts
def wer_errors(reference, hypothesis, tokenizer):
    r = tokenize(reference, tokenizer)
    h = tokenize(hypothesis, tokenizer)

    d = np.zeros((len(r) + 1, len(h) + 1), dtype=np.uint8)
    for i in range(len(r) + 1):
        d[i][0] = i
    for j in range(len(h) + 1):
        d[0][j] = j

    for i in range(1, len(r) + 1):
        for j in range(1, len(h) + 1):
            cost = 0 if r[i - 1] == h[j - 1] else 1
            d[i][j] = min(
                d[i - 1][j] + 1,      # deletion
                d[i][j - 1] + 1,      # insertion
                d[i - 1][j - 1] + cost  # substitution
            )

    total_errors = d[len(r)][len(h)]
    return np.int64(total_errors), np.int64(len(r))

# Initialize error counters
total_errors = np.int64(0)
total_reference_tokens = np.int64(0)

# Loop over all rows
i = 200
for _, row in df.iterrows():
  if i > 0:
    i -= 1
    ref = row['reference']
    hyp = row['hypothesis']
    print(f"hyp-----{hyp}")
    print(f"ref-----{ref}")
    errors, ref_len = wer_errors(ref, hyp, tokenizer)
    total_errors += errors
    total_reference_tokens += ref_len


# Calculate final WER
total_wer = (total_errors / total_reference_tokens) * 100 if total_reference_tokens > 0 else 0.0

# Print final WER
print(f"Total WER across all rows: {total_wer:.2f}%")


#CER

In [None]:
import pandas as pd
import numpy as np
import unicodedata
from transformers import PreTrainedTokenizerFast

# # Load the sheet from Google Drive
# sheet_path = "/content/Copy of Kikuyu_ASR_Report - Kikuyu_Transcription.csv"
# df = pd.read_csv(sheet_path)  # Limit to first 50 rows

# Load the tokenizer (adjust the path to your tokenizer)
tokenizer = PreTrainedTokenizerFast(tokenizer_file="/content/hindi_data_tokenizer.json")

# Normalize text and tokenize character-wise
def tokenize_odiya_char(text):
    normalized_text = unicodedata.normalize("NFKC", str(text))
    return list(normalized_text)

# Levenshtein distance between two sequences
def levenshtein_distance(r, h):
    d = np.zeros((len(r) + 1) * (len(h) + 1), dtype=np.int64)
    d = d.reshape((len(r) + 1, len(h) + 1))

    for i in range(len(r) + 1):
        d[i][0] = i
    for j in range(len(h) + 1):
        d[0][j] = j

    for i in range(1, len(r) + 1):
        for j in range(1, len(h) + 1):
            cost = 0 if r[i - 1] == h[j - 1] else 1
            d[i][j] = min(
                d[i - 1][j] + 1,      # Deletion
                d[i][j - 1] + 1,      # Insertion
                d[i - 1][j - 1] + cost  # Substitution
            )
    return d[len(r)][len(h)]

# Calculate total CER over all rows
total_distance = 0
total_chars = 0

for _, row in df.iterrows():
    ref = tokenize_odiya_char(row['reference'])
    hyp = tokenize_odiya_char(row['hypothesis'])

    total_distance += levenshtein_distance(ref, hyp)
    total_chars += len(ref)

total_cer = (total_distance / total_chars) * 100 if total_chars > 0 else 0.0

# Print result
print(f"Total Character Error Rate (CER): {total_cer:.2f}%")


#MER

In [None]:
import pandas as pd
import numpy as np
import unicodedata
from transformers import PreTrainedTokenizerFast

# # Load the sheet from Google Drive
# sheet_path = "/content/drive/MyDrive/Tech Team Task/IISC API evaluation/IISC_WER/Vaani ASR transcription.xlsx - Sheet1.csv"
# df = pd.read_csv(sheet_path)  # Limit to first 50 rows

# Load the tokenizer
tokenizer = PreTrainedTokenizerFast(tokenizer_file="/content/hindi_data_tokenizer.json")

# Tokenization function
def tokenize_odiya(text, tokenizer):
    normalized_text = unicodedata.normalize("NFKC", str(text))
    return tokenizer.tokenize(normalized_text)

# Function to calculate MER errors
def mer_errors(reference, hypothesis, tokenizer):
    r = tokenize_odiya(reference, tokenizer)
    h = tokenize_odiya(hypothesis, tokenizer)

    # Change dtype to a larger integer type like 'int64' or 'int32' to avoid overflow
    d = np.zeros((len(r) + 1, len(h) + 1), dtype=np.int64)
    for i in range(len(r) + 1):
        d[i][0] = i
    for j in range(len(h) + 1):
        d[0][j] = j

    for i in range(1, len(r) + 1):
        for j in range(1, len(h) + 1):
            cost = 0 if r[i - 1] == h[j - 1] else 1
            d[i][j] = min(
                d[i - 1][j] + 1,      # deletion
                d[i][j - 1] + 1,      # insertion
                d[i - 1][j - 1] + cost  # substitution
            )

    total_errors = d[len(r)][len(h)]
    # Ensure correct_matches is also a larger integer type
    correct_matches = np.int64(len(r) + len(h) - total_errors)
    return total_errors, correct_matches, len(r)

# Initialize error counters
total_errors = 0
# total_correct_matches = 0
total_correct_matches = np.int64(0)
total_reference_tokens = 0

# Loop over all rows
for _, row in df.iterrows():
    ref = row['reference']
    hyp = row['hypothesis']
    errors, correct_matches, ref_len = mer_errors(ref, hyp, tokenizer)
    total_errors += errors
    total_correct_matches += correct_matches
    total_reference_tokens += ref_len

# Calculate final MER
total_mer = (total_errors / (total_errors + total_correct_matches)) * 100 if (total_errors + total_correct_matches) > 0 else 0.0

# Print final MER
print(f"Total MER across all rows: {total_mer:.2f}%")