<a href="https://colab.research.google.com/github/AdamPrzychodniPrivate/somali-radios-with-ai-for-food-security/blob/main/1_phase/Radio_Ergo_Somali_Speech_to_Text_for_Food_Security.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 📻 Radio Ergo Somali Speech-to-Text for Food Security

## 🎯 Project Overview

**Radio Ergo SoundCloud Audio Downloader and Transcriber**

This notebook automates the end-to-end process of downloading Somali-language audio from **Radio Ergo’s SoundCloud channel** and transcribing it into readable text. It also includes a comprehensive evaluation of three different **speech-to-text models** to determine the most reliable solution for Somali transcription in support of food security analysis.

📘 **Full Project Report:**  
  - 📄 [Leveraging Local Radio for Real-Time Food Security Insights: An AI-Powered Approach](https://docs.google.com/document/d/1CvSWsIIZN1jriA02DhczZwuu_-vq87tMO50ZeyycRzY/edit?usp=sharing)

---

## 1. 📥 Data Collection – Audio Download System

The notebook includes a fully automated system for acquiring audio data from Radio Ergo:

### ✅ Features:

- **🔗 URL Validation:**  
  Ensures SoundCloud links are properly formatted.

- **📆 Date-Based Extraction:**  
  Extracts track links from Radio Ergo’s profile within a specified date range.

- **📡 Automated Download:**  
  Downloads all matching audio files in **MP3** format.

- **💾 Storage Options:**  
  Saves audio locally or to **Google Drive** for persistent access and processing.

---

## 2. 🧠 Transcription Model Evaluation

The notebook tests **three speech-to-text models** to find the most accurate transcription system for Somali-language audio.

### 🧪 Model 1: OpenAI Whisper (Standard)

- **🔍 Description:** A general-purpose transcription model by OpenAI.
- **✅ Language Detection:** Recognizes Somali.
- **❌ Major Issue:** Outputs **Arabic script** instead of the **Somali Latin script**.
- **📉 Result:** Unusable transcription with repeated nonsensical Arabic words.

---

### 🧪 Model 2: Whisper Small Somali (`steja/whisper-small-somali`)

- **🔍 Description:** A specialized model from Hugging Face fine-tuned for Somali.
- **✅ Improvement:** Correctly uses **Somali Latin script**.
- **❌ Major Issue:** Exhibits **hallucinatory repetition**, e.g., *"iyo iyo iyo"*, *"dhul dhul dhul"*.
- **📉 Result:** Partially successful, but unreliable due to extreme repetition.

---

### 🧪 Model 3: Gemini 2.0 Flash (Google)

- **🔍 Description:** A high-performance transcription model from Google.
- **✅ Improvement:** Produces **accurate**, **coherent**, and **well-structured** text.
- **✅ Script:** Correct **Somali Latin script**.
- **✅ Repetition:** Minimal and natural.
- **📈 Result:** **Success.** Best-performing model for Somali transcription.

---

## 3. 📊 Transcription Analysis

The final section of the notebook performs a detailed evaluation of each model’s output using quantitative metrics.

### 🧮 Metrics Analyzed:

- **🔠 Word Frequencies:** Frequency of unique and repeated words.
- **🔁 Repetition Patterns:** Identification of hallucinated or natural repetitions.
- **📏 Line Length Statistics:** Structural consistency of transcription lines.

### ✅ Outcome:
- **Model 1 & 2:** Failed to deliver reliable transcriptions.
- **Model 3 (Gemini 2.0 Flash):** Achieved **high accuracy** and **production-quality output**.

---

## ✅ Final Recommendation

For any future transcription of Somali-language audio — especially in humanitarian or food security contexts — **Gemini 2.0 Flash** is currently the most effective and reliable solution. It significantly outperforms general and specialized Whisper models in both accuracy and formatting.



#### 📋 INSTRUCTION

## ⚡️ How to Speed Up Speech-to-Text Models:

1. 🖱️ Click **Resources** in the menu
2. 🔄 Select **Change runtime type**
3. 🖥️ Choose a **GPU-enabled runtime** 🚀

> ℹ️ **Note**: Following these steps will significantly improve processing speed for speech recognition tasks.

# 1. Data Collection - Audio Download System

Code below automates downloading audio from SoundCloud by validating URLs, extracting track links from a profile page within a specified date range, and downloading the audio files using `yt-dlp`.

It first scrapes a given SoundCloud profile for track links, filters them based on date patterns in the URLs, and downloads matching audio tracks in MP3 format.

Additionally, it includes an option to save downloaded files to Google Drive for storage.


In [1]:
# Install required packages
!pip install yt-dlp requests beautifulsoup4

import os
import re
from datetime import datetime, timedelta
import yt_dlp
import requests
from bs4 import BeautifulSoup
import time

def validate_soundcloud_url(url):
    """Validate if the provided URL is a SoundCloud URL."""
    pattern = r'^https?://(?:www\.)?soundcloud\.com/[\w-]+/[\w-]+'
    return bool(re.match(pattern, url))

def download_soundcloud_audio(url, output_dir="downloads"):
    """Download audio from SoundCloud URL."""
    if not validate_soundcloud_url(url):
        print(f"Invalid SoundCloud URL: {url}")
        return False

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Configure yt-dlp options
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'),
        'noplaylist': True,
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'quiet': False,
        'verbose': False
    }

    # Download the audio
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=True)
            filename = ydl.prepare_filename(info)
            base, _ = os.path.splitext(filename)
            mp3_file = f"{base}.mp3"
            print(f"Downloaded: {mp3_file}")
            return mp3_file
    except Exception as e:
        print(f"Error downloading {url}: {str(e)}")
        return False

def get_soundcloud_urls_by_date_range(profile_url, start_date, end_date):
    """
    Get SoundCloud URLs within a specific date range.

    Args:
        profile_url: The SoundCloud profile URL (e.g., "https://soundcloud.com/radio-ergo")
        start_date: Start date (datetime object)
        end_date: End date (datetime object)

    Returns:
        A list of SoundCloud URLs within the specified date range
    """
    if not profile_url.endswith('/'):
        profile_url = profile_url + '/'

    # Fetch the profile page
    try:
        response = requests.get(profile_url, headers={'User-Agent': 'Mozilla/5.0'})
        response.raise_for_status()
    except Exception as e:
        print(f"Error fetching profile page: {str(e)}")
        return []

    soup = BeautifulSoup(response.text, 'html.parser')

    # Find all track links
    track_urls = []
    for link in soup.find_all('a', href=True):
        href = link['href']
        if href.startswith('/') and profile_url.split('/')[3] in href and '/sets/' not in href:
            full_url = f"https://soundcloud.com{href}"
            if validate_soundcloud_url(full_url):
                track_urls.append(full_url)

    # Filter URLs by date
    urls_in_range = []

    # Month names mapping (both full and abbreviated)
    month_names = {
        # Full month names
        'january': 1, 'february': 2, 'march': 3, 'april': 4, 'may': 5, 'june': 6,
        'july': 7, 'august': 8, 'september': 9, 'october': 10, 'november': 11, 'december': 12,
        # Abbreviated month names
        'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'jun': 6, 'jul': 7,
        'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12
    }

    for url in track_urls:
        # Try to extract date from URL using regex for both formats
        # Format 1: DD-month-YYYY (e.g., 11-march-2025)
        # Format 2: DD-mon-YYYY (e.g., 09-mar-2025)
        date_match = re.search(r'(\d{1,2})-([a-z]+)-(\d{4})', url.split('/')[-1], re.IGNORECASE)

        if date_match:
            day, month_str, year = date_match.groups()

            # Convert month name to number
            month = month_names.get(month_str.lower())

            if month:
                try:
                    track_date = datetime(int(year), month, int(day))
                    if start_date <= track_date <= end_date:
                        urls_in_range.append(url)
                        print(f"Found matching URL: {url}")
                except ValueError:
                    # Skip invalid dates
                    continue
            else:
                print(f"Could not parse month: {month_str} in URL: {url}")
        else:
            # Print URLs that don't match the pattern for debugging
            print(f"URL does not match date pattern: {url}")

    return urls_in_range

def download_by_date_range(profile_url, start_date_str, end_date_str, output_dir=None, save_to_drive=True):
    """
    Download SoundCloud tracks within a date range.

    Args:
        profile_url: The SoundCloud profile URL
        start_date_str: Start date in format 'YYYY-MM-DD'
        end_date_str: End date in format 'YYYY-MM-DD'
        output_dir: Output directory (default: None, which uses date range as folder name)
        save_to_drive: Whether to save files to Google Drive (default: True)
    """
    # Parse dates
    start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
    end_date = datetime.strptime(end_date_str, '%Y-%m-%d')

    # Create output directory name based on date range if not provided
    if not output_dir:
        output_dir = f"soundcloud_{start_date_str}_to_{end_date_str}"

    print(f"Searching for tracks from {start_date_str} to {end_date_str}...")

    # Option 1: Try to find using the profile page
    urls = get_soundcloud_urls_by_date_range(profile_url, start_date, end_date)

    # Option 2: If specific URLs are known to exist but weren't found, add them manually
    manual_urls = []

    # Check if a specific URL was requested but not found
    check_url = "https://soundcloud.com/radio-ergo/idaacadda-09-mar-2025"
    if check_url not in urls:
        current_date = start_date
        while current_date <= end_date:
            day = current_date.day
            month = current_date.strftime('%b').lower()  # Get abbreviated month
            year = current_date.year

            # Try different date formats
            potential_urls = [
                f"https://soundcloud.com/radio-ergo/idaacadda-{day:02d}-{month}-{year}",
                f"https://soundcloud.com/radio-ergo/idaacadda-{day}-{month}-{year}"
            ]

            for url in potential_urls:
                try:
                    response = requests.head(url)
                    if response.status_code == 200:
                        manual_urls.append(url)
                        print(f"Manually found URL: {url}")
                except:
                    pass

            current_date += timedelta(days=1)

    # Combine both lists and remove duplicates
    all_urls = list(set(urls + manual_urls))

    if not all_urls:
        print("No tracks found in the specified date range.")
        return []

    print(f"Found {len(all_urls)} tracks in the specified date range.")
    print("URLs found:")
    for url in all_urls:
        print(f"- {url}")

    print(f"\nStarting download of {len(all_urls)} SoundCloud audios to: {output_dir}")
    downloaded_files = []
    for url in all_urls:
        result = download_soundcloud_audio(url, output_dir)
        if result:
            downloaded_files.append(result)
        # Add a small delay to avoid overloading the server
        time.sleep(1)

    print("\nSummary:")
    print(f"Total URLs processed: {len(all_urls)}")
    print(f"Successfully downloaded: {len(downloaded_files)}")
    if downloaded_files:
        print("\nDownloaded files:")
        for file in downloaded_files:
            print(f"- {file}")

    # Save to Google Drive
    if save_to_drive and downloaded_files:
        drive_path = "/content/drive/MyDrive/" + output_dir
        print(f"\nSaving files to Google Drive at: {drive_path}")

        # Create directory in Drive if it doesn't exist
        if not os.path.exists(drive_path):
            os.makedirs(drive_path)

        # Copy files to Drive
        os.system(f"cp -r {output_dir}/* {drive_path}/")
        print(f"Files successfully saved to Google Drive")

    return downloaded_files

# You can also directly specify URLs if the automatic search doesn't work
def download_specific_urls(urls, output_dir=None, save_to_drive=True):
    """
    Download specific SoundCloud URLs.

    Args:
        urls: List of SoundCloud URLs to download
        output_dir: Output directory (default: None, which uses today's date)
        save_to_drive: Whether to save files to Google Drive (default: True)
    """
    if not output_dir:
        output_dir = f"soundcloud_radio_ergo__downloads_{datetime.now().strftime('%Y-%m-%d')}"

    print(f"Starting download of {len(urls)} specific SoundCloud URLs to: {output_dir}")
    downloaded_files = []
    for url in urls:
        result = download_soundcloud_audio(url, output_dir)
        if result:
            downloaded_files.append(result)
        # Add a small delay to avoid overloading the server
        time.sleep(1)

    print("\nSummary:")
    print(f"Total URLs processed: {len(urls)}")
    print(f"Successfully downloaded: {len(downloaded_files)}")
    if downloaded_files:
        print("\nDownloaded files:")
        for file in downloaded_files:
            print(f"- {file}")

    # Save to Google Drive
    if save_to_drive and downloaded_files:
        try:
            from google.colab import drive
            drive.mount('/content/drive')
            print("Google Drive mounted successfully")
        except:
            print("Not running in Google Colab or Drive already mounted")
        drive_path = "/content/drive/MyDrive/" + output_dir
        print(f"\nSaving files to Google Drive at: {drive_path}")

        # Create directory in Drive if it doesn't exist
        if not os.path.exists(drive_path):
            os.makedirs(drive_path)

        # Copy files to Drive
        os.system(f"cp -r {output_dir}/* {drive_path}/")
        print(f"Files successfully saved to Google Drive")

    return downloaded_files

Collecting yt-dlp
  Downloading yt_dlp-2025.6.30-py3-none-any.whl.metadata (174 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/174.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m174.3/174.3 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
Downloading yt_dlp-2025.6.30-py3-none-any.whl (3.3 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━[0m [32m2.7/3.3 MB[0m [31m78.1 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m55.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: yt-dlp
Successfully installed yt-dlp-2025.6.30


#### 📋 INSTRUCTION
**Run the cell below to download Radio Ergo auditions from SoundCloud within a specific date range and optionally save them to Google Drive.**

In [2]:
# Example usage
profile_url = "https://soundcloud.com/radio-ergo"  # SoundCloud profile/channel URL
start_date = "2025-03-15"  # Format: YYYY-MM-DD
end_date = "2025-03-16"    # Format: YYYY-MM-DD
save_to_drive = True       # Set to False if you don't want to save to Google Drive

downloaded_files = download_by_date_range(profile_url, start_date, end_date, save_to_drive=save_to_drive)

Searching for tracks from 2025-03-15 to 2025-03-16...
URL does not match date pattern: https://soundcloud.com/radio-ergo/likes
URL does not match date pattern: https://soundcloud.com/radio-ergo/sets
URL does not match date pattern: https://soundcloud.com/radio-ergo/tracks
URL does not match date pattern: https://soundcloud.com/radio-ergo/comments
Manually found URL: https://soundcloud.com/radio-ergo/idaacadda-15-mar-2025
Manually found URL: https://soundcloud.com/radio-ergo/idaacadda-15-mar-2025
Manually found URL: https://soundcloud.com/radio-ergo/idaacadda-16-mar-2025
Manually found URL: https://soundcloud.com/radio-ergo/idaacadda-16-mar-2025
Found 2 tracks in the specified date range.
URLs found:
- https://soundcloud.com/radio-ergo/idaacadda-16-mar-2025
- https://soundcloud.com/radio-ergo/idaacadda-15-mar-2025

Starting download of 2 SoundCloud audios to: soundcloud_2025-03-15_to_2025-03-16
[soundcloud] Extracting URL: https://soundcloud.com/radio-ergo/idaacadda-16-mar-2025
[soundcl

# 2. Transcription with Whisper Models

## Regular OpenAI Whisper Model

In [3]:
def transcribe_audio_files(input_dir, output_dir=None, language="so", save_to_drive=False,
                        audio_formats=None):
    """
    Transcribe audio files using OpenAI's Whisper model.

    Args:
        input_dir: Directory containing audio files
        output_dir: Directory to save transcripts (default: input_dir + "_transcripts")
        language: Language code for transcription (default: "so" for Somali)
        save_to_drive: Whether to save transcripts to Google Drive (default: False)
        audio_formats: List of audio formats to process (default: ["wav", "mp3", "m4a", "flac", "ogg"])

    Returns:
        List of transcript file paths
    """
    # Mount Google Drive at the beginning if save_to_drive is True
    if save_to_drive:
        try:
            from google.colab import drive
            drive.mount('/content/drive')
            print("Google Drive mounted successfully")
        except ImportError:
            print("Not running in Google Colab or Drive module not available")
        except Exception as e:
            print(f"Error mounting Google Drive: {str(e)}")
            print("Continuing without Google Drive. Files will only be saved locally.")
            save_to_drive = False  # Disable save_to_drive if mounting fails

    # Install whisper if not already installed
    try:
        import whisper
    except ImportError:
        print("Installing OpenAI Whisper...")
        !pip install -q openai-whisper
        import whisper

    # Install ffmpeg if needed (required for handling different audio formats)
    try:
        import subprocess
        result = subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if result.returncode != 0:
            raise ImportError
        print("ffmpeg is already installed.")
    except (ImportError, FileNotFoundError):
        print("Installing ffmpeg (required for audio format conversion)...")
        !apt-get update -qq && apt-get install -qq ffmpeg
        print("ffmpeg installed successfully.")

    import os
    import glob

    # Define default audio formats if none provided
    if audio_formats is None:
        audio_formats = ["wav", "mp3", "m4a", "flac", "ogg"]

    # Ensure formats have the dot prefix for glob patterns
    formats_pattern = [f"*.{fmt}" for fmt in audio_formats]

    # Set output directory
    if not output_dir:
        output_dir = "whisper_transcripts_" + input_dir

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    print(f"Loading Whisper model...")
    # Load the Whisper model (options: tiny, base, small, medium, large)
    model = whisper.load_model("small")

    # Get all audio files with specified formats in the input directory
    audio_files = []
    for pattern in formats_pattern:
        audio_files.extend(glob.glob(os.path.join(input_dir, pattern)))

    if not audio_files:
        print(f"No audio files with formats {audio_formats} found in {input_dir}")
        return []

    print(f"Found {len(audio_files)} audio files to transcribe.")
    print(f"Formats to process: {', '.join(audio_formats)}")

    transcript_files = []

    for audio_file in audio_files:
        filename = os.path.basename(audio_file)
        base_name = os.path.splitext(filename)[0]
        file_format = os.path.splitext(filename)[1][1:].lower()  # Get format without dot
        transcript_file = os.path.join(output_dir, f"{base_name}.txt")

        print(f"\nTranscribing: {filename} (Format: {file_format})")

        try:
            # OpenAI's Whisper can handle various audio formats through ffmpeg
            # Transcribe audio with specified language
            result = model.transcribe(audio_file, language=language, fp16=False, temperature=0.2)

            # Save transcript
            with open(transcript_file, "w", encoding="utf-8") as f:
                f.write(result["text"])

            print(f"Transcript saved to: {transcript_file}")
            transcript_files.append(transcript_file)

        except Exception as e:
            print(f"Error transcribing {filename}: {str(e)}")

            # If there's an error, try converting to WAV and retry
            try:
                import subprocess
                print(f"Attempting to convert {file_format} to WAV format and retry...")

                # Create a temporary directory for conversion if it doesn't exist
                temp_dir = os.path.join(input_dir, "_temp_conversion")
                if not os.path.exists(temp_dir):
                    os.makedirs(temp_dir)

                temp_wav = os.path.join(temp_dir, f"{base_name}.wav")

                # Convert using ffmpeg
                cmd = [
                    'ffmpeg', '-y', '-i', audio_file,
                    '-ar', '16000', '-ac', '1', '-c:a', 'pcm_s16le',
                    temp_wav
                ]

                process = subprocess.run(
                    cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE
                )

                if process.returncode == 0:
                    print(f"Successfully converted to WAV: {temp_wav}")

                    # Try transcription again with the WAV file
                    result = model.transcribe(temp_wav, language=language, fp16=False, temperature=0.2)

                    # Save transcript
                    with open(transcript_file, "w", encoding="utf-8") as f:
                        f.write(result["text"])

                    print(f"Transcript saved to: {transcript_file} (after conversion)")
                    transcript_files.append(transcript_file)
                else:
                    print(f"Conversion failed: {process.stderr.decode()}")

            except Exception as retry_err:
                print(f"Retry after conversion failed: {str(retry_err)}")

    # Clean up temporary conversion directory if it exists
    temp_dir = os.path.join(input_dir, "_temp_conversion")
    if os.path.exists(temp_dir):
        import shutil
        try:
            shutil.rmtree(temp_dir)
            print(f"Removed temporary conversion directory: {temp_dir}")
        except Exception as e:
            print(f"Failed to remove temporary directory: {str(e)}")

    # Save to Google Drive if requested
    if save_to_drive and transcript_files:
        # Use the same directory name for Google Drive saving
        drive_dir_name = os.path.basename(output_dir)
        drive_path = "/content/drive/MyDrive/" + drive_dir_name
        print(f"\nSaving transcripts to Google Drive at: {drive_path}")

        # Create directory in Drive if it doesn't exist
        if not os.path.exists(drive_path):
            os.makedirs(drive_path)

        # Copy files to Drive
        os.system(f"cp -r {output_dir}/* {drive_path}/")
        print(f"Transcripts successfully saved to Google Drive")

    print(f"\nTranscription Summary:")
    print(f"Total files processed: {len(audio_files)}")
    print(f"Transcripts created: {len(transcript_files)}")

    return transcript_files

#### 📋 INSTRUCTION
**Run the cell below to use Whisper to transcribe all Somali audio files in the specified directory and optionally save the transcripts to Google Drive.**

In [4]:
# Example usage
input_directory = "soundcloud_2025-03-15_to_2025-03-16"

# Transcribe all audio files in the directory
transcripts = transcribe_audio_files(
    input_dir=input_directory,
    language="so",  # Somali language code
    save_to_drive=True  # Change to True to save to Google Drive
)

Error mounting Google Drive: Mountpoint must not already contain files
Continuing without Google Drive. Files will only be saved locally.
Installing OpenAI Whisper...
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m803.2/803.2 kB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m80.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m90.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m60.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m

100%|███████████████████████████████████████| 461M/461M [00:05<00:00, 84.1MiB/s]


Found 2 audio files to transcribe.
Formats to process: wav, mp3, m4a, flac, ogg

Transcribing: IDAACADDA 16-MAR-2025.mp3 (Format: mp3)
Transcript saved to: whisper_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 16-MAR-2025.txt

Transcribing: IDAACADDA 15-MAR-2025.mp3 (Format: mp3)
Transcript saved to: whisper_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 15-MAR-2025.txt

Transcription Summary:
Total files processed: 2
Transcripts created: 2


### Performance Overview  

In [5]:
import os
import re
from collections import Counter
import textwrap
from nltk.tokenize import sent_tokenize, word_tokenize
import nltk

# Download required NLTK data
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

def analyze_somali_transcript(file_path, max_lines=None, sample_size=10):
    """
    Analyze a Somali transcript file with various metrics and display results without any visualizations.

    Parameters:
    file_path (str): Path to the transcript file
    max_lines (int, optional): Maximum number of lines to process (None for all)
    sample_size (int, optional): Number of sample lines to display

    Returns:
    dict: Analysis results
    """
    print(f"Analyzing file: {os.path.basename(file_path)}")

    # Read the file
    try:
        with open(file_path, "r", encoding="utf-8") as file:
            if max_lines:
                lines = [file.readline().strip() for _ in range(max_lines)]
                lines = [line for line in lines if line]  # Remove empty lines
            else:
                lines = [line.strip() for line in file.readlines()]
                lines = [line for line in lines if line]  # Remove empty lines
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None
    except Exception as e:
        print(f"Error reading file: {e}")
        return None

    # Basic statistics
    num_lines = len(lines)
    total_words = sum(len(line.split()) for line in lines)
    avg_words_per_line = total_words / num_lines if num_lines > 0 else 0
    avg_word_length = sum(len(word) for line in lines for word in line.split()) / total_words if total_words > 0 else 0

    # Word frequency analysis
    all_words = ' '.join(lines).lower().split()
    word_freq = Counter(all_words)
    most_common_words = word_freq.most_common(20)

    # Check for repetition patterns
    repetition_count = 0
    for line in lines:
        words = line.split()
        for i in range(len(words) - 1):
            if words[i] == words[i+1]:
                repetition_count += 1

    # Detect repeating segments (like "daadii ergo" in the example)
    text = ' '.join(lines)
    repeated_segments = []
    for segment_len in range(2, 5):  # Check for segments of 2 to 4 words
        words = text.split()
        segments = [' '.join(words[i:i+segment_len]) for i in range(len(words)-segment_len+1)]
        segment_counts = Counter(segments)
        repeated = [(segment, count) for segment, count in segment_counts.items() if count > 3]
        repeated_segments.extend(repeated)

    # Display sample lines with formatting
    print("\n==== Sample Lines ====")
    sample_indices = list(range(min(sample_size, num_lines)))
    for i in sample_indices:
        wrapped_text = textwrap.fill(lines[i], width=80)
        print(f"Line {i+1}: {wrapped_text}")

    # Display statistics
    print("\n==== Basic Statistics ====")
    print(f"Total lines: {num_lines}")
    print(f"Total words: {total_words}")
    print(f"Average words per line: {avg_words_per_line:.2f}")
    print(f"Average word length: {avg_word_length:.2f}")
    print(f"Adjacent word repetitions: {repetition_count}")

    # Display most common words
    print("\n==== Most Common Words ====")
    for word, count in most_common_words[:10]:
        print(f"{word}: {count}")

    # Display repeated segments
    if repeated_segments:
        print("\n==== Commonly Repeated Segments ====")
        sorted_segments = sorted(repeated_segments, key=lambda x: x[1], reverse=True)
        for segment, count in sorted_segments[:10]:
            print(f"'{segment}' appears {count} times")

    # Detect potential sentence boundaries (basic approach for Somali)
    try:
        # This is a very basic approach - better sentence detection would require Somali-specific tools
        sentences = []
        current_sentence = ""

        for line in lines:
            # Split by common Somali sentence-ending punctuation
            parts = re.split(r'[.!?]', line)
            for i, part in enumerate(parts):
                if part.strip():
                    current_sentence += " " + part.strip()
                    if i < len(parts) - 1 or any(line.endswith(p) for p in ['.', '!', '?']):
                        sentences.append(current_sentence.strip())
                        current_sentence = ""

        if current_sentence:
            sentences.append(current_sentence.strip())

        print(f"\nApproximate number of sentences: {len(sentences)}")
        print("\n==== Sample Sentences ====")
        for i, sentence in enumerate(sentences[:5]):
            wrapped_text = textwrap.fill(sentence, width=80)
            print(f"Sentence {i+1}: {wrapped_text}")
    except Exception as e:
        print(f"Error detecting sentences: {e}")

    # Analyze line lengths
    line_lengths = [len(line.split()) for line in lines]
    min_length = min(line_lengths) if line_lengths else 0
    max_length = max(line_lengths) if line_lengths else 0

    print("\n==== Line Length Analysis ====")
    print(f"Shortest line: {min_length} words")
    print(f"Longest line: {max_length} words")

    # Return analysis results
    results = {
        'num_lines': num_lines,
        'total_words': total_words,
        'avg_words_per_line': avg_words_per_line,
        'avg_word_length': avg_word_length,
        'most_common_words': most_common_words,
        'repeated_segments': sorted_segments[:10] if repeated_segments else [],
        'line_length_range': (min_length, max_length)
    }

    return results

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


#### 📋 INSTRUCTION
Run the cell below to visualize the transcript generated by Whisper.

In [6]:
# Example usage
file_path = "whisper_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 15-MAR-2025.txt"
# file_path = "whisper_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 16-MAR-2025.txt"

results = analyze_somali_transcript(file_path)

Analyzing file: IDAACADDA 15-MAR-2025.txt

==== Sample Lines ====
Line 1: موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع
موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع موضوع مو

### Problem ❌
OpenAI's Whisper model detects Somali (`so`) but **transcribes it in Arabic script** instead of **Somali Latin script**. This makes the output unusable for Somali language.  

## Observations  
- Produces Arabic text instead of Latin script.  
- Users report **repeated phrases** and **incorrect words**.  
- Confirmed in multiple discussions:  
  - [GitHub Issue #234](https://github.com/openai/whisper/discussions/234)  
  - [GitHub Issue #2110](https://github.com/openai/whisper/discussions/2110)  

## Possible Causes  
- Whisper **lacks training on Somali Latin script**.  

## Whisper Small Somali (Specialized Model)

### Why?  
OpenAI’s Whisper model **transcribes Somali in Arabic script** instead of **Latin script**, making it unsuitable for Somali. Additionally, it produces **inaccurate and repetitive transcriptions**.

### Solution
We are switching to **Hugging Face’s Whisper model trained specifically for Somali**:  
➡️ **Model:** `steja/whisper-small-somali`  
➡️ **Benefit:** Ensures accurate Somali transcription in **Latin script**  

### Reference  
📄 [Hugging Face Model Docs](https://huggingface.co/steja/whisper-small-somali)


In [7]:
def transcribe_audio_files(input_dir, output_dir=None, language="so", save_to_drive=False,
                        audio_formats=None):
    """
    Transcribe audio files using HuggingFace's Whisper model trained for Somali.

    Args:
        input_dir: Directory containing the audio files
        output_dir: Directory to save transcripts (default: input_dir + "_transcripts")
        language: Language code for transcription (default: "so" for Somali)
        save_to_drive: Whether to save transcripts to Google Drive (default: False)
        audio_formats: List of audio formats to process (default: ["wav", "mp3", "m4a", "flac", "ogg"])

    Returns:
        List of transcript file paths
    """
    # Install required packages if not already installed
    try:
        import transformers
    except ImportError:
        print("Installing transformers...")
        !pip install -q transformers
        import transformers

    try:
        import torch
    except ImportError:
        print("Installing PyTorch...")
        !pip install -q torch
        import torch

    try:
        import torchaudio
    except ImportError:
        print("Installing torchaudio...")
        !pip install -q torchaudio
        import torchaudio

    try:
        import soundfile
    except ImportError:
        print("Installing soundfile for additional audio format support...")
        !pip install -q soundfile
        import soundfile

    import os
    import glob
    from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor

    # Define default audio formats if none provided
    if audio_formats is None:
        audio_formats = ["wav", "mp3", "m4a", "flac", "ogg"]

    # Ensure formats have the dot prefix for glob patterns
    formats_pattern = [f"*.{fmt}" for fmt in audio_formats]

    # Set output directory
    if not output_dir:
        output_dir = "whisper_small_somali_transcripts_" + input_dir

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    print(f"Loading Hugging Face Whisper model (steja/whisper-small-somali)...")
    # Load the Whisper model from Hugging Face
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")

    # Load model and processor
    model_id = "steja/whisper-small-somali"
    model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id)
    processor = AutoProcessor.from_pretrained(model_id)

    # Move model to appropriate device
    model.to(device)

    # Create the pipeline for automatic speech recognition
    transcriber = pipeline(
        "automatic-speech-recognition",
        model=model,
        tokenizer=processor.tokenizer,
        feature_extractor=processor.feature_extractor,
        max_new_tokens=128,
        chunk_length_s=30,
        batch_size=16,
        device=device
    )

    # Get all audio files with specified formats in the input directory
    audio_files = []
    for pattern in formats_pattern:
        audio_files.extend(glob.glob(os.path.join(input_dir, pattern)))

    if not audio_files:
        print(f"No audio files with formats {audio_formats} found in {input_dir}")
        return []

    print(f"Found {len(audio_files)} audio files to transcribe.")
    print(f"Formats to process: {', '.join(audio_formats)}")

    transcript_files = []

    for audio_file in audio_files:
        filename = os.path.basename(audio_file)
        base_name = os.path.splitext(filename)[0]
        transcript_file = os.path.join(output_dir, f"{base_name}.txt")
        file_format = os.path.splitext(filename)[1][1:].lower()  # Get format without dot

        print(f"\nTranscribing: {filename} (Format: {file_format})")

        try:
            # Transcribe audio without requesting timestamps
            result = transcriber(audio_file)

            # Extract the transcript text
            if isinstance(result, dict) and "text" in result:
                transcript_text = result["text"]
            else:
                # Handle different return formats
                transcript_text = result

            # Save full transcript
            with open(transcript_file, "w", encoding="utf-8") as f:
                f.write(transcript_text)

            print(f"Transcript saved to: {transcript_file}")
            transcript_files.append(transcript_file)

        except Exception as e:
            print(f"Error transcribing {filename}: {str(e)}")

            # If there's an error with non-WAV formats, try to convert to WAV first
            if file_format != "wav":
                try:
                    print(f"Attempting to convert {file_format} to WAV format and retry...")

                    # Create a temporary directory for conversion if it doesn't exist
                    temp_dir = os.path.join(input_dir, "_temp_conversion")
                    if not os.path.exists(temp_dir):
                        os.makedirs(temp_dir)

                    temp_wav = os.path.join(temp_dir, f"{base_name}.wav")

                    # Load and resave as WAV using torchaudio
                    try:
                        waveform, sample_rate = torchaudio.load(audio_file)
                        torchaudio.save(temp_wav, waveform, sample_rate)
                        print(f"Successfully converted to WAV: {temp_wav}")

                        # Try transcription again with the WAV file
                        result = transcriber(temp_wav)

                        if isinstance(result, dict) and "text" in result:
                            transcript_text = result["text"]
                        else:
                            transcript_text = result

                        with open(transcript_file, "w", encoding="utf-8") as f:
                            f.write(transcript_text)

                        print(f"Transcript saved to: {transcript_file} (after conversion)")
                        transcript_files.append(transcript_file)

                    except Exception as conv_err:
                        print(f"Conversion failed: {str(conv_err)}")

                except Exception as retry_err:
                    print(f"Retry failed: {str(retry_err)}")

    # Clean up temporary conversion directory if it exists
    temp_dir = os.path.join(input_dir, "_temp_conversion")
    if os.path.exists(temp_dir):
        import shutil
        try:
            shutil.rmtree(temp_dir)
            print(f"Removed temporary conversion directory: {temp_dir}")
        except Exception as e:
            print(f"Failed to remove temporary directory: {str(e)}")

    # Save to Google Drive if requested
    if save_to_drive and transcript_files:
        try:
            from google.colab import drive
            drive.mount('/content/drive')
            print("Google Drive mounted successfully")
        except:
            print("Not running in Google Colab or Drive already mounted")
        drive_path = "/content/drive/MyDrive/" + os.path.basename(output_dir)
        print(f"\nSaving transcripts to Google Drive at: {drive_path}")

        # Create directory in Drive if it doesn't exist
        if not os.path.exists(drive_path):
            os.makedirs(drive_path)

        # Copy files to Drive
        os.system(f"cp -r {output_dir}/* {drive_path}/")
        print(f"Transcripts successfully saved to Google Drive")

    print(f"\nTranscription Summary:")
    print(f"Total files processed: {len(audio_files)}")
    print(f"Transcripts created: {len(transcript_files)}")

    return transcript_files

#### 📋 INSTRUCTION   
**Run the cell below to use Whisper Small Somali to transcribe all Somali audio files in the specified directory and optionally save the transcripts to Google Drive.**

In [8]:
# Example usage
input_directory = "soundcloud_2025-03-15_to_2025-03-16"

# Transcribe all audio files in the directory
transcripts = transcribe_audio_files(
    input_dir=input_directory,
    language="so",  # Somali language code
    save_to_drive=True  # Change to True to save to Google Drive
)

## Evaluation of Whisper Small Somali (Specialized Model)

### Transcription

#### 📋 INSTRUCTION
Run the cell below to visualize the transcript generated this time by Whisper Small Somali.

In [9]:
# Example usage
file_path = "/content/whisper_small_somali_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 15-MAR-2025.txt"
# file_path = "/content/whisper_small_somali_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 16-MAR-2025.txt"

results = analyze_somali_transcript(file_path)

Analyzing file: IDAACADDA 15-MAR-2025.txt
Error: File not found at /content/whisper_small_somali_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 15-MAR-2025.txt


## Gemini Flash 2.0 – Somali Transcription Update

### Why?  
After achieving 45.9% WER with the top-performing Scribe v1 model on the FLEURS benchmark, Gemini Flash 2.0 ranks as the second-best with a 54.4% Word Error Rate (WER).
🔗 [See Benchmark Results](https://elevenlabs.io/speech-to-text/somali)

### Solution
We're now deploying Gemini's in-house Somali transcription model, designed specifically for high accuracy and Latin script output:
➡️ **Model:**  Gemini Flash 2.0 Somali
➡️ **Benefit:** Improved Somali transcription quality using Gemini-tuned architecture, optimized for real-world use cases.

### Reference  
📄 [Gemini Flash 2.0 documentation](https://cloud.google.com/vertex-ai/generative-ai/docs/models/gemini/2-0-flash)

In [10]:
import os
import glob
from typing import Optional, List, Dict
from google import genai
from google.genai import types
from datetime import datetime
import json
import time
import subprocess # For ffmpeg
import shutil     # For directory cleanup

def transcribe_audio_files(
    input_dir: str,
    api_key: str, # API key is now passed as an argument
    output_dir: Optional[str] = None,
    language: str = "so", # Language hint for Gemini, not a strict filter
    save_to_drive: bool = False,
    audio_formats: Optional[List[str]] = None,
    model: str = "gemini-2.0-flash",
    retry_count: int = 3,
    delay_between_failures: int = 10
) -> List[str]:
    """
    Transcribe audio files using Google Gemini Flash 2.0.

    Args:
        input_dir (str): Directory containing audio files.
        api_key (str): Your Google Gemini API key.
        output_dir (str, optional): Directory to save transcripts.
                                    Defaults to input_dir + "_gemini_transcripts".
        language (str, optional): Language code for transcription (e.g., "so" for Somali).
                                  This acts as a hint for the model. Defaults to "so".
        save_to_drive (bool, optional): Whether to save transcripts to Google Drive.
                                        Defaults to False.
        audio_formats (list, optional): List of audio formats to process.
                                        Defaults to ["wav", "mp3", "m4a", "flac", "ogg"].
        model (str, optional): The Gemini model to use for transcription.
                               Defaults to "gemini-2.0-flash".
        retry_count (int, optional): Number of times to retry failed transcriptions.
                                     Defaults to 3.
        delay_between_failures (int, optional): Seconds to wait between retry attempts.
                                                Defaults to 10.

    Returns:
        List[str]: List of transcript file paths created.
    """
    if not api_key:
        raise EnvironmentError("API key cannot be empty. Please provide your Gemini API key.")

    # Mount Google Drive at the beginning if save_to_drive is True
    if save_to_drive:
        try:
            from google.colab import drive
            drive.mount('/content/drive')
            print("Google Drive mounted successfully")
        except ImportError:
            print("Not running in Google Colab or Drive module not available")
            save_to_drive = False # Disable if not in Colab
        except Exception as e:
            print(f"Error mounting Google Drive: {str(e)}")
            print("Continuing without Google Drive. Files will only be saved locally.")
            save_to_drive = False  # Disable save_to_drive if mounting fails

    # Check and install ffmpeg if needed (required for handling different audio formats)
    try:
        subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
        print("ffmpeg is already installed.")
    except (subprocess.CalledProcessError, FileNotFoundError):
        print("Installing ffmpeg (required for audio format conversion)...")
        # For Colab/Debian-based systems
        subprocess.run(['apt-get', 'update', '-qq'], check=True)
        subprocess.run(['apt-get', 'install', '-qq', 'ffmpeg'], check=True)
        print("ffmpeg installed successfully.")

    # Initialize Gemini client
    client = genai.Client(api_key=api_key)

    # Define default audio formats if none provided
    if audio_formats is None:
        audio_formats = ["wav", "mp3", "m4a", "flac", "ogg"]

    # Ensure formats have the dot prefix for glob patterns
    formats_pattern = [f"*.{fmt}" for fmt in audio_formats]

    # Set output directory
    if not output_dir:
        output_dir = "gemini_transcripts_" + os.path.basename(input_dir.rstrip('/'))

    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    print(f"Using Gemini model: {model}")

    # Get all audio files with specified formats in the input directory
    audio_files = []
    for pattern in formats_pattern:
        audio_files.extend(glob.glob(os.path.join(input_dir, pattern)))

    if not audio_files:
        print(f"No audio files with formats {audio_formats} found in {input_dir}")
        return []

    print(f"Found {len(audio_files)} audio files to transcribe.")
    print(f"Formats to process: {', '.join(audio_formats)}")

    transcript_files = []
    temp_conversion_dir = os.path.join(input_dir, "_temp_gemini_conversion")
    os.makedirs(temp_conversion_dir, exist_ok=True) # Create temp dir once

    for audio_file in audio_files:
        filename = os.path.basename(audio_file)
        base_name = os.path.splitext(filename)[0]
        file_extension = os.path.splitext(filename)[1][1:].lower() # Get format without dot
        transcript_file = os.path.join(output_dir, f"{base_name}.txt")

        # Skip if already transcribed
        if os.path.exists(transcript_file):
            print(f"Skipping {filename} - already transcribed")
            transcript_files.append(transcript_file) # Add to list even if skipped
            continue

        print(f"\nTranscribing: {filename} (Format: {file_extension})")

        current_audio_path = audio_file
        needs_conversion = False

        if file_extension != "mp3":
            needs_conversion = True
            temp_mp3_path = os.path.join(temp_conversion_dir, f"{base_name}.mp3")
            print(f"Converting {file_extension} to MP3: {audio_file} -> {temp_mp3_path}")
            try:
                cmd = [
                    'ffmpeg', '-y', '-i', audio_file,
                    '-vn', # no video
                    '-acodec', 'libmp3lame', # encode to mp3
                    '-ar', '16000', # 16 kHz sample rate (common for speech)
                    '-ac', '1', # mono audio
                    '-b:a', '32k', # bitrate, adjust as needed for quality vs size
                    temp_mp3_path
                ]
                process = subprocess.run(
                    cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    check=True # Raise CalledProcessError if command fails
                )
                current_audio_path = temp_mp3_path
                print(f"Successfully converted to MP3.")
            except subprocess.CalledProcessError as e:
                print(f"FFmpeg conversion failed for {filename}: {e.stderr.decode()}")
                print(f"Skipping {filename} due to conversion error.")
                # Log failure
                with open(os.path.join(output_dir, "failed_transcriptions.json"), "a") as f:
                    failure_info = {
                        "filename": filename,
                        "path": audio_file,
                        "timestamp": datetime.now().isoformat(),
                        "error": f"FFmpeg conversion failed: {e.stderr.decode()}"
                    }
                    f.write(json.dumps(failure_info) + "\n")
                continue # Skip to next audio file

        # Try transcription with retries
        for attempt in range(retry_count):
            try:
                # Read audio file bytes (from original or converted MP3)
                with open(current_audio_path, "rb") as f:
                    audio_bytes = f.read()

                # Create a Part with correct MIME type
                audio_part = types.Part.from_bytes(
                    data=audio_bytes,
                    mime_type="audio/mp3"
                )

                # Create prompt - include language hint
                prompt = f"Generate a transcript of the speech in {language} language."

                # Request transcript
                response = client.models.generate_content(
                    model=model,
                    contents=[prompt, audio_part]
                )

                # Return the transcript
                if hasattr(response, 'text'):
                    transcript_text = response.text
                else:
                    raise ValueError("No transcript text found in the response.")

                # Save the transcript
                with open(transcript_file, "w", encoding="utf-8") as f:
                    f.write(transcript_text)

                print(f"Successfully transcribed {filename}")
                transcript_files.append(transcript_file)
                break # Exit retry loop on success

            except Exception as e:
                print(f"Attempt {attempt+1}/{retry_count} failed for {filename}: {str(e)}")
                if attempt < retry_count - 1:
                    print(f"Waiting {delay_between_failures} seconds before retrying...")
                    time.sleep(delay_between_failures)
                else:
                    print(f"All attempts failed for {filename}")
                    # Log failure
                    with open(os.path.join(output_dir, "failed_transcriptions.json"), "a") as f:
                        failure_info = {
                            "filename": filename,
                            "path": audio_file,
                            "timestamp": datetime.now().isoformat(),
                            "error": str(e)
                        }
                        f.write(json.dumps(failure_info) + "\n")

    # Clean up temporary conversion directory
    if os.path.exists(temp_conversion_dir):
        try:
            shutil.rmtree(temp_conversion_dir)
            print(f"Removed temporary conversion directory: {temp_conversion_dir}")
        except Exception as e:
            print(f"Failed to remove temporary directory: {str(e)}")

    # Save to Google Drive if requested
    if save_to_drive and transcript_files:
        # Use the same directory name for Google Drive saving
        drive_dir_name = os.path.basename(output_dir.rstrip('/'))
        drive_path = os.path.join("/content/drive/MyDrive/", drive_dir_name)
        print(f"\nSaving transcripts to Google Drive at: {drive_path}")

        # Create directory in Drive if it doesn't exist
        os.makedirs(drive_path, exist_ok=True)

        # Copy files to Drive
        copied_count = 0
        for f_path in transcript_files:
            try:
                shutil.copy(f_path, os.path.join(drive_path, os.path.basename(f_path)))
                copied_count += 1
            except Exception as e:
                print(f"Failed to copy {f_path} to Drive: {e}")
        print(f"Successfully copied {copied_count} transcripts to Google Drive.")

    print(f"\nTranscription Summary:")
    print(f"Total files processed: {len(audio_files)}")
    print(f"Transcripts created: {len(transcript_files)}")

    return transcript_files


In [17]:
# Prompt the user for the API key manually
# gemini_api_key = input("Please enter your Gemini API Key: ")

In [14]:
# Example usage
input_directory = "soundcloud_2025-03-15_to_2025-03-16"

# Transcribe all audio files in the directory
transcripts = transcribe_audio_files(
    input_dir=input_directory,
    api_key=gemini_api_key, # Pass the manually entered API key
    language="so", # Somali language code
    save_to_drive=True
)

Error mounting Google Drive: Mountpoint must not already contain files
Continuing without Google Drive. Files will only be saved locally.
ffmpeg is already installed.
Using Gemini model: gemini-2.0-flash
Found 2 audio files to transcribe.
Formats to process: wav, mp3, m4a, flac, ogg
Skipping IDAACADDA 16-MAR-2025.mp3 - already transcribed

Transcribing: IDAACADDA 15-MAR-2025.mp3 (Format: mp3)
Successfully transcribed IDAACADDA 15-MAR-2025.mp3
Removed temporary conversion directory: soundcloud_2025-03-15_to_2025-03-16/_temp_gemini_conversion

Transcription Summary:
Total files processed: 2
Transcripts created: 2


In [15]:
# Example usage
file_path = "/content/gemini_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 15-MAR-2025.txt"
# file_path = "/content/gemini_transcripts_soundcloud_2025-03-15_to_2025-03-16/IDAACADDA 16-MAR-2025.txt"

results = analyze_somali_transcript(file_path)

Analyzing file: IDAACADDA 15-MAR-2025.txt

==== Sample Lines ====
Line 1: Halkanina waa Radiyo Ergo ee codka arimaha bini'aadanimada oo fadhigiisu yahay
magalada Nairobi ee dalka kenya waxad naga dhageysanaysaan mowjado gaaban ee
dherer keedu yahay 22 ka mitir baan una dhiganta 25,670.00 maga hertz. Saacada
geeska afrika dhabari marka ay tahay sedexda ilaa afarta gelabnimo
Line 2: Waxaad sidoo kale naga dhagaysanaysaan qaara ka tirsan idaacadaha dalka iyo
barta aanu ku leenahay internetka ee fadhigeedu yahay radiyo ergo dot o w r g.
Line 3: Kulanti wanaag san eedageystayal waa sabti ay bisha maarso sanadku waa 2022 ku
soo dhawaada idaca Ergo ee maanta anigu ahsan madaale ayaa la socodsiinayaa
qodobada aadka macquul ka doontan waxa ka mid ah xooldhaqatada noolol xumagaa ku
waajahay iyo diganka cali waal ee gobal ku nado o markii daqalaah aan iyo biyo
yari ay u dhinten in ka badan xoolahoodi. Qoraal sokono wax magalada fasah dherr
e gobal ka baydh waa ugu dilah maareynta dhaarasho quusas

# 3. Performance Overview  

# 📄 Somali Audio Transcription Model Evaluation

This document outlines the performance of three different AI models used to transcribe Somali-language audio. The objective was to identify a model capable of accurately converting spoken Somali into text using the **correct Latin script**. The evaluation moved from general-purpose models to more specialized ones, culminating in the success of **Gemini 2.0 Flash**.

---

## 1. 🧠 OpenAI Whisper (Standard Model)

### 🔍 Summary:
OpenAI's standard Whisper model was the initial choice, given its general-purpose transcription capability.

- **✅ Language Detection:** Correctly identified Somali.
- **❌ Script Used:** Incorrectly transcribed text using the **Arabic script**, not the standard **Somali Latin script**.

### ⚠️ Key Issue:
The transcription output was nonsensical and unusable. It was filled with repeated Arabic phrases like:

> **"موضوع موضوع موضوع..."** (Arabic for "subject")

### 📉 Transcription Quality:
- Repetitive
- Unintelligible
- Wrong script

### 🧾 Conclusion:
**❌ Failure.** The model was unable to produce text in the appropriate script, making it ineffective for this task.

---

## 2. 🧬 Whisper Small Somali (`steja/whisper-small-somali`)

### 🔍 Summary:
A specialized model from Hugging Face, fine-tuned specifically for Somali, was tested to address the script issue.

- **✅ Script Used:** Correctly used **Somali Latin script**
- **✅ Language:** Identified and transcribed Somali

### ⚠️ Key Issue:
The model produced **hallucinatory repetitions**, frequently looping on phrases like:

> *"iyo iyo iyo," "dhul dhul dhul," "dheesho dheesho..."*

### 📉 Transcription Quality:
- Correct script, but
- High frequency of repetitive nonsense
- Difficult to interpret meaningful content

### 🧾 Conclusion:
**⚠️ Partial Success.** Script issue resolved, but the hallucinations rendered the transcriptions unreliable for practical use.

---

## 3. 🚀 Gemini 2.0 Flash (Google)

### 🔍 Summary:
The final model tested, **Gemini 2.0 Flash**, exhibited strong performance and significantly improved output quality.

- **✅ Script:** Somali Latin script
- **✅ Structure:** Coherent sentences and paragraphs
- **✅ Repetition:** Natural and minimal
- **✅ Accuracy:** High contextual relevance

### ✅ Transcription Quality:
- Well-structured
- Accurate
- Readable and ready for analysis

### 🧾 Conclusion:
**✅ Success.** Gemini 2.0 Flash delivered the best results. It provided **accurate**, **natural**, and **usable** transcriptions aligned with the project’s goals.

---


