In [1]:
pip install langgraph langchain langchain_openai

Defaulting to user installation because normal site-packages is not writeable
Collecting langgraph
  Downloading langgraph-0.2.22-py3-none-any.whl.metadata (13 kB)
Collecting langchain-core<0.4,>=0.2.39 (from langgraph)
  Downloading langchain_core-0.3.2-py3-none-any.whl.metadata (6.3 kB)
Collecting langgraph-checkpoint<2.0.0,>=1.0.2 (from langgraph)
  Downloading langgraph_checkpoint-1.0.10-py3-none-any.whl.metadata (4.6 kB)
INFO: pip is looking at multiple versions of langchain to determine which version is compatible with other requirements. This could take a while.
Collecting langchain
  Downloading langchain-0.3.0-py3-none-any.whl.metadata (7.1 kB)
Collecting langchain-text-splitters<0.4.0,>=0.3.0 (from langchain)
  Downloading langchain_text_splitters-0.3.0-py3-none-any.whl.metadata (2.3 kB)
Collecting pydantic<3.0.0,>=2.7.4 (from langchain)
  Downloading pydantic-2.9.2-py3-none-any.whl.metadata (149 kB)
INFO: pip is looking at multiple versions of langchain-openai to determine w

In [3]:
import os

os.environ['AZURE_OPENAI_API_KEY'] = 'your_api_key_here'
os.environ['AZURE_OPENAI_MODEL'] = 'your_deployment_name_here'
os.environ['AZURE_OPENAI_ENDPOINT'] = 'deployment_endpoint'

In [5]:
import operator
from typing import Annotated, List, TypedDict, Optional
class Doc(TypedDict):
    id: str
    content: str
    summary: Optional[str]
    explanation: Optional[str]
    category: Optional[str]
class TaxonomyGenerationState(TypedDict):
    # The raw docs; we inject summaries within them in the first step
    documents: List[Doc]
    # Indices to be concise
    minibatches: List[List[int]]
    # Candidate Taxonomies (full trajectory)
    clusters: Annotated[List[List[dict]], operator.add]

In [8]:
import pandas as pd
from datasets import load_dataset
# Load the dataset
dataset = load_dataset("okite97/news-data")
# Access the different splits if available (e.g., train, test, validation)
train_data = dataset['train']
df = pd.DataFrame(train_data)
df = df.dropna()
df.reset_index(drop=True, inplace=True)
def run_to_doc(df: pd.DataFrame) -> Doc:
    all_data = []
    for i in range(len(df)):
        d = df.iloc[i]
        all_data.append({
            "id": i,
            "content": d['Title'] + "\\n\\n" + d['Excerpt']
        })
    
    return all_data
# Only clustering 100 documents
docs = run_to_doc(df[:100])
print(docs[0])

{'id': 0, 'content': 'Uefa Opens Proceedings against Barcelona, Juventus and Real Madrid Over European Super League Plan\\n\\nUefa has opened disciplinary proceedings against Barcelona, Juventus and Real Madrid over their involvement in the proposed European Super League.'}


In [12]:
import os
from langchain_openai import AzureChatOpenAI

model = AzureChatOpenAI(
    openai_api_version="2023-06-01-preview",
    azure_deployment="gpt-4o-2024-05-13",
    temperature=0.0
)

In [16]:
import re
import random
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableConfig, RunnableLambda, RunnablePassthrough
summary_prompt = hub.pull("wfh/tnt-llm-summary-generation").partial(
    summary_length=20, explanation_length=30
)
def parse_summary(xml_string: str) -> dict:
    summary_pattern = r"<summary>(.*?)</summary>"
    explanation_pattern = r"<explanation>(.*?)</explanation>"
    summary_match = re.search(summary_pattern, xml_string, re.DOTALL)
    explanation_match = re.search(explanation_pattern, xml_string, re.DOTALL)
    summary = summary_match.group(1).strip() if summary_match else ""
    explanation = explanation_match.group(1).strip() if explanation_match else ""
    return {"summary": summary, "explanation": explanation}
summary_llm_chain = (
    summary_prompt | model | StrOutputParser()
).with_config(run_name="GenerateSummary")
summary_chain = summary_llm_chain | parse_summary
# Input: state
# Output: state and summaries
# Processes docs in parallel
def get_content(state: TaxonomyGenerationState):
    docs = state["documents"]
    return [{"content": doc["content"]} for doc in docs]
map_step = RunnablePassthrough.assign(
    summaries=get_content
    | RunnableLambda(func=summary_chain.batch, afunc=summary_chain.abatch)
)
def reduce_summaries(combined: dict) -> TaxonomyGenerationState:
    summaries = combined["summaries"]
    documents = combined["documents"]
    return {
        "documents": [
            {
                "id": doc["id"],
                "content": doc["content"],
                "summary": summ_info["summary"],
                "explanation": summ_info["explanation"],
            }
            for doc, summ_info in zip(documents, summaries)
        ]
    }
# This is the summary node
map_reduce_chain = map_step | reduce_summaries
print(map_reduce_chain)

first=RunnableAssign(mapper={
  summaries: RunnableLambda(get_content)
             | RunnableLambda(batch)
}) middle=[] last=RunnableLambda(reduce_summaries)


In [18]:
def get_minibatches(state: TaxonomyGenerationState, config: RunnableConfig):
    batch_size = config["configurable"].get("batch_size", 200)
    original = state["documents"]
    indices = list(range(len(original)))
    random.shuffle(indices)
    if len(indices) < batch_size:
        # Don't pad needlessly if we can't fill a single batch
        return [indices]
    num_full_batches = len(indices) // batch_size
    batches = [
        indices[i * batch_size : (i + 1) * batch_size] for i in range(num_full_batches)
    ]
    leftovers = len(indices) % batch_size
    if leftovers:
        last_batch = indices[num_full_batches * batch_size :]
        elements_to_add = batch_size - leftovers
        last_batch += random.sample(indices, elements_to_add)
        batches.append(last_batch)
    return {
        "minibatches": batches,
    }

In [20]:
from typing import Dict
from langchain_core.runnables import Runnable
def parse_taxa(output_text: str) -> Dict:
    """Extract the taxonomy from the generated output."""
    cluster_pattern = r"<cluster>\\s*<id>(.*?)</id>\\s*<name>(.*?)</name>\\s*<description>(.*?)</description>\\s*</cluster>"
    cluster_matches = re.findall(cluster_pattern, output_text, re.DOTALL)
    clusters = [
        {"id": id.strip(), "name": name.strip(), "description": description.strip()}
        for id, name, description in cluster_matches
    ]
    return {"clusters": clusters}
def format_docs(docs: List[Doc]) -> str:
    xml_table = "\\n"
    for doc in docs:
        xml_table += f'{doc["summary"]}\\n'
    xml_table += ""
    return xml_table
def format_taxonomy(clusters):
    xml = "\\n"
    for label in clusters:
        xml += "  \\n"
        xml += f'    {label["id"]}\\n'
        xml += f'    {label["name"]}\\n'
        xml += f'    {label["description"]}\\n'
        xml += "  \\n"
    xml += ""
    return xml
def invoke_taxonomy_chain(
    chain: Runnable,
    state: TaxonomyGenerationState,
    config: RunnableConfig,
    mb_indices: List[int],
) -> TaxonomyGenerationState:
    configurable = config["configurable"]
    docs = state["documents"]
    minibatch = [docs[idx] for idx in mb_indices]
    data_table_xml = format_docs(minibatch)
    previous_taxonomy = state["clusters"][-1] if state["clusters"] else []
    cluster_table_xml = format_taxonomy(previous_taxonomy)
    updated_taxonomy = chain.invoke(
        {
            "data_xml": data_table_xml,
            "use_case": configurable["use_case"],
            "cluster_table_xml": cluster_table_xml,
            "suggestion_length": configurable.get("suggestion_length", 30),
            "cluster_name_length": configurable.get("cluster_name_length", 10),
            "cluster_description_length": configurable.get(
                "cluster_description_length", 30
            ),
            "explanation_length": configurable.get("explanation_length", 20),
            "max_num_clusters": configurable.get("max_num_clusters", 25),
        }
    )
    return {
        "clusters": [updated_taxonomy["clusters"]],
    }

In [22]:
# We will share an LLM for each step of the generate -> update -> review cycle
# You may want to consider using Opus or another more powerful model for this
taxonomy_generation_llm = model
## Initial generation
taxonomy_generation_prompt = hub.pull("wfh/tnt-llm-taxonomy-generation").partial(
    use_case="Generate the taxonomy that can be used to label the user intent in the conversation.",
)
taxa_gen_llm_chain = (
    taxonomy_generation_prompt | taxonomy_generation_llm | StrOutputParser()
).with_config(run_name="GenerateTaxonomy")
generate_taxonomy_chain = taxa_gen_llm_chain | parse_taxa
def generate_taxonomy(
    state: TaxonomyGenerationState, config: RunnableConfig
) -> TaxonomyGenerationState:
    return invoke_taxonomy_chain(
        generate_taxonomy_chain, state, config, state["minibatches"][0]
    )

In [23]:
taxonomy_update_prompt = hub.pull("wfh/tnt-llm-taxonomy-update")
taxa_update_llm_chain = (
    taxonomy_update_prompt | taxonomy_generation_llm | StrOutputParser()
).with_config(run_name="UpdateTaxonomy")
update_taxonomy_chain = taxa_update_llm_chain | parse_taxa
def update_taxonomy(
    state: TaxonomyGenerationState, config: RunnableConfig
) -> TaxonomyGenerationState:
    which_mb = len(state["clusters"]) % len(state["minibatches"])
    return invoke_taxonomy_chain(
        update_taxonomy_chain, state, config, state["minibatches"][which_mb]
    )

In [24]:
taxonomy_review_prompt = hub.pull("wfh/tnt-llm-taxonomy-review")
taxa_review_llm_chain = (
    taxonomy_review_prompt | taxonomy_generation_llm | StrOutputParser()
).with_config(run_name="ReviewTaxonomy")
review_taxonomy_chain = taxa_review_llm_chain | parse_taxa
def review_taxonomy(
    state: TaxonomyGenerationState, config: RunnableConfig
) -> TaxonomyGenerationState:
    batch_size = config["configurable"].get("batch_size", 200)
    original = state["documents"]
    indices = list(range(len(original)))
    random.shuffle(indices)
    return invoke_taxonomy_chain(
        review_taxonomy_chain, state, config, indices[:batch_size]
    )

In [25]:
from langgraph.graph import StateGraph
graph = StateGraph(TaxonomyGenerationState)
graph.add_node("summarize", map_reduce_chain)
graph.add_node("get_minibatches", get_minibatches)
graph.add_node("generate_taxonomy", generate_taxonomy)
graph.add_node("update_taxonomy", update_taxonomy)
graph.add_node("review_taxonomy", review_taxonomy)
graph.add_edge("summarize", "get_minibatches")
graph.add_edge("get_minibatches", "generate_taxonomy")
graph.add_edge("generate_taxonomy", "update_taxonomy")
def should_review(state: TaxonomyGenerationState) -> str:
    num_minibatches = len(state["minibatches"])
    num_revisions = len(state["clusters"])
    if num_revisions < num_minibatches:
        return "update_taxonomy"
    return "review_taxonomy"
graph.add_conditional_edges(
    "update_taxonomy",
    should_review,
    # Optional (but required for the diagram to be drawn correctly below)
    {"update_taxonomy": "update_taxonomy", "review_taxonomy": "review_taxonomy"},
)
graph.set_finish_point("review_taxonomy")
graph.set_entry_point("summarize")
app = graph.compile()

In [27]:
use_case = (
    "Generate the taxonomy that can be used to label the news article that would benefit the user."
)
try:
    stream = app.stream(
        {"documents": docs},
        {
            "configurable": {
                "use_case": use_case,
                # Optional:
                "batch_size": 10,
                "suggestion_length": 30,
                "cluster_name_length": 10,
                "cluster_description_length": 30,
                "explanation_length": 20,
                "max_num_clusters": 25,
            },
            "max_concurrency": 2,
            "recursion_limit": 50,
        },
    )
    for step in stream:
        node, state = next(iter(step.items()))
        print(node, str(state)[:20] + " ...")
    from IPython.display import Markdown
    def format_taxonomy_md(clusters):
        md = "## Final Taxonomy\\n\\n"
        md += "| ID | Name | Description |\\n"
        md += "|----|------|-------------|\\n"
        # Iterate over each inner list of dictionaries
        for cluster_list in clusters:
            for label in cluster_list:
                id = label["id"]
                name = label["name"].replace("|", "\\\\|")  # Escape any pipe characters within the content
                description = label["description"].replace("|", "\\\\|")  # Escape any pipe characters
                md += f"| {id} | {name} | {description} |\\n"
        return md
    markdown_table = format_taxonomy_md(step['review_taxonomy']['clusters'])
    display(Markdown(markdown_table))
except Exception as e:
    print(f"An error occurred: {e}")

An error occurred: Connection error.
