<a href="https://colab.research.google.com/github/Netdrum/MARIA/blob/main/20241119_step_1_transcribe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import json
import logging
import os
import shutil

In [None]:
!pip install torch
!pip install datasets==2.12.0

In [None]:
from google.colab import userdata
userdata.get('IRCG_VHF')

In [None]:
import os
import json
import csv
import re  # Optional for filename parsing (if speaker ID needed)
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import torch  # Import torch explicitly
import librosa  # Import librosa for audio loading

def prepare_VHF_data(data_folder="/content/drive/MyDrive/Dataset", output_dir="manifest_output"):
  """
  This function processes VHF audio files in a folder using Whisper for transcription and generates:
      - JSON manifest files for each audio file with transcription and audio information.
      - A single CSV file summarizing the processed audio data.

  Args:
      data_folder: Path to the folder containing VHF audio files (default: "/content/drive/MyDrive/Sample Data Set").
      output_dir: Path to the directory where manifest files and the CSV will be saved (default: "manifest_output").
  """

  # Create the output directory if it doesn't exist
  os.makedirs(output_dir, exist_ok=True)

  # Load Whisper model and processor
  model_name = "openai/whisper-small"
  processor = WhisperProcessor.from_pretrained(model_name)
  model = WhisperForConditionalGeneration.from_pretrained(model_name)
  try:
      print("Whisper model loaded successfully.")
  except Exception as e:
      print(f"Error loading Whisper model: {e}")
      return

  # Create an empty list to store CSV data
  csv_data = []

  for filename in os.listdir(data_folder):
    if filename.endswith(".wav"):  # Check for audio file extension

      # Print file paths for verification
      print(f"Processing file: {filename}")
      filepath = os.path.join(data_folder, filename)

      # Load audio data using librosa and retrieve sampling rate (if available)
      try:
          audio_data, sample_rate = librosa.load(filepath, sr=None)  # Load with native sample rate
      except Exception as e:
          print(f"Error loading audio file: {filename} ({e})")
          continue  # Skip to the next file if loading fails

      # Transcribe audio using Whisper (passing the sampling rate)
      inputs = processor(audio_data, sampling_rate=sample_rate, return_tensors="pt")
      with torch.no_grad():
          outputs = model.generate(**inputs)
      transcription = processor.batch_decode(outputs, skip_special_tokens=True)[0]

      # Extract speaker ID from filename (optional, adjust regex if needed)
      speaker_id = None  # Replace with appropriate regex if you want speaker ID
      # You can uncomment and modify the following line for speaker ID extraction:
      # match = re.search(r"speaker_(\d+)", filename)  # Example regex for "speaker_XX.wav" format
      # if match:
      #     speaker_id = match.group(1)

      # Create dictionary entry with desired format
      entry = {
          "wav": os.path.join("{data_root}", filename),  # Placeholder for data_root
          "length": len(audio_data) / sample_rate,  # Duration
          "spk_id": speaker_id,  # Optional speaker ID from filename
          "ali": None,  # Phoneme alignment information (if you have it)
          "phn": None,  # Phoneme information (if you have it)
          "text": transcription  # Text information
      }

      if sample_rate is not None:
          entry["frame_rate"] = sample_rate  # Add frame rate if retrieved

        # Split into train and test sets
    all_entries = []  # Collect all entries for dataset creation
    for filename in os.listdir(data_folder):
        if filename.endswith(".wav"):

            # Append the entry to all_entries
            all_entries.append(entry)

    # Create a Hugging Face Dataset from the entries
    dataset = Dataset.from_list(all_entries)

    # Split the dataset into train and test
    dataset = dataset.train_test_split(test_size=test_size)

    # Rename the splits to "train" and "test"
    dataset = DatasetDict({"train": dataset["train"], "test": dataset["test"]})

    # Upload to Hugging Face
    dataset.push_to_hub("IRCG_VHF")  # Replace with your details

      # Generate output filename (replace ".wav" with ".json")
    output_filename = os.path.splitext(filename)[0] + ".json"  # Create JSON filename from audio filename
    output_path = os.path.join(output_dir, output_filename)

      # Write entry to a JSON file for each audio file
    with open(output_path, "w") as f:
          json.dump(entry, f, indent=4)

      # Create a CSV row for each audio file and append to csv_data
    csv_row = {
          "filename": filename,
          "duration": entry["length"],
          "speaker_id": entry["spk_id"] if entry["spk_id"] else None,  # Include speaker ID if it exists
      } # Add this closing curly brace