In [1]:
import gdown
import os
import json
import pandas as pd
import mysql.connector
import pymysql 
import re
import itertools

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader, UnstructuredHTMLLoader
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser, PydanticOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from pydantic import BaseModel, Field, create_model
from typing import List, Dict
from openai import OpenAI
from tiktoken import get_encoding

원본 감사보고서

In [85]:
file_path = "크래프톤_2024.htm"

loader = UnstructuredHTMLLoader(file_path)
docs = loader.load()

print(docs)



In [86]:
# 출력 구조 정의
class FieldWithReason(BaseModel):
    reason: str = Field(description="value를 도출한 근거")
    value: str = Field(description="value 값")
    
# 출력 파서
parser = PydanticOutputParser(pydantic_object=FieldWithReason)

NICE 정답지 11번 파싱

In [87]:
with open("report/krafton_11.json", encoding="utf-8") as f:
    data = json.load(f)

print("🔍 전체 길이:", len(data))
print(data)

🔍 전체 길이: 201
{'114000': {'label': '자산총계|비유동자산(계)'}, '113200': {'label': '자산총계|비유동자산(계)|유형자산(계)'}, '113203': {'label': '(유형자산정부보조금계)'}, '113201': {'label': '(유형자산감가상각누계액계)'}, '113205': {'label': '(유형자산손상차손누계액계)'}, '113310': {'label': '자산총계|비유동자산(계)|유형자산(계)|리스자산'}, '113311': {'label': '(유형자산감가상각누계액계)|(리스자산감가상각누계액)'}, '112690': {'label': '자산총계|비유동자산(계)|유형자산(계)|리스자산|기타리스자산'}, '112699': {'label': '(유형자산감가상각누계액계)|(리스자산감가상각누계액)|(기타리스자산감가상각누계액)'}, '113110': {'label': '자산총계|비유동자산(계)|유형자산(계)|토지'}, '113146': {'label': '자산총계|비유동자산(계)|유형자산(계)|시설장치'}, '113148': {'label': '(유형자산정부보조금계)|(시설장치정부보조금)'}, '113149': {'label': '(유형자산감가상각누계액계)|(시설장치감가상각누계액)'}, '113180': {'label': '자산총계|비유동자산(계)|유형자산(계)|비품'}, '113181': {'label': '(유형자산감가상각누계액계)|(비품감가상각누계액)'}, '113199': {'label': '자산총계|비유동자산(계)|유형자산(계)|건설중인자산'}, '113196': {'label': '(유형자산손상차손누계액계)|(건설중인자산손상차손누계액)'}, '112150': {'label': '자산총계|비유동자산(계)|투자부동산'}, '112175': {'label': '(투자부동산감가상각누계액)'}, '112143': {'label': '자산총계|비유동자산(계)|투자부동산|토지'}, '112144': {'labe

In [88]:
# 전체 출력
for label in data:
    print(label)
    
labels = data.values()
label_list = [item['label'] for item in labels if isinstance(item, dict) and 'label' in item]

114000
113200
113203
113201
113205
113310
113311
112690
112699
113110
113146
113148
113149
113180
113181
113199
113196
112150
112175
112143
112144
112178
113400
113355
113356
113299
113007
113008
113009
113010
113220
113221
113240
113241
113290
113291
113297
113346
113347
113348
113300
113349
113383
112108
112109
112113
112111
112106
112114
112121
112147
112117
112118
112119
112631
112180
112241
112242
112245
112210
112215
112211
112212
112213
112261
113604
112610
112270
112636
112251
112252
112000
111156
111249
111150
111159
111152
111147
111180
111189
111183
111190
111199
111192
111197
111170
111179
111171
111174
111591
111594
111233
111230
111232
111220
111225
111484
111162
111421
111401
111410
111419
111415
111420
111422
111430
111200
111100
111135
111130
111134
111137
115000
118060
118100
118110
118120
118211
118300
118330
118630
118600
118470
118530
118531
118533
118540
118541
118621
118090
118214
118236
118238
118430
118570
118599
118900
117000
117112
116400
116409
116805
116800

In [89]:
label_list

['자산총계|비유동자산(계)',
 '자산총계|비유동자산(계)|유형자산(계)',
 '(유형자산정부보조금계)',
 '(유형자산감가상각누계액계)',
 '(유형자산손상차손누계액계)',
 '자산총계|비유동자산(계)|유형자산(계)|리스자산',
 '(유형자산감가상각누계액계)|(리스자산감가상각누계액)',
 '자산총계|비유동자산(계)|유형자산(계)|리스자산|기타리스자산',
 '(유형자산감가상각누계액계)|(리스자산감가상각누계액)|(기타리스자산감가상각누계액)',
 '자산총계|비유동자산(계)|유형자산(계)|토지',
 '자산총계|비유동자산(계)|유형자산(계)|시설장치',
 '(유형자산정부보조금계)|(시설장치정부보조금)',
 '(유형자산감가상각누계액계)|(시설장치감가상각누계액)',
 '자산총계|비유동자산(계)|유형자산(계)|비품',
 '(유형자산감가상각누계액계)|(비품감가상각누계액)',
 '자산총계|비유동자산(계)|유형자산(계)|건설중인자산',
 '(유형자산손상차손누계액계)|(건설중인자산손상차손누계액)',
 '자산총계|비유동자산(계)|투자부동산',
 '(투자부동산감가상각누계액)',
 '자산총계|비유동자산(계)|투자부동산|토지',
 '자산총계|비유동자산(계)|투자부동산|건물',
 '(투자부동산감가상각누계액)|(건물감가상각누계액)',
 '자산총계|비유동자산(계)|무형자산(계)',
 '(상각누계액)',
 '(손상차손누계액)',
 '(정부보조금)',
 '자산총계|비유동자산(계)|무형자산(계)|기타의무형자산(계)',
 '(상각누계액)|(기타의무형자산상각누계액)',
 '(손상차손누계액)|(기타의무형자산손상차손누계액)',
 '(정부보조금)|(기타의무형자산정부보조금)',
 '자산총계|비유동자산(계)|무형자산(계)|기타의무형자산(계)|산업재산권',
 '(상각누계액)|(기타의무형자산상각누계액)|(산업재산권상각누계액)',
 '자산총계|비유동자산(계)|무형자산(계)|기타의무형자산(계)|저작권',
 '(상각누계액)|(기타의무형자산상각누계액)|(저작권상각누계액)',
 '자산총계|비유동자산(계)|무형자산(계)|

In [90]:
class FieldWithReason(BaseModel):
    reason: str
    value: str

def make_report_model(labels: list[str]) -> type[BaseModel]:
    fields = {lbl: FieldWithReason for lbl in labels}
    return create_model('ReportLabels', **fields)

ReportLabels = make_report_model(label_list)

전년도 답지

In [102]:
df_prior = pd.read_excel("answer/krafton_11_2023.xlsx")

prior_answer_text = "\n".join(
    f"- {row['계정명']}: {row['20231231 K 원']}"
    for _, row in df_prior.iterrows()
    if pd.notnull(row.get('계정명')) and pd.notnull(row.get('20231231 K 원'))
)

전년도 감사보고서

In [103]:
# 파일 로딩
prior_file_path = "크래프톤_2023.htm"
loader = UnstructuredHTMLLoader(prior_file_path)
prior_docs_raw = loader.load()

#. 테이블 단위로 분할
def split_by_table(text):
    return re.split(r'(?=(표\s*\d+[^\n]*|Table\s*\d+[^\n]*))', text)

# 텍스트 변환
prior_text = "\n".join([doc.page_content for doc in prior_docs_raw])
prior_docs_chunks = split_by_table(prior_text)

chunks = split_by_table(prior_text)

# 토큰 계산
encoding = get_encoding("cl100k_base")
def count_tokens(text): return len(encoding.encode(text))

4개의 input으로 GPT 추론

In [None]:
def chunk_labels(labels, chunk_size=100):
    for i in range(0, len(labels), chunk_size):
        yield labels[i:i + chunk_size]
        
all_results = []
label_chunks = list(chunk_labels(label_list, chunk_size=100))

client = OpenAI(api_key='')

for i, chunk in enumerate(label_chunks):
   prior_chunk_text = ""
   prior_chunk_tokens = 0
   
   
    # prior_doc 중 토큰 제한 안 넘는 선까지 누적
   for table in prior_docs_chunks:
      tokens = count_tokens(table)
      if prior_chunk_tokens + tokens <= 40000:  # prior_docs에 할당 가능한 최대치
         prior_chunk_text += table
         prior_chunk_tokens += tokens
      else:
         break

   print(f"{i + 1}번째 체크 처리 중...")
   
   ReportLabelsPartial = make_report_model(chunk)    
   completion = client.beta.chat.completions.parse(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": f"""
당신은 한국 감사보고서 분석 전문가입니다.

📂 주어진 정보:
- `docs`: 올해 감사보고서 일부 (값은 반드시 여기서만 추출)
- `prior_docs`: 작년 감사보고서 (절대 복사 금지, 참고만 가능)
- `prior_answer_text`: 작년 정답값 (추출 방식 학습용)

---

🎯 목적:
- 정확한 값을 `docs`에서 추출
- 표의 **단위 (천원, 백만원, 원)** 를 **반드시** 감지하고 변환
- `value`는 **쉼표 + "원" 포함된 원 단위 숫자**
- `reason`은 **한국어**로 작성하며 단위 감지·변환 과정 포함

---

📏 단위 감지 우선순위:
1. 표 위 또는 캡션에 “단위: ○○” 명시 여부
2. 표 상단 셀 또는 열 제목
3. 계정명 안에 단위 포함
4. 추론 가능할 경우 명확한 근거 포함 (`reason`에 명시)
- 예: "값이 작고, 자산 항목이므로 천원 단위로 추정함"

🧮 변환 방식:
- "1,000천원" → "1,000,000원"
- "2,345백만원" → "2,345,000,000원"

---

📘 Reason 예시 (한국어):
- `"자본금은 표 위에 '단위: 백만원'이 있었고, 값은 2,345이므로 2,345 × 1,000,000 = 2,345,000,000원입니다."`
- `"단위가 명시되지 않았지만, 자산 항목이며 값이 작아 천원 단위로 추정하여 100 × 1,000 = 100,000원입니다."`

❗ 금지사항:
- 단위 무시
- '원' 생략, 쉼표 없음
- 소수점 값
- 단위 추론 근거 없이 변환

---

📊 특수처리:
- "기초/기말" 포함된 항목은 둘 다 추출
- 부모 항목 = 자식 합 (또는 역산)
- 추출 불가 시 `"value": "알 수 없음"`, `"reason": "값을 명확히 찾을 수 없음"`

---

지금부터 단위 감지 및 변환을 정확히 수행하여 value와 reason을 JSON 형태로 반환하세요.

# 올해 보고서 docs 일부
{docs[0].page_content}

# prior_docs (참고용)
{prior_chunk_text}

# prior_answer_text (추출 방식 학습용)
{prior_answer_text}
"""
},
            {"role": "user", "content": "2024년의 값들을 구해줘."}
        ],
        temperature=0.2,
        response_format=ReportLabelsPartial,
    )

   parsed = completion.choices[0].message.parsed
   all_results.append(parsed)

1번째 체크 처리 중...
2번째 체크 처리 중...
3번째 체크 처리 중...


추론 결과 (계정코드, 계정명, reason, value)

In [105]:
merged = {}
for result in all_results:
    merged.update(result)
    
# DataFrame으로 변환
df = pd.DataFrame.from_dict(merged, orient="index")
df.reset_index(inplace = True)
df.rename(columns={"index": "계정명"}, inplace=True)
df.columns = ["계정명", "reason", "value"]

# 열 이름 확인 및 필요시 재정의
expected_cols = ["계정명", "reason", "value"]
if not all(col in df.columns for col in expected_cols):
    df.columns = expected_cols

# 계정명 → 계정코드 매핑
name_to_code = {
    info["label"]: code
    for code, info in data.items()
    if isinstance(info.get("label"), str)
}

# 계정코드 열 생성
def find_account_code(name):
    return name_to_code.get(name)

df["계정코드"] = df["계정명"].map(find_account_code)

# 열 순서 재정렬
df = df[["계정코드", "계정명", "reason", "value"]]

# 저장
df.to_csv("2024/parsed_11.csv", index=False, encoding="utf-8-sig")

DB 연결

In [106]:
# MySQL 연결
conn = mysql.connector.connect(
    host='localhost',
    user='root',
    password='ljm03',
    database='NICE'
)
cursor = conn.cursor(dictionary=True)

# DB에서 계산식 기반 계정명 불러오기
def fetch_formula_info():
    conn = pymysql.connect(
        host = 'localhost',
        user = 'root',
        password = 'ljm03',
        db = 'NICE',
        charset = 'utf8'
    )
    sql = "SELECT account_name, account_code, formula_text, formula_code FROM krafton"
    df = pd.read_sql(sql, conn)
    conn.close()
    return df

# DB 확인
df_formula = fetch_formula_info()
print(df_formula.head())

  account_name account_code  \
0         자산총계       115000   
1     비유동자산(계)       114000   
2      유형자산(계)       113200   
3         리스자산       113310   
4         시설장치       113146   

                                        formula_text  \
0                          자산총계 = 비유동자산(계) + 유동자산(계)   
1  비유동자산(계) = 유형자산(계) + 투자부동산 + 무형자산(계) + 장기투자자산 ...   
2           유형자산(계) = 리스자산 + 토지 + 시설장치 + 비품 + 건설중인자산   
3                                      리스자산 = 기타리스자산   
4              시설장치 = 시설장치 - 시설장치감가상각누계액 - 시설장치정부보조금   

                                        formula_code  
0                           115000 = 114000 + 112000  
1  114000 = 113200 + 112150 + 113400 + 113383 + 1...  
2  113200 = 113310 + 113110 + 113146 + 113180 + 1...  
3                                    113310 = 112690  
4                  113146 = 113146 - 113149 - 113148  


  df = pd.read_sql(sql, conn)


전년도 matched

In [107]:
# 작년 보간 결과 또는 matched 결과 불러오기
last_year_matched_df = pd.read_csv("2023/matched_11.csv", dtype=str)

# 계정코드와 계정명만 남기고 중복 제거
last_year_matched_df = last_year_matched_df[["계정코드", "계정명"]].drop_duplicates()

value 정리

In [108]:
# value 필드에서 튜플형 문자 처리
def extract_number(val):
    if isinstance(val,str) and "'" in val:
        try:
            val = eval(val)
            val = val[1] if isinstance(val, tuple) and len(val) > 1 else val
        except:
            pass
    return str(val)

df["value"] = df["value"].apply(extract_number)

# 숫자 전처리 함수
def clean_number(series):
    return pd.to_numeric(
        series.astype(str)
              .str.replace(",", "")
              .str.replace("(", "-")
              .str.replace(")", ""),
        errors="coerce"
    ).dropna().astype("int64")

# 숫자 전처리
df["clean_value"] = clean_number(df["value"])

DB 계산식 활용하여 보간

In [118]:
def interpolate_from_last_year(df, df_formula, last_year_matched_df, max_iter=3):
    def extract_clean_int(val):
        try:
            if isinstance(val, str) and "'" in val and "(" in val:
                val = eval(val)
                if isinstance(val, tuple):
                    val = val[1] if len(val) > 1 else val[0]
            return int(str(val).replace(",", "").replace("원", "").strip())
        except:
            return None

    # 전년도 matched 계정코드 → 믿을 수 있는 것들
    trusted_codes = set(last_year_matched_df["계정코드"].astype(str).str.zfill(6))

    df["계정코드"] = df["계정코드"].astype(str).str.zfill(6)
    name_map = dict(zip(df["계정코드"], df["계정명"]))

    # GPT 추론 결과가 있어도 trusted가 아니면 reference에 넣지 않음
    reference_values = {
        row["계정코드"]: extract_clean_int(row["value"])
        for _, row in df.iterrows()
        if row["계정코드"] in trusted_codes and extract_clean_int(row["value"]) is not None
    }

    # 계산식 정리
    formula_map = {}
    reverse_map = {}

    for _, row in df_formula.iterrows():
        formula = str(row["formula_code"])
        lhs_match = re.match(r"^\s*(\d{6})\s*=", formula)
        rhs_matches = re.findall(r"\d{6}", formula)

        if lhs_match and len(rhs_matches) > 1:
            lhs = lhs_match.group(1).zfill(6)
            rhs = [code.zfill(6) for code in rhs_matches if code != lhs]
            formula_map.setdefault(lhs, []).append(rhs)

            for missing_code in rhs:
                others = [c for c in rhs if c != missing_code]
                reverse_map.setdefault(missing_code, []).append((lhs, others))

    interpolated_rows = []
    already_interpolated = set()

    for loop in range(max_iter):
        updated = False

        # 정방향
        for target, rhs_list_set in formula_map.items():
            if target in trusted_codes or target in already_interpolated:
                continue
            for rhs_list in rhs_list_set:
                if all(code in reference_values for code in rhs_list):
                    value = sum(reference_values[code] for code in rhs_list)
                    reference_values[target] = value
                    already_interpolated.add(target)
                    interpolated_rows.append({
                        "계정코드": target,
                        "계정명": name_map.get(target, ""),
                        "보간값": value,
                        "보간방향": f"정방향({' + '.join(rhs_list)})",
                        "근거": "계산식 기반"
                    })
                    updated = True
                    break

        # 역방향
        for target, cases in reverse_map.items():
            if target in trusted_codes or target in already_interpolated:
                continue
            for lhs, siblings in cases:
                lhs = lhs.zfill(6)
                siblings = [s.zfill(6) for s in siblings]
                if lhs in reference_values and all(s in reference_values for s in siblings):
                    value = reference_values[lhs] - sum(reference_values[s] for s in siblings)
                    reference_values[target] = value
                    already_interpolated.add(target)
                    interpolated_rows.append({
                        "계정코드": target,
                        "계정명": name_map.get(target, ""),
                        "보간값": value,
                        "보간방향": f"역방향({lhs} - [{' + '.join(siblings)}])",
                        "근거": "계산식 기반"
                    })
                    updated = True
                    break

        if not updated:
            break

    interpolated_df = pd.DataFrame(interpolated_rows)
    return interpolated_df

interpolated_df = interpolate_from_last_year(df, df_formula, last_year_matched_df)
interpolated_df.to_csv("2024/interpolated_11.csv", index=False, encoding="utf-8-sig")

parsed + interpolated

In [119]:
# 계정코드 6자리로 맞추기
df["계정코드"] = df["계정코드"].astype(str).str.zfill(6)
interpolated_df["계정코드"] = interpolated_df["계정코드"].astype(str).str.zfill(6)

df["value"] = df["value"].apply(lambda x: str(int(x)) if pd.notnull(x) and str(x).replace("-", "").isdigit() else x)
# '예측값'과 '보간방향'을 각각 'value'와 'reason'으로 매핑
overwrite_map = interpolated_df.set_index("계정코드")[["보간값", "보간방향"]]

# df에서 해당 계정코드가 있는 경우 덮어쓰기
for code, row in overwrite_map.iterrows():
    int_value = int(row["보간값"])
    df.loc[df["계정코드"] == code, "value"] = row["보간값"]
    df.loc[df["계정코드"] == code, "reason"] = f"보간값 ({row['보간방향']})"

df["clean_value"] = df["clean_value"].apply(lambda x: str(int(x)) if pd.notnull(x) and x == int(x) else x)

# 저장 
df.to_csv("2024/results_11.csv", index=False, encoding="utf-8-sig")

채점

In [120]:
# 파일 경로
csv_path = "2024/results_11.csv"
excel_path = "answer/krafton_11_2024.xlsx"

# 데이터 불러오기
test_df = pd.read_csv(csv_path)
answer_df = pd.read_excel(excel_path)

def extract_number(val):
    if isinstance(val,str) and "'" in val:
        try:
            val = eval(val)
            val = val[1] if isinstance(val, tuple) and len(val) > 1 else val
        except:
            pass
    return str(val)

test_df["value"] = test_df["value"].apply(extract_number)

# 숫자 전처리 함수
def clean_number(series):
    return pd.to_numeric(
        series.astype(str)
              .str.replace("원", "", regex=False)
              .str.replace(",", "", regex=False)
              .str.replace("(", "-", regex=False)
              .str.replace(")", "", regex=False),
        errors="coerce"
    )

# 전처리된 숫자 열 생성 (단위: 원)
test_df["clean_value"] = clean_number(test_df["value"])
answer_df["clean_answer"] = clean_number(answer_df["20241231 K 원"])

# 천원 단위로 변환
test_df["value_thousand"] = (test_df["clean_value"] // 1000).astype("Int64")
answer_df["answer_thousand"] = (answer_df["clean_answer"] // 1000).astype("Int64")

# 문자열 정제
test_df["계정명"] = test_df["계정명"].astype(str).str.strip()
test_df["계정코드"] = test_df["계정코드"].astype(str).str.strip()
answer_df["계정명"] = answer_df["계정명"].astype(str).str.strip()
answer_df["계정코드"] = answer_df["계정코드"].astype(str).str.strip()

# 매칭 수행 (±5천원 오차 허용)
matched_rows = []

for _, row in test_df.iterrows():
    test_val_k = row["value_thousand"]
    test_label = str(row["계정명"])
    test_code = str(row["계정코드"])
    
    # 숫자와 계정명 모두 일치하는 행만 필터링
    matched = answer_df[
        (answer_df["계정명"] == test_label) &
        (answer_df["계정코드"] == test_code) &
        (answer_df["answer_thousand"].between(test_val_k - 5, test_val_k + 5))
    ]
    if not matched.empty:
        for _, ans_row in matched.iterrows():
            matched_rows.append({
                "계정코드": test_code,
                "계정명": test_label,
                "예측값": int(row["clean_value"])
            })

# 결과 출력
matched_df = pd.DataFrame(matched_rows)

answer_count = answer_df["answer_thousand"].notnull().sum()
# 매칭률
print('*****************************')
print(f"📋 정답 계정명 개수: {answer_count}")
print(f"📍 매칭 계정명 개수: {len(matched_df)}")
match_rate = len(matched_df) / answer_count
print(f"\n 🎯 매칭률: {match_rate:.2%}")
print('*****************************')

matched_df.to_csv("2024/matched_11.csv", index=False)

*****************************
📋 정답 계정명 개수: 139
📍 매칭 계정명 개수: 94

 🎯 매칭률: 67.63%
*****************************


unmatched

In [2]:
# 파일 경로
nice_path = "answer/krafton_11_2024.xlsx"
matched_path = "2024/matched_11.csv"

# 데이터 불러오기
nice_df = pd.read_excel(nice_path)
matched_df = pd.read_csv(matched_path)

# 계정코드 정제 (문자열, 공백 제거, 6자리 패딩)
nice_df["계정코드"] = nice_df["계정코드"].astype(str).str.strip().str.zfill(6)
matched_df["계정코드"] = matched_df["계정코드"].astype(str).str.strip().str.zfill(6)

# 제외할 계정코드 목록
exclude_codes = set(matched_df["계정코드"])

# 필터링: 제외된 계정코드를 포함하지 않는 행
unmatched_df = nice_df[~nice_df["계정코드"].isin(exclude_codes)]

# 결과 저장
unmatched_df.to_csv("2024/unmatched_11.csv", index=False, encoding="utf-8-sig")