<a href="https://colab.research.google.com/github/AtulAravindDas/FinVAR_LLM_Version/blob/main/Risk_Section_Extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install -y wkhtmltopdf

!pip install pdfkit

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  avahi-daemon geoclue-2.0 glib-networking glib-networking-common
  glib-networking-services gsettings-desktop-schemas iio-sensor-proxy
  libavahi-core7 libavahi-glib1 libdaemon0 libevdev2 libgudev-1.0-0 libhyphen0
  libinput-bin libinput10 libjson-glib-1.0-0 libjson-glib-1.0-common
  libmbim-glib4 libmbim-proxy libmd4c0 libmm-glib0 libmtdev1 libnl-genl-3-200
  libnotify4 libnss-mdns libproxy1v5 libqmi-glib5 libqmi-proxy libqt5core5a
  libqt5dbus5 libqt5gui5 libqt5network5 libqt5positioning5 libqt5printsupport5
  libqt5qml5 libqt5qmlmodels5 libqt5quick5 libqt5sensors5 libqt5svg5
  libqt5webchannel5 libqt5webkit5 libqt5widgets5 libsoup2.4-1
  libsoup2.4-common libudev1 libwacom-bin libwacom-common libwacom9 libwoff1
  libxcb-icccm4 libxcb-image0 libxcb-keysyms1 libxcb-render-util0 libxcb-util1
  libxcb-xinerama0 libxcb-xinput0 libxcb-xkb1 

In [None]:
pip install pymupdf

Collecting pymupdf
  Downloading pymupdf-1.26.3-cp39-abi3-manylinux_2_28_x86_64.whl.metadata (3.4 kB)
Downloading pymupdf-1.26.3-cp39-abi3-manylinux_2_28_x86_64.whl (24.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.1/24.1 MB[0m [31m43.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pymupdf
Successfully installed pymupdf-1.26.3


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%%writefile /content/drive/MyDrive/risk_section_extractor.py
import requests
import pandas as pd
import pdfkit
import fitz
import re

# Headings detection (used by QQDR offset + compositor)
ITEM1_BUSINESS_HDR = re.compile(
    r'(?m)^\s*item\s*1\s*[\.\):\- ]+\s*business\b', re.IGNORECASE
)

# Inline-heading detection for make_composite_from_text
CAPS_LINE = re.compile(r'^[A-Z0-9][A-Z0-9 /,&\-]{2,}$')
CAPS_BLOCK_AT_LINE_START = re.compile(
    r'^(?P<head>[A-Z0-9][A-Z0-9 /,&\-]{2,})(?![.:])\s+(?=[A-Z])'
)

class RiskSectionExtractor:
    def __init__(self, ticker):
        self.ticker = ticker
        self.url = "https://www.sec.gov/files/company_tickers.json"
        self.headers = {'User-Agent': 'Atul A Das - atularavinddas@gmail.com'}

    def get_10K_pdfs(self):
        response = requests.get(self.url, headers=self.headers)
        data = response.json()

        df = pd.DataFrame.from_dict(data, orient='index')
        df.columns = ['CIK', 'Ticker', 'Title']

        cik = str(df[df['Ticker'] == self.ticker]['CIK'].values[0]).zfill(10)
        url = f'https://data.sec.gov/submissions/CIK{cik}.json'
        response = requests.get(url, headers=self.headers)
        central_index_key_details = response.json()

        print(f"Essential keys for {cik}", central_index_key_details.keys())

        recent_filings = central_index_key_details['filings']['recent']
        df_filings = pd.DataFrame(recent_filings)
        df_10K = df_filings[df_filings['form'] == '10-K']

        filing_url = f"https://www.sec.gov/Archives/edgar/data/{int(cik)}/{df_10K.iloc[0]['accessionNumber'].replace('-', '')}/{df_10K.iloc[0]['primaryDocument']}"

        UA = 'Atul A Das - atularavinddas@gmail.com'
        options = {
            'custom-header': [
                ('User-Agent', UA),
                ('Referer', 'https://www.sec.gov/'),
            ],
            'custom-header-propagation': '',
            'encoding': 'UTF-8',
            'quiet': '',
            'javascript-delay': 2000,
            'load-error-handling': 'ignore',
        }

        pdfkit.from_url(filing_url, '10-K.pdf', options=options)
        print("Saved: 10-K.pdf")

    # === Risk Factors extraction (as you wrote) ===
    @staticmethod
    def find_toc_page(doc):
        for i in range(1, len(doc)):
            text = doc.load_page(i).get_text().lower()
            if "table of contents" in text or "index" in text:
                print(f"TOC found on page {i + 1}")
                return i
        print("TOC not found")
        return None

    @staticmethod
    def generate_risk_factors_pdf(input_pdf_path):
        doc = fitz.open(input_pdf_path)
        toc_index = RiskSectionExtractor.find_toc_page(doc)

        if toc_index is None:
            print("Cannot proceed without TOC.")
            return

        table_of_contents_page = doc.load_page(toc_index)
        toc_text = table_of_contents_page.get_text()
        toc_lines = toc_text.split("\n")

        start_page = None
        end_page = None
        j_value = None

        for i, line in enumerate(toc_lines):
            if re.match(r"^(item\s*1a\.?\s*[:\-]?\s*)?risk factors$", line.strip().lower()):
                print(f"Found line: {line}")
                for j in range(i + 1, min(i + 5, len(toc_lines))):
                    next_line = toc_lines[j].strip()
                    if next_line.isdigit():
                        print(f"Start page number found: {next_line}")
                        start_page = int(next_line)
                        j_value = j
                        break
                    else:
                        print(f"Skipped non-digit line: {next_line}")
                break

        if j_value is not None:
            for k in range(j_value + 1, len(toc_lines)):
                line = toc_lines[k].strip()
                if line.isdigit():
                    end_page = int(line)
                    print(f"End page of Risk Factors section: {end_page}")
                    break

        if start_page is not None and end_page is not None:
            start_idx = toc_index + start_page - 1
            end_idx = toc_index + end_page - 1
            print(start_idx)
            print(end_idx)

            risk_doc = fitz.open()
            risk_doc.insert_pdf(doc, from_page=start_idx - 1, to_page=end_idx)
            risk_doc.save("Risk_Factors.pdf")
            print("Saved Risk Factors section as Risk_Factors.pdf")
        else:
            print("Could not determine start or end page properly.")

    # === QQDR extraction (as you wrote) ===
    @staticmethod
    def get_post_toc_offset(doc, toc_index, max_scan=15):
        offset = 0
        i = toc_index + 1
        while i < len(doc) and offset < max_scan:
            t = (doc.load_page(i).get_text() or "").strip()
            if ITEM1_BUSINESS_HDR.search(t):
                break
            offset += 1
            i += 1
        return offset

    @staticmethod
    def generate_qqdr_section(input_pdf_path, ticker=""):
        doc = fitz.open(input_pdf_path)
        toc_index = RiskSectionExtractor.find_toc_page(doc)

        if toc_index is None:
            print("Cannot proceed without TOC.")
            return

        table_of_contents_page = doc.load_page(toc_index)
        toc_text = table_of_contents_page.get_text().split("\n")

        qqdr_start_page = None
        qqdr_end_page = None
        j_value = None

        for i, line in enumerate(toc_text):
            if "quantitative and qualitative disclosures" in line.strip().lower():
                print(f"Found line: {line}")
                for j in range(i + 1, min(i + 5, len(toc_text))):
                    next_line = toc_text[j].strip()
                    if next_line.isdigit():
                        print(f"Page number found: {next_line}")
                        qqdr_start_page = int(next_line)
                        j_value = j
                        break

        for i, line in enumerate(toc_text):
            if "financial statements and supplementary data" in line.strip().lower():
                print(f"Found line: {line}")
                page_numbers = []
                for j in range(i + 1, min(i + 6, len(toc_text))):
                    next_line = toc_text[j].strip()
                    if next_line.isdigit():
                        page_numbers.append(int(next_line))
                print(page_numbers)

                if len(page_numbers) >= 1:
                    for i in range(len(page_numbers)):
                        if page_numbers[i] > qqdr_start_page:
                            qqdr_end_page = page_numbers[i]
                            break
                    print(f"Chosen end page: {qqdr_end_page}")
                else:
                    print("No valid page numbers found after end section.")
                break

        if qqdr_start_page is not None and qqdr_end_page is not None:
            if ticker.upper() == "TSLA":
                offset = RiskSectionExtractor.get_post_toc_offset(doc, toc_index)
            else:
                offset = 0
            start_idx = toc_index + qqdr_start_page - 1 + offset
            end_idx = toc_index + qqdr_end_page - 1 + offset
            print(start_idx)
            print(end_idx)
            risk_doc = fitz.open()
            risk_doc.insert_pdf(doc, from_page=start_idx - 1, to_page=end_idx)
            risk_doc.save("QQDR_Factors.pdf")
            print("Saved QQDR section as QQDR_Factors.pdf")

    # === Text extraction + cleaning + compositing (helpers you referenced) ===
    @staticmethod
    def clean_qqdr(text):
        cleaned_text = re.sub(r'\s+', ' ', text)
        cleaned_text = re.sub(r'\n', ' ', cleaned_text)
        cleaned_text = re.sub(r'\t', ' ', cleaned_text)
        cleaned_text = re.sub(r'Table of Contents', '', cleaned_text, flags=re.IGNORECASE)
        cleaned_text = re.sub(r'Index', '', cleaned_text, flags=re.IGNORECASE)
        cleaned_text = re.sub(r'©', '', cleaned_text)
        cleaned_text = re.sub(r'(PART\s*II\s*Item\s*7A\s*\d+\s*)?ITEM\s*7A\.', '', cleaned_text, flags=re.IGNORECASE)
        return cleaned_text

    @staticmethod
    def extract_text_from_pdf(pdf_path):
        doc = fitz.open(pdf_path)
        page_texts = []
        for i in range(len(doc)):
            page = doc.load_page(i)
            text = page.get_text()
            page_texts.append(text)
        combined_text = '\n'.join(page_texts)
        cleaned_text = RiskSectionExtractor.clean_qqdr(combined_text)
        return cleaned_text

    @staticmethod
    def enforce_sentence_breaks(text: str) -> str:
        return re.sub(r'([.?!:])\s+(?=\S)', r'\1\n', text)

    @staticmethod
    def isolate_inline_headings(line: str) -> list[str]:
        m = CAPS_BLOCK_AT_LINE_START.match(line.strip())
        if m:
            head = m.group('head').strip()
            rest = line[len(m.group(0)):].strip()
            return [head, rest] if rest else [head]
        return [line]

    @staticmethod
    def coalesce_caps_headings(lines: list[str]) -> list[str]:
        out, i = [], 0
        while i < len(lines):
            cur = lines[i].strip()
            if CAPS_LINE.match(cur) and not cur.endswith(('.', ':')):
                parts, j = [cur], i + 1
                while j < len(lines):
                    nxt = lines[j].strip()
                    if nxt and CAPS_LINE.match(nxt) and not nxt.endswith(('.', ':')):
                        parts.append(nxt); j += 1
                    elif nxt == "":
                        if j + 1 < len(lines) and CAPS_LINE.match(lines[j+1].strip()):
                            j += 1
                            continue
                        break
                    else:
                        break
                out.append(" ".join(parts))
                i = j
            else:
                out.append(lines[i]); i += 1
        return out

    @staticmethod
    def make_composite_from_text(raw: str) -> str:
        with_breaks = RiskSectionExtractor.enforce_sentence_breaks(raw)
        lines = []
        for ln in with_breaks.splitlines():
            lines.extend(RiskSectionExtractor.isolate_inline_headings(ln))
        lines = RiskSectionExtractor.coalesce_caps_headings(lines)

        composite = []
        prev_was_heading = False
        for ln in lines:
            s = ln.strip()
            if not s:
                continue
            if CAPS_LINE.match(s) and not s.endswith(('.', ':')):
                if composite and not composite[-1].endswith("\n\n"):
                    composite.append("\n")
                composite.append(s + "\n")
                prev_was_heading = True
            else:
                if prev_was_heading:
                    composite.append("\n")
                    prev_was_heading = False
                composite.append(s + "\n")
        return "".join(composite).strip()

    @staticmethod
    def slice_from_risk_factors(text: str) -> str:
        pattern = re.compile(r'(ITEM\s*1A\.?\s+RISK\s+FACTORS|RISK\s+FACTORS)', re.IGNORECASE)
        match = pattern.search(text)
        if match:
            return text[match.start():]
        return text

    def extract_risk_sections(self):
        self.get_10K_pdfs()
        RiskSectionExtractor.generate_risk_factors_pdf('10-K.pdf')
        RiskSectionExtractor.generate_qqdr_section('10-K.pdf', self.ticker)
        risk_factors_text = RiskSectionExtractor.extract_text_from_pdf('Risk_Factors.pdf')
        qqdr_text = RiskSectionExtractor.extract_text_from_pdf('QQDR_Factors.pdf')
        return [risk_factors_text, qqdr_text]
