In [None]:
# 이걸 잠시 하다보니 다른 생각이 든다..
# 필터링을 위한 메타데이터..가 존재해야하나? 벡터 db에서 필터링을 해야하나?
# 단순히 연산이 작아보이고 쉬워보여서.. 대충 메타데이터에 넣고, 벡터db에서 필터링 하게 한 게 잘못된 건 아닐까?
# 그냥 이러한 필터링 자체는 최대한 rdb 단에서 수행하고, 메타데이터와 벡터db의 사용을 최적화 하는 게 더 좋지 않을까?
# => 그렇게 하자. 처음부터 다시하자.

In [None]:
def populate_code_tables(cursor):
    """
    엑셀 파일의 각 시트를 읽어 코드 테이블에 데이터를 삽입합니다.
    """
    print("🚀 코드 테이블 데이터 이관 시작...")

    try:
        # 테이블과 시트 이름, 그리고 해당 시트 내의 컬럼명을 매핑합니다.
        # 시트 이름은 실제 엑셀 파일의 시트 이름과 정확히 일치해야 합니다.
        code_sheets_map = {
            'major_codes': ('코드정보', 'plcyMajorCd', '코드', '코드내용'),
            'job_status_codes': ('코드정보', 'jobCd', '코드', '코드내용'),
            'education_level_codes': ('코드정보', 'schoolCd', '코드', '코드내용'),
            'specialization_codes': ('코드정보', 'sBizCd', '코드', '코드내용'),
            'category_codes': ('정책대분류', None, '번호', '정책대분류명(lclsfNm)'),
            'subcategory_codes': ('정책중분류', None, '번호', '정책중분류명(mclsfNm)'),
            'keywords': ('정책키워드', None, None, '정책키워드명(plcyKywdNm)')
        }
        
        for table_name, (sheet_name, filter_col, code_col, name_col) in code_sheets_map.items():
            print(f"  - 테이블 '{table_name}' 작업 중 (시트: '{sheet_name}')...")
            # read_excel을 사용하여 특정 시트를 읽어옵니다.
            df = pd.read_excel(excel_file_path, sheet_name=sheet_name)
            
            # '코드정보' 시트의 경우, 특정 분류(jobCd 등)로 필터링
            if filter_col:
                df = df[df['분류'] == filter_col].copy()
            
            insert_sql = ""
            # 각 테이블 구조에 맞게 데이터 준비 및 INSERT
            if table_name == 'keywords':
                insert_sql = f"INSERT INTO {table_name} (name) VALUES (%s)"
                data_to_insert = [(row[name_col],) for index, row in df.iterrows() if pd.notna(row[name_col])]
            else:
                insert_sql = f"INSERT INTO {table_name} (code, name) VALUES (%s, %s)"
                data_to_insert = [(str(row[code_col]), row[name_col]) for index, row in df.iterrows() if pd.notna(row[code_col]) and pd.notna(row[name_col])]
            
            cursor.executemany(insert_sql, data_to_insert)
            print(f"    ✅ {cursor.rowcount}개 행 삽입 완료.")

    except FileNotFoundError:
        print(f"🚨 오류: 엑셀 파일('{excel_file_path}')을 찾을 수 없습니다. 파일 이름과 경로를 확인하세요.")
        raise
    except Exception as e:
        print(f"🚨 오류: 코드 테이블 이관 중 문제 발생 - {e}")
        raise


In [None]:
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': '1234',
    'database': 'toyprj4'
}

# 엑셀 파일 이름 (사용자님이 변경하신 이름)
excel_file_path = './code_table.xlsx'

In [None]:
import 
connection = None
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor()
print("✅ 데이터베이스 연결 성공!")

populate_code_tables(cursor)

In [None]:
connection.commit()
print("\n🎉 모든 작업이 성공적으로 완료되었습니다.")

In [None]:
# connection.rollback()

In [None]:
# 1. 데이터 불러오기

import pandas as pd
import numpy as np

# 데이터가 담긴 CSV 파일 경로
csv_file_path = './policy_data.csv' # 실제 파일 이름으로 변경해주세요.

try:
    # CSV 파일을 DataFrame으로 읽어옵니다.
    # 한글 깨짐 방지를 위해 encoding='utf-8'을 사용합니다.
    df_raw = pd.read_csv(csv_file_path, encoding='utf-8')
    
    # 데이터 로딩 성공 확인
    print("✅ 1단계 성공: 데이터 로딩 완료!")
    print(f"전체 정책 데이터 수: {len(df_raw)}개")
    
    # 데이터 구조 확인을 위해 상위 3개 행을 출력합니다.
    print("\n--- 원본 데이터 샘플 ---")
    print(df_raw.head(3))
    
except FileNotFoundError:
    print(f"❌ 오류: '{csv_file_path}' 파일을 찾을 수 없습니다. 파일 경로를 확인해주세요.")
except Exception as e:
    print(f"❌ 오류: 파일을 읽는 중 문제가 발생했습니다. (오류: {e})")

In [None]:
# 2. 데이터 정제 및 가공
# 숫자 형 변환 : 나이, 소득 등 숫자여야 하는 열들의 데이터 타입을 숫자로 확실하게 변환, 숫자로 변환 불가한 경우 NaN 처리
# 날짜 형 변환 : yymmdd 형식의 날짜륻릉ㄹ 실제 날짜 형으로 변환하고, 구간 정보의 경우 분리.

# --- 2.1. 컬럼 선택 및 이름 변경 (수정) ---
column_mapping = {
    'plcyNo': 'policy_id',
    'plcyNm': 'policy_name',
    'plcyExplnCn': 'policy_summary',
    'refUrlAddr1': 'source_url',
    'sprtTrgtMinAge': 'min_age',
    'sprtTrgtMaxAge': 'max_age',
    'earnMinAmt': 'income_min',
    'earnMaxAmt': 'income_max',
    'bizPrdBgngYmd': 'biz_start_date',
    'bizPrdEndYmd': 'biz_end_date',
    'aplyYmd': 'aply_period',
    'mrgSttsCd': 'marriage_status',
    'aplyPrdSeCd': 'application_status', 
    'jobCd': 'job_status_codes',
    'schoolCd': 'education_level_codes',
    'plcyMajorCd': 'major_codes',
    'plcyKywdNm': 'keywords',
    'zipCd': 'region_codes',
    'lclsfNm': 'category_names',
    'mclsfNm': 'subcategory_names',
    'rgtrInstCdNm: Operation_Inst'
}

df_selected = df_raw[list(column_mapping.keys())].copy()
df_processed = df_selected.rename(columns=column_mapping)

# --- 2.2. 숫자 형식 변환 (변경 없음) ---
numeric_cols = ['min_age', 'max_age', 'income_min', 'income_max']
for col in numeric_cols:
    df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce')
    df_processed[col] = df_processed[col].replace(0, np.nan)

# --- 2.3. 날짜 형식 변환 (변경 없음) ---
def to_datetime_safe(date_str):
    if pd.isna(date_str) or not isinstance(date_str, str) or not date_str.strip():
        return pd.NaT
    return pd.to_datetime(date_str, format='%Y%m%d', errors='coerce')

def parse_apply_period(period_str):
    if pd.isna(period_str) or '~' not in str(period_str):
        return pd.NaT, pd.NaT
    try:
        start_str, end_str = [s.strip() for s in period_str.split('~')]
        return to_datetime_safe(start_str), to_datetime_safe(end_str)
    except ValueError:
        return pd.NaT, pd.NaT

df_processed['biz_start_date'] = df_processed['biz_start_date'].apply(to_datetime_safe)
df_processed['biz_end_date'] = df_processed['biz_end_date'].apply(to_datetime_safe)
df_processed[['aply_start_date', 'aply_end_date']] = df_processed['aply_period'].apply(
    lambda x: pd.Series(parse_apply_period(x))
)
df_processed = df_processed.drop(columns=['aply_period'])

print("✅ 수정된 2단계 성공: `application_status` 포함하여 정제 완료!")
print(df_processed[['policy_id', 'application_status']].head())

In [None]:
# 3. policies 테이블용 데이터 프레임 생성

# 1. 'policies' 테이블 컬럼 목록 정의 (수정)
policy_table_cols = [
    'policy_id',
    'policy_name',
    'policy_summary',
    'Operation_Inst',
    'source_url',
    'min_age',
    'max_age',
    'income_min',
    'income_max',
    'biz_start_date',
    'biz_end_date',
    'aply_start_date',
    'aply_end_date',
    'marriage_status',
    'application_status' 
]

# 2. df_processed에서 해당 컬럼들만 선택하여 최종 policies_df 생성
policies_df = df_processed[policy_table_cols].copy()

# 3. 결과 확인
print("\n✅ 수정된 3단계 성공: `application_status` 포함하여 `policies_df` 생성 완료!")
print("\n--- policies_df 정보 ---")
policies_df.info()

In [None]:
# 순서 재정렬(DB 스키마 순서와 동일하게)
# (선택 사항) DB 스키마 순서와 동일하게 컬럼 순서 재정렬
db_schema_order = [
    'policy_id', 'min_age', 'max_age', 'income_min', 'income_max',
    'biz_start_date', 'biz_end_date', 'aply_start_date', 'aply_end_date',
    'application_status', 'marriage_status', 'policy_name', 
    'policy_summary', 'source_url'
]
policies_df = policies_df[db_schema_order]

# 재정렬된 순서 확인
print(policies_df.head())

In [None]:
df_processed.info()

# df_processed에서 'keywords' 열이 비어있지 않은 행의 개수를 세어봅니다.
policies_with_keywords = df_processed['keywords'].notna().sum()

print(f"전체 {len(df_processed)}개 정책 중, 키워드가 할당된 정책의 수: {policies_with_keywords}개")

In [None]:
# 4. 매핑 테이블용 데이터 프레임 생성


# 1. 매핑 테이블 생성을 위한 범용 함수 정의
def create_mapping_df(df, id_col, value_col, new_value_col_name):
    """
    쉼표로 구분된 값을 가진 컬럼을 분리하여,
    정책 ID와 값이 1:1로 매핑되는 '길쭉한' 데이터프레임을 만듭니다.
    
    :param df: 원본 데이터프레임 (df_processed)
    :param id_col: 기준 ID 컬럼명 ('policy_id')
    :param value_col: 쉼표로 구분된 값이 있는 컬럼명
    :param new_value_col_name: 최종 매핑 테이블의 값 컬럼명
    :return: 매핑 테이블용 데이터프레임
    """
    # 1. 필요한 컬럼만 선택하고, 값이 비어있는 행은 제외
    temp_df = df[[id_col, value_col]].dropna(subset=[value_col])
    
    # 2. 쉼표(,)를 기준으로 값을 분리하여 리스트로 만듦
    temp_df[value_col] = temp_df[value_col].str.split(',')
    
    # 3. .explode() 함수로 리스트의 각 항목을 별도의 행으로 펼침
    mapping_df = temp_df.explode(value_col)
    
    # 4. 각 값의 앞뒤 공백 제거
    mapping_df[value_col] = mapping_df[value_col].str.strip()
    
    # 5. 최종 컬럼 이름 변경 (예: value_col -> keyword_name)
    mapping_df = mapping_df.rename(columns={value_col: new_value_col_name})
    
    # 6. 인덱스 초기화
    mapping_df = mapping_df.reset_index(drop=True)
    
    return mapping_df

# 2. 'keywords' 컬럼을 사용하여 'policy_keywords' 매핑 테이블 생성
policy_keywords_df = create_mapping_df(
    df=df_processed, 
    id_col='policy_id', 
    value_col='keywords', 
    new_value_col_name='keyword_name' # DB 스키마에 맞게 이름 지정
)

# 3. 결과 확인
print("✅ 4-1단계 성공: `policy_keywords_df` 생성 완료!")
print(f"생성된 키워드 매핑 수: {len(policy_keywords_df)}개")

print("\n--- policy_keywords_df 데이터 샘플 ---")
print(policy_keywords_df.head(10))

In [None]:

print("✅ 4-2단계 시작: 나머지 매핑 테이블들을 생성합니다.\n")

# 1. 취업 상태(job_status) 매핑 테이블 생성
policy_job_status_df = create_mapping_df(
    df=df_processed, 
    id_col='policy_id', 
    value_col='job_status_codes', 
    new_value_col_name='job_status_code'
)
print("--- `policy_job_status_df` 생성 완료 ---")
print(policy_job_status_df.head())


# 2. 학력(education_levels) 매핑 테이블 생성
policy_education_levels_df = create_mapping_df(
    df=df_processed, 
    id_col='policy_id', 
    value_col='education_level_codes', 
    new_value_col_name='education_level_code'
)
print("\n--- `policy_education_levels_df` 생성 완료 ---")
print(policy_education_levels_df.head())


# 3. 전공(majors) 매핑 테이블 생성
policy_majors_df = create_mapping_df(
    df=df_processed, 
    id_col='policy_id', 
    value_col='major_codes', 
    new_value_col_name='major_code'
)
print("\n--- `policy_majors_df` 생성 완료 ---")
print(policy_majors_df.head())


# 4. 지역(regions) 매핑 테이블 생성
policy_regions_df = create_mapping_df(
    df=df_processed, 
    id_col='policy_id', 
    value_col='region_codes', 
    new_value_col_name='region_code'
)
print("\n--- `policy_regions_df` 생성 완료 ---")
print(policy_regions_df.head())


# 5. 카테고리(categories) 매핑 테이블 생성
policy_categories_df = create_mapping_df(
    df=df_processed, 
    id_col='policy_id', 
    value_col='category_names', 
    new_value_col_name='category_name'
)
print("\n--- `policy_categories_df` 생성 완료 ---")
print(policy_categories_df.head())


# 6. 서브카테고리(subcategories) 매핑 테이블 생성
policy_subcategories_df = create_mapping_df(
    df=df_processed, 
    id_col='policy_id', 
    value_col='subcategory_names', 
    new_value_col_name='subcategory_name'
)
print("\n--- `policy_subcategories_df` 생성 완료 ---")
print(policy_subcategories_df.head())

print("\n\n✅ 4단계 성공: 모든 매핑 테이블용 데이터프레임 생성 완료!")

In [None]:
# 타입 디버깅을 위한 확인용 콛,

import pandas as pd

# 엑셀 파일 경로
CODE_EXCEL_PATH = './code_table.xlsx'

try:
    # 엑셀 파일의 '코드정보' 시트를 읽어옵니다.
    df_codes_debug = pd.read_excel(CODE_EXCEL_PATH, sheet_name='코드정보')

    print("--- 1. 실제 컬럼 이름 확인 ---")
    print(df_codes_debug.columns.tolist())
    print("\n" + "="*30 + "\n")

    # '분류'라는 컬럼이 있는지 확인하고, 있다면 그 안의 고유한 값들을 확인합니다.
    if '분류' in df_codes_debug.columns:
        print("--- 2. '분류' 컬럼의 고유값 확인 ---")
        # ffill()을 먼저 적용해서 비어있는 셀을 채운 후 고유값을 확인합니다.
        unique_values = df_codes_debug['분류'].ffill().unique()
        print(unique_values)
    else:
        print(">>> '분류'라는 이름의 컬럼을 찾을 수 없습니다!")
    
    print("\n" + "="*30 + "\n")
    
    print("--- 3. 파일 상위 5줄 내용 확인 ---")
    print(df_codes_debug.head())

except FileNotFoundError:
    print(f"오류: '{CODE_EXCEL_PATH}' 파일을 찾을 수 없습니다.")
except Exception as e:
    print(f"오류 발생: {e}")

In [None]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import logging

# --- 설정 ---
# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 파일 경로
POLICY_DATA_PATH = './policy_data.csv'
CODE_EXCEL_PATH = './code_table.xlsx'

# DB 연결 정보 (!!!반드시 수정!!!)
DB_USER = "root"
DB_PASSWORD = "1234"
DB_HOST = "localhost"
DB_PORT = "3306"
DB_NAME = "toyprj4"
DB_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"


# --- 헬퍼 함수 정의 (변경 없음) ---
def to_datetime_safe(date_str):
    if pd.isna(date_str) or not isinstance(date_str, str) or not date_str.strip():
        return pd.NaT
    return pd.to_datetime(date_str, format='%Y%m%d', errors='coerce')

def parse_apply_period(period_str):
    if pd.isna(period_str) or '~' not in str(period_str):
        return pd.NaT, pd.NaT
    try:
        start_str, end_str = [s.strip() for s in period_str.split('~')]
        return to_datetime_safe(start_str), to_datetime_safe(end_str)
    except ValueError:
        return pd.NaT, pd.NaT

def create_mapping_df(df, id_col, value_col, new_value_col_name):
    temp_df = df[[id_col, value_col]].dropna(subset=[value_col])
    if temp_df.empty:
        return pd.DataFrame(columns=[id_col, new_value_col_name])
    temp_df[value_col] = temp_df[value_col].astype(str).str.split(',')
    mapping_df = temp_df.explode(value_col)
    mapping_df[value_col] = mapping_df[value_col].str.strip()
    mapping_df = mapping_df.rename(columns={value_col: new_value_col_name})
    return mapping_df.reset_index(drop=True)


# --- 메인 ETL 로직 ---
def run_etl():
    """전체 ETL 프로세스를 실행합니다."""
    
    # --- 1. 추출 (Extract) ---
    try:
        df_raw = pd.read_csv(POLICY_DATA_PATH, encoding='utf-8')
        df_codes = pd.read_excel(CODE_EXCEL_PATH, sheet_name='코드정보', dtype={'코드': str})
        logging.info(f"✅ 1단계 성공: 데이터 및 엑셀 코드 시트 로드 완료.")
    except FileNotFoundError as e:
        logging.error(f"❌ 오류: 파일을 찾을 수 없습니다. ({e})")
        return

    # --- 2. 변환 (Transform) ---
    # 2.1. 코드-자연어 매핑 딕셔너리 생성
    code_map = {}
    df_codes['분류'] = df_codes['분류'].ffill() 
    df_codes['코드'] = df_codes['코드'].str.zfill(7)
    
    code_map['marriage_status'] = df_codes[df_codes['분류'] == 'mrgSttsCd'].set_index('코드')['코드내용'].to_dict()
    code_map['application_status'] = df_codes[df_codes['분류'] == 'aplyPrdSeCd'].set_index('코드')['코드내용'].to_dict()
    logging.info("코드-자연어 매핑 생성 완료.")
    
    # 2.2. 기본 정제
    column_mapping = {
        'plcyNo': 'policy_id', 'plcyNm': 'policy_name', 'plcyExplnCn': 'policy_summary',
        'refUrlAddr1': 'source_url', 'sprtTrgtMinAge': 'min_age', 'sprtTrgtMaxAge': 'max_age',
        'earnMinAmt': 'income_min', 'earnMaxAmt': 'income_max', 'bizPrdBgngYmd': 'biz_start_date',
        'bizPrdEndYmd': 'biz_end_date', 'aplyYmd': 'aply_period', 'mrgSttsCd': 'marriage_status',
        'aplyPrdSeCd': 'application_status', 'jobCd': 'job_status_codes', 'schoolCd': 'education_level_codes',
        'plcyMajorCd': 'major_codes', 'sbizCd': 'specialization_codes', 'plcyKywdNm': 'keywords',
        'zipCd': 'region_codes', 'lclsfNm': 'category_names', 'mclsfNm': 'subcategory_names',
        'rgtrInstCdNm': 'operation_inst' # rgtrInstCdNm 컬럼 추가
    }
    df_processed = df_raw[list(column_mapping.keys())].copy().rename(columns=column_mapping)

    # 2.3. 코드를 자연어로 변환
    def code_to_natural(series, mapping_dict):
        """숫자/문자열 코드를 안전하게 자연어로 변환하는 함수"""
        # 1. 비어있는 값(NaN)은 그대로 둠
        # 2. pd.to_numeric으로 숫자 변환 시도, 실패 시 NaN
        # 3. 정수형으로 변환하여 .0 제거
        # 4. 문자열로 변환
        # 5. 7자리로 맞춤
        # 6. 최종 매핑
        return pd.to_numeric(series, errors='coerce').astype('Int64').astype(str).str.zfill(7).map(mapping_dict)

    df_processed['marriage_status'] = code_to_natural(df_processed['marriage_status'], code_map['marriage_status'])
    df_processed['application_status'] = code_to_natural(df_processed['application_status'], code_map['application_status'])
    logging.info("결혼상태, 신청상태 코드 -> 자연어 변환 완료.")

    # 2.4. 숫자 및 날짜 변환
    numeric_cols = ['min_age', 'max_age', 'income_min', 'income_max']
    for col in numeric_cols:
        df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce').replace(0, np.nan)

    df_processed['biz_start_date'] = df_processed['biz_start_date'].apply(to_datetime_safe)
    df_processed['biz_end_date'] = df_processed['biz_end_date'].apply(to_datetime_safe)
    df_processed[['aply_start_date', 'aply_end_date']] = df_processed['aply_period'].apply(lambda x: pd.Series(parse_apply_period(x)))
    df_processed = df_processed.drop(columns=['aply_period'])
    logging.info("✅ 2단계 성공: 전체 데이터 정제 및 가공 완료.")

    # --- 3. 변환 (Transform) - `policies` 테이블용 데이터프레임 생성 ---
    policy_table_cols = [
        'policy_id', 'policy_name', 'policy_summary', 'source_url', 'min_age', 'max_age',
        'income_min', 'income_max', 'biz_start_date', 'biz_end_date', 'aply_start_date',
        'aply_end_date', 'marriage_status', 'application_status',
        'operation_inst' # `policies` 테이블에 추가된 컬럼
    ]
    policies_df = df_processed[policy_table_cols].copy()
    logging.info("✅ 3단계 성공: `policies` 테이블용 데이터프레임 생성 완료.")

    # --- 4. 변환 (Transform) - 매핑 테이블용 데이터프레임 생성 ---
    policy_keywords_df = create_mapping_df(df_processed, 'policy_id', 'keywords', 'keyword_name')
    policy_job_status_df = create_mapping_df(df_processed, 'policy_id', 'job_status_codes', 'job_status_code')
    policy_education_levels_df = create_mapping_df(df_processed, 'policy_id', 'education_level_codes', 'education_level_code')
    policy_majors_df = create_mapping_df(df_processed, 'policy_id', 'major_codes', 'major_code')
    policy_regions_df = create_mapping_df(df_processed, 'policy_id', 'region_codes', 'region_code')
    policy_categories_df = create_mapping_df(df_processed, 'policy_id', 'category_names', 'category_name')
    policy_subcategories_df = create_mapping_df(df_processed, 'policy_id', 'subcategory_names', 'subcategory_name')
    policy_specializations_df = create_mapping_df(df_processed, 'policy_id', 'specialization_codes', 'specialization_code')
    logging.info("✅ 4단계 성공: 모든 매핑 테이블용 데이터프레임 생성 완료.")

    # --- 5. 적재 (Load) ---
    try:
        engine = create_engine(DB_URL)
        logging.info(f"데이터베이스 '{DB_NAME}'에 연결되었습니다.")
        
        dataframes_to_load = {
            'policies': policies_df,
            'policy_keywords': policy_keywords_df,
            'policy_job_status': policy_job_status_df,
            'policy_education_levels': policy_education_levels_df,
            'policy_majors': policy_majors_df,
            'policy_regions': policy_regions_df,
            'policy_categories': policy_categories_df,
            'policy_subcategories': policy_subcategories_df,
            'policy_specializations': policy_specializations_df
        }

        for table_name, df in dataframes_to_load.items():
            df.to_sql(name=table_name, con=engine, if_exists='replace', index=False)
            logging.info(f"-> 테이블 '{table_name}'에 {len(df)}개 행 적재 완료.")
        
        logging.info("🎉 5단계 성공: 모든 데이터 적재 작업이 완료되었습니다!")

    except Exception as e:
        logging.error(f"❌ 5단계 오류: 데이터베이스 연결 또는 적재 중 오류 발생: {e}")

# --- 스크립트 실행 ---
if __name__ == "__main__":
    run_etl()

In [None]:
import pandas as pd

# 파일 경로
POLICY_DATA_PATH = './policy_data.csv'
CODE_EXCEL_PATH = './code_table.xlsx'

try:
    # 1. 각 파일을 읽어옵니다.
    df_raw_debug = pd.read_csv(POLICY_DATA_PATH, encoding='utf-8')
    df_codes_debug = pd.read_excel(CODE_EXCEL_PATH, sheet_name='코드정보', dtype={'코드': str})

    # --- marriage_status 매핑 과정만 집중 분석 ---

    # 2. 코드 매핑 딕셔너리를 만듭니다.
    df_codes_debug['분류'] = df_codes_debug['분류'].ffill()
    df_codes_debug['코드'] = df_codes_debug['코드'].str.zfill(7)
    
    marriage_map = df_codes_debug[df_codes_debug['분류'] == 'mrgSttsCd'].set_index('코드')['코드내용'].to_dict()

    print("--- 1. 생성된 결혼상태(marriage_status) 매핑 딕셔너리 ---")
    print(marriage_map)
    print("\n" + "="*50 + "\n")


    # 3. 원본 데이터의 'mrgSttsCd' 값들을 확인합니다.
    #    값이 비어있지 않은 샘플 5개를 가져옵니다.
    sample_codes = df_raw_debug['mrgSttsCd'].dropna().head(5)
    
    print("--- 2. 원본 데이터의 'mrgSttsCd' 샘플 ---")
    print(sample_codes)
    print("\n" + "="*50 + "\n")
    
    
    # 4. 이 샘플들이 어떻게 변환되는지 확인합니다.
    print("--- 3. 샘플 코드 변환 과정 및 매핑 결과 ---")
    for code in sample_codes:
        # 원본 값을 문자열로 바꾸고 7자리로 만듭니다.
        processed_code = str(code).zfill(7)
        # 매핑 딕셔너리에서 값을 찾아봅니다.
        mapped_value = marriage_map.get(processed_code, ">>> 매핑 실패! <<<")
        
        print(f"원본: {code}  ->  변환 후: '{processed_code}'  ->  매핑 결과: {mapped_value}")


except FileNotFoundError as e:
    print(f"오류: 파일을 찾을 수 없습니다. ({e})")
except Exception as e:
    print(f"오류 발생: {e}")