In [1]:
import os
import io
import datetime
import requests
import random
import speech_recognition as sr
from pydub import AudioSegment

In [2]:
def ensure_dir(directory: str) -> None:
    """
    Create the specified directory if it does not already exist.
    Params:
      :directory: The path of the directory to check/create.
    """
    os.makedirs(directory, exist_ok=True)

def get_data(url: str) -> dict:
    """
    Fetch JSON data from a given URL.
    Params:
        :url: The URL to fetch data from.
    Returns: 
        A dictionary containing the JSON response.
    Raises:
        RuntimeError if the request fails or JSON parsing fails.
    """
    try:
        response = requests.get(url)
        response.raise_for_status()
        return response.json()
    except (requests.RequestException, ValueError) as e:
        raise RuntimeError(f"Failed to retrieve data from {url}: {e}")

def get_audio_url(api_url: str) -> str | None:
    """
    Extract the first chapter's audio URL from the API response.
    Params:
        :api_url: The API URL to fetch audiobook data from.
    Returns:
        The audio URL if available, otherwise None.
    """
    data = get_data(api_url)
    # Get the first book in the response [0]
    book = data['books'][0]
    sections = book.get('sections', [])
    if sections:
        return sections[0].get('listen_url')  # Return the 'listen_url' of the first chapter
    return None

def convert_mp3_to_wav(audio_data: bytes, output_file: str) -> None:
    """
    Convert MP3 bytes to a WAV file using pydub.
    """
    audio = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
    audio.export(output_file, format="wav")
    print(f"Conversion complete: MP3 bytes -> {output_file}")

def transcribe_chunks(audio_file: str, recogniser: sr.Recognizer, chunk_length: int = 20, total_chunks: int = 4) -> str:
    """
    Transcribe the first N chunks of a WAV audio file.

    Params:
        :audio_file: Path to the input WAV audio file.
        :recogniser: An instance of the Recognizer class for transcription.
        :chunk_length: Length of each chunk in seconds (default is 20 seconds).
        :total_chunks: Total number of chunks to transcribe (default is 3).

    Returns:
        A formatted string containing the transcription for each chunk, 
        including error messages if the API couldn't transcribe a chunk.
    """
    # Load the entire audio file into an AudioSegment
    audio_segment = AudioSegment.from_file(audio_file)
    transcription = ""  # Final transcription output

    # Loop through each chunk, skip the first chunk (20-40, 40-60 etc)
    for i in range(1, total_chunks):
        # Calculate start and end times (milliseconds)
        start_ms = i * chunk_length * 1000
        end_ms = (i + 1) * chunk_length * 1000

        chunk_audio = audio_segment[start_ms:end_ms]

        # Saving as temporary WAV file
        chunk_filename = "temp_chunk.wav"
        chunk_audio.export(chunk_filename, format="wav")

        # Transcribe the audio chunk using the recogniser
        with sr.AudioFile(chunk_filename) as source:
            chunk_data = recogniser.record(source)
            try:
                # Google's API to transcribe this chunk
                text = recogniser.recognize_google(chunk_data)
                transcription += f"[Chunk {i}]: {text}\n"
            except sr.UnknownValueError:
                transcription += f"[Chunk {i}]: [Could not understand chunk.]\n"
            except sr.RequestError as e:
                transcription += f"[Chunk {i}]: [API error: {e}]\n"

        # Remove the temporary chunk file
        os.remove(chunk_filename)

    # Return the complete transcription for all chunks
    return transcription.strip()

def download_and_process_audio(audio_url: str, book_title: str) -> None:
    """
    Download audio from a URL, save it as WAV, transcribe 3 chunks, and save the transcription.
    """
    print(f"Fetching audio from: {audio_url}")
    audio_data = requests.get(audio_url).content
    
    today_date = datetime.datetime.now().strftime("%Y-%m-%d")
    output_dir = os.path.join("processed_audio", today_date)
    ensure_dir(output_dir)
    wav_file = os.path.join(output_dir, f"{book_title}.wav")
    
    # Convert MP3 bytes to WAV
    convert_mp3_to_wav(audio_data, wav_file)

    # Transcribe only the first 4 chunks of 20 seconds, skipping the first chunk
    recogniser = sr.Recognizer()
    transcription = transcribe_chunks(wav_file, recogniser, chunk_length=20, total_chunks=4)

    # Save transcription
    transcriptions_dir = os.path.join("transcriptions", today_date)
    ensure_dir(transcriptions_dir)
    txt_file = os.path.join(transcriptions_dir, f"{book_title}.txt")
    with open(txt_file, "w") as f:
        f.write(transcription)
    print(f"Transcription saved to {txt_file}")

In [3]:
# Get first 10 books
all_books_data = get_data("https://librivox.org/api/feed/audiobooks/?format=json&extended=1&limit=10")
books = all_books_data.get('books', [])

if not books:
    print("No books found in API response.")
else:
    
    # Pick a random book from the list
    random_book = random.choice(books)
    random_id = random_book['id']
    print(f"Selected random audiobook ID: {random_id} - {random_book['title']}")

    # API with random book ID
    api_url = f"https://librivox.org/api/feed/audiobooks/?id={random_id}&format=json&extended=1"

    # Audio URL for the first chapter
    audio_url = get_audio_url(api_url)

    if audio_url:
        download_and_process_audio(audio_url, random_book['title'])
    else:
        print("No audio URL found for the first chapter.")


Selected random audiobook ID: 52 - Letters of Two Brides
Fetching audio from: https://www.archive.org/download/letters_brides_0709_librivox/letters_of_two_brides_01_debalzac_64kb.mp3
Conversion complete: MP3 bytes -> processed_audio/2025-06-05/Letters of Two Brides.wav
Transcription saved to transcriptions/2025-06-05/Letters of Two Brides.txt
