In [None]:
# API KEY를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv
import os
import requests
import pandas as pd

# API KEY 정보로드
load_dotenv()
DART_API_KEY = os.getenv("DART_API_KEY")

In [None]:
url_json = "https://opendart.fss.or.kr/api/list.json"
params = {
    "crtfc_key": DART_API_KEY,
    "corp_code": "00149655",
    "bgn_de": "20230601",
    "end_de": "20240630",
    # "pblntf_ty": "A",
    # "pblntf_detail_ty": "A001",
}

response = requests.get(url_json, params=params)
print(response)
data = response.json()
print(data)
data_list = data.get("list")
df_list = pd.DataFrame(data_list)
print(df_list)

In [None]:
df_list

In [None]:
# %pip install langchain_teddynote
# %pip install langchain_community
# %pip install langchain
# %pip install openai
# %pip install tiktoken
# %pip install langchain_anthropic
# %pip install transformers
# %pip install PyTorch
# %pip install TensorFlow

In [None]:
# LangSmith 추적을 설정합니다. https://smith.langchain.com
# !pip install -qU langchain-teddynote
from langchain_teddynote import logging

# 프로젝트 이름을 입력합니다.
logging.langsmith("Spoon")

In [None]:
import os
import requests
import zipfile
import io
from typing import List, Dict, Any
from lxml import etree
from langchain_community.document_loaders import BSHTMLLoader
import tempfile
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain.chains.llm import LLMChain
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
import pandas as pd
import logging

# 로깅 설정
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# 상수 정의
API_URL = "https://opendart.fss.or.kr/api/document.xml"
CHUNK_SIZE = 4000
CHUNK_OVERLAP = 0
LLM_MODEL = "gpt-4-0125-preview"
LLM_TEMPERATURE = 0


def fetch_document(api_key: str, rcp_no: str) -> bytes:
    """DART API를 통해 문서를 가져옵니다."""
    params = {"crtfc_key": api_key, "rcept_no": rcp_no}
    response = requests.get(API_URL, params=params)
    response.raise_for_status()
    return response.content


def extract_section(
    root: etree.Element, start_aassocnote: str, end_aassocnote: str
) -> str:
    """XML에서 특정 섹션을 추출합니다."""
    start_element = root.xpath(
        f"//TITLE[@ATOC='Y' and @AASSOCNOTE='{start_aassocnote}']"
    )[0]
    end_element = root.xpath(f"//TITLE[@ATOC='Y' and @AASSOCNOTE='{end_aassocnote}']")[
        0
    ]

    extracted_elements = []
    current_element = start_element
    while current_element is not None:
        extracted_elements.append(
            etree.tostring(current_element, encoding="unicode", with_tail=True)
        )
        if current_element == end_element:
            break
        current_element = current_element.getnext()

    return "".join(extracted_elements)


def extract_audit_report(zip_content: bytes, rcp_no: str) -> str:
    """ZIP 파일에서 감사보고서를 추출합니다."""
    try:
        with zipfile.ZipFile(io.BytesIO(zip_content)) as zf:
            audit_fnames = [
                info.filename
                for info in zf.infolist()
                if rcp_no in info.filename and info.filename.endswith(".xml")
            ]
            if not audit_fnames:
                raise ValueError("감사보고서 파일을 찾을 수 없습니다.")

            xml_data = zf.read(audit_fnames[0])
            parser = etree.XMLParser(recover=True, encoding="utf-8")
            root = etree.fromstring(xml_data, parser)

            return extract_section(root, "D-0-11-2-0", "D-0-11-3-0")

    except (zipfile.BadZipFile, etree.XMLSyntaxError, IndexError) as e:
        logging.error(f"감사보고서 추출 실패: {str(e)}")
        raise


def parse_html_from_xml(xml_data: str) -> etree.Element:
    """XML 데이터를 HTML로 파싱합니다."""
    parser = etree.HTMLParser()
    return etree.fromstring(f"<html><body>{xml_data}</body></html>", parser)


def load_html_with_langchain(html_string: str) -> List[Dict[str, Any]]:
    """HTML 문자열을 LangChain 문서로 로드합니다."""
    with tempfile.NamedTemporaryFile(
        mode="w", encoding="utf-8", suffix=".html", delete=False
    ) as temp_file:
        temp_file.write(html_string)
        temp_file_path = temp_file.name

    try:
        loader = BSHTMLLoader(temp_file_path, open_encoding="utf-8")
        return loader.load()
    finally:
        os.unlink(temp_file_path)


def tsv_string_to_dataframe(tsv_string: str) -> pd.DataFrame:
    """TSV 문자열을 pandas DataFrame으로 변환합니다."""
    return pd.read_csv(io.StringIO(tsv_string), sep="\t", encoding="utf-8")


def process_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """DataFrame의 데이터 타입을 적절히 변환합니다."""
    for col in df.columns:
        if "금액" in col or "건수" in col:
            df[col] = pd.to_numeric(df[col].replace("-", "0"), errors="coerce")
        elif "일" in col:
            df[col] = pd.to_datetime(df[col], errors="coerce")
    return df


def summarize_report(api_key: str, rcp_no: str) -> pd.DataFrame:
    """보고서를 요약하여 DataFrame으로 반환합니다."""
    try:
        zip_content = fetch_document(api_key, rcp_no)
        logging.info(f"API 응답 크기: {len(zip_content)} 바이트")

        extracted_content = extract_audit_report(zip_content, rcp_no)
        logging.info("XML 섹션 추출 완료")

        root = parse_html_from_xml(extracted_content)
        logging.info("HTML 파싱 완료")

        html_string = etree.tostring(
            root, pretty_print=True, method="html", encoding="unicode"
        )
        docs = load_html_with_langchain(html_string)
        logging.info(f"추출된 문서 수: {len(docs)}")

        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP
        )
        split_docs = text_splitter.split_documents(docs)

        embeddings = OpenAIEmbeddings()
        vectorstore = FAISS.from_documents(documents=split_docs, embedding=embeddings)
        retriever = vectorstore.as_retriever()

        template = """이 파일에서 '해외채무보증' 또는 '채무보증내역' 제목 아래에 있는 표를 읽어 TSV(Tab-Separated Values) 형식으로 변환해주세요. 다음 지침을 따라주세요:

        1. 표의 두 겹 칼럼 구조를 단일 행 헤더로 변환하세요. 상위 칼럼과 하위 칼럼을 언더스코어(_)로 결합하여 새로운 칼럼 이름을 만드세요.
           예: '채무보증금액'의 하위 칼럼 '제59기말'은 '채무보증금액_제59기말'로 변환

        2. 결과 TSV의 헤더는 다음과 같은 형식이어야 합니다 (실제 칼럼 이름은 원본 표에 따라 다를 수 있음):
           성명    관계    채권자    보증건수    보증기간_시작일    보증기간_종료일    채무보증금액_제59기말    채무보증금액_증가    채무보증금액_감소    채무보증금액_제60기말    채무금액

        3. 데이터 행은 각 칼럼에 해당하는 값을 포함해야 합니다. 값이 없는 경우 빈 칸으로 두지 말고 '-'로 표시하세요.

        4. 숫자 데이터는 쉼표나 기타 구분자 없이 순수한 숫자로 표현하세요.

        5. 날짜는 'YYYY-MM-DD' 형식으로 통일하세요.

        6. TSV 데이터만 반환하세요. 추가 설명이나 주석은 포함하지 마세요.

        7. 결과에 따옴표(''')나 기타 구분자를 포함하지 말고, 순수한 TSV 데이터만 반환하세요.

        #Context:
        {context}

        #Answer:
        """
        prompt = PromptTemplate.from_template(template)
        llm = ChatOpenAI(model=LLM_MODEL, temperature=LLM_TEMPERATURE)
        chain = LLMChain(llm=llm, prompt=prompt)

        context = retriever.get_relevant_documents("")
        tsv_result = chain.run(context=context)

        df = tsv_string_to_dataframe(tsv_result)
        return process_dataframe(df)

    except Exception as e:
        logging.error(f"보고서 요약 중 오류 발생: {str(e)}")
        raise


if __name__ == "__main__":
    api_key = "b0f7f31f54a0f96561f361c405caa204e64c81a1"  # DART API 키
    rcp_no = "20240312000736"  # 문서 번호

    try:
        result_df = summarize_report(api_key, rcp_no)
        print(result_df.head())
        print(result_df.dtypes)
    except Exception as e:
        logging.error(f"프로그램 실행 중 오류 발생: {str(e)}")

In [None]:
result_df

In [None]:
import os
import requests
import zipfile
import io
from lxml import etree
import xml.etree.ElementTree as ET
from langchain_community.document_loaders import BSHTMLLoader
import tempfile
from bs4 import BeautifulSoup
import pandas as pd


def fetch_document(api_key, rcp_no):
    url = "https://opendart.fss.or.kr/api/document.xml"
    params = {"crtfc_key": api_key, "rcept_no": rcp_no}
    response = requests.get(url, params=params)
    if response.status_code != 200:
        raise Exception(f"API 요청 실패: 상태 코드 {response.status_code}")
    return response.content


def extract_section(root, start_aassocnote, end_aassocnote):
    start_element = root.xpath(
        f"//TITLE[@ATOC='Y' and @AASSOCNOTE='{start_aassocnote}']"
    )[0]
    end_element = root.xpath(f"//TITLE[@ATOC='Y' and @AASSOCNOTE='{end_aassocnote}']")[
        0
    ]

    extracted_elements = []
    current_element = start_element
    while current_element is not None:
        extracted_elements.append(
            etree.tostring(current_element, encoding="unicode", with_tail=True)
        )
        if current_element == end_element:
            break
        current_element = current_element.getnext()

    return "".join(extracted_elements)


def extract_audit_report(zip_content, rcp_no):
    try:
        with zipfile.ZipFile(io.BytesIO(zip_content)) as zf:
            print("ZIP 파일 내용:")
            for file_info in zf.infolist():
                print(file_info.filename)

            audit_fnames = [
                info.filename
                for info in zf.infolist()
                if rcp_no in info.filename and info.filename.endswith(".xml")
            ]
            if not audit_fnames:
                raise ValueError("감사보고서 파일을 찾을 수 없습니다.")
            xml_data = zf.read(audit_fnames[0])

            # XML 파싱
            parser = etree.XMLParser(recover=True, encoding="utf-8")
            root = etree.fromstring(xml_data, parser)

            # 세 부분 추출
            part1 = extract_section(root, "D-0-11-2-0", "D-0-11-3-0")

            # 세 부분 합치기
            extracted_xml = part1

            return extracted_xml

    except zipfile.BadZipFile:
        raise ValueError("ZIP 파일이 손상되었거나 유효하지 않습니다.")
    except etree.XMLSyntaxError as e:
        raise ValueError(f"XML 파싱 실패: {str(e)}")
    except IndexError:
        raise ValueError("필요한 TITLE 요소를 찾을 수 없습니다.")


def parse_html_from_xml(xml_data):
    parser = etree.HTMLParser()
    root = etree.fromstring(f"<html><body>{xml_data}</body></html>", parser)
    print(root)
    return root


def load_html_with_langchain(html_string):
    with tempfile.NamedTemporaryFile(
        mode="w", encoding="utf-8", suffix=".html", delete=False
    ) as temp_file:
        temp_file.write(html_string)
        temp_file_path = temp_file.name

    try:
        loader = BSHTMLLoader(temp_file_path, open_encoding="utf-8")
        documents = loader.load()
        return documents
    finally:
        import os

        os.unlink(temp_file_path)


def extract_specific_table(html_string, table_title):
    soup = BeautifulSoup(html_string, "html.parser")

    # "채무보증내역" 제목을 찾습니다
    title_element = soup.find(
        "p", string=lambda text: table_title in text if text else False
    )

    if title_element:
        # 제목 다음에 오는 첫 번째 테이블을 찾습니다
        table = title_element.find_next("table")

        if table:
            # 테이블을 DataFrame으로 변환합니다
            df = pd.read_html(str(table))[0]
            return df

    return None


def summarize_report(api_key, rcp_no):
    # 문서 가져오기
    zip_content = fetch_document(api_key, rcp_no)
    print("API 응답 크기:", len(zip_content), "바이트")

    # XML 데이터 추출 및 특정 섹션 파싱
    extracted_content = extract_audit_report(zip_content, rcp_no)
    print("XML 섹션 추출 완료")

    # HTML 파싱
    root = parse_html_from_xml(extracted_content)
    print("HTML 파싱 완료")

    # HTML을 문자열로 변환
    html_string = etree.tostring(
        root, pretty_print=True, method="html", encoding="unicode"
    )

    # 특정 테이블 추출
    table_df = extract_specific_table(html_string, "채무보증내역")

    if table_df is not None:
        print("'채무보증내역' 테이블을 추출했습니다.")
        print(table_df)
    else:
        print("'채무보증내역' 테이블을 찾을 수 없습니다.")

    # LangChain을 사용하여 HTML 로드
    docs = load_html_with_langchain(html_string)
    print(f"추출된 문서 수: {len(docs)}")

    return extracted_content, table_df


# 사용 예시
api_key = "b0f7f31f54a0f96561f361c405caa204e64c81a1"  # dart api
rcp_no = "20240312000736"

extracted_content, table_df = summarize_report(api_key, rcp_no)

# 추출된 XML 데이터를 파일로 저장
with open("extracted_sections.xml", "w", encoding="utf-8") as f:
    f.write(extracted_content)
print("추출된 섹션들이 'extracted_sections.xml' 파일로 저장되었습니다.")

# 추출된 테이블을 CSV 파일로 저장
if table_df is not None:
    table_df.to_csv("채무보증내역.csv", index=False, encoding="utf-8-sig")
    print("'채무보증내역' 테이블이 '채무보증내역.csv' 파일로 저장되었습니다.")

In [None]:
# TSV 파일 읽기
df = pd.read_csv("채무보증내역.tsv", sep="\t", encoding="utf-8")

In [None]:
import pandas as pd

df = pd.read_csv(
    "C:\\langchain-kr\\01-Basic\\Spoon\\채무보증내역.csv", encoding="UTF-8", sep=","
)
df.head()

## -Anthropic 이용

In [None]:
import os
import requests
import zipfile
import io
from lxml import etree
import xml.etree.ElementTree as ET
from langchain_community.document_loaders import BSHTMLLoader
import tempfile
from langchain.chains import MapReduceDocumentsChain, ReduceDocumentsChain
from langchain.text_splitter import CharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain.chains.llm import LLMChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain_anthropic import ChatAnthropic
from langchain.schema import Document


def fetch_document(api_key, rcp_no):
    url = "https://opendart.fss.or.kr/api/document.xml"
    params = {"crtfc_key": api_key, "rcept_no": rcp_no}
    response = requests.get(url, params=params)
    if response.status_code != 200:
        raise Exception(f"API 요청 실패: 상태 코드 {response.status_code}")
    return response.content


def extract_section(root, start_aassocnote, end_aassocnote):
    start_element = root.xpath(
        f"//TITLE[@ATOC='Y' and @AASSOCNOTE='{start_aassocnote}']"
    )[0]
    end_element = root.xpath(f"//TITLE[@ATOC='Y' and @AASSOCNOTE='{end_aassocnote}']")[
        0
    ]

    extracted_elements = []
    current_element = start_element
    while current_element is not None:
        extracted_elements.append(
            etree.tostring(current_element, encoding="unicode", with_tail=True)
        )
        if current_element == end_element:
            break
        current_element = current_element.getnext()

    return "".join(extracted_elements)


def extract_audit_report(zip_content, rcp_no):
    try:
        with zipfile.ZipFile(io.BytesIO(zip_content)) as zf:
            print("ZIP 파일 내용:")
            for file_info in zf.infolist():
                print(file_info.filename)

            audit_fnames = [
                info.filename
                for info in zf.infolist()
                if rcp_no in info.filename and info.filename.endswith(".xml")
            ]
            if not audit_fnames:
                raise ValueError("감사보고서 파일을 찾을 수 없습니다.")
            xml_data = zf.read(audit_fnames[0])

            # XML 파싱
            parser = etree.XMLParser(recover=True, encoding="utf-8")
            root = etree.fromstring(xml_data, parser)

            # 세 부분 추출
            part1 = extract_section(root, "D-0-2-0-0", "D-0-3-0-0")
            part2 = extract_section(root, "D-0-3-1-0", "D-0-3-2-0")
            part3 = extract_section(root, "D-0-3-2-0", "D-0-3-3-0")

            # 세 부분 합치기
            extracted_xml = part1 + part2 + part3

            return extracted_xml

    except zipfile.BadZipFile:
        raise ValueError("ZIP 파일이 손상되었거나 유효하지 않습니다.")
    except etree.XMLSyntaxError as e:
        raise ValueError(f"XML 파싱 실패: {str(e)}")
    except IndexError:
        raise ValueError("필요한 TITLE 요소를 찾을 수 없습니다.")


def parse_html_from_xml(xml_data):
    parser = etree.HTMLParser()
    root = etree.fromstring(f"<html><body>{xml_data}</body></html>", parser)
    return root


def load_html_with_langchain(html_string):
    with tempfile.NamedTemporaryFile(
        mode="w", encoding="utf-8", suffix=".html", delete=False
    ) as temp_file:
        temp_file.write(html_string)
        temp_file_path = temp_file.name

    try:
        loader = BSHTMLLoader(temp_file_path, open_encoding="utf-8")
        documents = loader.load()
        return documents
    finally:
        import os

        os.unlink(temp_file_path)


# Map 프롬프트 설정
map_template = """다음은 문서의 일부입니다:
{docs}
이 부분에서 주요 주제와 재무 정보를 포함한 핵심 내용을 100단어 이내로 요약해주세요.
요약:"""

map_prompt = PromptTemplate.from_template(map_template)

# LLM 모델 설정 (Claude 3.5 Sonnet 사용)
llm = ChatAnthropic(model="claude-3-5-sonnet-20240620", temperature=0)
map_chain = LLMChain(llm=llm, prompt=map_prompt)

# Reduce 프롬프트 설정
reduce_template = """
당신은 은행에서 대출을 심사하는 역할입니다.
당신은 대출 심사에 대한 판단 전에 신용평가보고서를 작성하고 있습니다.

다음은 요약들의 집합입니다: {docs}
이것들을 가져다가 최종적으로 통합하여 1.기업체개요 2.산업분석 3.영업현황 및 수익구조 4.재무구조 및 현금흐름 5.신용등급 부여의견으로 구분해서 요약해주세요.
각 섹션에 관련 재무 수치를 포함시켜 주세요.

예시 :
1. 기업체 개요 : 동사 부동산 임대업 등의 사업목적으로 2001.10.16. 설립된 2023년말 기준 총자산 42,615백만원, 자본총계 22,335백만원, 매출액 3,502백만원,
당기순이익 37백만원 규모의 외감 소기업임.
2. 산업분석 : 최근 전방 산업 경기침체로 공실률 확대 기조 지속되어 매매가력 하락 및 임대소득 하락이 동시에 일어나 부동산 임대업 업황에 부정적인 영향을 미칠 가능성이 높음
3. 영업현황 및 수익구조 : 동사 2023년도 기준 매출액 전년도 대비 증가하였는 바, 안정적인 임대수입 영위 중에 있어 향후에도 구준한 매출액 시현에 따른 영업이익 지속 가능시됨.
4. 재무구조 및 현금흐름 : 동사 2023년말 기준 차입금 다소 증가하는 등 재무안정성 지표 상 미흡한 수준을 나타내고 있으나, 최근 3년간 무난한 현금흐름 나타내고 있으며, 지속적인 순이익 시현의 내부 유보로 자기자본 규모 확대되고 있음
5. 신용등급 부여의견 : 동사 최근 3년간 순이익 지속에 다른 영업활동 상 현금창출 지속되고, 순이익 시현의 내부유보로 자기자본 규모 확대되어 재무구조 개선되고 있으며, 향후에도 안정적인 영업실적 유지에 따른 수익성 유지로 채무상환 능력 인정됨.

요약된 내용:
"""

reduce_prompt = PromptTemplate.from_template(reduce_template)

# Reduce 체인 설정
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)
combine_documents_chain = StuffDocumentsChain(
    llm_chain=reduce_chain, document_variable_name="docs"
)

reduce_documents_chain = ReduceDocumentsChain(
    combine_documents_chain=combine_documents_chain,
    collapse_documents_chain=combine_documents_chain,
    token_max=4000,
)

# MapReduce 체인 설정
map_reduce_chain = MapReduceDocumentsChain(
    llm_chain=map_chain,
    reduce_documents_chain=reduce_documents_chain,
    document_variable_name="docs",
    return_intermediate_steps=False,
)

# 텍스트 분할기 설정
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=4000, chunk_overlap=0
)


def summarize_report(api_key, rcp_no):
    # 문서 가져오기
    zip_content = fetch_document(api_key, rcp_no)
    print("API 응답 크기:", len(zip_content), "바이트")

    # XML 데이터 추출 및 특정 섹션 파싱
    extracted_content = extract_audit_report(zip_content, rcp_no)
    print("XML 섹션 추출 완료")

    # HTML 파싱
    root = parse_html_from_xml(extracted_content)
    print("HTML 파싱 완료")

    # HTML을 문자열로 변환
    html_string = etree.tostring(
        root, pretty_print=True, method="html", encoding="unicode"
    )

    # LangChain을 사용하여 HTML 로드
    docs = load_html_with_langchain(html_string)
    print(f"추출된 문서 수: {len(docs)}")

    # 문서 분할
    split_docs = text_splitter.split_documents(docs)

    # MapReduce 체인 실행
    summary = map_reduce_chain.run(split_docs)
    return summary


# 사용 예시
api_key = "b0f7f31f54a0f96561f361c405caa204e64c81a1"
rcp_no = "20240516001638"

summary = summarize_report(api_key, rcp_no)
print(summary)

# 추출된 XML 데이터를 파일로 저장 (옵션)
# with open("extracted_sections.xml", "w", encoding="utf-8") as f:
#     f.write(extracted_content)
# print("추출된 섹션들이 'extracted_sections.xml' 파일로 저장되었습니다.")