In [1]:
import json
import re

from collections import Counter, defaultdict
from typing import Dict, List, Tuple, Literal, Any
from dotenv import load_dotenv, find_dotenv
from pathlib import Path

import LLM_setting
import Load_Data
import Chunking


## API Setting

In [2]:
_ = load_dotenv(find_dotenv())

In [3]:
#LLM Setting
model = "openai/gpt-oss-120b"
temperature = 0
llm_setting= LLM_setting.LLMSetting(MODEL=model, TEMPERATURE=temperature)
llm = LLM_setting.setting(llm_setting)

In [4]:
# Read Data
file_path = Path("data/raw_filings/320193_000032019325000079_aapl-20250927.htm")
cfg = Load_Data.LoadData(file_path=file_path)
docs = Load_Data.load_data(cfg)

In [5]:
# Chunking
chunk_size= 2500
chunk_overlap = 250
data_processing=Chunking.DataProcessing(file_path,chunk_size,chunk_overlap)
doc_chunk =Chunking.chunks(data_processing)
print(f"Loaded docs: {len(docs)} | Chunks: {len(doc_chunk)}")

Loaded docs: 1 | Chunks: 85


In [6]:
len(doc_chunk)

85

In [7]:
TOPICS_PER_CHUNK = 8
KEYWORDS_PER_TOPIC = 8
FINAL_K_TOPICS = 10

In [8]:
BaselineType = Literal["zero_shot", "minimal_template"]

In [9]:
ZERO_SHOT_INSTRUCTION = "Identify and label the main topics in this corpus."
MINIMAL_TEMPLATE_ROLE = "You are a financial analyst."

In [10]:
FORMAT_CONSTRAINTS = (
    "Use ONLY the text below.\n"
    "Return exactly {topics_per_chunk} topics.\n"
    "Each topic must have exactly {keywords_per_topic} keywords.\n"
    "Output MUST be valid JSON only (no Markdown, no extra text).\n"
    'Schema: {{"topics":[{{"label":str,"keywords":[str,...]}}]}}\n'
)

In [11]:
_JSON_OBJ_RE = re.compile(r"\{.*\}", flags=re.DOTALL)

In [12]:
def _parse_json_loose(s: str) -> Dict[str, Any]:
    s = (s or "").strip()
    if not s:
        return {}

    try:
        return json.loads(s)
    except Exception:
        pass

    m = _JSON_OBJ_RE.search(s)
    if m:
        try:
            return json.loads(m.group(0))
        except Exception:
            return {}

    return {}

In [13]:
def build_baseline_prompt(
    baseline: BaselineType,
    chunk_text: str,
    topics_per_chunk: int,
    keywords_per_topic: int,
) -> str:
    if baseline == "zero_shot":
        header = ZERO_SHOT_INSTRUCTION
    elif baseline == "minimal_template":
        header = f"{MINIMAL_TEMPLATE_ROLE} {ZERO_SHOT_INSTRUCTION}"
    else:
        raise ValueError("baseline must be 'zero_shot' or 'minimal_template'")

    return (
        f"{header}\n\n"
        + FORMAT_CONSTRAINTS.format(
            topics_per_chunk=topics_per_chunk,
            keywords_per_topic=keywords_per_topic,
        )
        + "\nTEXT:\n"
        + chunk_text
    )

In [14]:
def _as_text(chunk: Any) -> str:
    return getattr(chunk, "page_content", None) or str(chunk)

In [15]:
def extract_topics_for_chunk(
    llm,
    baseline: BaselineType,
    chunk_text,
    topics_per_chunk: int,
    keywords_per_topic: int,
) -> List[Dict[str, Any]]:
    text = _as_text(chunk_text)

    prompt = build_baseline_prompt(
        baseline=baseline,
        chunk_text=text,
        topics_per_chunk=topics_per_chunk,
        keywords_per_topic=keywords_per_topic,
    )

    resp = llm.invoke(prompt)  # ChatGroq / LangChain
    content = getattr(resp, "content", str(resp))
    data = _parse_json_loose(content)

    topics = data.get("topics", [])
    if not isinstance(topics, list):
        return []

    cleaned: List[Dict[str, Any]] = []
    for t in topics:
        if not isinstance(t, dict):
            continue
        label = str(t.get("label", "")).strip()
        kws = t.get("keywords", [])
        if not label or not isinstance(kws, list):
            continue

        keywords = [str(k).strip() for k in kws if str(k).strip()]
        if len(keywords) != keywords_per_topic:
            continue

        cleaned.append({"label": label, "keywords": keywords})

    if len(cleaned) != topics_per_chunk:
        return []

    return cleaned

In [16]:
def _norm_label(label: str) -> str:
    label = label.strip().lower()
    label = re.sub(r"\s+", " ", label)
    label = re.sub(r"[^\w\s\-&/]", "", label)
    return label.strip()

In [17]:
def merge_topics_frequency_based(
    per_chunk_topics: List[List[Dict[str, Any]]],
    final_k_topics: int,
    keywords_per_topic: int,
) -> List[Dict[str, Any]]:
    label_support = Counter()
    label_canonical: Dict[str, str] = {}
    kw_counts: Dict[str, Counter] = defaultdict(Counter)

    for chunk_topics in per_chunk_topics:
        for t in chunk_topics:
            label = str(t.get("label", "")).strip()
            if not label:
                continue
            norm = _norm_label(label)
            if not norm:
                continue

            label_support[norm] += 1
            label_canonical.setdefault(norm, label)

            for kw in t.get("keywords", []) or []:
                kw_s = str(kw).strip()
                if kw_s:
                    kw_counts[norm][kw_s] += 1

    ranked_labels = sorted(label_support.items(), key=lambda x: (-x[1], x[0]))
    selected = ranked_labels[: max(1, final_k_topics)]

    merged: List[Dict[str, Any]] = []
    for norm, support in selected:
        top_keywords = [k for k, _ in kw_counts[norm].most_common(keywords_per_topic)]
        merged.append(
            {
                "label": label_canonical.get(norm, norm),
                "keywords": top_keywords,
                "support_chunks": int(support),
            }
        )
    return merged


In [18]:
def run_baseline(
    llm,
    doc_chunks,
    baseline: BaselineType,
    topics_per_chunk: int,
    keywords_per_topic: int,
    final_k_topics: int,
) -> Dict[str, Any]:
    per_chunk_topics: List[List[Dict[str, Any]]] = []

    for chunk in doc_chunks:
        topics = extract_topics_for_chunk(
            llm=llm,
            baseline=baseline,
            chunk_text=chunk,  # pass Document; extractor converts
            topics_per_chunk=topics_per_chunk,
            keywords_per_topic=keywords_per_topic,
        )
        per_chunk_topics.append(topics)

    merged_topics = merge_topics_frequency_based(
        per_chunk_topics=per_chunk_topics,
        final_k_topics=final_k_topics,
        keywords_per_topic=keywords_per_topic,
    )

    return {
        "baseline": baseline,
        "num_chunks": len(doc_chunks),
        "topics_per_chunk": topics_per_chunk,
        "keywords_per_topic": keywords_per_topic,
        "final_k_topics": final_k_topics,
        "topics": merged_topics,
    }

In [19]:
baseline_type: BaselineType = "zero_shot"

baseline_result = run_baseline(
    llm=llm,
    doc_chunks=doc_chunk,
    baseline=baseline_type,
    topics_per_chunk=TOPICS_PER_CHUNK,
    keywords_per_topic=KEYWORDS_PER_TOPIC,
    final_k_topics=FINAL_K_TOPICS,
)

print(json.dumps(baseline_result, indent=2, ensure_ascii=False))

{
  "baseline": "zero_shot",
  "num_chunks": 85,
  "topics_per_chunk": 8,
  "keywords_per_topic": 8,
  "final_k_topics": 10,
  "topics": [
    {
      "label": "Financial Impact",
      "keywords": [
        "financial condition",
        "stock price",
        "results of operations",
        "business",
        "costs",
        "liabilities",
        "expenses",
        "revenue"
      ],
      "support_chunks": 8
    },
    {
      "label": "Regulatory Compliance",
      "keywords": [
        "compliance",
        "filing requirements",
        "legal obligations",
        "regulatory changes",
        "regulations",
        "SEC",
        "Securities Exchange Act",
        "Section 13(a)"
      ],
      "support_chunks": 8
    },
    {
      "label": "Corporate Governance",
      "keywords": [
        "board",
        "governance",
        "policies",
        "ethics",
        "oversight",
        "compliance",
        "board of directors",
        "shareholder rights"
      ],
   

In [20]:
# save JSON
out_path = Path("outputs") / f"baseline_{baseline_type}_FINAL_K_{FINAL_K_TOPICS}.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(baseline_result, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"Saved: {out_path}")

Saved: outputs/baseline_zero_shot_FINAL_K_10.json


In [19]:
baseline_type: BaselineType = "minimal_template"

baseline_result = run_baseline(
    llm=llm,
    doc_chunks=doc_chunk,
    baseline=baseline_type,
    topics_per_chunk=TOPICS_PER_CHUNK,
    keywords_per_topic=KEYWORDS_PER_TOPIC,
    final_k_topics=FINAL_K_TOPICS,
)

print(json.dumps(baseline_result, indent=2, ensure_ascii=False))

{
  "baseline": "minimal_template",
  "num_chunks": 85,
  "topics_per_chunk": 8,
  "keywords_per_topic": 8,
  "final_k_topics": 10,
  "topics": [
    {
      "label": "Legal Proceedings",
      "keywords": [
        "litigation",
        "lawsuits",
        "claims",
        "legal proceedings",
        "regulatory",
        "court",
        "compliance",
        "investigations"
      ],
      "support_chunks": 6
    },
    {
      "label": "Financial Impact",
      "keywords": [
        "stock price",
        "financial condition",
        "results of operations",
        "earnings",
        "revenue",
        "profitability",
        "revenue impact",
        "cost of litigation"
      ],
      "support_chunks": 5
    },
    {
      "label": "Company Identification",
      "keywords": [
        "Apple Inc.",
        "Company",
        "Apple",
        "wholly owned subsidiaries",
        "collectively",
        "refers",
        "business",
        "used"
      ],
      "support_chu

In [21]:
# save JSON
out_path = Path("outputs") / f"baseline_{baseline_type}_FINAL_K_{FINAL_K_TOPICS}.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(baseline_result, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"Saved: {out_path}")

Saved: outputs/baseline_minimal_template_FINAL_K_10.json
