In [1]:
from datasets import load_dataset

# Stream the dataset to avoid loading everything in memory
ds = load_dataset("eloukas/edgar-corpus", "full", split="train")

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# it never holds the whole dataset in memory, and the gzip keeps the file small.
import json, gzip, os

AIG_CIK = "0000005272"   # AIG
OUT_PATH = "aig_edgar.jsonl.gz"  # compact on-disk buffer for Spark

# # Stream the full split; no huge RAM spikes
# ds = load_dataset("eloukas/edgar-corpus", "full", split="train", streaming=True)

# Write only matching rows to newline-delimited JSON (gzipped)
count = 0
with gzip.open(OUT_PATH, "wt", encoding="utf-8") as f:
    for row in ds:
        # rows have keys like: filename, cik, year, section_1, section_1A, ...
        if str(row.get("cik", "")).zfill(10) == AIG_CIK:
            f.write(json.dumps(row, ensure_ascii=False) + "\n")
            count += 1

print(f"Wrote {count} AIG rows to {OUT_PATH}")

Wrote 22 AIG rows to aig_edgar.jsonl.gz


In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("AIG-EDGAR")
    # tweak as you like; driver mem helps if you inspect a lot at once
    .config("spark.driver.memory", "6g")
    .getOrCreate()
)

# Read the gzipped JSONL directly
aig_df = spark.read.json(OUT_PATH)

# (Optional) normalize CIK to 10-digit string for consistency
from pyspark.sql.functions import lpad, col
aig_df = aig_df.withColumn("cik", lpad(col("cik").cast("string"), 10, "0"))

# Inspect a few rows
aig_df.select("filename", "cik", "year").show(10, truncate=False)

# Persist to Parquet (columnar, splittable, great for Spark)
PARQUET_DIR = "parquet_aig_edgar"
aig_df.write.mode("overwrite").parquet(PARQUET_DIR)

print(f"Saved AIG subset to {PARQUET_DIR}")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/08 20:23:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-------------+----------+----+
|filename     |cik       |year|
+-------------+----------+----+
|5272_1994.txt|0000005272|1994|
|5272_1995.txt|0000005272|1995|
|5272_1998.txt|0000005272|1998|
|5272_1999.txt|0000005272|1999|
|5272_2000.txt|0000005272|2000|
|5272_2001.txt|0000005272|2001|
|5272_2003.htm|0000005272|2003|
|5272_2004.htm|0000005272|2004|
|5272_2005.htm|0000005272|2005|
|5272_2006.htm|0000005272|2006|
+-------------+----------+----+
only showing top 10 rows
Saved AIG subset to parquet_aig_edgar


                                                                                

In [4]:
aig_df.toPandas().head(2)

Unnamed: 0,cik,filename,section_1,section_10,section_11,section_12,section_13,section_14,section_15,section_1A,...,section_4,section_5,section_6,section_7,section_7A,section_8,section_9,section_9A,section_9B,year
0,5272,5272_1994.txt,ITEM 1. BUSINESS\nAmerican International Group...,ITEM 10. DIRECTORS AND EXECUTIVE OFFICERS OF T...,ITEM 11. EXECUTIVE COMPENSATION\nThis item is ...,ITEM 12. SECURITY OWNERSHIP OF CERTAIN BENEFIC...,ITEM 13. CERTAIN RELATIONSHIPS AND RELATED TRA...,"ITEM 14. EXHIBITS, FINANCIAL STATEMENT SCHEDUL...",,,...,ITEM 4. SUBMISSION OF MATTERS TO A VOTE OF SEC...,ITEM 5. MARKET FOR THE REGISTRANT'S COMMON STO...,ITEM 6. SELECTED FINANCIAL DATA\nAMERICAN INTE...,ITEM 7. MANAGEMENT'S DISCUSSION AND ANALYSIS O...,,ITEM 8. FINANCIAL STATEMENTS AND SUPPLEMENTARY...,ITEM 9. CHANGES IN AND DISAGREEMENTS WITH ACCO...,,,1994
1,5272,5272_1995.txt,ITEM 1. BUSINESS\nAmerican International Group...,ITEM 10. DIRECTORS AND EXECUTIVE OFFICERS OF T...,ITEM 11. EXECUTIVE COMPENSATION\nThis item is ...,ITEM 12. SECURITY OWNERSHIP OF CERTAIN BENEFIC...,ITEM 13. CERTAIN RELATIONSHIPS AND RELATED TRA...,"ITEM 14. EXHIBITS, FINANCIAL STATEMENT SCHEDUL...",,,...,ITEM 4. SUBMISSION OF MATTERS TO A VOTE OF SEC...,ITEM 5. MARKET FOR THE REGISTRANT'S COMMON STO...,ITEM 6. SELECTED FINANCIAL DATA AMERICAN INTER...,ITEM 7. MANAGEMENT'S DISCUSSION AND ANALYSIS O...,,ITEM 8. Financial Statements and Supplementary...,ITEM 9. CHANGES IN AND DISAGREEMENTS WITH ACCO...,,,1995


In [15]:
import os
import google.generativeai as genai
api_key = "AIzaSyDz3kL0XL7QogHsDPh_g596Raj2CbpyMmQ"
os.environ["GOOGLE_API_KEY"] = api_key # or set in your shell
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])

In [16]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
dim = len(embeddings.embed_query("dimension probe"))
print("Embedding dimension:", dim)

Embedding dimension: 768


In [17]:
# from langchain_community.embeddings import HuggingFaceEmbeddings
# embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
# dim = len(embeddings.embed_query("dimension probe"))
# print("Embedding dimension:", dim)

In [32]:
df = aig_df.toPandas()

                                                                                

In [34]:
df.head(2)

Unnamed: 0,cik,filename,section_1,section_10,section_11,section_12,section_13,section_14,section_15,section_1A,...,section_4,section_5,section_6,section_7,section_7A,section_8,section_9,section_9A,section_9B,year
0,5272,5272_1994.txt,ITEM 1. BUSINESS\nAmerican International Group...,ITEM 10. DIRECTORS AND EXECUTIVE OFFICERS OF T...,ITEM 11. EXECUTIVE COMPENSATION\nThis item is ...,ITEM 12. SECURITY OWNERSHIP OF CERTAIN BENEFIC...,ITEM 13. CERTAIN RELATIONSHIPS AND RELATED TRA...,"ITEM 14. EXHIBITS, FINANCIAL STATEMENT SCHEDUL...",,,...,ITEM 4. SUBMISSION OF MATTERS TO A VOTE OF SEC...,ITEM 5. MARKET FOR THE REGISTRANT'S COMMON STO...,ITEM 6. SELECTED FINANCIAL DATA\nAMERICAN INTE...,ITEM 7. MANAGEMENT'S DISCUSSION AND ANALYSIS O...,,ITEM 8. FINANCIAL STATEMENTS AND SUPPLEMENTARY...,ITEM 9. CHANGES IN AND DISAGREEMENTS WITH ACCO...,,,1994
1,5272,5272_1995.txt,ITEM 1. BUSINESS\nAmerican International Group...,ITEM 10. DIRECTORS AND EXECUTIVE OFFICERS OF T...,ITEM 11. EXECUTIVE COMPENSATION\nThis item is ...,ITEM 12. SECURITY OWNERSHIP OF CERTAIN BENEFIC...,ITEM 13. CERTAIN RELATIONSHIPS AND RELATED TRA...,"ITEM 14. EXHIBITS, FINANCIAL STATEMENT SCHEDUL...",,,...,ITEM 4. SUBMISSION OF MATTERS TO A VOTE OF SEC...,ITEM 5. MARKET FOR THE REGISTRANT'S COMMON STO...,ITEM 6. SELECTED FINANCIAL DATA AMERICAN INTER...,ITEM 7. MANAGEMENT'S DISCUSSION AND ANALYSIS O...,,ITEM 8. Financial Statements and Supplementary...,ITEM 9. CHANGES IN AND DISAGREEMENTS WITH ACCO...,,,1995


In [36]:
from typing import List

# Identify section columns dynamically
section_cols: List[str] = [c for c in df.columns if c.startswith("section_")]

# Build parent docs (one per non-empty section cell)
parents: List[Document] = []
parent_ids: List[str] = []

for row in df.itertuples(index=False):
    base_meta = {
        "filename": getattr(row, "filename"),
        "cik": getattr(row, "cik"),
        "year": getattr(row, "year"),
    }
    for sec in section_cols:
        content = getattr(row, sec)
        if not content or not str(content).strip():
            continue

        parent_id = f"{base_meta['filename']}#{sec}"   # stable per (file, section)
        doc = Document(
            page_content=str(content),
            metadata={
                **base_meta,
                "section": sec,
                "parent_id": parent_id,
            },
        )
        parents.append(doc)
        parent_ids.append(parent_id)

print(f"Prepared {len(parents)} parent docs across {len(section_cols)} sections.")

Prepared 378 parent docs across 20 sections.


In [37]:
index = faiss.IndexFlatIP(dim)  # inner product (works as cosine with normalized vectors)
child_docstore = InMemoryDocstore()  # FAISS keeps its own docstore for children
vectorstore = FAISS(
    embedding_function=embeddings,
    index=index,
    docstore=child_docstore,
    index_to_docstore_id={},  # filled as we add children
)

In [38]:
# Docstore for PARENTS
parent_docstore = InMemoryStore()

# This text splitter is used to create the parent documents
parent_splitter = RecursiveCharacterTextSplitter(
    chunk_size=2000,
    separators=["\n\n", "\n", " ", ""],
)

# Splitter for children (tune sizes for your data)
child_splitter = RecursiveCharacterTextSplitter(
    chunk_size=400,
    separators=["\n\n", "\n", " ", ""],
)

In [46]:
# Build the ParentDocumentRetriever
retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,       # children live here
    docstore=parent_docstore,      # parents live here
    child_splitter=child_splitter, # how to chunk parents into children
    # parent_splitter=parent_splitter,
    search_kwargs={"k": 6},        # tune recall
)


In [47]:
len(parent_ids), len(parents)

(378, 378)

In [48]:
parents[1]

Document(metadata={'filename': '5272_1994.txt', 'cik': '0000005272', 'year': '1994', 'section': 'section_10', 'parent_id': '5272_1994.txt#section_10'}, page_content='ITEM 10. DIRECTORS AND EXECUTIVE OFFICERS OF THE REGISTRANT\nExcept for the information provided in Part I under the heading "Directors and Executive Officers of the Registrant", this item is omitted because a definitive proxy statement which involves the election of directors will be filed with the Securities and Exchange Commission not later than 120 days after the close of the fiscal year pursuant to Regulation 14A.\nITEM 11.')

In [49]:
# Add parent docs; c`d and indexed
retriever.add_documents(parents, ids=parent_ids)
print("Loaded parents and auto-chunked children into FAISS.")

Loaded parents and auto-chunked children into FAISS.


In [50]:
child_store_dict = vectorstore.docstore._dict  # {child_id: Document}
example_items = list(child_store_dict.items())[:5]
for child_id, child_doc in example_items:
    print({
        "child_id": child_id,
        "parent_id": child_doc.metadata.get("doc_id"),  # parent_id you provided above
        "section": child_doc.metadata.get("section"),   # often propagated, but not guaranteed
        "filename": child_doc.metadata.get("filename"),
    })

{'child_id': 'ab88a0fb-d5e7-4bdb-a0c4-600cd67e5d89', 'parent_id': '5272_1994.txt#section_1', 'section': 'section_1', 'filename': '5272_1994.txt'}
{'child_id': 'df803804-b52c-4435-9e90-27a4064f4757', 'parent_id': '5272_1994.txt#section_1', 'section': 'section_1', 'filename': '5272_1994.txt'}
{'child_id': '2cd05536-2290-4639-925f-e343b31dac2b', 'parent_id': '5272_1994.txt#section_1', 'section': 'section_1', 'filename': '5272_1994.txt'}
{'child_id': 'e24b2e88-2e7b-4d4e-9803-69e015da1d22', 'parent_id': '5272_1994.txt#section_1', 'section': 'section_1', 'filename': '5272_1994.txt'}
{'child_id': '6dd7fb5d-7aaa-4249-8146-460d5c841c65', 'parent_id': '5272_1994.txt#section_1', 'section': 'section_1', 'filename': '5272_1994.txt'}


In [51]:
for cid, cdoc in child_store_dict.items():
    if "filename" not in cdoc.metadata or "section" not in cdoc.metadata:
        # Look up the parent to copy its metadata
        parent_id = cdoc.metadata.get("doc_id")
        parent_doc = parent_docstore.search(parent_id)
        if parent_doc is not None:
            cdoc.metadata.setdefault("filename", parent_doc.metadata.get("filename"))
            cdoc.metadata.setdefault("section", parent_doc.metadata.get("section"))
            cdoc.metadata.setdefault("cik", parent_doc.metadata.get("cik"))
            cdoc.metadata.setdefault("year", parent_doc.metadata.get("year"))

In [54]:
# -------------------------------------------------------------------
# Retrieval example (returns full parent docs for the matching child chunks)
query = "what is the declared a cash dividend"
results = retriever.get_relevant_documents(query)
print(f"\nTop {len(results)} parent results for: {query!r}")
for i, d in enumerate(results, 1):
    print(f"\n[{i}] parent_id={d.metadata.get('parent_id')} | file={d.metadata.get('filename')} | section={d.metadata.get('section')}")
    print(d.page_content, "...")


Top 6 parent results for: 'what is the declared a cash dividend'

[1] parent_id=5272_2016.htm#section_7 | file=5272_2016.htm | section=section_7
ITEM 7 | Use of Non-GAAP Measures
comparisons with our insurance competitors. When we use these measures, reconciliations to the most comparable GAAP measure are provided on a consolidated basis in the Results of Operations section of this MD&A.
Operating revenues exclude Net realized capital gains (losses), income from non-operating litigation settlements (included in Other income for GAAP purposes) and changes in fair value of securities used to hedge guaranteed living benefits (included in Net investment income for GAAP purposes). Operating revenues are a GAAP measure for our operating segments.
Pre-tax operating income is derived by excluding the following items from income from continuing operations before income tax. This definition is consistent across our modules (including geography). These items generally fall into one or more of th

In [18]:
# --- Spark: filter to AIG, normalize, and persist ---
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lpad, trim, length, expr, concat_ws

# --- Build parent docs (Spark side) ---
# 1) discover all section_* columns
section_cols = [c for c in aig_df.columns if c.startswith("section_")]
if not section_cols:
    raise ValueError("No section_* columns in AIG DF.")

In [19]:
# 2) unpivot sections -> (section, content) using stack()
stack_expr = f"stack({len(section_cols)}, " + ", ".join([f"'{c}', `{c}`" for c in section_cols]) + ") as (section, content)"

long_df = aig_df.select("filename", "cik", "year", expr(stack_expr))
parents_df = (
    long_df
    .where(col("content").isNotNull() & (length(trim(col("content"))) > 0))
    .withColumn("parent_id", concat_ws("#", col("filename"), col("section")))
    .select("parent_id", "filename", "cik", "year", "section", "content")
    .persist()   # avoid recomputation while we iterate
)

print(f"Parent rows (Spark): {parents_df.count()}")

Parent rows (Spark): 378


25/09/08 20:31:56 WARN CacheManager: Asked to cache already cached data.


In [25]:
# --- LangChain retriever: stream from Spark to FAISS ---
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
import faiss

vectorstore = FAISS(
    embedding_function=embeddings,
    index=faiss.IndexFlatIP(dim),     # cosine if normalize_L2=True
    docstore=InMemoryDocstore(),      # children live here
    index_to_docstore_id={},
    normalize_L2=True,
)
parent_store = InMemoryStore()        # parents live here

child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)

retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=parent_store,
    child_splitter=child_splitter,
    search_kwargs={"k": 6},
)

In [27]:
# Stream Spark rows in batches into the retriever
BATCH = 1
docs_batch, ids_batch, added = [], [], 0

for r in parents_df.toLocalIterator():
    # r is a Row; build the parent Document
    doc = Document(
        page_content=str(r["content"]),
        metadata={
            "filename": r["filename"],
            "cik": r["cik"],
            "year": r["year"],
            "section": r["section"],
            "parent_id": r["parent_id"],
        },
    )
    docs_batch.append(doc)
    ids_batch.append(r["parent_id"])

    if len(docs_batch) >= BATCH:
        retriever.add_documents(docs_batch, ids=ids_batch)
        added += len(docs_batch)
        docs_batch.clear(); ids_batch.clear()

In [29]:
# flush remainder
if docs_batch:
    retriever.add_documents(docs_batch, ids=ids_batch)
    added += len(docs_batch)

print(f"Loaded {added} parent docs; children auto-chunked and indexed in FAISS.")


Loaded 378 parent docs; children auto-chunked and indexed in FAISS.


In [30]:
# (Optional) verify children + backfill metadata if needed
child_store_dict = vectorstore.docstore._dict  # {child_id: Document}
print("Example child:", list(child_store_dict.items())[:1])


Example child: [('fc60eeb9-04e8-4d7c-924a-04cfe82df180', Document(id='fc60eeb9-04e8-4d7c-924a-04cfe82df180', metadata={'filename': '5272_1994.txt', 'cik': '0000005272', 'year': '1994', 'section': 'section_1', 'parent_id': '5272_1994.txt#section_1', 'doc_id': '5272_1994.txt#section_1'}, page_content='ITEM 1. BUSINESS'))]


In [31]:
# Retrieval demo (returns full parents for the best child chunks)
query = "what is administrative expense"
results = retriever.get_relevant_documents(query)
print(f"\nTop {len(results)} parents for: {query!r}")
for i, d in enumerate(results, 1):
    print(f"[{i}] file={d.metadata.get('filename')} | section={d.metadata.get('section')} | parent_id={d.metadata.get('parent_id')}")
    print(d.page_content, "...\n")


Top 6 parents for: 'what is administrative expense'
[1] file=5272_2016.htm | section=section_6 | parent_id=5272_2016.htm#section_6
Item 6. Selected Financial Data and throughout this MD&A, we present our financial condition and results of operations in the way we believe will be most meaningful and representative of our business results. Some of the measurements we use are “non-GAAP financial measures” under SEC rules and regulations. GAAP is the acronym for “accounting principles generally accepted in the United States.” The non-GAAP financial measures we present may not be comparable to similarly-named measures reported by other companies.
Book Value Per Common Share Excluding Accumulated Other Comprehensive Income (AOCI), Book Value Per Common Share Excluding AOCI and Deferred Tax Assets (DTA) (Adjusted Book Value Per Common Share) and Adjusted Book Value Per Common Share Including Dividend Growth are used to show the amount of our net worth on a per-share basis. We believe these m