<a href="https://colab.research.google.com/github/Vtheonly/Junk-Yard/blob/main/Get%20trasncribtions/arabic_to_Text.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Arabic to English Audio Translation with Whisper

This notebook uses OpenAI's Whisper model to transcribe and translate audio files from Arabic to English text.

### Instructions:
1.  **Connect to a GPU Runtime**: Go to `Runtime` -> `Change runtime type` and select `T4 GPU` as the hardware accelerator.
2.  **Run the first code cell** to install the necessary libraries (`openai-whisper` and `ffmpeg`).
3.  **Place your audio files**: Create a folder in your Google Drive (e.g., `Arabic_Audio_Files`) and upload your MP3 files there.
4.  **Update the folder path** in the main code cell (`input_folder_name`).
5.  **Run all cells**.

The English translations will be saved as `.txt` files in a new subfolder named `english_translations` inside your input directory.

In [None]:
!pip install -U openai-whisper ffmpeg-python

In [None]:
from IPython import get_ipython
from IPython.display import display
import os
from google.colab import drive
import whisper
import concurrent.futures
import time
import threading

# --- 1. Mount Google Drive ---
try:
    drive.mount('/content/drive')
    print("Google Drive mounted successfully.")
except Exception as e:
    print(f"Error mounting Google Drive: {e}")
    if "already mounted" in str(e).lower():
        print("Drive seems to be already mounted.")
    else:
        raise

# --- 2. Configure Paths ---
# IMPORTANT: Change 'Arabic_Audio_Files' to the name of the folder in your Google Drive containing the MP3s.
base_drive_path = '/content/drive/MyDrive/'
input_folder_name = 'Arabic_Audio_Files'  # <<< CHANGE THIS TO YOUR FOLDER NAME
output_subfolder_name = 'english_translations' # Output text files will be saved here

input_folder_path = os.path.join(base_drive_path, input_folder_name)
output_folder_path = os.path.join(input_folder_path, output_subfolder_name)

if not os.path.exists(output_folder_path):
    os.makedirs(output_folder_path)
    print(f"Created output directory: {output_folder_path}")
else:
    print(f"Output directory already exists: {output_folder_path}")

# --- 3. Load Whisper Model ---
# Models: "tiny", "base", "small", "medium", "large".
# Larger models are more accurate but slower. "base" or "small" is a good starting point.
model = None
try:
    print("Loading Whisper model (this may take a moment)...")
    model = whisper.load_model("base", device="cuda")
    print("Whisper model loaded successfully on GPU.")
except Exception as e:
    print(f"Error loading Whisper model on GPU: {e}")
    print("Attempting to load on CPU...")
    try:
        model = whisper.load_model("base", device="cpu")
        print("Whisper model loaded successfully on CPU.")
    except Exception as e_cpu:
        print(f"Failed to load model on CPU as well: {e_cpu}")

# --- 4. Define Translation Function ---
def translate_audio_file(audio_file_path, model_instance, output_dir, use_fp16_if_cuda, lock):
    """
    Translates a single audio file to English and saves the text.
    Returns True on success, False on failure.
    """
    mp3_filename = os.path.basename(audio_file_path)
    start_time = time.time()
    try:
        fp16_setting = False
        if model_instance.device.type == 'cuda' and use_fp16_if_cuda:
            fp16_setting = True

        with lock:
            print(f"[Thread] Starting locked translation for: {mp3_filename}")
            # The key change is here: task="translate" tells Whisper to output English
            result = model_instance.transcribe(audio_file_path, task="translate", fp16=fp16_setting)

        if 'text' in result and result['text'].strip():
            transcript_text = result["text"]
            txt_filename = os.path.splitext(mp3_filename)[0] + ".txt"
            output_txt_path = os.path.join(output_dir, txt_filename)
            with open(output_txt_path, "w", encoding="utf-8") as txt_file:
                txt_file.write(transcript_text)
            elapsed_time = time.time() - start_time
            print(f"  [Thread] SUCCESS: English translation for {mp3_filename} saved to {output_txt_path} (took {elapsed_time:.2f}s)")
            return True
        else:
            elapsed_time = time.time() - start_time
            print(f"  [Thread] WARNING: Translation for {mp3_filename} was empty or no text found (took {elapsed_time:.2f}s).")
            if result:
                print(f"    Result keys: {result.keys()}")
                if 'segments' in result and not result['segments']:
                    print("    Note: No speech segments were detected.")
            return False
    except Exception as e:
        elapsed_time = time.time() - start_time
        print(f"  [Thread] ERROR processing {mp3_filename} (took {elapsed_time:.2f}s): {e}")
        return False

# --- 5. Process Files ---
if model:
    mp3_file_paths = []
    if os.path.exists(input_folder_path):
        print(f"\nLooking for MP3 files in: {input_folder_path}")
        for filename in os.listdir(input_folder_path):
            if filename.lower().endswith(".mp3"):
                mp3_file_paths.append(os.path.join(input_folder_path, filename))
    else:
        print(f"ERROR: Input folder not found: {input_folder_path}")
        print("Please check the 'base_drive_path' and 'input_folder_name' variables.")

    if not mp3_file_paths:
        print("No MP3 files found in the specified directory.")
    else:
        print(f"Found {len(mp3_file_paths)} MP3 file(s) to process.")

        # Configure parallel processing
        if model.device.type == 'cuda':
            USE_FP16_ON_CUDA = False # Set to True for potential speedup, might reduce accuracy slightly
            num_workers = min(4, os.cpu_count() if os.cpu_count() else 1)
            print(f"Using {num_workers} worker threads (GPU detected, fp16 on CUDA: {USE_FP16_ON_CUDA}). Lock will serialize GPU access.")
        else:
            USE_FP16_ON_CUDA = False
            num_workers = os.cpu_count() if os.cpu_count() else 2
            print(f"Using {num_workers} worker threads (CPU detected). Lock will serialize model access.")

        transcription_lock = threading.Lock()
        overall_start_time = time.time()
        successful_translations = 0
        failed_translations = 0

        with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
            future_to_file = {
                executor.submit(translate_audio_file, audio_path, model, output_folder_path, USE_FP16_ON_CUDA, transcription_lock): audio_path
                for audio_path in mp3_file_paths
            }

            for future in concurrent.futures.as_completed(future_to_file):
                file_path = future_to_file[future]
                try:
                    success = future.result()
                    if success:
                        successful_translations += 1
                    else:
                        failed_translations +=1
                except Exception as exc:
                    failed_translations +=1
                    print(f"  [Main] Generated an exception for {os.path.basename(file_path)}: {exc}")

        overall_elapsed_time = time.time() - overall_start_time
        print(f"\n--- All file processing complete. ---")
        print(f"Total time taken: {overall_elapsed_time:.2f} seconds.")
        print(f"Successful translations: {successful_translations}")
        print(f"Failed/Empty translations: {failed_translations}")
else:
    print("Whisper model could not be loaded. Cannot proceed with translation.")