In [69]:
import re
import json
import numpy as np
import pandas as pd
from bs4 import BeautifulSoup, Tag
from typing import List, Dict, Any
from datetime import datetime

class FinancialStatementParser:
    def __init__(self, html_content: str):
        self.soup = BeautifulSoup(html_content, 'html.parser')
        self.body_elements = [el for el in self.soup.body.children if isinstance(el, Tag)]
        self.parsed_tables: List[pd.DataFrame] = []
        self.annotation_table_map: Dict[str, List[pd.DataFrame]] = {}
        self.parsed_json: Dict[str, Any] = {}
        self.found_cashflow = False
        self.skip_next = False
        self.last_main_note = None
        self.subsection_chain: List[str] = []

    def _expand_table_header(self, header_rows):
        max_cols = max(sum(int(cell.get("colspan", 1)) for cell in row.find_all(['th', 'td']))
                       for row in header_rows)
        grid = [["" for _ in range(max_cols)] for _ in range(len(header_rows))]

        for row_idx, row in enumerate(header_rows):
            col_idx = 0
            for cell in row.find_all(['th', 'td']):
                while col_idx < max_cols and grid[row_idx][col_idx] != "":
                    col_idx += 1
                rowspan = int(cell.get("rowspan", 1))
                colspan = int(cell.get("colspan", 1))
                text = cell.get_text(strip=True).replace('\xa0', '')
                for i in range(rowspan):
                    for j in range(colspan):
                        if row_idx + i < len(grid) and col_idx + j < max_cols:
                            grid[row_idx + i][col_idx + j] = text
                col_idx += colspan

        return ["|".join([grid[r][c] for r in range(len(grid)) if grid[r][c]])
                for c in range(max_cols)]

    def _expand_body_row(self, row, col_len):
        cells = []
        for cell in row.find_all(['td', 'th']):
            colspan = int(cell.get("colspan", 1))
            text = cell.get_text(strip=True).replace('\xa0', '')
            cells.extend([text] + [''] * (colspan - 1))
        return cells + [""] * (col_len - len(cells))

    def parse_tables(self):
        current_note = None
        self.subsection_chain = []

        for el in self.body_elements:
            if el.name == 'table':
                if self.skip_next:
                    self.skip_next = False
                    continue
                if '자 본 변 동 표' in el.get_text():
                    self.skip_next = True
                    continue
                if '현 금 흐 름 표' in el.get_text():
                    self.found_cashflow = True

                header_rows, body_rows = [], []
                thead, tbody = el.find("thead"), el.find("tbody")
                if thead and tbody:
                    header_rows = thead.find_all("tr")
                    body_rows = tbody.find_all("tr")
                else:
                    rows = el.find_all("tr")
                    header_rows, body_rows = rows[:2], rows[2:]

                try:
                    headers = self._expand_table_header(header_rows)
                    data = [self._expand_body_row(row, len(headers)) for row in body_rows]
                    df = pd.DataFrame(data, columns=headers)
                    df = df.replace('', np.nan).dropna(how='all')

                    if self.found_cashflow and self.last_main_note:
                        note_label = self.last_main_note
                        if self.subsection_chain:
                            note_label += '_' + '_'.join(self.subsection_chain)
                        if note_label not in self.annotation_table_map:
                            self.annotation_table_map[note_label] = []
                        self.annotation_table_map[note_label].append(df)
                    else:
                        self.parsed_tables.append(df)
                except Exception as e:
                    print(f"❌ 표 처리 오류: {e}")

            elif el.name in ['p', 'span', 'div']:
                text = el.get_text(strip=True).replace('\xa0', '')
                if self.found_cashflow:
                    if '계속' in text and self.last_main_note:
                        continue  # 유지

                    main_match = re.match(r'^(\d+(\.\d+)*)([:\-_\.]\s*)(.*)', text)
                    if main_match:
                        num = main_match.group(1)
                        title = main_match.group(4).split(':')[0].strip()
                        self.last_main_note = f"{num}. {title}"
                        self.subsection_chain = []
                        continue

                    sub_match = re.match(r'^([\u3131-\u314e]\.|\(\d+\))\s*(.*)', text)
                    if sub_match:
                        self.subsection_chain.append(sub_match.group(1).strip())

    def save_tables(self, output_excel: str):
        with pd.ExcelWriter(output_excel) as writer:
            for i, df in enumerate(self.parsed_tables):
                df.to_excel(writer, sheet_name=f'Table_{i+1}', index=False)
            for note, dfs in self.annotation_table_map.items():
                for idx, df in enumerate(dfs):
                    safe_note = note.replace('/', '_').replace(':', '_').replace('\\', '_')
                    sheet_name = safe_note[:25] + f"_{idx+1}" if len(dfs) > 1 else safe_note[:31]
                    df.to_excel(writer, sheet_name=sheet_name, index=False)

    def parse_to_json(self) -> Dict[str, Any]:
        result = {
            'metadata': {
                'parsed_at': datetime.now().isoformat(),
                'total_tables': len(self.parsed_tables)
            },
            'tables': []
        }
        for i, df in enumerate(self.parsed_tables):
            headers = df.columns.tolist()
            data = df.fillna("").values.tolist()
            result['tables'].append({
                'table_index': i,
                'structure': {'headers': headers, 'data': data}
            })
        for note, dfs in self.annotation_table_map.items():
            for idx, df in enumerate(dfs):
                label = f"{note}_{idx+1}" if len(dfs) > 1 else note
                headers = df.columns.tolist()
                data = df.fillna("").values.tolist()
                result['tables'].append({
                    'note': label,
                    'structure': {'headers': headers, 'data': data}
                })
        self.parsed_json = result
        return result

    def save_json(self, output_json: str, pretty: bool = True):
        if not self.parsed_json:
            self.parse_to_json()
        with open(output_json, 'w', encoding='utf-8') as f:
            json.dump(self.parsed_json, f, ensure_ascii=False, indent=2 if pretty else None)


def parse_financial_statement(input_html_path: str, output_excel: str, output_json: str):
    with open(input_html_path, 'r', encoding='cp949') as f:
        html = f.read()
    parser = FinancialStatementParser(html)
    parser.parse_tables()
    parser.save_tables(output_excel)
    parser.parse_to_json()
    parser.save_json(output_json)
    print(f"✅ 저장 완료: {output_excel}, {output_json}")

In [70]:
# 실행
parse_financial_statement(
    'samsung/감사보고서_4496517.htm',
    '삼성전자_2014.xlsx',
    '삼성전자_2014.json'
)

✅ 저장 완료: 삼성전자_2014.xlsx, 삼성전자_2014.json
