In [19]:
import os
import io
import re
import argparse
from datetime import datetime
import requests
import pandas as pd
from zipfile import ZipFile, BadZipFile
from openpyxl import load_workbook, Workbook
from openpyxl.styles import Alignment
from pandas.api.types import (
    is_integer_dtype,
    is_float_dtype
)
from bs4 import BeautifulSoup
from dotenv import load_dotenv         

load_dotenv()
API_KEY = os.getenv("DART_API_KEY")

def fetch_sales(session) -> pd.DataFrame:
    url = 'https://opendart.fss.or.kr/api/list.json'
    base_params = {
        'crtfc_key': API_KEY,
        'bgn_de': 20250601,
        'end_de': 20250623,
        'pblntf_detail_ty': 'I001',
        'page_count': 100,
        'last_reprt_at': 'Y',
    }

    all_reports = []
    resp = session.get(url, params={**base_params, 'page_no': 1}, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    total_page = int(data.get('total_page', 1))
    all_reports.extend(data.get('list', []))

    for page in range(2, total_page + 1):
        resp = session.get(url, params={**base_params, 'page_no': page}, timeout=10)
        resp.raise_for_status()
        reports = resp.json().get('list', [])
        if not reports:
            break
        all_reports.extend(reports)

    df = pd.DataFrame(all_reports)

    if df.empty or 'report_nm' not in df.columns:
        return pd.DataFrame()
    
    # return df
    return df[
        df['report_nm'].str.contains('신규시설') 
    ].reset_index(drop=True)

In [20]:
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0"})
df = fetch_sales(session)
df

Unnamed: 0,corp_code,corp_name,stock_code,corp_cls,report_nm,rcept_no,flr_nm,rcept_dt,rm
0,677334,아하,102950,N,신규시설투자등,20250620600515,아하,20250620,넥
1,117577,오리온홀딩스,1800,Y,신규시설투자등(자회사의 주요경영사항),20250620800332,오리온홀딩스,20250620,유
2,1238169,오리온,271560,Y,신규시설투자등,20250620800297,오리온,20250620,유
3,161383,한미반도체,42700,Y,신규시설투자등(자율공시),20250620800077,한미반도체,20250620,유
4,1357765,큐라티스,348080,K,신규시설투자등,20250619900605,큐라티스,20250619,코
5,653024,진에어,272450,Y,신규시설투자등,20250617800459,진에어,20250617,유
6,105873,LG디스플레이,34220,Y,신규시설투자등,20250617800220,LG디스플레이,20250617,유
7,642541,제이에스링크,127120,K,[기재정정]신규시설투자등,20250616900561,제이에스링크,20250616,코
8,351375,뉴보텍,60260,K,신규시설투자등(자율공시),20250613900596,뉴보텍,20250613,코
9,1383779,에스켐,475660,K,신규시설투자등,20250613900288,에스켐,20250613,코


In [None]:
def parse_contract(session, rcept_no: str) -> dict:
    resp = session.get(
        'https://opendart.fss.or.kr/api/document.xml',
        params={'crtfc_key': API_KEY, 'rcept_no': rcept_no},
        timeout=10
    )
    resp.raise_for_status()
    try:
        z = ZipFile(io.BytesIO(resp.content))
    except BadZipFile:
        return {}
    fname = next(f for f in z.namelist() if f.lower().endswith(('.xml', '.html')))
    html = z.read(fname).decode('utf-8', errors='ignore')
    soup = BeautifulSoup(html, 'lxml')

    contract_table = None
    for tbl in soup.find_all('table'):
        text = tbl.get_text()
        if any(k in text for k in ['투자구분', '판매ㆍ공급계약', '세부내용', '계약내역']):
            contract_table = tbl
            break
    if contract_table is None:
        return {}
    return contract_table

In [22]:
html = parse_contract(session, 20250620600515)

In [27]:
html

<table border="1" bordercolordark="white" bordercolorlight="#666666" cellpadding="1" cellspacing="0" id="XFormD1_Form0_Table0" style="margin:0px 0px 20px 0px;width:598px;font-size:10pt;border:1px solid #7f7f7f;">
<tbody>
<tr>
<td colspan="2" width="241"> <span style="width:241px;font-size:10pt;">1. 투자구분</span> </td>
<td width="357"> <span class="xforms_input" style="width:357px;font-size:10pt;">해외(우즈베키스탄) 신규 생산공장 설립</span> </td>
</tr>
<tr>
<td rowspan="5" width="91"> <span style="width:91px;font-size:10pt;">2. 투자내역</span> </td>
<td width="150"> <span style="width:150px;font-size:10pt;">투자금액(원)</span> </td>
<td width="357"> <span class="xforms_input" style="width:357px;font-size:10pt;text-align:right;">27,592,000,000</span> </td>
</tr>
<tr>
<td width="150"> <span style="width:150px;font-size:10pt;">자기자본(원)</span> </td>
<td width="357"> <span class="xforms_input" style="width:357px;font-size:10pt;text-align:right;">39,907,985,862</span> </td>
</tr>
<tr>
<td width="150"> <span style="widt

In [47]:
from bs4 import BeautifulSoup, element
import re
from datetime import datetime
from zipfile import ZipFile, BadZipFile
import io

# ───────────────────────────────────────────────
# 1) 이전에 만든 헬퍼 함수를 그대로 가져옵니다.
def parse_investment_with_helpers(html) -> dict:
    # html이 Tag면 그대로 table으로, 아니면 문자열로 파싱
    if isinstance(html, element.Tag):
        table = html
    elif isinstance(html, str):
        soup = BeautifulSoup(html, 'html.parser')
        table = soup.find(id="XFormD1_Form0_Table0")
        if table is None:
            raise ValueError("ID 'XFormD1_Form0_Table0'인 <table>을 찾을 수 없습니다.")
    else:
        raise TypeError("html은 str 또는 bs4.element.Tag 이어야 합니다.")

    def get_val(keys):
        for tr in table.find_all('tr'):
            tds = tr.find_all('td')
            if len(tds) == 2:
                label = re.sub(r'\s+', '', tds[0].get_text())
                val    = tds[1].get_text(strip=True)
            elif len(tds) >= 3:
                label = re.sub(r'\s+', '', tds[0].get_text() + tds[1].get_text())
                val    = tds[-1].get_text(strip=True)
            else:
                continue
            for k in keys:
                if k.replace('ㆍ','') in label or k in label:
                    return val
        return None

    def get_int(keys):
        raw = get_val(keys)
        return None if not raw or raw=='-' else int(raw.replace(',', ''))

    def get_float(keys):
        raw = get_val(keys)
        return None if not raw or raw=='-' else float(raw.replace(',', ''))

    def get_date(keys):
        raw = get_val(keys)
        if not raw: 
            return None
        try:
            return datetime.strptime(raw, '%Y-%m-%d').date()
        except ValueError:
            return raw

    return {
        '투자구분':   get_val(['투자구분']),
        '투자금액(원)': get_int(['투자금액','투자금액(원)']),
        '자기자본(원)':    get_int(['자기자본','자기자본(원)']),
        '자기자본대비(%)':      get_float(['자기자본대비','자기자본대비(%)']),
        '자산총액':      get_int(['자산총액','최근사업연도말자산총액(원)']),
        '자산총액대비(%)':      get_float(['자산총액대비','자산총액대비(%)']),
        '결정일':        get_date(['이사회결의일(결정일)', '이사회결의일', '이사회결정일']),
        '시작일':        get_date(['시작일']),
        '종료일':          get_date(['종료일']),
    }

# ───────────────────────────────────────────────
# 2) parse_contract() 에 헬퍼 호출을 추가합니다.
def parse_contract(session, rcept_no: str) -> dict:
    resp = session.get(
        'https://opendart.fss.or.kr/api/document.xml',
        params={'crtfc_key': API_KEY, 'rcept_no': rcept_no},
        timeout=10
    )
    resp.raise_for_status()
    try:
        z = ZipFile(io.BytesIO(resp.content))
    except BadZipFile:
        return {}
    # xml/html 파일 이름 추출
    fname = next(f for f in z.namelist() if f.lower().endswith(('.xml', '.html')))
    html = z.read(fname).decode('utf-8', errors='ignore')
    soup = BeautifulSoup(html, 'lxml')

    # 투자·계약 테이블 찾기
    contract_table = None
    for tbl in soup.find_all('table'):
        text = tbl.get_text()
        if any(k in text for k in ['신규 시설투자 등', '신규 시설투자', '신규시설투자']):
            contract_table = tbl
            break
    if contract_table is None:
        return {}

    # ── 여기서 바로 파싱 헬퍼를 호출해 결과 dict 리턴
    data = parse_investment_with_helpers(contract_table)
    return pd.DataFrame([data])

In [48]:
import requests
session = requests.Session()
data = parse_contract(session, '20250620800297')

data

Unnamed: 0,투자구분,투자금액(원),자기자본(원),자기자본대비(%),자산총액,자산총액대비(%),결정일,시작일,종료일
0,신규시설투자,228000000000,3574045867619,6.38,,,2025-06-20,2025-08-01,2027-06-30


## 테스트

In [68]:
import os
import io
import re
from datetime import datetime
from zipfile import ZipFile, BadZipFile

import requests
import pandas as pd
from bs4 import BeautifulSoup, element
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()
API_KEY = os.getenv("DART_API_KEY")


def fetch_sales(session) -> pd.DataFrame:
    """DART에서 '신규시설' 공시 목록을 가져와 DataFrame으로 반환."""
    url = 'https://opendart.fss.or.kr/api/list.json'
    base_params = {
        'crtfc_key': API_KEY,
        'bgn_de': 20250620,
        'end_de': 20250623,
        'pblntf_detail_ty': 'I001',
        'page_count': 100,
        'last_reprt_at': 'Y',
    }

    all_reports = []
    resp = session.get(url, params={**base_params, 'page_no': 1}, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    total_page = int(data.get('total_page', 1))
    all_reports.extend(data.get('list', []))

    for page in range(2, total_page + 1):
        resp = session.get(url, params={**base_params, 'page_no': page}, timeout=10)
        resp.raise_for_status()
        reports = resp.json().get('list', [])
        if not reports:
            break
        all_reports.extend(reports)

    df = pd.DataFrame(all_reports)
    if df.empty or 'report_nm' not in df.columns:
        return pd.DataFrame()

    # '신규시설'이 포함된 보고서명만 필터
    return df[df['report_nm'].str.contains('신규시설', na=False)].reset_index(drop=True)


def parse_investment_with_helpers(html) -> dict:
    """공시 HTML 테이블(Tag 또는 문자열)에서 투자정보 8개 필드를 추출."""
    if isinstance(html, element.Tag):
        table = html
    elif isinstance(html, str):
        soup = BeautifulSoup(html, 'html.parser')
        table = soup.find(id="XFormD1_Form0_Table0")
        if table is None:
            raise ValueError("ID 'XFormD1_Form0_Table0'인 <table>을 찾을 수 없습니다.")
    else:
        raise TypeError("html은 str 또는 bs4.element.Tag 이어야 합니다.")

    def get_val(keys):
        # 키워드는 길이 순으로 정렬 (긴 것 먼저)
        sorted_keys = sorted(keys, key=lambda x: len(x), reverse=True)
        for tr in table.find_all('tr'):
            tds = tr.find_all('td')
            if len(tds) == 2:
                label = re.sub(r'\s+', '', tds[0].get_text())
                val   = tds[1].get_text(strip=True)
            elif len(tds) >= 3:
                label = re.sub(r'\s+', '', tds[0].get_text() + tds[1].get_text())
                val   = tds[-1].get_text(strip=True)
            else:
                continue

            for k in sorted_keys:
                if k.replace('ㆍ','') in label or k in label:
                    return val
        return None

    def get_int(keys):
        raw = get_val(keys)
        return None if not raw or raw == '-' else int(raw.replace(',', ''))

    def get_float(keys):
        raw = get_val(keys)
        return None if not raw or raw == '-' else float(raw.replace(',', ''))

    def get_date(keys):
        raw = get_val(keys)
        if not raw:
            return None
        try:
            return datetime.strptime(raw, '%Y-%m-%d').date()
        except ValueError:
            return raw

    return {
        '투자구분':             get_val(['투자구분']),
        '투자금액(원)':         get_int(['투자금액', '투자금액(원)']),
        '자기자본(원)':         get_int(['자기자본', '자기자본(원)']),
        '자기자본대비(%)':      get_float(['자기자본대비', '자기자본대비(%)']),
        '최근사업연도말 자산총액(원)': get_int(['최근사업연도말자산총액(원)']),
        '자산총액대비(%)':      get_float(['자산총액대비', '자산총액대비(%)']),
        '결정일':               get_date(['이사회결의일(결정일)', '이사회결의일', '결정일', '이사회결정일']),
        '시작일':               get_date(['시작일']),
        '종료일':               get_date(['종료일']),
    }


def parse_contract(session, rcept_no: str) -> dict:
    """단일 rcept_no에 대해 DART 문서를 내려받아 투자 테이블을 파싱."""
    resp = session.get(
        'https://opendart.fss.or.kr/api/document.xml',
        params={'crtfc_key': API_KEY, 'rcept_no': rcept_no},
        timeout=10
    )
    resp.raise_for_status()
    try:
        z = ZipFile(io.BytesIO(resp.content))
    except BadZipFile:
        return {}
    fname = next(f for f in z.namelist() if f.lower().endswith(('.xml', '.html')))
    html = z.read(fname).decode('utf-8', errors='ignore')
    soup = BeautifulSoup(html, 'lxml')

    # '투자구분' 키워드가 있는 테이블을 찾아 파싱
    for tbl in soup.find_all('table'):
        if '투자구분' in tbl.get_text():
            return parse_investment_with_helpers(tbl)
    return {}


if __name__ == '__main__':
    session = requests.Session()

    # 1) 검색: 신규시설 공시 목록
    sales_df = fetch_sales(session)

    # 2) 파싱: rcept_no 별로 투자정보 추출
    parsed_list = []
    for no in tqdm(sales_df['rcept_no'], desc="Parsing"):
        parsed = parse_contract(session, str(no)) or {}
        parsed['rcept_no'] = no
        parsed_list.append(parsed)

    parsed_df = pd.DataFrame(parsed_list)

    # 3) 병합: 원본 목록 + 파싱 결과
    result_df = sales_df[['corp_name', 'rcept_no']].merge(parsed_df, on='rcept_no', how='left')
    # result_df = parsed_df
    # 4) 공시회사 컬럼 추가 및 컬럼명 한글화
    # result_df['공시회사'] = result_df['corp_name']
    result_df = result_df.rename(columns={
        'corp_name': '공시회사',
        '투자구분': '투자구분',
        '투자금액(원)': '투자금액(원)',
        '자기자본(원)': '자기자본(원)',
        '자기자본대비(%)': '자기자본대비(%)',
        '최근사업연도말 자산총액(원)': '자산총액(원)',
        '자산총액대비(%)': '자산총액대비(%)',
        '시작일': '시작일',
        '종료일': '종료일',
    })

Parsing: 100%|██████████| 4/4 [00:00<00:00, 38.78it/s]


In [69]:
result_df

Unnamed: 0,공시회사,rcept_no,투자구분,투자금액(원),자기자본(원),자기자본대비(%),자산총액(원),자산총액대비(%),결정일,시작일,종료일
0,아하,20250620600515,해외(우즈베키스탄) 신규 생산공장 설립,27592000000,39907985862,69.14,109299100000.0,25.24,2025-06-20,2025-06-20,2027-02-28
1,오리온홀딩스,20250620800332,신규시설투자,228000000000,3574045867619,6.38,,,2025-06-20,2025-08-01,2027-06-30
2,오리온,20250620800297,신규시설투자,228000000000,3574045867619,6.38,,,2025-06-20,2025-08-01,2027-06-30
3,한미반도체,20250620800077,신규시설투자,28480000000,540887863389,5.26,,,2025-06-20,2025-08-01,2026-11-30


In [111]:
import os
import io
import re
from datetime import datetime
from zipfile import ZipFile, BadZipFile

import requests
import pandas as pd
from bs4 import BeautifulSoup, element
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()
API_KEY = os.getenv("DART_API_KEY")


def fetch_sales(session) -> pd.DataFrame:
    """DART에서 '신규시설' 공시 목록을 가져와 DataFrame으로 반환."""
    url = 'https://opendart.fss.or.kr/api/list.json'
    base_params = {
        'crtfc_key': API_KEY,
        'bgn_de': 20250620,
        'end_de': 20250623,
        'pblntf_detail_ty': 'I001',
        'page_count': 100,
        'last_reprt_at': 'Y',
    }

    all_reports = []
    resp = session.get(url, params={**base_params, 'page_no': 1}, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    total_page = int(data.get('total_page', 1))
    all_reports.extend(data.get('list', []))

    for page in range(2, total_page + 1):
        resp = session.get(url, params={**base_params, 'page_no': page}, timeout=10)
        resp.raise_for_status()
        reports = resp.json().get('list', [])
        if not reports:
            break
        all_reports.extend(reports)

    df = pd.DataFrame(all_reports)
    if df.empty or 'report_nm' not in df.columns:
        return pd.DataFrame()
    
    return df[
        df['report_nm'].str.contains('신규시설', na=False) &
        ~df['report_nm'].str.contains('자회사')
    ].reset_index(drop=True)    


def parse_investment_with_helpers(html) -> dict:
    """공시 HTML 테이블(Tag 또는 문자열)에서 투자정보 8개 필드를 추출."""
    if isinstance(html, element.Tag):
        table = html
    elif isinstance(html, str):
        soup = BeautifulSoup(html, 'html.parser')
        table = soup.find(id="XFormD1_Form0_Table0")
        if table is None:
            raise ValueError("ID 'XFormD1_Form0_Table0'인 <table>을 찾을 수 없습니다.")
    else:
        raise TypeError("html은 str 또는 bs4.element.Tag 이어야 합니다.")

    def get_val(keys):
        sorted_keys = sorted(keys, key=len, reverse=True)
        for tr in table.find_all('tr'):
            tds = tr.find_all('td')
            if len(tds) == 2:
                label = re.sub(r'\s+', '', tds[0].get_text())
                val   = tds[1].get_text(strip=True)
            elif len(tds) >= 3:
                label = re.sub(r'\s+', '', tds[0].get_text() + tds[1].get_text())
                val   = tds[-1].get_text(strip=True)
            else:
                continue
            for k in sorted_keys:
                if k.replace('ㆍ', '') in label or k in label:
                    return val
        return None

    def get_int(keys):
        raw = get_val(keys)
        return None if not raw or raw == '-' else int(raw.replace(',', ''))

    def get_float(keys):
        raw = get_val(keys)
        return None if not raw or raw == '-' else float(raw.replace(',', ''))

    def get_date(keys):
        raw = get_val(keys)
        if not raw:
            return None
        try:
            return datetime.strptime(raw, '%Y-%m-%d').date()
        except ValueError:
            return raw

    return {
        '투자구분':                  get_val(['투자대상']) or get_val(['투자구분']),
        '투자금액(원)':              get_int(['투자금액(원)', '투자금액']),
        '자기자본(원)':              get_int(['자기자본(원)', '자기자본']),
        '자기자본대비(%)':           get_float(['자기자본대비(%)']),
        '최근사업연도말 자산총액(원)': get_int(['최근사업연도말자산총액(원)']),
        '자산총액대비(%)':           get_float(['자산총액대비(%)']),
        '결정일':                    get_date(['이사회결의일(결정일)','결정일']),
        '시작일':                    get_date(['시작일']),
        '종료일':                    get_date(['종료일']),
    }


def parse_contract(session, rcept_no: str) -> dict:
    """단일 rcept_no에 대해 DART 문서를 내려받아 투자 테이블을 파싱."""
    resp = session.get(
        'https://opendart.fss.or.kr/api/document.xml',
        params={'crtfc_key': API_KEY, 'rcept_no': rcept_no},
        timeout=10
    )
    resp.raise_for_status()
    try:
        z = ZipFile(io.BytesIO(resp.content))
    except BadZipFile:
        return {}
    fname = next(f for f in z.namelist() if f.lower().endswith(('.xml', '.html')))
    html = z.read(fname).decode('utf-8', errors='ignore')
    soup = BeautifulSoup(html, 'lxml')

    for tbl in soup.find_all('table'):
        if '투자구분' in tbl.get_text():
            return parse_investment_with_helpers(tbl)
    return {}


if __name__ == '__main__':
    session = requests.Session()

    # 1) 검색: '신규시설' 공시 목록
    sales_df = fetch_sales(session)

    # 2) 순회: corp_name, rcept_no 가져와서 파싱 후 즉시 합치기
    parsed_list = []
    for rec in tqdm(
        sales_df[['corp_name','rcept_no', 'rcept_dt', 'stock_code']].to_dict(orient='records'),
        desc="Parsing contracts"
    ):
        parsed = parse_contract(session, str(rec['rcept_no'])) or {}
        formatted_date = datetime.strptime(str(rec['rcept_dt']), '%Y%m%d').strftime('%Y-%m-%d')
        # corp_name 과 rcept_no 를 바로 병합
        parsed.update({
            '공시회사': rec['corp_name'],
            '공시일': formatted_date,
            '종목코드': rec['stock_code']
        })
        parsed_list.append(parsed)

    # 3) DataFrame 생성 & 컬럼 순서 지정
    final_df = pd.DataFrame(parsed_list, columns=[
        '공시회사', '공시일', '종목코드',
        '투자구분','투자금액(원)','자기자본(원)','자기자본대비(%)',
        '최근사업연도말 자산총액(원)','자산총액대비(%)',
        '결정일','시작일','종료일'
    ])
    
    final_df

Parsing contracts: 100%|██████████| 3/3 [00:00<00:00, 45.88it/s]


In [112]:
final_df

Unnamed: 0,공시회사,공시일,종목코드,투자구분,투자금액(원),자기자본(원),자기자본대비(%),최근사업연도말 자산총액(원),자산총액대비(%),결정일,시작일,종료일
0,아하,2025-06-20,102950,해외(우즈베키스탄) 신규 생산공장 설립,27592000000,39907985862,69.14,109299100000.0,25.24,2025-06-20,2025-06-20,2027-02-28
1,오리온,2025-06-20,271560,오리온 진천공장 건립,228000000000,3574045867619,6.38,,,2025-06-20,2025-08-01,2027-06-30
2,한미반도체,2025-06-20,42700,한미반도체 7공장,28480000000,540887863389,5.26,,,2025-06-20,2025-08-01,2026-11-30


In [105]:
import requests
import pandas as pd
from bs4 import BeautifulSoup

def fetch_history(code: str, page: int = 1) -> pd.DataFrame:
    """
    네이버 금융에서 지정한 종목코드(code)의 일별 시세 페이지(page)를 긁어와
    ['date','close'] 컬럼 DataFrame으로 반환.
    """
    url = f"https://finance.naver.com/item/sise_day.naver?code={code}&page={page}"
    resp = requests.get(url, headers={"User-Agent": "Mozilla/5.0"})
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'html.parser')

    table = soup.select_one("table.type2")
    rows = table.find_all("tr")

    records = []
    for row in rows:
        cols = row.find_all("td")
        if len(cols) < 7:
            continue  # 데이터가 없는 빈 행 skip
        date_txt  = cols[0].get_text(strip=True)  # '2025.06.20'
        close_txt = cols[1].get_text(strip=True).replace(",", "")  # '12345'
        if not date_txt or not close_txt:
            continue

        records.append({
            "date": pd.to_datetime(date_txt, format="%Y.%m.%d").date(),
            "close": int(close_txt)
        })

    return pd.DataFrame(records)

# 사용 예시
df1 = fetch_history("102950", page=1)
# print(df1.head())

In [106]:
df1

Unnamed: 0,date,close
0,2025-06-24,2265
1,2025-06-23,2270
2,2025-06-20,2340
3,2025-06-19,2210
4,2025-06-18,2295
5,2025-06-17,2335
6,2025-06-16,2405
7,2025-06-13,2375
8,2025-06-12,2435
9,2025-06-11,2450


In [113]:
import requests
import pandas as pd
from bs4 import BeautifulSoup

# 1) page 단위로 일별 시세를 가져오는 함수
def fetch_history(code: str, page: int = 1) -> pd.DataFrame:
    url = f"https://finance.naver.com/item/sise_day.naver?code={code}&page={page}"
    resp = requests.get(url, headers={"User-Agent":"Mozilla/5.0"})
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'html.parser')
    table = soup.select_one("table.type2")
    records = []
    for row in table.find_all("tr"):
        cols = row.find_all("td")
        if len(cols) < 7: 
            continue
        date_txt  = cols[0].get_text(strip=True)
        close_txt = cols[1].get_text(strip=True).replace(",", "")
        if date_txt and close_txt:
            records.append({
                "date": pd.to_datetime(date_txt, format="%Y.%m.%d").date(),
                "close": int(close_txt)
            })
    return pd.DataFrame(records)

# 2) target_date 전일·당일·익일 종가를 뽑아주는 함수
def get_prev_curr_next(df: pd.DataFrame, target_date: datetime.date) -> dict:
    df2 = df.sort_values("date").reset_index(drop=True)
    idx = df2.index[df2["date"] == target_date]
    if idx.empty:
        return {"prev_close": None, "curr_close": None, "next_close": None}
    i = idx[0]
    return {
        "전일종가": df2.loc[i-1, "close"] if i-1 >= 0 else None,
        "당일종가": df2.loc[i,   "close"],
        "익일종가": df2.loc[i+1, "close"] if i+1 < len(df2) else None
    }

# 3) final_df에 컬럼 추가
price_cols = []
for rec in tqdm(final_df.to_dict(orient="records"), desc="Fetching prices"):
    code = rec["종목코드"]
    target = datetime.strptime(rec["공시일"], "%Y-%m-%d").date()
    # 충분한 과거·미래 데이터를 보려면 여러 페이지 읽기
    hist = fetch_history(code, page=1)
    # 필요에 따라 page=2,3... 도 추가로 concat 하셔도 됩니다.

    p = get_prev_curr_next(hist, target)
    price_cols.append(p)

# 4) 만들어진 리스트를 DataFrame으로 합치기
price_df = pd.DataFrame(price_cols)
final_df = pd.concat([final_df, price_df], axis=1)

# 이제 final_df에 '전일종가','당일종가','익일종가'가 붙어 있습니다.
# print(final_df.head())

Fetching prices: 100%|██████████| 3/3 [00:00<00:00, 34.30it/s]


In [114]:
final_df

Unnamed: 0,공시회사,공시일,종목코드,투자구분,투자금액(원),자기자본(원),자기자본대비(%),최근사업연도말 자산총액(원),자산총액대비(%),결정일,시작일,종료일,전일종가,당일종가,익일종가
0,아하,2025-06-20,102950,해외(우즈베키스탄) 신규 생산공장 설립,27592000000,39907985862,69.14,109299100000.0,25.24,2025-06-20,2025-06-20,2027-02-28,2210,2340,2270
1,오리온,2025-06-20,271560,오리온 진천공장 건립,228000000000,3574045867619,6.38,,,2025-06-20,2025-08-01,2027-06-30,107000,107200,105500
2,한미반도체,2025-06-20,42700,한미반도체 7공장,28480000000,540887863389,5.26,,,2025-06-20,2025-08-01,2026-11-30,92500,93000,90400


## 최최종

In [None]:
import os
import io
import re
from datetime import datetime
from zipfile import ZipFile, BadZipFile
from openpyxl import load_workbook
from openpyxl.styles import Alignment, Font
from pandas.api.types import is_integer_dtype, is_float_dtype

import requests
import pandas as pd
from bs4 import BeautifulSoup, element
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()
API_KEY = os.getenv("DART_API_KEY")


def fetch_sales(session) -> pd.DataFrame:
    """DART에서 '신규시설' 공시 목록을 가져와 DataFrame으로 반환."""
    url = 'https://opendart.fss.or.kr/api/list.json'
    base_params = {
        'crtfc_key': API_KEY,
        'bgn_de': 20250601,
        'end_de': 20250619,
        'pblntf_detail_ty': 'I001',
        'page_count': 100,
        'last_reprt_at': 'Y',
    }

    all_reports = []
    resp = session.get(url, params={**base_params, 'page_no': 1}, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    total_page = int(data.get('total_page', 1))
    all_reports.extend(data.get('list', []))

    for page in range(2, total_page + 1):
        resp = session.get(url, params={**base_params, 'page_no': page}, timeout=10)
        resp.raise_for_status()
        reports = resp.json().get('list', [])
        if not reports:
            break
        all_reports.extend(reports)

    df = pd.DataFrame(all_reports)
    if df.empty or 'report_nm' not in df.columns:
        return pd.DataFrame()
    
    return df[
        df['report_nm'].str.contains('신규시설', na=False) &
        ~df['report_nm'].str.contains('자회사')
    ].reset_index(drop=True)


def parse_investment_with_helpers(html) -> dict:
    """공시 HTML 테이블에서 투자정보 8개 필드를 추출."""
    if isinstance(html, element.Tag):
        table = html
    elif isinstance(html, str):
        soup = BeautifulSoup(html, 'html.parser')
        table = soup.find(id="XFormD1_Form0_Table0")
        if table is None:
            raise ValueError("ID 'XFormD1_Form0_Table0'인 <table>을 찾을 수 없습니다.")
    else:
        raise TypeError("html은 str 또는 bs4.element.Tag 이어야 합니다.")

    def get_val(keys):
        sorted_keys = sorted(keys, key=len, reverse=True)
        for tr in table.find_all('tr'):
            tds = tr.find_all('td')
            if len(tds) == 2:
                label = re.sub(r'\s+', '', tds[0].get_text())
                val   = tds[1].get_text(strip=True)
            elif len(tds) >= 3:
                label = re.sub(r'\s+', '', tds[0].get_text() + tds[1].get_text())
                val   = tds[-1].get_text(strip=True)
            else:
                continue
            for k in sorted_keys:
                if k.replace('ㆍ','') in label or k in label:
                    return val
        return None

    def get_int(keys):
        raw = get_val(keys)
        return None if not raw or raw=='-' else int(raw.replace(',',''))

    def get_float(keys):
        raw = get_val(keys)
        return None if not raw or raw=='-' else float(raw.replace(',',''))

    def get_date(keys):
        raw = get_val(keys)
        if not raw:
            return None
        try:
            return datetime.strptime(raw, '%Y-%m-%d').date()
        except ValueError:
            return raw

    return {
        '투자구분':                  get_val(['투자대상']) or get_val(['투자구분']),
        '투자금액(백만원)':              get_int(['투자금액(원)','투자금액'])/1_000_000,
        '자기자본(백만원)':              get_int(['자기자본(원)','자기자본'])/1_000_000,
        '자기자본대비(%)':           get_float(['자기자본대비(%)']),
        # '최근사업연도말 자산총액(원)': get_int(['최근사업연도말자산총액(원)']),
        # '자산총액대비(%)':           get_float(['자산총액대비(%)']),
        '결정일':                    get_date(['이사회결의일(결정일)','결정일']),
        '시작일':                    get_date(['시작일']),
        '종료일':                    get_date(['종료일']),
    }


def parse_contract(session, rcept_no: str) -> dict:
    """단일 rcept_no에 대해 DART 문서를 내려받아 투자 테이블을 파싱."""
    resp = session.get(
        'https://opendart.fss.or.kr/api/document.xml',
        params={'crtfc_key': API_KEY, 'rcept_no': rcept_no},
        timeout=10
    )
    resp.raise_for_status()
    try:
        z = ZipFile(io.BytesIO(resp.content))
    except BadZipFile:
        return {}
    fname = next(f for f in z.namelist() if f.lower().endswith(('.xml','.html')))
    html = z.read(fname).decode('utf-8', errors='ignore')
    soup = BeautifulSoup(html, 'lxml')

    for tbl in soup.find_all('table'):
        if '투자구분' in tbl.get_text():
            return parse_investment_with_helpers(tbl)
    return {}


# ── 아래부터 신규 기능 통합 ──

def fetch_history(code: str, page: int = 1) -> pd.DataFrame:
    """네이버 금융에서 종목코드의 일별 시세표(page)를 반환."""
    url = f"https://finance.naver.com/item/sise_day.naver?code={code}&page={page}"
    resp = requests.get(url, headers={"User-Agent":"Mozilla/5.0"})
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'html.parser')
    table = soup.select_one("table.type2")
    records = []
    for row in table.find_all("tr"):
        cols = row.find_all("td")
        if len(cols) < 7:
            continue
        date_txt  = cols[0].get_text(strip=True)
        close_txt = cols[1].get_text(strip=True).replace(",","")
        if date_txt and close_txt:
            records.append({
                "date": pd.to_datetime(date_txt, format="%Y.%m.%d").date(),
                "close": int(close_txt)
            })
    return pd.DataFrame(records)


def get_prev_curr_next(df: pd.DataFrame, target_date: datetime.date) -> dict:
    """DataFrame에서 target_date 전일·당일·익일 종가를 반환."""
    df2 = df.sort_values("date").reset_index(drop=True)
    idx = df2.index[df2["date"] == target_date]
    if idx.empty:
        return {"전일종가":None, "당일종가":None, "익일종가":None}
    i = idx[0]
    return {
        "전일종가": df2.loc[i-1, "close"] if i-1>=0 else None,
        "당일종가": df2.loc[i,   "close"],
        "익일종가": df2.loc[i+1, "close"] if i+1<len(df2) else None
    }


# if __name__ == '__main__':
#     session = requests.Session()

#     # 1) 검색: '신규시설' 공시 목록
#     sales_df = fetch_sales(session)

#     # 2) 순회 + 파싱 + 시세 크롤링
#     parsed_list = []
#     for rec in tqdm(
#         sales_df[['corp_name','rcept_no','rcept_dt','stock_code']]
#                 .to_dict(orient='records'),
#         desc="Processing"
#     ):
#         parsed = parse_contract(session, str(rec['rcept_no'])) or {}

#         # 공시회사/공시일/종목코드
#         parsed.update({
#             '공시회사': rec['corp_name'],
#             '공시일':   datetime.strptime(str(rec['rcept_dt']), '%Y%m%d').strftime('%Y-%m-%d'),
#             '종목코드': rec['stock_code']
#         })

#         # 3) 해당 종목 시세 크롤링 & 전/당/익일 종가 가져오기
#         hist = fetch_history(rec['stock_code'], page=1)
#         prices = get_prev_curr_next(hist, datetime.strptime(parsed['공시일'], '%Y-%m-%d').date())
#         parsed.update(prices)

#         parsed_list.append(parsed)

#     # 4) 최종 DataFrame 생성
#     final_df = pd.DataFrame(parsed_list, columns=[
#         '공시회사','공시일','종목코드',
#         '투자구분','투자금액(백만원)','자기자본(백만원)','자기자본대비(%)',
#         '결정일','시작일','종료일',
#         '전일종가','당일종가','익일종가'
#     ])

if __name__ == '__main__':
    session = requests.Session()

    # 1) 검색: '신규시설' 공시 목록
    sales_df = fetch_sales(session)

    # 2) 순회 + 파싱 + 시세 크롤링
    parsed_list = []
    for rec in tqdm(
        sales_df[['corp_name','rcept_no','rcept_dt','stock_code']]
                .to_dict(orient='records'),
        desc="Processing"
    ):
        parsed = parse_contract(session, str(rec['rcept_no'])) or {}

        # 공시회사/공시일/종목코드
        parsed.update({
            '공시회사': rec['corp_name'],
            '공시일':   datetime.strptime(str(rec['rcept_dt']), '%Y%m%d').strftime('%Y-%m-%d'),
            '종목코드': rec['stock_code']
        })

        # 3) 해당 종목 시세 크롤링 & 전/당/익일 종가 가져오기
        hist   = fetch_history(rec['stock_code'], page=1)
        prices = get_prev_curr_next(hist, datetime.strptime(parsed['공시일'], '%Y-%m-%d').date())
        parsed.update(prices)

        parsed_list.append(parsed)

    # 4) 최종 DataFrame 생성
    final_df = pd.DataFrame(parsed_list, columns=[
        '공시회사','공시일','종목코드',
        '투자구분','투자금액(백만원)','자기자본(백만원)','자기자본대비(%)',
        '결정일','시작일','종료일',
        '전일종가','당일종가','익일종가'
    ])

from openpyxl import Workbook, load_workbook
from openpyxl.styles import Alignment, Font
from pandas.api.types import is_integer_dtype, is_float_dtype

def update_excel(result_df: pd.DataFrame, excel_path: str, sheet_name: str = 'main'):
    # 1) 워크북 준비
    if os.path.exists(excel_path):
        try:
            wb = load_workbook(excel_path)
        except BadZipFile:
            wb = Workbook()
    else:
        wb = Workbook()

    # 2) 시트 준비
    if sheet_name in wb.sheetnames:
        ws = wb[sheet_name]
        existing = pd.read_excel(
            excel_path,
            sheet_name=sheet_name,
            parse_dates=['공시일','시작일','종료일'],
            usecols=list(result_df.columns)
        )
    else:
        ws = wb.create_sheet(sheet_name)
        ws.append(list(result_df.columns))  # 헤더
        existing = pd.DataFrame(columns=result_df.columns)

    # 3) 중복 제거
    def filter_new(df_new, df_old):
        if df_old.empty:
            return df_new
        mask = ~df_new.apply(
            lambda r: ((df_old['공시회사']==r['공시회사']) &
                       (df_old['공시일']==r['공시일'])).any(),
            axis=1
        )
        return df_new[mask]

    new_rows = filter_new(result_df, existing)

    # 4) 스타일링용 준비
    header = [cell.value for cell in ws[1]]
    num_cols = [c for c in result_df.columns
                if is_integer_dtype(result_df[c]) or is_float_dtype(result_df[c])]
    col_map = {c: header.index(c)+1 for c in num_cols if c in header}
    alignment = Alignment(horizontal='center', vertical='center')
    font      = Font(size=10)

    # 5) 새 행 append 및 스타일 적용
    for _, r in new_rows.iterrows():
        ws.append([r.get(c, '') for c in result_df.columns])
        row_idx = ws.max_row
        for col_idx, col_name in enumerate(header, start=1):
            cell = ws.cell(row=row_idx, column=col_idx)
            cell.alignment = alignment
            cell.font      = font

            if col_name in ['공시일','시작일','종료일'] and pd.notna(r.get(col_name)):
                cell.number_format = 'yyyy-mm-dd'
            elif col_name in col_map:
                if is_float_dtype(result_df[col_name]):
                    cell.number_format = '#,##0'
                else:
                    cell.number_format = '#,##0'

    # 6) 저장
    wb.save(excel_path)

Processing: 100%|██████████| 12/12 [00:00<00:00, 17.56it/s]


In [143]:
if __name__ == '__main__':
    session = requests.Session()

    # 1) 검색
    sales_df = fetch_sales(session)

    # 2) 파싱 + 시세 크롤링
    parsed_list = []
    for rec in tqdm(
        sales_df[['corp_name','rcept_no','rcept_dt','stock_code']].to_dict(orient='records'),
        desc="Processing"
    ):
        parsed = parse_contract(session, str(rec['rcept_no'])) or {}
        parsed.update({
            '공시회사': rec['corp_name'],
            '공시일':   datetime.strptime(str(rec['rcept_dt']), '%Y%m%d').strftime('%Y-%m-%d'),
            '종목코드': rec['stock_code']
        })
        hist   = fetch_history(rec['stock_code'], page=1)
        prices = get_prev_curr_next(hist, datetime.strptime(parsed['공시일'], '%Y-%m-%d').date())
        parsed.update(prices)
        parsed_list.append(parsed)

    # 3) DataFrame 생성
    final_df = pd.DataFrame(parsed_list, columns=[
        '공시회사','공시일','종목코드',
        '투자구분','투자금액(백만원)','자기자본(백만원)','자기자본대비(%)',
        '결정일','시작일','종료일',
        '전일종가','당일종가','익일종가'
    ])

    # 4) 업데이트 함수로 엑셀에 append
    excel_path = "신규시설_투자내역.xlsx"
    update_excel(final_df, excel_path, sheet_name="Sheet1")
    print(f"업데이트 완료: '{excel_path}'의 'main' 시트에 새 데이터가 추가되었습니다.")

Processing: 100%|██████████| 12/12 [00:00<00:00, 17.94it/s]

업데이트 완료: '신규시설_투자내역.xlsx'의 'main' 시트에 새 데이터가 추가되었습니다.





In [132]:
final_df

Unnamed: 0,공시회사,공시일,종목코드,투자구분,투자금액(백만원),자기자본(백만원),자기자본대비(%),결정일,시작일,종료일,전일종가,당일종가,익일종가
0,큐라티스,2025-06-19,348080,신규시설투자,6891.0,30465.11,22.62,2025-06-19,2025-06-19,2026-02-28,1048.0,1028.0,975.0
1,진에어,2025-06-17,272450,항공기 운항훈련장비,22646.51,52200.0,43.4,2025-06-17,2025-06-17,2026-10-31,8900.0,9000.0,9010.0
2,LG디스플레이,2025-06-17,34220,OLED 생산 시설,1260000.0,8072807.0,15.6,2025-06-17,2025-06-17,2027-06-30,8790.0,8940.0,9210.0
3,제이에스링크,2025-06-16,127120,제이에스링크 예산공장 공장동 대수선공사,4200.0,36618.64,11.47,2025-02-28,2025-03-01,2025-08-31,9490.0,9540.0,9540.0
4,뉴보텍,2025-06-13,60260,신규 시설투자 등(제설제사업부 공장증설 및 이전),1535.5,20950.99,7.33,2025-06-13,2025-06-16,2026-09-30,1869.0,1759.0,1799.0
5,에스켐,2025-06-13,475660,신규 시설투자 등,4000.0,38247.31,10.46,2025-06-13,2025-06-13,2025-12-31,5180.0,5200.0,5220.0
6,진바이오텍,2025-06-10,86060,신규시설투자,3290.0,48304.75,6.81,2024-10-08,2024-10-10,2025-06-10,,,
7,와이엠티,2025-06-10,251370,신규시설투자,58608.0,151365.7,38.72,2023-04-18,2023-06-20,2025-06-30,,,
8,SK케미칼,2025-06-09,285130,Multi-Utility 사업투자,672600.0,958932.8,70.1,2021-09-13,2022-05-01,2025-06-30,,,
9,티에스아이,2025-06-09,277880,신규시설(공장신축)투자,17600.0,97410.02,18.1,2024-04-08,2024-04-08,2025-08-30,,,


## 최최최종

In [1]:
import os
import io
import re
from datetime import datetime
from zipfile import ZipFile, BadZipFile

import requests
import pandas as pd
from bs4 import BeautifulSoup, element
from dotenv import load_dotenv
from tqdm import tqdm
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Alignment, Font
from pandas.api.types import is_integer_dtype, is_float_dtype

load_dotenv()
API_KEY = os.getenv("DART_API_KEY")


def fetch_sales(session) -> pd.DataFrame:
    """DART에서 '신규시설' 공시 목록을 가져와 DataFrame으로 반환."""
    url = 'https://opendart.fss.or.kr/api/list.json'
    params = {
        'crtfc_key': API_KEY,
        'bgn_de': 20250620,
        'end_de': 20250625,
        'pblntf_detail_ty': 'I001',
        'page_count': 100,
        'last_reprt_at': 'Y',
    }
    all_reports = []
    resp = session.get(url, params={**params, 'page_no': 1}, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    total_page = int(data.get('total_page', 1))
    all_reports.extend(data.get('list', []))
    for page in range(2, total_page + 1):
        resp = session.get(url, params={**params, 'page_no': page}, timeout=10)
        resp.raise_for_status()
        reports = resp.json().get('list', [])
        if not reports:
            break
        all_reports.extend(reports)
    df = pd.DataFrame(all_reports)
    if df.empty or 'report_nm' not in df.columns:
        return pd.DataFrame()
    return df[ df['report_nm'].str.contains('신규시설', na=False) & ~df['report_nm'].str.contains('자회사') ].reset_index(drop=True)


def parse_investment_with_helpers(html) -> dict:
    """공시 HTML 테이블에서 투자정보 필드를 추출."""
    table = html if isinstance(html, element.Tag) else BeautifulSoup(html, 'html.parser').find(id="XFormD1_Form0_Table0")
    if table is None:
        raise ValueError("투자 테이블을 찾을 수 없습니다.")
    def get_val(keys):
        keys = sorted(keys, key=len, reverse=True)
        for tr in table.find_all('tr'):
            tds = tr.find_all('td')
            if len(tds) < 2:
                continue
            raw_label = tds[0].get_text() + (tds[1].get_text() if len(tds) >= 3 else '')
            label = re.sub(r'\s+', '', raw_label)
            val = tds[-1].get_text(strip=True)
            for k in keys:
                if k.replace('ㆍ','') in label or k in label:
                    return val
        return None
    def get_int(keys):
        v = get_val(keys)
        return None if not v or v=='-' else int(v.replace(',',''))
    def get_float(keys):
        v = get_val(keys)
        return None if not v or v=='-' else float(v.replace(',',''))
    def get_date(keys):
        v = get_val(keys)
        if not v:
            return None
        try:
            return datetime.strptime(v, '%Y-%m-%d').date()
        except ValueError:
            return None
    return {
        '투자구분':    get_val(['투자대상']) or get_val(['투자구분']),
        '투자금액(백만원)': get_int(['투자금액(원)','투자금액'])/1_000_000,
        '자기자본(백만원)': get_int(['자기자본(원)','자기자본'])/1_000_000,
        '자기자본대비(%)': get_float(['자기자본대비(%)']),
        '결정일':      get_date(['이사회결의일(결정일)','결정일']),
        '시작일':      get_date(['시작일']),
        '종료일':      get_date(['종료일']),
    }


def parse_contract(session, rcept_no: str) -> dict:
    resp = session.get(
        'https://opendart.fss.or.kr/api/document.xml',
        params={'crtfc_key': API_KEY, 'rcept_no': rcept_no},
        timeout=10
    )
    resp.raise_for_status()
    try:
        z = ZipFile(io.BytesIO(resp.content))
    except BadZipFile:
        return {}
    fname = next(f for f in z.namelist() if f.lower().endswith(('.xml','.html')))
    html = z.read(fname).decode('utf-8', errors='ignore')
    soup = BeautifulSoup(html, 'lxml')
    for tbl in soup.find_all('table'):
        if '투자구분' in tbl.get_text():
            return parse_investment_with_helpers(tbl)
    return {}


def fetch_history(code: str, page: int =1) -> pd.DataFrame:
    url = f"https://finance.naver.com/item/sise_day.naver?code={code}&page={page}"
    resp = requests.get(url, headers={"User-Agent":"Mozilla/5.0"})
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text,'html.parser')
    table = soup.select_one('table.type2')
    recs=[]
    for row in table.find_all('tr'):
        cols=row.find_all('td')
        if len(cols)<7: continue
        date_txt=cols[0].get_text(strip=True)
        close_txt=cols[1].get_text(strip=True).replace(',','')
        if date_txt and close_txt:
            recs.append({'date':pd.to_datetime(date_txt,'%Y.%m.%d').date(),'close':int(close_txt)})
    return pd.DataFrame(recs)


def get_prev_curr_next(df: pd.DataFrame, target_date: datetime.date) -> dict:
    df2=df.sort_values('date').reset_index(drop=True)
    idx=df2.index[df2['date']==target_date]
    if idx.empty:
        return {'전일종가':None,'당일종가':None,'익일종가':None}
    i=idx[0]
    return {
        '전일종가':df2.loc[i-1,'close'] if i-1>=0 else None,
        '당일종가':df2.loc[i  ,'close'],
        '익일종가':df2.loc[i+1,'close'] if i+1<len(df2) else None
    }


def update_excel(result_df: pd.DataFrame, excel_path: str, sheet_name: str='Sheet1'):
    # 워크북 준비
    if os.path.exists(excel_path):
        try:
            wb=load_workbook(excel_path)
        except BadZipFile:
            wb=Workbook()
    else:
        wb=Workbook()
    # 시트 준비
    if sheet_name in wb.sheetnames:
        ws=wb[sheet_name]
        existing=pd.read_excel(excel_path,sheet_name=sheet_name,parse_dates=['공시일','시작일','종료일'],usecols=list(result_df.columns))
    else:
        ws=wb.create_sheet(sheet_name)
        ws.append(list(result_df.columns))
        existing=pd.DataFrame(columns=result_df.columns)
    # 중복 제거
    if existing.empty:
        new_rows=result_df
    else:
        mask=~result_df.apply(lambda r:((existing['공시회사']==r['공시회사'])&(existing['공시일']==r['공시일'])).any(),axis=1)
        new_rows=result_df[mask]
    # 스타일 준비
    header=[c.value for c in ws[1]]
    num_cols=[c for c in result_df.columns if is_integer_dtype(result_df[c]) or is_float_dtype(result_df[c])]
    col_map={c:header.index(c)+1 for c in num_cols if c in header}
    align=Alignment(horizontal='center',vertical='center')
    font=Font(size=10)
    # append 및 스타일
    for _,r in new_rows.iterrows():
        ws.append([r.get(c,'') for c in result_df.columns])
        row=ws.max_row
        for ci,cname in enumerate(header,1):
            cell=ws.cell(row=row,column=ci)
            cell.alignment=align
            cell.font=font
            if cname in ['공시일','시작일','종료일'] and pd.notna(r.get(cname)):
                cell.number_format='yyyy-mm-dd'
            elif cname in col_map:
                cell.number_format='#,##0' if is_integer_dtype(result_df[cname]) else '#,##0'
    wb.save(excel_path)


if __name__=='__main__':
    sess=requests.Session()
    sales=fetch_sales(sess)
    parsed=[]
    for rec in tqdm(sales[['corp_name','rcept_no','rcept_dt','stock_code']].to_dict('records')):
        d=parse_contract(sess,str(rec['rcept_no'])) or {}
        d.update({
            '공시회사':rec['corp_name'], '공시일':datetime.strptime(str(rec['rcept_dt']),'%Y%m%d').strftime('%Y-%m-%d'),
            '종목코드':rec['stock_code']})
        h=fetch_history(rec['stock_code'])
        d.update(get_prev_curr_next(h,datetime.strptime(d['공시일'],'%Y-%m-%d').date()))
        parsed.append(d)
    final_df=pd.DataFrame(parsed,columns=[
        '공시회사','공시일','종목코드','투자구분','투자금액(백만원)','자기자본(백만원)','자기자본대비(%)',
        '결정일','시작일','종료일','전일종가','당일종가','익일종가'
    ])
    update_excel(final_df,'신규시설_투자내역.xlsx')


  0%|          | 0/7 [00:00<?, ?it/s]


AssertionError: 

In [2]:
import os
import io
import re
from datetime import datetime
from zipfile import ZipFile, BadZipFile

import requests
import pandas as pd
from bs4 import BeautifulSoup, element
from dotenv import load_dotenv
from tqdm import tqdm
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Alignment, Font
from pandas.api.types import is_integer_dtype, is_float_dtype

load_dotenv()
API_KEY = os.getenv("DART_API_KEY")


def fetch_sales(session) -> pd.DataFrame:
    """DART에서 '신규시설' 공시 목록을 가져와 DataFrame으로 반환."""
    url = 'https://opendart.fss.or.kr/api/list.json'
    params = {
        'crtfc_key': API_KEY,
        'bgn_de': 20250620,
        'end_de': 20250625,
        'pblntf_detail_ty': 'I001',
        'page_count': 100,
        'last_reprt_at': 'Y',
    }
    all_reports = []
    resp = session.get(url, params={**params, 'page_no': 1}, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    total_page = int(data.get('total_page', 1))
    all_reports.extend(data.get('list', []))
    for page in range(2, total_page + 1):
        resp = session.get(url, params={**params, 'page_no': page}, timeout=10)
        resp.raise_for_status()
        reports = resp.json().get('list', [])
        if not reports:
            break
        all_reports.extend(reports)
    df = pd.DataFrame(all_reports)
    if df.empty or 'report_nm' not in df.columns:
        return pd.DataFrame()
    return df[ df['report_nm'].str.contains('신규시설', na=False) & ~df['report_nm'].str.contains('자회사') & ~df['report_nm'].str.contains('철회')].reset_index(drop=True)


def parse_investment_with_helpers(html) -> dict:
    """공시 HTML 테이블에서 투자정보 필드를 추출."""
    table = html if isinstance(html, element.Tag) else BeautifulSoup(html, 'html.parser').find(id="XFormD1_Form0_Table0")
    if table is None:
        raise ValueError("투자 테이블을 찾을 수 없습니다.")
    def get_val(keys):
        keys = sorted(keys, key=len, reverse=True)
        for tr in table.find_all('tr'):
            tds = tr.find_all('td')
            if len(tds) < 2:
                continue
            raw_label = tds[0].get_text() + (tds[1].get_text() if len(tds) >= 3 else '')
            label = re.sub(r'\s+', '', raw_label)
            val = tds[-1].get_text(strip=True)
            for k in keys:
                if k.replace('ㆍ','') in label or k in label:
                    return val
        return None
    def get_int(keys):
        v = get_val(keys)
        return None if not v or v=='-' else int(v.replace(',',''))
    def get_float(keys):
        v = get_val(keys)
        return None if not v or v=='-' else float(v.replace(',',''))
    def get_date(keys):
        v = get_val(keys)
        if not v:
            return None
        try:
            return datetime.strptime(v, '%Y-%m-%d').date()
        except ValueError:
            return None
    try:
        raw_amount = get_int(['투자금액(원)', '투자금액'])
        raw_equity = get_int(['자기자본(원)', '자기자본'])
    except ValueError:
        return None 
       
    return {
        '투자구분':    get_val(['투자대상']) or get_val(['투자구분']),
        '투자금액(백만원)': raw_amount / 1_000_000 if raw_amount is not None else None,
        '자기자본(백만원)': raw_equity / 1_000_000 if raw_equity is not None else None,
        '자기자본대비(%)': get_float(['자기자본대비(%)']),
        '결정일':      get_date(['이사회결의일(결정일)','결정일']),
        '시작일':      get_date(['시작일']),
        '종료일':      get_date(['종료일']),
    }


def parse_contract(session, rcept_no: str) -> dict:
    resp = session.get(
        'https://opendart.fss.or.kr/api/document.xml',
        params={'crtfc_key': API_KEY, 'rcept_no': rcept_no},
        timeout=10
    )
    resp.raise_for_status()
    try:
        z = ZipFile(io.BytesIO(resp.content))
    except BadZipFile:
        return {}
    fname = next(f for f in z.namelist() if f.lower().endswith(('.xml','.html')))
    html = z.read(fname).decode('utf-8', errors='ignore')
    soup = BeautifulSoup(html, 'lxml')
    for tbl in soup.find_all('table'):
        if '투자구분' in tbl.get_text():
            return parse_investment_with_helpers(tbl)
    return {}


def fetch_history(code: str, page: int =1) -> pd.DataFrame:
    """네이버 금융에서 종목코드의 일별 시세표(page)를 반환."""
    url = f"https://finance.naver.com/item/sise_day.naver?code={code}&page={page}"
    resp = requests.get(url, headers={"User-Agent":"Mozilla/5.0"})
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text,'html.parser')
    table = soup.select_one('table.type2')
    recs = []
    for row in table.find_all('tr'):
        cols = row.find_all('td')
        if len(cols) < 7:
            continue
        date_txt  = cols[0].get_text(strip=True)
        close_txt = cols[1].get_text(strip=True).replace(',','')
        if not date_txt or not close_txt:
            continue
        # 날짜 파싱 안정화: 오류 시 건너뜀
        try:
            dt = pd.to_datetime(date_txt, format="%Y.%m.%d", errors='raise').date()
        except Exception:
            continue
        try:
            cl = int(close_txt)
        except ValueError:
            continue
        recs.append({'date': dt, 'close': cl})
    return pd.DataFrame(recs)


def get_prev_curr_next(df: pd.DataFrame, target_date: datetime.date) -> dict:
    df2=df.sort_values('date').reset_index(drop=True)
    idx=df2.index[df2['date']==target_date]
    if idx.empty:
        return {'전일종가':None,'당일종가':None,'익일종가':None}
    i=idx[0]
    return {
        '전일종가':df2.loc[i-1,'close'] if i-1>=0 else None,
        '당일종가':df2.loc[i  ,'close'],
        '익일종가':df2.loc[i+1,'close'] if i+1<len(df2) else None
    }


# def update_excel(result_df: pd.DataFrame, excel_path: str, sheet_name: str='신규투자'):
#     # 워크북 준비
#     if os.path.exists(excel_path):
#         try:
#             wb=load_workbook(excel_path)
#         except BadZipFile:
#             wb=Workbook()
#     else:
#         wb=Workbook()
#     # 시트 준비
#     if sheet_name in wb.sheetnames:
#         ws=wb[sheet_name]
#         existing=pd.read_excel(excel_path,sheet_name=sheet_name,parse_dates=['공시일','시작일','종료일'],usecols=list(result_df.columns))
#     else:
#         ws=wb.create_sheet(sheet_name)
#         ws.append(list(result_df.columns))
#         existing=pd.DataFrame(columns=result_df.columns)
#     # 중복 제거
#     if existing.empty:
#         new_rows=result_df
#     else:
#         mask=~result_df.apply(lambda r:((existing['공시회사']==r['공시회사'])&(existing['공시일']==r['공시일'])).any(),axis=1)
#         new_rows=result_df[mask]
#     # 스타일 준비
#     header=[c.value for c in ws[1]]
#     num_cols=[c for c in result_df.columns if is_integer_dtype(result_df[c]) or is_float_dtype(result_df[c])]
#     col_map={c:header.index(c)+1 for c in num_cols if c in header}
#     align=Alignment(horizontal='center',vertical='center')
#     font=Font(size=10)
#     # append 및 스타일
#     for _,r in new_rows.iterrows():
#         ws.append([r.get(c,'') for c in result_df.columns])
#         row=ws.max_row
#         for ci,cname in enumerate(header,1):
#             cell=ws.cell(row=row,column=ci)
#             cell.alignment=align
#             cell.font=font
#             if cname in ['공시일','시작일','종료일'] and pd.notna(r.get(cname)):
#                 cell.number_format='yyyy-mm-dd'
#             elif cname in col_map:
#                 cell.number_format='#,##0' if is_integer_dtype(result_df[cname]) else '#,##0'
#     wb.save(excel_path)

def update_excel(result_df: pd.DataFrame, excel_path: str, sheet_name: str='신규투자'):
    # 워크북 준비
    if os.path.exists(excel_path):
        try:
            wb = load_workbook(excel_path)
        except BadZipFile:
            wb = Workbook()
    else:
        wb = Workbook()

    # 시트 준비
    if sheet_name in wb.sheetnames:
        ws = wb[sheet_name]
        existing = pd.read_excel(excel_path, sheet_name=sheet_name,
                                 parse_dates=['공시일','시작일','종료일'],
                                 usecols=list(result_df.columns))
    else:
        ws = wb.create_sheet(sheet_name)
        ws.append(list(result_df.columns))
        existing = pd.DataFrame(columns=result_df.columns)

    # 읽어온 헤더(컬럼 순서)
    header = [cell.value for cell in ws[1]]

    # 중복 제거
    if existing.empty:
        new_rows = result_df
    else:
        mask = ~result_df.apply(
            lambda r: ((existing['공시회사'] == r['공시회사']) &
                       (existing['공시일'] == r['공시일'])).any(),
            axis=1
        )
        new_rows = result_df[mask]

    # 스타일 준비
    num_cols = [c for c in result_df.columns
                if is_integer_dtype(result_df[c]) or is_float_dtype(result_df[c])]
    # header → 엑셀 col index map (1-based)
    col_index = {col: idx+1 for idx, col in enumerate(header)}
    align = Alignment(horizontal='center', vertical='center')
    font = Font(size=10)

    # append 및 스타일링
    for _, r in new_rows.iterrows():
        # header 순서에 맞춰 값 채우기
        row_vals = [r.get(col, '') for col in header]
        ws.append(row_vals)
        row_no = ws.max_row
        for col_name, idx in col_index.items():
            cell = ws.cell(row=row_no, column=idx)
            cell.alignment = align
            cell.font = font
            # 날짜 형식 적용
            if col_name in ['공시일','시작일','종료일'] and pd.notna(r.get(col_name)):
                cell.number_format = 'yyyy-mm-dd'
            elif col_name == '자기자본대비(%)' and pd.notna(r.get(col_name)):
                cell.number_format = '#,##0.00'
            # 숫자 형식 적용
            elif col_name in num_cols:
                cell.number_format = '#,##0'

    wb.save(excel_path)


if __name__=='__main__':
    sess=requests.Session()
    sales=fetch_sales(sess)
    parsed=[]
    for rec in tqdm(sales[['corp_name','rcept_no','rcept_dt','stock_code']].to_dict('records')):
        d=parse_contract(sess,str(rec['rcept_no'])) or {}
        d.update({
            '공시회사':rec['corp_name'], '공시일':datetime.strptime(str(rec['rcept_dt']),'%Y%m%d').strftime('%Y-%m-%d'),
            '종목코드':rec['stock_code']})
        h=fetch_history(rec['stock_code'])
        d.update(get_prev_curr_next(h,datetime.strptime(d['공시일'],'%Y-%m-%d').date()))
        parsed.append(d)
    final_df=pd.DataFrame(parsed,columns=[
        '공시회사','공시일','종목코드','투자구분','투자금액(백만원)','자기자본(백만원)','자기자본대비(%)',
        '결정일','시작일','종료일','전일종가','당일종가','익일종가'
    ]).sort_values(by='공시일')
    update_excel(final_df,'국내 주요 공시 정리.xlsx')


100%|██████████| 5/5 [00:00<00:00, 18.02it/s]


In [1]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time

def fetch_page(code: str, page: int):
    """
    Naver 금융 일별 시세 페이지에서
    [날짜, 종가, 등락률, 시가, 고가, 저가, 거래량] 리스트를 반환
    """
    url = f'https://finance.naver.com/item/sise_day.naver?code={code}&page={page}'
    resp = requests.get(url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'lxml')
    rows = soup.select('table.type2 tr')
    data = []
    for row in rows[2:]:
        cols = row.find_all('td')
        if len(cols) != 7 or not cols[0].text.strip():
            continue
        # 날짜 포맷: '2025.05.13'
        date = cols[0].text.strip()
        close = cols[1].text.strip().replace(',', '')
        data.append([date, close])
    return data

def get_closes(code: str, announce_date: str, max_pages: int = 5, delay: float = 0.3):
    """
    code: 종목코드 (예: '457190')
    announce_date: 'YYYY-MM-DD' 형식 (예: '2025-05-13')
    max_pages: 탐색할 최대 페이지 수
    delay: 페이지별 요청 사이 대기 시간(초)
    
    반환: (prev_close, today_close, next_close)
        prev_close: 공시일 전일 종가 (없으면 None)
        today_close: 공시일 종가 (없으면 None)
        next_close: 공시일 익일 종가 (없으면 None)
    """
    target = announce_date.replace('-', '.')  # '2025.05.13'
    for page in range(1, max_pages + 1):
        data = fetch_page(code, page)
        # data: [[date, close], [date, close], ...] (최신 순)
        for idx, (dt, cl) in enumerate(data):
            if dt == target:
                today = cl
                prev_ = data[idx + 1][1] if idx + 1 < len(data) else None
                next_ = data[idx - 1][1] if idx - 1 >= 0 else None
                return prev_, today, next_
        time.sleep(delay)
    # 못 찾으면
    return None, None, None

# --------------------------------------------------
# (예시) 위 함수를 이용해 DataFrame으로 결과 모으기
# --------------------------------------------------

# 1) 회사 리스트 불러오기 (예: 엑셀 또는 CSV에서)
#    최소한 '종목코드', '공시일' 컬럼이 필요합니다.
#    여기서는 예시로 직접 딕셔너리 만들어서 DataFrame 생성
df_info = pd.DataFrame([
    {'종목코드':'317870', '공시일':'2025-05-13'},
    {'종목코드':'304360', '공시일':'2025-05-14'},
    {'종목코드':'230240', '공시일':'2025-05-20'},
    {'종목코드':'241710', '공시일':'2025-05-20'},
    {'종목코드':'357780', '공시일':'2025-05-20'},
    {'종목코드':'052400', '공시일':'2025-05-22'},
    {'종목코드':'051360', '공시일':'2025-05-26'},
    {'종목코드':'060370', '공시일':'2025-05-26'},
    {'종목코드':'086710', '공시일':'2025-05-28'},
    {'종목코드':'105760', '공시일':'2025-05-28'},
    {'종목코드':'457190', '공시일':'2025-05-28'},
    {'종목코드':'028670', '공시일':'2025-05-29'},
    {'종목코드':'357550', '공시일':'2025-05-30'},
    {'종목코드':'001570', '공시일':'2025-05-30'},
    {'종목코드':'285130', '공시일':'2025-06-09'},
    {'종목코드':'277880', '공시일':'2025-06-09'},
    {'종목코드':'036930', '공시일':'2025-06-09'},
    {'종목코드':'317330', '공시일':'2025-06-09'},
    {'종목코드':'086060', '공시일':'2025-06-10'},
    {'종목코드':'251370', '공시일':'2025-06-10'}
])

# 2) get_closes 호출 후 컬럼 추가
results = []
for _, row in df_info.iterrows():
    code = row['종목코드']
    adate = row['공시일']
    prev_c, today_c, next_c = get_closes(code, adate, max_pages=10)
    results.append({
        '종목코드': code,
        '공시일': adate,
        '전일종가': prev_c,
        '당일종가': today_c,
        '익일종가': next_c,
    })

df_prices = pd.DataFrame(results)
print(df_prices)

      종목코드         공시일  전일종가  당일종가  익일종가
0   317870  2025-05-13  None  None  None
1   304360  2025-05-14  None  None  None
2   230240  2025-05-20  None  None  None
3   241710  2025-05-20  None  None  None
4   357780  2025-05-20  None  None  None
5   052400  2025-05-22  None  None  None
6   051360  2025-05-26  None  None  None
7   060370  2025-05-26  None  None  None
8   086710  2025-05-28  None  None  None
9   105760  2025-05-28  None  None  None
10  457190  2025-05-28  None  None  None
11  028670  2025-05-29  None  None  None
12  357550  2025-05-30  None  None  None
13  001570  2025-05-30  None  None  None
14  285130  2025-06-09  None  None  None
15  277880  2025-06-09  None  None  None
16  036930  2025-06-09  None  None  None
17  317330  2025-06-09  None  None  None
18  086060  2025-06-10  None  None  None
19  251370  2025-06-10  None  None  None


In [None]:
import re
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time

HEADERS = {
    'User-Agent': (
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
        'AppleWebKit/537.36 (KHTML, like Gecko) '
        'Chrome/115.0.0.0 Safari/537.36'
    )
}

def fetch_page(code: str, page: int):
    url = f'https://finance.naver.com/item/sise_day.naver?code={code}&page={page}'
    resp = requests.get(url, headers=HEADERS, timeout=10)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'lxml')

    table = soup.find('table', class_='type2')
    if not table:
        print(f'⚠️ Page {page}: table.type2 없음')
        return []

    data = []
    for row in table.find_all('tr'):
        cols = row.find_all('td')
        if len(cols) == 7:
            date = cols[0].get_text(strip=True)
            if re.match(r'^\d{4}\.\d{2}\.\d{2}$', date):
                close = cols[1].get_text(strip=True).replace(',', '')
                data.append((date, close))
    print(f'Page {page} 날짜 목록 → {[d for d,_ in data]}')
    return data

def get_closes(code: str, announce_date: str,
               max_pages: int = 5, delay: float = 0.2):
    target = announce_date.replace('-', '.')
    for page in range(1, max_pages+1):
        records = fetch_page(code, page)
        for idx, (dt, cl) in enumerate(records):
            if dt == target:
                prev_  = records[idx+1][1] if idx+1 < len(records) else None
                today_ = cl
                next_  = records[idx-1][1] if idx-1 >= 0 else None
                return prev_, today_, next_
        time.sleep(delay)
    print(f'⚠️ {code} {announce_date}을(를) {max_pages}페이지 내에서 못 찾음')
    return None, None, None

# 예시: df_info에서 결과 모으기
df_info = pd.DataFrame([
    {'종목코드':'317870', '공시일':'2025-05-13'},
    {'종목코드':'304360', '공시일':'2025-05-14'},
    {'종목코드':'230240', '공시일':'2025-05-20'},
    {'종목코드':'241710', '공시일':'2025-05-20'},
    {'종목코드':'357780', '공시일':'2025-05-20'},
    {'종목코드':'052400', '공시일':'2025-05-22'},
    {'종목코드':'051360', '공시일':'2025-05-26'},
    {'종목코드':'060370', '공시일':'2025-05-26'},
    {'종목코드':'086710', '공시일':'2025-05-28'},
    {'종목코드':'105760', '공시일':'2025-05-28'},
    {'종목코드':'457190', '공시일':'2025-05-28'},
    {'종목코드':'028670', '공시일':'2025-05-29'},
    {'종목코드':'357550', '공시일':'2025-05-30'},
    {'종목코드':'001570', '공시일':'2025-05-30'},
    {'종목코드':'285130', '공시일':'2025-06-09'},
    {'종목코드':'277880', '공시일':'2025-06-09'},
    {'종목코드':'036930', '공시일':'2025-06-09'},
    {'종목코드':'317330', '공시일':'2025-06-09'},
    {'종목코드':'086060', '공시일':'2025-06-10'},
    {'종목코드':'251370', '공시일':'2025-06-10'}
])

results = []
for _, r in df_info.iterrows():
    prev_c, today_c, next_c = get_closes(r['종목코드'], r['공시일'], max_pages=3)
    results.append({
        '종목코드': r['종목코드'],
        '공시일':   r['공시일'],
        '전일종가': prev_c,
        '당일종가': today_c,
        '익일종가': next_c,
    })

df_prices = pd.DataFrame(results)
print(df_prices)


Page 1 날짜 목록 → ['2025.06.25', '2025.06.24', '2025.06.23', '2025.06.20', '2025.06.19', '2025.06.18', '2025.06.17', '2025.06.16', '2025.06.13', '2025.06.12']
Page 2 날짜 목록 → ['2025.06.11', '2025.06.10', '2025.06.09', '2025.06.05', '2025.06.04', '2025.06.02', '2025.05.30', '2025.05.29', '2025.05.28', '2025.05.27']
Page 3 날짜 목록 → ['2025.05.26', '2025.05.23', '2025.05.22', '2025.05.21', '2025.05.20', '2025.05.19', '2025.05.16', '2025.05.15', '2025.05.14', '2025.05.13']
Page 1 날짜 목록 → ['2025.06.25', '2025.06.24', '2025.06.23', '2025.06.20', '2025.06.19', '2025.06.18', '2025.06.17', '2025.06.16', '2025.06.13', '2025.06.12']
Page 2 날짜 목록 → ['2025.06.11', '2025.06.10', '2025.06.09', '2025.06.05', '2025.06.04', '2025.06.02', '2025.05.30', '2025.05.29', '2025.05.28', '2025.05.27']
Page 3 날짜 목록 → ['2025.05.26', '2025.05.23', '2025.05.22', '2025.05.21', '2025.05.20', '2025.05.19', '2025.05.16', '2025.05.15', '2025.05.14', '2025.05.13']
Page 1 날짜 목록 → ['2025.06.25', '2025.06.24', '2025.06.23', '2025.

In [5]:
df_prices.to_excel('종가.xlsx')