In [None]:
import re
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin

UA = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}

# -------------------------
# 인코딩 감지 & 디코딩
# -------------------------
_META_CHARSET_RE = re.compile(br'charset\s*=\s*["\']?([\w\-]+)', re.I)

def _smart_text(resp: requests.Response, fallback_domain_hint: str | None = None) -> str:
    """
    응답 바이트에서 meta/http 헤더의 charset을 우선 반영하고,
    없으면 apparent_encoding을 쓰고, 마지막으로 도메인 힌트로 보정.
    """
    raw = resp.content
    enc = None

    # 1) HTTP 헤더 우선
    ct = resp.headers.get("Content-Type", "")
    m = re.search(r'charset=([\w\-]+)', ct, flags=re.I)
    if m:
        enc = m.group(1).lower()

    # 2) <meta ... charset=...> 스캔
    if not enc:
        m2 = _META_CHARSET_RE.search(raw[:4096])  # 초반부만 보면 충분
        if m2:
            enc = m2.group(1).decode("ascii", "ignore").lower()

    # 3) requests의 추정
    if not enc:
        enc = (resp.apparent_encoding or "").lower() or None

    # 4) 도메인 힌트 (KIND 기본 cp949)
    if not enc and fallback_domain_hint and "kind.krx.co.kr" in fallback_domain_hint.lower():
        enc = "cp949"

    # 5) 표준화
    if enc in ("euc-kr", "ks_c_5601-1987", "ks_c_5601-1989"):
        enc = "cp949"

    try:
        return raw.decode(enc or "utf-8", errors="strict")
    except Exception:
        # 최후의 보루: cp949 → utf-8 순서로 시도
        for e in (enc, "cp949", "utf-8", "latin1"):
            if not e:
                continue
            try:
                return raw.decode(e, errors="strict")
            except Exception:
                pass
        # 그래도 실패하면 느슨하게
        return raw.decode(enc or "utf-8", errors="replace")

def _get(url: str, timeout: int = 20, hint: str | None = None) -> str:
    r = requests.get(url, timeout=timeout, headers=UA)
    r.raise_for_status()
    return _smart_text(r, fallback_domain_hint=hint or url)

# -------------------------
# 태그 제거 유틸
# -------------------------
def _strip_tags(html: str) -> str:
    soup = BeautifulSoup(html, "lxml")
    table = soup.find("table", {"id": "XFormD1_Form0_Table0"})
    rows = table.find_all("tr")
    
    if table:
        print(table)
    
    data = {}

    # 각 항목별로 필요한 값 추출
    for row in rows:
        cols = row.find_all("span")
        if not cols:
            continue
    
        
        
        label = cols[0].get_text(strip=True)
        
        
        if "계약상대" in label:
            data["계약상대"] = cols[-1].get_text(strip=True)
        elif "계약금액" in label:
            data["계약금액"] = cols[-1].get_text(strip=True)
        elif "매출액대비" in label:
            data["매출대비"] = cols[-1].get_text(strip=True)
        elif "공급지역" in label or "판매ㆍ공급지역" in label:
            data["공급지역"] = cols[-1].get_text(strip=True)
        elif "계약기간" in label and "시작일" in cols[1].get_text(strip=True):
            # 시작/종료일은 rowspan이므로 다음 row도 봐야 함
            data["계약시작"] = cols[-1].get_text(strip=True)
        elif "계약기간" in label and "종료일" in cols[1].get_text(strip=True):
            data["계약종료"] = cols[-1].get_text(strip=True)
        elif "계약(수주)일자" in label:
            data["계약일자"] = cols[-1].get_text(strip=True)
        elif "체결계약명" in label:
            data["계약내용"] = cols[-1].get_text(strip=True)
            
    # 예시 기업명, 보고서명, 시가총액 등은 실제로는 외부에서 받아와야 함
    기업명 = "파미셀"
    시가총액 = "7,394억"
    보고서명 = "단일판매ㆍ공급계약체결"

    # 계약기간 계산 (예시: 1개월)
    data["계약기간"] = "1개월"

    # 예시값이 없을 경우 기본값 처리
    계약상대 = data.get("계약상대", "")
    계약내용 = data.get("계약내용", "")
    공급지역 = data.get("공급지역", "")
    계약금액 = data.get("계약금액", "")
    계약시작 = data.get("계약시작", "")
    계약종료 = data.get("계약종료", "")
    계약기간 = data.get("계약기간", "")
    매출대비 = data.get("매출대비", "")

    result = (
        f"기업명: {기업명}(시가총액: {시가총액})\n"
        f"보고서명: {보고서명}\n\n"
        f"계약상대 : {계약상대}\n"
        f"계약내용 : {계약내용}\n"
        f"공급지역 : {공급지역}\n"
        f"계약금액 : {계약금액}\n\n"
        f"계약시작 : {계약시작}\n"
        f"계약종료 : {계약종료}\n"
        f"계약기간 : {계약기간}\n"
        f"매출대비 : {매출대비}\n"
    )

    print(result)
    
    for t in soup(["script", "style", "noscript"]): t.decompose()
    for br in soup.find_all("br"): br.replace_with("\n")
    for p in soup.find_all("p"):
        p.insert_before("\n"); p.insert_after("\n")
    text = soup.get_text("\n", strip=True)
    text = re.sub(r"\r\n?", "\n", text)
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()

def fetch_viewer_html(url: str, timeout: int = 20) -> str:
    return _get(url, timeout=timeout, hint=url)

# -------------------------
# KIND: acptNo/docNo → searchContents → setPath(...)에서 docLocPath 추출
# -------------------------
def _kind_pick_acpt_doc(viewer_html: str) -> tuple[str | None, str | None]:
    soup = BeautifulSoup(viewer_html, "lxml")
    acpt = None
    el = soup.select_one("#acptNo")
    if el and el.has_attr("value"):
        acpt = el["value"].strip() or None

    doc = None
    opt = soup.select_one("#mainDoc option[selected]") or soup.select_one("#mainDoc option:nth-of-type(2)")
    if opt and opt.get("value"):
        doc = opt["value"].split("|", 1)[0].strip() or None
    return acpt, doc

def _kind_fetch_doclocpath(kind_base_url: str, doc_no: str, timeout: int = 20) -> str | None:
    sc_url = urljoin(kind_base_url, f"/common/disclsviewer.do?method=searchContents&docNo={doc_no}")
    text = _get(sc_url, timeout=timeout, hint=kind_base_url)

    # setPath('tocLocPath','docLocPath','server',...)
    m = re.search(r"setPath\(\s*['\"][^'\"]*['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*,", text)
    if m:
        return m.group(1).strip()
    # 백업: setPath2(...)
    m2 = re.search(r"setPath2\(\s*['\"][^'\"]*['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*,", text)
    return m2.group(1).strip() if m2 else None

def _kind_fetch_text_from_docloc(kind_base_url: str, doc_loc_path: str, timeout: int = 20) -> str:
    doc_url = urljoin(kind_base_url, doc_loc_path)
    html = _get(doc_url, timeout=timeout, hint=kind_base_url)
    return _strip_tags(html)

# -------------------------
# iframe 백업 로직 (DART/KIND 공통)
# -------------------------
def _find_iframe_src(viewer_html: str) -> str | None:
    soup = BeautifulSoup(viewer_html, "lxml")
    iframe = soup.find("iframe", id="docViewFrm")
    if iframe:
        src = (iframe.get("src") or "").strip()
        if src:
            return src
    m = re.search(r'docViewFrm\s*\.src\s*=\s*[\'"]([^\'"]+)[\'"]', viewer_html, flags=re.I)
    if m:
        return m.group(1).strip()
    m2 = re.search(r"setPath\(\s*['\"][^'\"]*['\"]\s*,\s*['\"]([^'\"]+)['\"]", viewer_html)
    if m2:
        return m2.group(1).strip()
    return None

def _fetch_iframe_text(viewer_html: str, viewer_url: str, timeout: int = 20) -> str:
    src = _find_iframe_src(viewer_html)
    if not src:
        return ""
    iframe_url = urljoin(viewer_url, src)
    html = _get(iframe_url, timeout=timeout, hint=viewer_url)
    return _strip_tags(html)

# -------------------------
# 최종 진입점
# -------------------------
def extract_disclosure_text(url: str, timeout: int = 20) -> str:
    viewer_html = fetch_viewer_html(url, timeout=timeout)

    if "kind.krx.co.kr" in url.lower():
        acpt, doc = _kind_pick_acpt_doc(viewer_html)
        if doc:
            doc_loc = _kind_fetch_doclocpath(url, doc, timeout=timeout)
            if doc_loc:
                text = _kind_fetch_text_from_docloc(url, doc_loc, timeout=timeout)
                if text and len(text) > 50:
                    return text
        return _fetch_iframe_text(viewer_html, url, timeout=timeout)

    # DART 등은 기본적으로 iframe 따라가기 (DART도 EUC-KR 페이지가 있음 → _get 사용)
    return _fetch_iframe_text(viewer_html, url, timeout=timeout)



In [2]:
kind_url = "https://kind.krx.co.kr/common/disclsviewer.do?method=search&acptno=20220630001042&docno=&viewerhost=&viewerport="
print("KIND:", extract_disclosure_text(kind_url), "\n---\n")

<table border="1" bordercolordark="white" bordercolorlight="#666666" cellpadding="1" cellspacing="0" id="XFormD1_Form0_Table0" style="margin:0px 0px 20px 0px;width:596px;font-size:10pt;border:1px solid #7f7f7f;">
<tbody>
<tr>
<td colspan="2" width="261"> <span style="width:261px;font-size:10pt;">1. 판매ㆍ공급계약 구분</span> </td>
<td colspan="2" width="335"> <span class="xforms_input" style="width:335px;font-size:10pt;">공사수주</span> </td>
</tr>
<tr>
<td colspan="2" width="261"> <span style="width:261px;font-size:10pt;">- 체결계약명</span> </td>
<td colspan="2" width="335"> <span class="xforms_input" style="width:335px;font-size:10pt;">대명2동 명덕지구 주택재개발정비사업</span> </td>
</tr>
<tr>
<td rowspan="4" width="153"> <span style="width:153px;font-size:10pt;">2. 계약내역</span> </td>
<td width="108"> <span style="width:108px;font-size:10pt;text-align:center;">계약금액(원)</span> </td>
<td colspan="2" width="335"> <span class="xforms_input" style="width:335px;font-size:10pt;text-align:right;">237,082,257,380</span> </td>

In [3]:
import pandas as pd
import os

# 현재 디렉토리 확인
current_dir = os.getcwd()
print(f"현재 디렉토리: {current_dir}")

# 입력/출력 파일 경로 (절대 경로로 설정)
base_dir = "/Users/taechanan/dart-sentiment-analysis/dags/data"
in_path = os.path.join(base_dir, "kind_disclosures_20250822_070850.csv")  # 원본 CSV
out_path = os.path.join(base_dir, "kind_disclosures_20250822_070850.with_text.csv")  # 저장할 CSV

print(f"입력 파일 경로: {in_path}")
print(f"출력 파일 경로: {out_path}")

# 파일 존재 확인
if not os.path.exists(in_path):
    raise FileNotFoundError(f"입력 파일을 찾을 수 없습니다: {in_path}")

# CSV 읽기 (인코딩 가볍게 시도)
df = None
for enc in ("utf-8-sig", "utf-8", "cp949", "euc-kr"):
    try:
        df = pd.read_csv(in_path, encoding=enc)
        print(f"CSV 파일을 {enc} 인코딩으로 성공적으로 읽었습니다.")
        break
    except Exception as e:
        print(f"{enc} 인코딩 시도 실패: {e}")
        pass
if df is None:
    raise ValueError("CSV를 읽을 수 없습니다. 인코딩을 확인하세요.")

# detail_url → raw_text 생성 (extract_disclosure_text는 이미 정의되어 있다고 가정)
if "detail_url" not in df.columns:
    raise ValueError("CSV에 'detail_url' 컬럼이 없습니다.")

df["raw_text"] = df["detail_url"].fillna("").astype(str).apply(
    lambda u: extract_disclosure_text(u) if u else ""
)

# 저장
df.to_csv(out_path, index=False, encoding="utf-8-sig")
print("saved ->", out_path)


현재 디렉토리: /Users/antaechan/dart-sentiment-analysis/analysis
입력 파일 경로: /Users/taechanan/dart-sentiment-analysis/dags/data/kind_disclosures_20250822_070850.csv
출력 파일 경로: /Users/taechanan/dart-sentiment-analysis/dags/data/kind_disclosures_20250822_070850.with_text.csv


FileNotFoundError: 입력 파일을 찾을 수 없습니다: /Users/taechanan/dart-sentiment-analysis/dags/data/kind_disclosures_20250822_070850.csv

In [None]:
pd.read_csv(out_path)

Unnamed: 0,disclosed_at,company_name,stock_code,short_code,market,title,disclosure_id,detail_url,raw_text
0,2022-06-30 18:48,이원컴포텍,KR7088290002,88290.0,KOSDAQ,유상증자결정(자율공시)(종속회사의 주요경영사항)(제3자배정),20220630001079,https://kind.krx.co.kr/common/disclsviewer.do?...,:: 71776_유상증자결정(자율공시)(종속회사의 주요경영사항)\n유상증자결정(자율...
1,2022-06-30 18:24,코아시아씨엠,KR7196450001,196450.0,KOSDAQ,증권 발행결과(자율공시),20220630001078,https://kind.krx.co.kr/common/disclsviewer.do?...,:: 70065_증권 발행결과(자율공시)\n증권 발행결과(자율공시)\n1. 증권의 ...
2,2022-06-30 18:14,코아시아,KR7045970001,45970.0,KOSDAQ,주권 관련 사채권의 취득결정,20220630001053,https://kind.krx.co.kr/common/disclsviewer.do?...,:: 71988_주권 관련 사채권의 취득결정\n주권 관련 사채권의 취득결정\n1. ...
3,2022-06-30 18:14,코아시아씨엠,KR7196450001,196450.0,KOSDAQ,타법인주식및출자증권취득결정,20220630001052,https://kind.krx.co.kr/common/disclsviewer.do?...,:: 71381_타법인 주식 및 출자증권 취득결정\n타법인 주식 및 출자증권 취득결...
4,2022-06-30 18:14,코아시아씨엠,KR7196450001,196450.0,KOSDAQ,전환사채권발행결정(제10회차),20220630001036,https://kind.krx.co.kr/common/disclsviewer.do?...,주요사항보고서 / 거래소 신고의무 사항\n금융위원회 / 한국거래소 귀중\n2022 ...
...,...,...,...,...,...,...,...,...,...
495,2022-06-29 17:34,골프존데카,,,KONEX,주식등의대량보유상황보고서(일반),20220629000784,https://kind.krx.co.kr/common/disclsviewer.do?...,주식등의 대량보유상황보고서\n(일반서식 : 자본시장과 금융투자업에 관한 법률 제14...
496,2022-06-29 17:31,GS,KR7078930005,78930.0,KOSPI,주식등의대량보유상황보고서(일반),20220629000780,https://kind.krx.co.kr/common/disclsviewer.do?...,주식등의 대량보유상황보고서\n(일반서식 : 자본시장과 금융투자업에 관한 법률 제14...
497,2022-06-29 17:27,오늘이엔엠,KR7192410009,192410.0,KOSDAQ,주식등의대량보유상황보고서(일반),20220629000776,https://kind.krx.co.kr/common/disclsviewer.do?...,주식등의 대량보유상황보고서\n(일반서식 : 자본시장과 금융투자업에 관한 법률 제14...
498,2022-06-29 17:26,케이피에프,KR7024880007,24880.0,KOSDAQ,타인에대한채무보증결정,20220629000770,https://kind.krx.co.kr/common/disclsviewer.do?...,:: 71314_타인에 대한 채무보증 결정\n타인에 대한 채무보증 결정\n1. 채무...


In [None]:
def get_disclosure_supply(kind_url: str, timeout: int = 20) -> dict:
    """
    KIND URL에서 판매ㆍ공급계약 관련 정보를 추출하는 함수
    extract_disclosure_text의 HTML 추출 로직을 참고하여 직접 테이블 파싱
    
    Args:
        kind_url: KIND 공시 URL
        timeout: 요청 타임아웃 (초)
    
    Returns:
        dict: 추출된 공급계약 정보
    """
    try:
        # extract_disclosure_text의 HTML 추출 로직 참고
        viewer_html = fetch_viewer_html(kind_url, timeout=timeout)
        
        if "kind.krx.co.kr" in kind_url.lower():
            acpt, doc = _kind_pick_acpt_doc(viewer_html)
            if doc:
                doc_loc = _kind_fetch_doclocpath(kind_url, doc, timeout=timeout)
                if doc_loc:
                    html_text = _kind_fetch_text_from_docloc(kind_url, doc_loc, timeout=timeout)
                    if html_text and len(html_text) > 50:
                        # HTML 원본을 직접 파싱
                        doc_url = urljoin(kind_url, doc_loc)
                        raw_html = _get(doc_url, timeout=timeout, hint=kind_url)
                    else:
                        raw_html = _fetch_iframe_text(viewer_html, kind_url, timeout=timeout)
                else:
                    raw_html = _fetch_iframe_text(viewer_html, kind_url, timeout=timeout)
            else:
                raw_html = _fetch_iframe_text(viewer_html, kind_url, timeout=timeout)
        else:
            raw_html = _fetch_iframe_text(viewer_html, kind_url, timeout=timeout)
        
        if not raw_html:
            return {"error": "HTML을 추출할 수 없습니다."}
        
        # BeautifulSoup으로 파싱
        soup = BeautifulSoup(raw_html, "lxml")
        
        # 제공된 테이블 구조에 맞게 테이블 찾기
        table = soup.find("table", {"id": "XFormD1_Form0_Table0"})
        if not table:
            # 다른 테이블 구조도 시도
            tables = soup.find_all("table")
            table = None
            for t in tables:
                if "판매" in str(t) and "공급" in str(t):
                    table = t
                    break
        
        if not table:
            return {"error": "공급계약 테이블을 찾을 수 없습니다."}
        
        # 결과 딕셔너리 초기화
        result = {}
        
        # 테이블의 모든 행(tr) 처리
        rows = table.find_all("tr")
        
        for i, row in enumerate(rows):
            cells = row.find_all("td")
            if len(cells) < 2:
                continue
            
            # 각 행의 구조에 따라 정보 추출
            # 제공된 HTML 구조를 정확히 분석하여 파싱
            
            # 첫 번째 셀에서 라벨 추출
            first_cell_text = cells[0].get_text(strip=True)
            
            # 제공된 테이블 구조에 맞게 정확한 매핑
            if "1. 판매ㆍ공급계약 구분" in first_cell_text:
                if len(cells) >= 3:
                    result["contract_type"] = cells[2].get_text(strip=True)
            
            elif "- 체결계약명" in first_cell_text:
                if len(cells) >= 3:
                    result["contract_name"] = cells[2].get_text(strip=True)
            
            elif "2. 계약내역" in first_cell_text:
                # 계약내역 섹션 - 다음 행들에서 세부 정보 추출
                continue
            
            elif "계약금액(원)" in first_cell_text:
                if len(cells) >= 3:
                    result["contract_amount"] = cells[2].get_text(strip=True)
            
            elif "최근매출액(원)" in first_cell_text:
                if len(cells) >= 3:
                    result["recent_revenue"] = cells[2].get_text(strip=True)
            
            elif "매출액대비(%)" in first_cell_text:
                if len(cells) >= 3:
                    result["revenue_ratio"] = cells[2].get_text(strip=True)
            
            elif "대규모법인여부" in first_cell_text:
                if len(cells) >= 3:
                    result["large_corporation"] = cells[2].get_text(strip=True)
            
            elif "3. 계약상대" in first_cell_text:
                if len(cells) >= 3:
                    result["contract_counterparty"] = cells[2].get_text(strip=True)
            
            elif "- 회사와의 관계" in first_cell_text:
                if len(cells) >= 3:
                    result["relationship"] = cells[2].get_text(strip=True)
            
            elif "4. 판매ㆍ공급지역" in first_cell_text:
                if len(cells) >= 3:
                    result["supply_region"] = cells[2].get_text(strip=True)
            
            elif "5. 계약기간" in first_cell_text:
                # 계약기간 섹션 - 다음 행들에서 세부 정보 추출
                continue
            
            elif "시작일" in first_cell_text:
                if len(cells) >= 3:
                    result["start_date"] = cells[2].get_text(strip=True)
            
            elif "종료일" in first_cell_text:
                if len(cells) >= 3:
                    result["end_date"] = cells[2].get_text(strip=True)
            
            elif "6. 주요 계약조건" in first_cell_text:
                if len(cells) >= 3:
                    result["main_contract_terms"] = cells[2].get_text(strip=True)
            
            elif "7. 계약(수주)일자" in first_cell_text:
                if len(cells) >= 3:
                    result["contract_date"] = cells[2].get_text(strip=True)
            
            elif "8. 공시유보 관련내용" in first_cell_text:
                # 공시유보 섹션 - 다음 행들에서 세부 정보 추출
                continue
            
            elif "유보사유" in first_cell_text:
                if len(cells) >= 3:
                    result["reserve_reason"] = cells[2].get_text(strip=True)
            
            elif "유보기한" in first_cell_text:
                if len(cells) >= 3:
                    result["reserve_deadline"] = cells[2].get_text(strip=True)
            
            elif "9. 기타 투자판단과 관련한 중요사항" in first_cell_text:
                # 다음 행에서 실제 내용 추출
                if i + 1 < len(rows):
                    next_row = rows[i + 1]
                    next_cells = next_row.find_all("td")
                    if len(next_cells) >= 1:
                        result["important_matters"] = next_cells[0].get_text(strip=True)
            
            elif "※ 관련공시" in first_cell_text:
                if len(cells) >= 2:
                    result["related_disclosure"] = cells[1].get_text(strip=True)
        
        # URL 정보 추가
        result["source_url"] = kind_url
        
        return result
        
    except Exception as e:
        return {"error": f"공급계약 정보 추출 중 오류 발생: {str(e)}"}

# 테스트용 함수
def test_get_disclosure_supply():
    """테스트용 함수"""
    test_url = "https://kind.krx.co.kr/common/disclsviewer.do?method=search&acptno=20210105000343"
    result = get_disclosure_supply(test_url)
    print("추출 결과:")
    for key, value in result.items():
        print(f"{key}: {value}")
    return result


In [None]:
# 수정된 함수 테스트
test_result = test_get_disclosure_supply()


추출 결과:
error: 공급계약 테이블을 찾을 수 없습니다.
