# Full pipeline implementation

**Description**: This notebook implements a full pipeline for extracting text from a PDF file, formatting it, and then using it to generate audio using a text-to-speech (TTS) model.

## Imports

In [1]:
import json
import os
from pathlib import Path
from typing import List

from huggingface_hub import hf_hub_download
from IPython.display import display, Audio
from kokoro import KModel, KPipeline
import numpy as np
from openai import OpenAI
import pypdfium2 as pdfium
import soundfile as sf

from src.io_schemas.output_schemas import FormattedPageText
from src.io_schemas.prompts import FORMAT_TEXT_FOR_TTS
from src.pdf_reader.helpers import detect_header_footer
from src.openai_api_utils.controller import OpenAIAPIController
from src.utils.custom_exceptions import OpenAIInvalidResponseFormatError

In [2]:
DATA_DIR = Path("./data")

## Setup Gemini client and OpenAI API controller

In [3]:
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
model_id = "gemini-2.5-flash-preview-04-17"  # "gemini-2.5-pro-exp-03-25"

In [4]:
client = OpenAI(
    api_key=GEMINI_API_KEY,
    base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
)
client.models.list().to_dict()

{'data': [{'id': 'models/chat-bison-001',
   'object': 'model',
   'owned_by': 'google'},
  {'id': 'models/text-bison-001', 'object': 'model', 'owned_by': 'google'},
  {'id': 'models/embedding-gecko-001',
   'object': 'model',
   'owned_by': 'google'},
  {'id': 'models/gemini-1.0-pro-vision-latest',
   'object': 'model',
   'owned_by': 'google'},
  {'id': 'models/gemini-pro-vision', 'object': 'model', 'owned_by': 'google'},
  {'id': 'models/gemini-1.5-pro-latest',
   'object': 'model',
   'owned_by': 'google'},
  {'id': 'models/gemini-1.5-pro-001', 'object': 'model', 'owned_by': 'google'},
  {'id': 'models/gemini-1.5-pro-002', 'object': 'model', 'owned_by': 'google'},
  {'id': 'models/gemini-1.5-pro', 'object': 'model', 'owned_by': 'google'},
  {'id': 'models/gemini-1.5-flash-latest',
   'object': 'model',
   'owned_by': 'google'},
  {'id': 'models/gemini-1.5-flash-001',
   'object': 'model',
   'owned_by': 'google'},
  {'id': 'models/gemini-1.5-flash-001-tuning',
   'object': 'model',

In [5]:
openai_api_controller = OpenAIAPIController(
    openai_client=client,
    model_name=model_id,
)

In [6]:
openai_api_kwargs = {
    # "max_completion_tokens": 30_000,
    "temperature": 0.0,
}

## Download and initialize TTS model

In [7]:
REPO_ID = "hexgrad/Kokoro-82M"

In [8]:
tts_model_path = hf_hub_download(
    repo_id=REPO_ID,
    filename=KModel.MODEL_NAMES[REPO_ID],
    local_dir="./models/kokoro",
    force_download=False,  # Set to True to force redownload even if the file exists
)
tts_model_path

'models/kokoro/kokoro-v1_0.pth'

In [9]:
tts_model = KModel(repo_id=REPO_ID, model=tts_model_path)



## Load PDF document

In [10]:
pdf_path = DATA_DIR / "pdf_docs/a-practical-guide-to-building-agents.pdf"
pdf = pdfium.PdfDocument(pdf_path)
print(f"Length of PDF: {len(pdf)} pages")

Length of PDF: 34 pages


## Extract text from document pages

In [11]:
header_footer_lines = detect_header_footer(document=pdf)
list(header_footer_lines)[:3]

['33 A practical guide to building agents',
 '4 A practical guide to building agents',
 '53']

In [12]:
text_from_pages = []

for page_id in range(len(pdf)):
    # It seems that the package "pypdfium2" separates lines by "\r\n" by default
    page_text = pdf[page_id].get_textpage().get_text_bounded()

    # Remove lines contained in header/footer
    page_text_without_header_footer = "\n".join(
        line
        for line in page_text.splitlines()
        if line.strip() not in header_footer_lines
    )

    text_from_pages.append(page_text_without_header_footer)

## Use the LLM to format the extracted text into a text suitable for TTS

In [13]:
SILENCE_KEYWORD = "[SILENCE]"

In [14]:
formatted_document_text = ""
for page_id, page_text in enumerate(text_from_pages[:4]):
    print(("-------------------------------------------------------------------"))
    print(f"Processing page {page_id + 1}/{len(text_from_pages)}")

    # Get input texts that are needed to build the prompt
    previous_fragment = (
        f"... {formatted_document_text[-100:]}" if formatted_document_text else ""
    )
    current_page = page_text
    next_preview = (
        text_from_pages[page_id + 1] if page_id + 1 < len(text_from_pages) else ""
    )

    # Build the prompt object as the OpenAI API controller expects
    prompt = {
        "system_msg": FORMAT_TEXT_FOR_TTS.system_msg.format(
            silence_keyword=SILENCE_KEYWORD,
        ),
        "user_msg": FORMAT_TEXT_FOR_TTS.user_msg.format(
            previous_fragment=previous_fragment,
            current_page=current_page,
            next_preview=next_preview,
        ),
    }

    # Send the request to the OpenAI/Gemini API
    chat_completion, elapsed_time_s, retries_taken = openai_api_controller.send_request(
        prompt=prompt,
        response_format=FORMAT_TEXT_FOR_TTS.output_json,
        **openai_api_kwargs,
    )
    num_attempts = retries_taken + 1
    print(
        f"Received response from OpenAI API. Response: {chat_completion}\n"
        f"num_attempts: {num_attempts}\n"
        f"response_time (s): {elapsed_time_s:.2f}"
    )

    # Validate response format. Raises custom exception if the response does not match the `FormattedPageText` schema
    response_msg = chat_completion.choices[0].message.content
    try:
        formatted_page_text = FormattedPageText(**json.loads(response_msg))
    except Exception as e:
        print(f"Error validating response format: {str(e)}")
        raise OpenAIInvalidResponseFormatError()
    
    formatted_document_text += f" {formatted_page_text.text}"

-------------------------------------------------------------------
Processing page 1/34
Received response from OpenAI API. Response: ParsedChatCompletion[FormattedPageText](id=None, choices=[ParsedChoice[FormattedPageText](finish_reason='stop', index=0, logprobs=None, message=ParsedChatCompletionMessage[FormattedPageText](content='{\n  "text": "[SILENCE] A practical guide to building agents."\n}', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None, parsed=FormattedPageText(text='[SILENCE] A practical guide to building agents.')))], created=1747481717, model='gemini-2.5-flash-preview-04-17', object='chat.completion', service_tier=None, system_fingerprint=None, usage=CompletionUsage(completion_tokens=19, prompt_tokens=813, total_tokens=1063, completion_tokens_details=None, prompt_tokens_details=None))
num_attempts: 1
response_time (s): 2.29
-------------------------------------------------------------------
Processing page 2/34
Received res

In [15]:
text_chunks = [
    chunk.strip()
    for chunk in formatted_document_text.split(SILENCE_KEYWORD)
    if chunk.strip() != ""
]
text_chunks

['A practical guide to building agents.',
 "This guide will cover several key areas. We'll start by defining what an agent is, then discuss when you should consider building one. We'll delve into the foundations of agent design, explore guardrails, and finally, offer a conclusion.",
 "Let's begin with the introduction. Large language models are becoming increasingly capable of handling complex, multi-step tasks. Advances in reasoning, multimodality, and tool use have unlocked a new category of LLM-powered systems known as agents. This guide is designed for product and engineering teams exploring how to build their first agents, distilling insights from numerous customer deployments into practical and actionable best practices. It includes frameworks for identifying promising use cases, clear patterns for designing agent logic and orchestration, and best practices to ensure your agents run safely, predictably, and effectively. After reading this guide, you'll have the foundational knowl

## Pass the formatted text to the TTS model

### Initialize pipeline

In [16]:
pipeline = KPipeline(lang_code="a", repo_id=REPO_ID, model=tts_model, device="cpu")

Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m17.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: en-core-web-sm
Successfully installed en-core-web-sm-3.8.0
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


### Generate audio from text

In [17]:
# How much silence to insert between paragraphs: 5000 is about 0.2 seconds
SILENCE_DURATION = 0.3  # seconds
SAMPLE_RATE = 24_000
VOICE_ID = "am_liam"  # "am_puck" is another cool male voice.
SPEED = 1.0

**Note:** This function is necessary to prevent the TTS model from speeding up the voice too much. The pipeline processes the text in chunks to generate audio of approximately 25 seconds. If any fragment is slightly longer, the TTS model may slightly accelerate the voice.

In [18]:
def split_into_chunks(text: str, max_words: int = 50) -> List[str]:
    """
    Splits the text into chunks formed by sentences, ensuring that each chunk does not
    exceed the specified number of words.

    Parameters
    ----------
    text : str
        The text to be split into chunks.
    max_words : int
        The maximum number of words allowed in each chunk.
    
    Returns
    -------
    List[str]
        A list of text chunks, each containing a maximum of `max_words` words.
    """
    sentences_in_text = text.split(". ")

    chunks = []
    current_chunk = ""
    for sentence in sentences_in_text:
        if len(current_chunk.split()) + len(sentence.split()) <= max_words:
            current_chunk += sentence + (". " if sentence[-1] != "." else "")
        else:
            chunks.append(current_chunk.strip())
            current_chunk = sentence + (". " if sentence[-1] != "." else "")

    if current_chunk:
        chunks.append(current_chunk.strip())
        
    return chunks

In [None]:
audio_chunks = []

for text_chunk_id, text_chunk in enumerate(text_chunks):
    print(
        "-------------------------------\n"
        f"Processing text chunk {text_chunk_id + 1}/{len(text_chunks)}\n"
        f" Number of sentences: {text_chunk.count('.')} |"
        f" Number of words: {len(text_chunk.split())}"
    )

    smaller_text_chunks = split_into_chunks(text=text_chunk, max_words=50)
    
    audio_chunks_for_text_chunk = []
    for small_text_chunk in smaller_text_chunks:
        generator = pipeline(text=small_text_chunk, voice=VOICE_ID, speed=SPEED)
        
        for audio_chunk_id, (graphemes, phonemes, audio_chunk) in enumerate(generator):
            print(
                f"++++ Processing audio chunk {audio_chunk_id + 1}\n"
                f" Number of words: {len(graphemes.split())}\n"
                f" Graphemes: {graphemes}\n"
                f" Phonemes: {phonemes}"
            )
            display(Audio(data=audio_chunk, rate=SAMPLE_RATE))
            audio_chunks_for_text_chunk.append(audio_chunk)
    
    if text_chunk_id > 0:
        # Add silence between chunks
        silence = np.zeros(int(SILENCE_DURATION * SAMPLE_RATE), dtype=np.float32)
        audio_chunks_for_text_chunk = np.concatenate(
            [silence, np.concatenate(audio_chunks_for_text_chunk)]
        )
    else:
        audio_chunks_for_text_chunk = np.concatenate(audio_chunks_for_text_chunk)

    audio_chunks.append(audio_chunks_for_text_chunk)

final_audio = np.concatenate(audio_chunks)

### Save final audio

In [None]:
# Ensure the output directory exists
output_dir = DATA_DIR /  "output_audio/"
output_dir.mkdir(parents=True, exist_ok=True)

# Save the audio to a file
sf.write(
    file=output_dir / "HEARME_en.wav", data=final_audio, samplerate=SAMPLE_RATE
)

In [None]:
display(Audio(data=final_audio, rate=SAMPLE_RATE))