In [1]:
import asyncio
import json
from io import StringIO
from dotenv import load_dotenv
from typing import Any, Dict, List, TypedDict
import pandas as pd

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langchain.text_splitter import RecursiveCharacterTextSplitter

from prompt_templates import name_extraction_template, name_translation_template
from utilities import batch_list, extract_csv, extract_json

load_dotenv() # Loading the API key from a .env file.

True

In [2]:
llm = ChatOpenAI(
    model="gpt-5" # Cost and latency are not that much of a factor at this text size, hence the choice of a model. 
    )

### Text Processing

- The book is split into c.a. 40 chunks using recursive splitting.
- The paragraphs of the text are relatively short and the chunk size chosen preserves them.

In [3]:
with open("text_clean.txt", "r") as file:
  text = file.read()

text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n\n", "\n\n", "\n", "."],
    chunk_size=2_500,
    chunk_overlap=0,
    length_function=len,
)

chunks = text_splitter.split_text(text)
print(f"Number of text chunks: {len(chunks)}")

Number of text chunks: 39


### Name Extraction and Initial Translation

 - Each chunk passes through chunk_app.
 - An LLM extracts the names of people, peoples and places and makes an initial translation.
 - The output for each paragraph is converted into tabular format for an easier review by a human later.
 - The chain is not executed directly, it is component of global_app, which executes chunk_app in parallel.
 - global_app processes text chunks in batches via chunk_app.
 - The translations are aggregated into a single DataFrame.

In [4]:
class ChunkState(TypedDict):
    text: str
    extracted_json: dict
    translation_csv: str
    parsed_table: pd.DataFrame

async def extract_names(state: ChunkState) -> Dict[str, dict]:
    prompt = name_extraction_template.format_messages(text=state["text"])
    response = await llm.ainvoke(prompt)
    extracted = extract_json(response.content)
    return {"extracted_json": extracted}

async def translate_names(state: ChunkState) -> Dict[str, str]:
    names_json = json.dumps(state["extracted_json"], ensure_ascii=False, indent=2)
    prompt = name_translation_template.format_messages(names_in_english=names_json)
    response = await llm.ainvoke(prompt)
    return {"translation_csv": response.content}

async def parse_translation(state: ChunkState) -> Dict[str, pd.DataFrame]:
    if "csv" in state["translation_csv"]: # The API response will occasionally start with "```csv", that's handled here.
        csv_string = extract_csv(state["translation_csv"])
    else:
        csv_string = state["translation_csv"]

    df = pd.read_csv(StringIO(csv_string))
    return {"parsed_table": df}

chunk_graph = StateGraph(ChunkState)

chunk_graph.add_node("extract_names", extract_names)
chunk_graph.add_node("translate_names", translate_names)
chunk_graph.add_node("parse_translation", parse_translation)

chunk_graph.add_edge(START, "extract_names")
chunk_graph.add_edge("extract_names", "translate_names")
chunk_graph.add_edge("translate_names", "parse_translation")
chunk_graph.add_edge("parse_translation", END)

chunk_app = chunk_graph.compile()

In [5]:
class GlobalState(TypedDict):
    texts: List[str]
    chunk_results: List[ChunkState]
    aggregated: pd.DataFrame

async def aggregate_results(state: GlobalState) -> Dict[str, pd.DataFrame]:
    dfs = [chunk["parsed_table"] for chunk in state["chunk_results"]]
    merged_df = pd.concat(dfs, ignore_index=True)
    return {"aggregated": merged_df}

async def run_chunk_pipeline(state: GlobalState, batch_size: int = 10) -> Dict[str, list]:
    all_results = []
    for batch in batch_list(state["texts"], batch_size):
        tasks = [chunk_app.ainvoke({"text": t}) for t in batch]
        results = await asyncio.gather(*tasks)
        all_results.extend(results)
    return {"chunk_results": all_results}

async def process_chunks(state: GlobalState) -> Dict[str, list]:
    return await run_chunk_pipeline(state, batch_size=10)

# Build graph
global_graph = StateGraph(GlobalState)
global_graph.add_node("process_chunks", process_chunks)
global_graph.add_node("aggregate", aggregate_results)

global_graph.add_edge(START, "process_chunks")
global_graph.add_edge("process_chunks", "aggregate")
global_graph.add_edge("aggregate", END)

global_app = global_graph.compile()

In [6]:
# Graph execution.
names_translation = await global_app.ainvoke({"texts": chunks})

In [7]:
# Final DataFrame of all translations
df = names_translation["aggregated"]
df.drop_duplicates(subset="Name in English", inplace=True) #removing duplicated names
print(f"Names extracted: {df.shape[0]}")
df.sort_values("Name in English").head()

Names extracted: 174


Unnamed: 0,Name in English,Name in Bulgarian,Category,Translation type
244,AEstyan nations,Естийски народи,names_of_peoples,translation
28,Abnoba,Абноба,geographical_references,transliteration
44,Africa,Африка,geographical_references,translation
4,Agricola,Агрикола,names_of_people,transliteration
213,Alcis,Алкис,names_of_people,transliteration


In [8]:
# Saving the results in an excel file for audit.
df.to_excel("names_translation_unaudited.xlsx", index=False)