One key idea in Microsoft’s [Graph RAG for Summarization](https://www.microsoft.com/en-us/research/publication/from-local-to-global-a-graph-rag-approach-to-query-focused-summarization/) is to group content by community prior to summarizing. This technique may be applied without storing everything as a knowledge graph. In this post we’ll demonstrate a generic LangChain summarizer which returns community summaries based on extracted links between documents. We'll apply to this retrieval results both with and without persisted links. The same techniques could be applied to summarize communities in large documents during indexing without building an explicit graph.

# Background: From Local to Global

In [“From Local to Global: A Graph RAG Approach to Query-Focused Summarization”](https://www.microsoft.com/en-us/research/publication/from-local-to-global-a-graph-rag-approach-to-query-focused-summarization/), Microsoft presents a technique for answering queries over an entire dataset. The idea boils down to extracting an entity knowledge graph from the documents and pre-generating community summaries for groups of closely related entities. The community summaries are then used for generating initial per-community responses to a question which are then combined (hierarchically) to generate the final answer.

The use of communities for summarization has wider applicability than just a starting point for global summarization. Anytime the context exceeds the maximum length supported by the LLM, summarization may be necessary. Similarly, anytime the content to be summarized exceeds the content length it will need to be done on some subgroups. In both of these cases, grouping the content into communities first is likely to improve the results, since each of the initial calls receives information related to a specific topic (based on the community).

Consider a case where you have 10 documents about topic A and 10 documents about topic B. If we divide this into groups of 10 documents, we could evenly divide these – including 5 documents from topic A and 5 documents from topic B in each group. In this case, there is a risk the generated summary only addresses one of the topics. Even if it doesn’t, we’re likely generating output pertaining to each topic multiple times. If we instead group the documents by topic, we generate a summary of everything related to A and a summary of everything related to B (which should have minimal overlap) and then a combined summary. This is the benefit of community summarization.


# Environment / Dependencies

In [1]:
#@ Install modules
%pip install -U -r requirements.txt

Collecting langgraph (from -r requirements.txt (line 3))
  Downloading langgraph-0.2.22-py3-none-any.whl.metadata (13 kB)
INFO: pip is looking at multiple versions of langgraph to determine which version is compatible with other requirements. This could take a while.
  Using cached langgraph-0.2.21-py3-none-any.whl.metadata (13 kB)
  Using cached langgraph-0.2.20-py3-none-any.whl.metadata (13 kB)
  Using cached langgraph-0.2.19-py3-none-any.whl.metadata (13 kB)
  Using cached langgraph-0.2.18-py3-none-any.whl.metadata (13 kB)
  Using cached langgraph-0.2.17-py3-none-any.whl.metadata (13 kB)
Note: you may need to restart the kernel to use updated packages.


In [2]:
#@ Configure import paths.
import sys
sys.path.append("../../")

# Initialize environment variables.
from utils import initialize_environment
initialize_environment()

In [3]:
#@ Create GraphVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_community.graph_vectorstores.cassandra import CassandraGraphVectorStore
import cassio

cassio.init(auto=True)
store = CassandraGraphVectorStore(
    embedding = OpenAIEmbeddings(),
    node_table="summarize",
    insert_timeout = 1000.0,
)

In [None]:
from cassio.config import check_resolve_session, check_resolve_keyspace
session = check_resolve_session()
keyspace = check_resolve_keyspace()


session.execute(f"TRUNCATE TABLE {keyspace}.summarize;")

## Load Data
The following loads Wikipedia data from the [2wikimultihop](https://github.com/Alab-NII/2wikimultihop) dataset. To execute it, you will need to download [para_with_hyperlink.zip](https://www.dropbox.com/s/wlhw26kik59wbh8/para_with_hyperlink.zip) to the `wikimultihop` directory. The code is setup to automatically resume from where it previously made in case you encounter a transient failure and need to restart. On my machine it takes about 2.5 hours.

In [5]:
#@ Load Data Into the Graph VectorStore

if input("load data (y/N): ").lower() == "y":
    print("Loading data...")
    from datasets.wikimultihop.load import load_2wikimultihop
    load_2wikimultihop(store)
else:
    print("Skipped loading data")

Loading data...
Resuming loading with 4972 completed, 1018 remaining


100%|██████████| 1018/1018 [20:20<00:00,  1.20s/it]


# Local Community Summarization, In Memory

Detecting communities can be done easily using a library such as `networkx`. In the following code, LangChain `Documents` are turned into nodes, with edges based on their `link` properties. This allows the documents to be grouped by community for accurate summarization.

The following also includes a `CommunitySummarizer` which can be used to automatically extract links from documents (based on keywords, named entities, links, etc.) as part of building the graph. This allows using the community summarization even on chunks that don't already contain links.

In [6]:
from langchain_core.documents import Document
from langchain_core.graph_vectorstores.links import get_links, copy_with_links
from langchain_core.runnables import Runnable
from langchain_community.graph_vectorstores.extractors.link_extractor import (
    LinkExtractor,
)
from typing import Any, Dict, Iterable, List, Set, Tuple
import networkx as nx

def _best_communities(graph: nx.DiGraph) -> Tuple[Set[str]] | None:
    """Compute the best communities.

    Iteratively applies Girvan-Newman algorithm as long as the modularity improves.

    Returns:
        The communities from the last iteration of the Girvan-Newman algorithm.
    """

    # TODO: Also continue running until the size of communities is below
    # a specified threshold?

    best_modularity = float("-inf")
    best_communities = None
    for new_communities in nx.algorithms.community.girvan_newman(graph):
        new_modularity = nx.algorithms.community.modularity(graph, new_communities)
        if new_modularity > best_modularity:
            best_modularity = new_modularity
            best_communities = new_communities
        else:
            break
    return best_communities

def group_by_community(documents: Iterable[Document]) -> List[List[Document]]:
    """Group documents by community inferred from the links."""

    graph = nx.DiGraph()

    # First pass -- map from tag to noed IDs with that incoming.
    documents_by_id = {}
    documents_by_incoming = {}
    for document in documents:
        # Add the node to the graph
        graph.add_node(document.id)
        documents_by_id[document.id] = document

        # Record the incoming edges.
        for link in get_links(document):
            if link.direction == "in" or link.direction == "bidir":
                documents_by_incoming.setdefault((link.kind, link.tag), set()).add(document.id)


    # Second pass -- add edges for each outgoing edge.
    for document in documents_by_id.values():
        for link in get_links(document):
            if link.direction == "out" or link.direction == "bidir":
                for target in documents_by_incoming.get((link.kind, link.tag), set()):
                    graph.add_edge(document.id, target)

    # Find communities and output documents grouped by community.
    # The algorithm returns an iterator over iterations.
    # Iterate until the modularity no longer increases.
    return [
        [documents_by_id[id] for id in community]
        for community in _best_communities(graph)
    ]

class CommunitySummarizer:
  def __init__(self,
               summarize: Runnable[Dict[str, Any], Any],
               *,
               link_extractors: Tuple[LinkExtractor[Document]] = ()) -> None:
    """Create a community summarizer.

    Parameters:
      summarize: Chain to use for summarization. Must accept documents to
          be summarized in the `"context"` key. Other arguments may be passed
          to the summarizaiton chain when `summarize` is invoked.
      link_extractors: If specified, links will be added to input documents
          using the link extractors. If empty, only the links already present
          in the documents will be used.
    """
    self.link_extractors = link_extractors

  def summarize(self,
                documents: List[Document],
                *,
                summarize_dict: Dict[str, Any] = {}) -> Tuple[List[Document], List[Document]]:
    """Return the summaries of the communities in the given documents.

    If `link_extractors` were specified when constructed, each documuent
    in `documents` will have links added based on the selected extractors.

    Parameters:
      documents: The documents to summarize.
      summarize_dict: Dictionary containing additional key/value pairs to
        pass to the summarizaiton chain.

    Returns:
      The a tuple containing the original documents (with additional links, if any) and
      the list of community summaries.
    """

    # If necessary, run the link extractors to add links.
    if self.link_extractors:
        # Run each extractor over all documents.
        links_per_extractor = [e.extract_many(documents) for e in self.link_extractors]

        # Transpose the list of lists to pair each document with the tuple of links.
        links_per_document = zip(*links_per_extractor)

        documents = [
            copy_with_links(document, *links)
            for document, links in zip(documents, links_per_document)
        ]

    # Generate the communities
    communities = group_by_community(documents)

    # Summarize each community.
    summaries = [self.summarize.invoke({**summarize_dict, "context": community }) for community in communities]

    return (documents, summaries)

# Example: Retrieve Chunks and Summarize Communities

Incorporating summarization into a RAG pipeline is useful when there may be a large number of relevant chunks. In this example, we'll retrieve a large number of chunks based on the question and then group them into communities to be summarized.

Since this happens "late" -- after the question is known -- we're able to use that to ask the summaries be generated in a way that focuses on information relevant to the question. We can then combine the summaries hierarchically to produce the final answer.

The implementation uses LangGraph and is based on [this LangChain example showing how to orchestrate summarization using LangGraph](https://python.langchain.com/v0.2/docs/tutorials/summarization/#orchestration-via-langgraph).

In [7]:
#@ Create the retriever.
# For summarization, we use a higher `k` so we have more chunks.
retriever = store.as_retriever(
    search_type = "mmr_traversal",
    search_kwargs = {
        "k": 20,
        "fetch_k": 50,
        "depth": 2,
        # "score_threshold": 0.2,
    },
)

In [8]:
#@ Community Summarization using LangGraph

import operator
from typing import Annotated, List, Literal, TypedDict

from langchain.chains.combine_documents.reduce import (
    acollapse_docs,
    split_list_of_docs,
)
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI
from langgraph.constants import Send
from langgraph.graph import END, START, StateGraph

token_max = 1000

llm = ChatOpenAI(model="gpt-4o", temperature=0)


def length_function(documents: List[Document]) -> int:
    """Get number of tokens for input contents."""
    return sum(llm.get_num_tokens(doc.page_content) for doc in documents)


# This will be the overall state of the main graph.
# It will contain the input document contents, corresponding
# summaries, and a final summary.
class OverallState(TypedDict):
    # Notice here we use the operator.add
    # This is because we want combine all the summaries we generate
    # from individual nodes back into one list - this is essentially
    # the "reduce" part
    question: str
    communities: List[List[Document]]
    summaries: Annotated[list, operator.add]
    collapsed_summaries: List[Document]
    final_summary: str


# This will be the state of the node that we will "map" all
# documents to in order to generate summaries
class SummaryState(TypedDict):
    content: List[str]
    question: str

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

map_template = """
The following is a set of documents:
{content}

First, describe the common topic in the documents.

Then, write a concise summary of that topic based on the documents related to the question:
{question}
"""

map_prompt = ChatPromptTemplate.from_messages(
    [("human", map_template)]
)

map_chain = map_prompt | llm | StrOutputParser()

# Here we generate a summary, given a document
async def generate_summary(state: SummaryState):
    response = await map_chain.ainvoke(state)
    return {"summaries": [response]}


# Here we define the logic to map out over the documents
# We will use this an edge in the graph
def map_summaries(state: OverallState):
    # We will return a list of `Send` objects
    # Each `Send` object consists of the name of a node in the graph
    # as well as the state to send to that node
    return [
        Send("generate_summary", {"content": [doc.page_content for doc in community],
                                  "question": state["question"]})
        for community in state["communities"]
    ]


def collect_summaries(state: OverallState):
    return {
        "collapsed_summaries": [Document(summary) for summary in state["summaries"]]
    }

reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes answering the following question:
{question}
"""

reduce_prompt = ChatPromptTemplate([("human", reduce_template)])

reduce_chain = reduce_prompt | llm | StrOutputParser()

# Add node to collapse summaries
async def collapse_summaries(state: OverallState):
    doc_lists = split_list_of_docs(
        state["collapsed_summaries"], length_function, token_max
    )
    results = []
    for doc_list in doc_lists:
        results.append(await acollapse_docs(doc_list,
                                            lambda docs: reduce_chain.ainvoke({
                                                "docs": docs,
                                                "question": state["question"],
                                            })))

    return {"collapsed_summaries": results}


# This represents a conditional edge in the graph that determines
# if we should collapse the summaries or not
def should_collapse(
    state: OverallState,
) -> Literal["collapse_summaries", "generate_final_summary"]:
    num_tokens = length_function(state["collapsed_summaries"])
    if num_tokens > token_max:
        return "collapse_summaries"
    else:
        return "generate_final_summary"


# Here we will generate the final summary
async def generate_final_summary(state: OverallState):
    response = await reduce_chain.ainvoke({
        "docs": state["collapsed_summaries"],
        "question": state["question"],
    })
    return {"final_summary": response}


# Construct the graph
# Nodes:
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary)
graph.add_node("collect_summaries", collect_summaries)
graph.add_node("collapse_summaries", collapse_summaries)
graph.add_node("generate_final_summary", generate_final_summary)

# Edges:
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "collect_summaries")
graph.add_conditional_edges("collect_summaries", should_collapse)
graph.add_conditional_edges("collapse_summaries", should_collapse)
graph.add_edge("generate_final_summary", END)

community_summarizer = graph.compile()

from operator import itemgetter
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
summary_retriever = {
    "question": RunnablePassthrough(),
    "communities": RunnablePassthrough() | retriever | RunnableLambda(group_by_community),
    } | community_summarizer | itemgetter("final_summary")


In [10]:
await summary_retriever.ainvoke("Tell me about Russia")

'Russia, the largest country in the world, is known for its vast and diverse environment, rich history, and significant cultural and geopolitical influence. The landscape features expansive forests, tundras, and mountain ranges, with climates ranging from the frigid Arctic in the north to more temperate zones in the south. Russia is abundant in natural resources like oil, natural gas, and minerals but faces environmental challenges such as pollution, deforestation, and climate change.\n\nHistorically, Russia has seen significant territorial expansion, especially during the Tsardom and Soviet eras, becoming a multi-ethnic state. The Soviet Union (1922-1991) was a major global power, playing crucial roles in World War II and the Cold War, and achieving technological milestones like launching the first human-made satellite and sending the first humans into space.\n\nCulturally, Russia is deeply influenced by the Russian language and Orthodox Christianity, with roots tracing back to Kievan

# Conclusion
Summarization is useful in a variety of places beyond requests to summarize a dataset. During indexing, summaries can be persisted and retrieved separately or used to filter retrieved content to documents whose summary is closest to the question. Summaries can also be used for computing other embeddings -- for instance, a document could be subdivided into chunks and communities, with the summary of each community being used to create an embedding for a multi-vector retriever case. During retrieval, if the retrieved content is too large it can be useful to summarize the retrieval results and use the summary to answer the question.

In this post we demonstrated the application of community-based summarization to the graph of links between documents. This is an easy way to increase the quality of summarization steps in your LangChain applications, and can be used with or without a `GraphVectorStore`. It is possible to extract links from any document chunk – for instance, based on the links and/or structure of the documents – and use those links to generate and summarize communities. This provides the benefits of local community summarization without needing to extract a knowledge graph.