# Import Libraries

In [1]:
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Set Parameters & Paths

In [2]:
### 파라미터 설정

SEED = 42
TARGET_YEAR = 2023                    # use info up to 2023 for valuation

# Relative valuation (3y weighted cheapness) weights
W_2023, W_2022, W_2021 = 0.5, 0.3, 0.2

MIN_INDUSTRY_SIZE = 100               # 중분류별 표본 최소 기업 수
SAMPLE_PER_INDUSTRY = 5               # 중분류별 랜덤 샘플 수

# Valuation model assumptions
RISK_FREE = 0.03                      # 3% risk-free
EQUITY_RISK_PREMIUM = 0.05            # 5% equity risk premium (conservative)
BETA_DEFAULT = 1.0                    # if beta unavailable
COST_OF_EQUITY_CAP = 0.20             # ceiling for cost of equity to avoid extreme
LONG_TERM_GROWTH_CAP = 0.05           # cap LT growth at 5%
DEFAULT_PAYOUT = 0.30                 # 30% payout ratio if dividends unknown
PAYOUT_UP = 0.35                      # if cashflow quality strong
PAYOUT_DOWN = 0.25                    # if cashflow quality weak
RI_HORIZON_YEARS = 5                  # projection horizon for RIM
FADE_TO_G = True                      # fade NI growth to terminal g linearly

In [3]:
### 경로 설정

top10_fp = Path("outputs/top10.csv")
bottom10_fp = Path("outputs/bottom10.csv")
all_fp = Path("data/preprocessed_accounts_data.csv")  # <-- main dataset provided by user

# Defensive file checks
missing = [p.name for p in [top10_fp, bottom10_fp, all_fp] if not p.exists()]
if missing:
    raise FileNotFoundError(f"다음 파일을 찾을 수 없습니다: {missing}. "
                            f"경로 또는 파일명을 확인해주세요.")

# Load Data

In [4]:
# Try a couple of encodings defensively
def _read_csv_safe(path):
    for enc in ("utf-8-sig", "utf-8", "cp949"):
        try:
            return pd.read_csv(path, dtype={"Symbol":"string"}, encoding=enc)
        except Exception:
            continue
    # fallback
    return pd.read_csv(path, dtype={"Symbol":"string"})

top10 = _read_csv_safe(top10_fp)
bottom10 = _read_csv_safe(bottom10_fp)
df = _read_csv_safe(all_fp)


In [5]:
# Ensure numeric for selected cols
def to_num(df, cols):
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    return df

numeric_cols = [
    "연도","PER","PBR","EV_EBITDA","총자산","자기자본비율","당기순이익","기말발행주식수",
    "SalesCAGR3Y","NIGrowth"
]
df = to_num(df, numeric_cols)

In [6]:
# Keep only data up to TARGET_YEAR; pick the latest available per firm for cross-sectional valuation
df = df[df["연도"] <= TARGET_YEAR].copy()
df.sort_values(["Symbol", "연도"], inplace=True)
latest = df.groupby("Symbol").tail(1).copy()

# Build target universe

In [7]:
chosen = pd.concat([top10[["Symbol"]], bottom10[["Symbol"]]]).drop_duplicates()
chosen = chosen.merge(latest[["Symbol","Name","연도","산업분류(중분류)"]], on="Symbol", how="left")

In [None]:
# MIN_INDUSTRY_SIZE 이상의 기업이 속한 산업군 추출(극단값 처리)
ind_counts = latest.groupby("산업분류(중분류)")["Symbol"].nunique().reset_index(name="n_firms")
big_inds = ind_counts[ind_counts["n_firms"] >= MIN_INDUSTRY_SIZE]["산업분류(중분류)"]

In [9]:
# 추출된 산업군에서 랜덤 샘플링
rng = np.random.default_rng(SEED)
samples = []
for ind in big_inds:
    pool = latest[latest["산업분류(중분류)"] == ind]
    if len(pool) == 0:
        continue
    k = min(SAMPLE_PER_INDUSTRY, len(pool))
    idx = rng.choice(pool.index, size=k, replace=False)
    samples.append(pool.loc[idx, ["Symbol"]])
samples = pd.concat(samples) if samples else pd.DataFrame(columns=["Symbol"])


In [10]:
# 최종 평가 대상 기업 선정
target_syms = pd.concat([chosen[["Symbol"]], samples]).drop_duplicates()
target = target_syms.merge(latest, on="Symbol", how="left")

In [11]:
target.shape

(85, 66)

# 3-Years Normalization 

In [12]:
panel3 = target[target["연도"].between(2021, 2023)].copy()

# 3y averages for normalization
agg_dict = {}
if "당기순이익" in panel3.columns: agg_dict["당기순이익"] = "mean"
if "매출액" in panel3.columns: agg_dict["매출액"] = "mean"
if not agg_dict:
    raise ValueError("패널 데이터에 필요한 컬럼(당기순이익, 매출액)이 없습니다.")

avg3 = panel3.groupby("Symbol").agg(agg_dict)
avg3 = avg3.rename(columns={"당기순이익":"NI_avg3","매출액":"Sales_avg3"}).reset_index()

In [13]:
# NI CAGR proxy from 2021 to 2023 if available (2-year CAGR)
def cagr_2y(group, col):
    g = group.sort_values("연도")
    if {2021, 2023}.issubset(set(g["연도"])):
        v1 = g.loc[g["연도"]==2021, col].values[0]
        v3 = g.loc[g["연도"]==2023, col].values[0]
        if pd.notna(v1) and pd.notna(v3) and v1 not in [0, np.nan]:
            return (v3 / v1) ** (1/2) - 1
    return np.nan

ni_cagr2y = panel3.groupby("Symbol").apply(lambda g: cagr_2y(g, "당기순이익")).rename("NI_CAGR2Y").reset_index()

In [14]:
# Merge normalization info into latest
latest_norm = latest.merge(avg3, on="Symbol", how="left").merge(ni_cagr2y, on="Symbol", how="left")

# 상대가치 평가

In [15]:
peer_cols = [c for c in ["PER","PBR","EV_EBITDA"] if c in df.columns]

def year_industry_percentiles(subdf, year):
    tmp = subdf[subdf["연도"]==year][["Symbol","산업분류(중분류)"] + peer_cols].copy()
    for col in peer_cols:
        tmp[f"{col}_prank_{year}"] = tmp.groupby("산업분류(중분류)")[col].transform(lambda s: s.rank(pct=True))
        tmp[f"{col}_cheap_{year}"] = 1.0 - tmp[f"{col}_prank_{year}"]
    return tmp[["Symbol","산업분류(중분류)"] + [f"{c}_cheap_{year}" for c in peer_cols]]

def safe_year_df(year):
    if (df["연도"]==year).any():
        return year_industry_percentiles(df, year)
    else:
        # empty frame with needed columns
        cols = ["Symbol","산업분류(중분류)"] + [f"{c}_cheap_{year}" for c in peer_cols]
        return pd.DataFrame(columns=cols)

cheap_2023 = safe_year_df(2023)
cheap_2022 = safe_year_df(2022)
cheap_2021 = safe_year_df(2021)

cheap_all = latest[["Symbol","산업분류(중분류)"]].merge(cheap_2023, on=["Symbol","산업분류(중분류)"], how="left") \
    .merge(cheap_2022, on=["Symbol","산업분류(중분류)"], how="left") \
    .merge(cheap_2021, on=["Symbol","산업분류(중분류)"], how="left")

In [16]:
# Weighted average with available weights
for col in peer_cols:
    cols_year = [f"{col}_cheap_2023", f"{col}_cheap_2022", f"{col}_cheap_2021"]
    weights = np.array([W_2023, W_2022, W_2021], dtype=float)

    def _weighted(row):
        vals = np.array([row.get(c) for c in cols_year], dtype=float)
        mask = ~np.isnan(vals)
        if not mask.any():
            return np.nan
        w = weights[mask]
        w = w / w.sum()
        return float(np.dot(vals[mask], w))

    cheap_all[f"{col}_cheap_3y"] = cheap_all.apply(_weighted, axis=1)

In [17]:
# 2023 gap-to-median for interpretability
rel2023 = df[df["연도"]==2023][["Symbol","산업분류(중분류)"] + peer_cols].copy()
for col in peer_cols:
    rel2023[f"{col}_gap_med_2023"] = rel2023.groupby("산업분류(중분류)")[col].transform(
        lambda s: s / np.nanmedian(s.values) - 1.0
    )

rel_merge = latest[["Symbol","Name","산업분류(중분류)"]].merge(cheap_all, on=["Symbol","산업분류(중분류)"], how="left") \
    .merge(rel2023[["Symbol"] + [f"{c}_gap_med_2023" for c in peer_cols]], on="Symbol", how="left")

# Only target firms
rel_tgt = latest[["Symbol","Name","산업분류(중분류)"]].merge(rel_merge, on=["Symbol","Name","산업분류(중분류)"], how="left")

  r, k = function_base._ureduce(a, func=_nanmedian, axis=axis, out=out,
  r, k = function_base._ureduce(a, func=_nanmedian, axis=axis, out=out,


# 절대가치 평가

In [18]:
# Build working frame with normalization features
work = latest.merge(latest_norm[["Symbol","NI_avg3","NI_CAGR2Y"]], on="Symbol", how="left")

def estimate_cost_of_equity(row):
    beta = BETA_DEFAULT
    coe = RISK_FREE + beta * EQUITY_RISK_PREMIUM
    return float(min(coe, COST_OF_EQUITY_CAP))

def estimate_lt_growth(row):
    g = row.get("SalesCAGR3Y", np.nan)
    if pd.isna(g) or not np.isfinite(g):
        g = row.get("NI_CAGR2Y", np.nan)  # fallback
    if pd.isna(g) or not np.isfinite(g):
        g = 0.02
    g = float(min(g, LONG_TERM_GROWTH_CAP))
    return g

def choose_payout(row):
    # Adjust payout using cashflow quality heuristics if available
    payout = DEFAULT_PAYOUT
    cfo_ni = row.get("CFO_NI", np.nan)
    accr = row.get("Accruals", np.nan)
    if pd.notna(cfo_ni) and pd.notna(accr):
        if (cfo_ni >= 1.0) and (accr <= 0):
            payout = PAYOUT_UP
        elif (cfo_ni < 0.5) or (accr > 0):
            payout = PAYOUT_DOWN
    return float(payout)

def estimate_dividend(row):
    # DPS0 based on NI_avg3 for normalization
    ni = row.get("NI_avg3", np.nan)
    if pd.isna(ni):
        ni = row.get("당기순이익", np.nan)
    shares = row.get("기말발행주식수", np.nan)
    if pd.isna(shares) or shares == 0:
        return np.nan, np.nan, np.nan
    payout = choose_payout(row)
    dps0 = (ni * payout) / shares
    return dps0, payout, shares

def ddm_valuation(row):
    r = estimate_cost_of_equity(row)
    g = estimate_lt_growth(row)
    dps0, payout, shares = estimate_dividend(row)
    if any(pd.isna(x) for x in [dps0, r, g, shares]) or (r <= g):
        return np.nan, np.nan, r, g, payout
    d1 = dps0 * (1.0 + g)
    price = d1 / (r - g)
    equity_value = price * shares
    return price, equity_value, r, g, payout

def rim_valuation(row):
    total_assets = row.get("총자산", np.nan)
    eq_ratio = row.get("자기자본비율", np.nan)
    ni0 = row.get("NI_avg3", np.nan)
    if pd.isna(ni0):
        ni0 = row.get("당기순이익", np.nan)
    shares = row.get("기말발행주식수", np.nan)

    if any(pd.isna(x) for x in [total_assets, eq_ratio, ni0]) or total_assets <= 0 or eq_ratio <= 0:
        return np.nan, np.nan, np.nan, np.nan, np.nan

    B0 = total_assets * eq_ratio
    r = estimate_cost_of_equity(row)
    g = estimate_lt_growth(row)
    H = RI_HORIZON_YEARS

    gi0 = row.get("NIGrowth", np.nan)
    if pd.isna(gi0) or not np.isfinite(gi0):
        gi0 = row.get("NI_CAGR2Y", np.nan)
    if pd.isna(gi0) or not np.isfinite(gi0):
        gi0 = g
    gi0 = float(np.clip(gi0, -0.5, 0.5))

    # project NI
    ni_path = []
    for t in range(1, H+1):
        gt = gi0 + (g - gi0) * (t / H) if FADE_TO_G else gi0
        last = ni0 if t == 1 else ni_path[-1]
        ni_path.append(last * (1.0 + gt))

    payout = choose_payout(row)
    B = B0
    PV_RI = 0.0
    for t, NI in enumerate(ni_path, start=1):
        RI_t = NI - r * B
        PV_RI += RI_t / ((1.0 + r) ** t)
        div = payout * NI
        B = B + NI - div

    NI_T = ni_path[-1] if ni_path else np.nan
    if pd.isna(NI_T) or r <= g:
        V0 = np.nan
    else:
        div_T = payout * NI_T
        B_T = B + NI_T - div_T
        RI_T1 = (NI_T * (1.0 + g)) - r * B_T
        term_val = RI_T1 / ((r - g) * ((1.0 + r) ** H))
        V0 = B0 + PV_RI + term_val

    price = np.nan
    if not pd.isna(V0) and not pd.isna(shares) and shares > 0:
        price = V0 / shares
    return V0, price, r, g, payout

In [19]:
abs_rows = []
for _, row in work.iterrows():
    ddm_price, ddm_equity, r1, g1, payout1 = ddm_valuation(row)
    rim_equity, rim_price, r2, g2, payout2 = rim_valuation(row)
    abs_rows.append({
        "Symbol": row["Symbol"],
        "Name": row.get("Name", np.nan),
        # DDM
        "DDM_price": ddm_price,
        "DDM_equity_value": ddm_equity,
        "DDM_r_used": r1,
        "DDM_g_used": g1,
        "DDM_payout_used": payout1,
        # RIM
        "RIM_price": rim_price,
        "RIM_equity_value": rim_equity,
        "RIM_r_used": r2,
        "RIM_g_used": g2,
        "RIM_payout_used": payout2,
        # Normalization
        "NI_avg3": row.get("NI_avg3", np.nan),
        "NI_CAGR2Y": row.get("NI_CAGR2Y", np.nan),
        "SalesCAGR3Y": row.get("SalesCAGR3Y", np.nan),
    })
abs_df = pd.DataFrame(abs_rows)

# Combine relative + absolute for target firms

In [20]:
out = rel_tgt.merge(abs_df, on=["Symbol","Name"], how="left")

# Add 2023 raw peer metrics for reference
cols_add = [c for c in ["PER","PBR","EV_EBITDA","기말발행주식수","총자산","자기자본비율","당기순이익","CFO_NI","Accruals"] if c in latest.columns]
out = out.merge(latest[["Symbol"] + cols_add], on="Symbol", how="left")


In [21]:
# Sort and save
out.sort_values(["산업분류(중분류)","Name"], inplace=True)

out_fp = Path("outputs/valuation_results_3y.csv")
out.to_csv(out_fp, index=False, encoding="utf-8-sig")

# Scenarios & Sensitivity

In [22]:
def dps0_unit(row):
    ni = row.get("NI_avg3", np.nan)
    if pd.isna(ni): ni = row.get("당기순이익", np.nan)
    shares = row.get("기말발행주식수", np.nan)
    if pd.isna(ni) or pd.isna(shares) or shares == 0: return np.nan
    return float(ni / shares)

def ddm_price_from_row(row, r, g, payout):
    dps0 = dps0_unit(row)
    if any(pd.isna(x) for x in [dps0, r, g, payout]): return np.nan
    g = clamp_g(r, g)
    d1 = dps0 * payout * (1.0 + g)
    if r <= g: return np.nan
    return float(d1 / (r - g))

def rim_price_from_row(row, r, g, payout):
    # reuse the RIM function pieces above for price only
    ta = row.get("총자산", np.nan)
    eqr = row.get("자기자본비율", np.nan)
    shares = row.get("기말발행주식수", np.nan)
    ni0 = row.get("NI_avg3", np.nan)
    if pd.isna(ni0): ni0 = row.get("당기순이익", np.nan)
    if any(pd.isna(x) for x in [ta, eqr, shares, ni0]): return np.nan
    B0 = ta * eqr
    H = RI_HORIZON_YEARS
    gi0 = row.get("NI_CAGR2Y", np.nan)
    if pd.isna(gi0): gi0 = row.get("RIM_g_used", np.nan)
    if pd.isna(gi0): gi0 = g
    gi0 = float(np.clip(gi0, -0.5, 0.5))
    g = clamp_g(r, g)

    ni_path = []
    for t in range(1, H+1):
        gt = gi0 + (g - gi0) * (t / H) if FADE_TO_G else gi0
        last = ni0 if t == 1 else ni_path[-1]
        ni_path.append(last * (1.0 + gt))

    B = B0
    PV_RI = 0.0
    for t, NI in enumerate(ni_path, start=1):
        RI_t = NI - r * B
        PV_RI += RI_t / ((1.0 + r) ** t)
        div = payout * NI
        B = B + NI - div

    if r <= g: return np.nan
    NI_T = ni_path[-1]
    div_T = payout * NI_T
    B_T = B + NI_T - div_T
    RI_T1 = (NI_T * (1.0 + g)) - r * B_T
    term_val = RI_T1 / ((r - g) * ((1.0 + r) ** H))
    V0 = B0 + PV_RI + term_val
    return float(V0 / shares)

def clamp_g(r, g, margin=0.002):
    g = min(g, LONG_TERM_GROWTH_CAP)
    if r <= g + margin:
        g = max(-0.10, r - margin - 1e-6)
    return g

def choose_base(row, key_r, key_g, key_p):
    r = row.get(key_r, np.nan)
    g = row.get(key_g, np.nan)
    p = row.get(key_p, np.nan)
    if pd.isna(r): r = RISK_FREE + BETA_DEFAULT * EQUITY_RISK_PREMIUM
    if pd.isna(g): g = 0.02
    if pd.isna(p): p = DEFAULT_PAYOUT
    return float(r), float(g), float(np.clip(p, 0.05, 0.8))

In [23]:
scen_records = []
for _, row in out.iterrows():
    r_base, g_base, p_ddm_base = choose_base(row, "DDM_r_used", "DDM_g_used", "DDM_payout_used")
    _, _, p_rim_base = choose_base(row, "RIM_r_used", "RIM_g_used", "RIM_payout_used")

    scenarios = {
        "Conservative": {"dr": +0.015, "dg": -0.010, "dp": -0.05},
        "Base":         {"dr": +0.000, "dg": +0.000, "dp": +0.00},
        "Optimistic":   {"dr": -0.015, "dg": +0.010, "dp": +0.05},
    }

    cheap_mean = np.nanmean([row.get("PER_cheap_3y", np.nan),
                             row.get("PBR_cheap_3y", np.nan),
                             row.get("EV_EBITDA_cheap_3y", np.nan)])

    for scen, d in scenarios.items():
        r = max(0.01, r_base + d["dr"])
        g = g_base + d["dg"]
        p_ddm = float(np.clip(p_ddm_base + d["dp"], 0.05, 0.8))
        p_rim = float(np.clip(p_rim_base + d["dp"], 0.05, 0.8))

        scen_records.append({
            "Symbol": row["Symbol"], "Name": row.get("Name"), "Scenario": scen,
            "r": r, "g": clamp_g(r, g), "payout_DDM": p_ddm, "payout_RIM": p_rim,
            "DDM_price": ddm_price_from_row(row, r, g, p_ddm),
            "RIM_price": rim_price_from_row(row, r, g, p_rim),
            "cheap_mean_3y": cheap_mean
        })

scen_df = pd.DataFrame(scen_records)
scen_fp = Path("outputs/valuation_scenarios.csv")
scen_df.to_csv(scen_fp, index=False, encoding="utf-8-sig")


  cheap_mean = np.nanmean([row.get("PER_cheap_3y", np.nan),


In [24]:
# sensitivity grid (top-K by cheapness)
K = 12
cheap_cols = [c for c in ["PER_cheap_3y","PBR_cheap_3y","EV_EBITDA_cheap_3y"] if c in out.columns]
cheap_score = out[["Symbol","Name"]].copy()
cheap_score["cheap_mean_3y"] = np.nanmean(out[cheap_cols].values, axis=1)
top_syms = cheap_score.sort_values("cheap_mean_3y", ascending=False).head(K)["Symbol"].tolist()

sens_records = []
for _, row in out[out["Symbol"].isin(top_syms)].iterrows():
    r_base = row.get("RIM_r_used", row.get("DDM_r_used", RISK_FREE + EQUITY_RISK_PREMIUM))
    g_base = row.get("RIM_g_used", row.get("DDM_g_used", 0.02))
    p_ddm = row.get("DDM_payout_used", DEFAULT_PAYOUT)
    p_rim = row.get("RIM_payout_used", DEFAULT_PAYOUT)

    r_grid = np.clip(np.array([r_base-0.02, r_base-0.01, r_base, r_base+0.01, r_base+0.02]), 0.01, 0.25)
    g_grid = np.clip(np.array([g_base-0.01, g_base-0.005, g_base, g_base+0.005, g_base+0.01]), -0.10, 0.05)

    for r in r_grid:
        for g in g_grid:
            g_eff = clamp_g(r, g)
            sens_records.append({
                "Symbol": row["Symbol"], "Name": row.get("Name"),
                "Model": "DDM", "r": float(r), "g": float(g_eff), "payout": float(np.clip(p_ddm, 0.05, 0.8)),
                "Price": ddm_price_from_row(row, r, g_eff, np.clip(p_ddm, 0.05, 0.8))
            })
            sens_records.append({
                "Symbol": row["Symbol"], "Name": row.get("Name"),
                "Model": "RIM", "r": float(r), "g": float(g_eff), "payout": float(np.clip(p_rim, 0.05, 0.8)),
                "Price": rim_price_from_row(row, r, g_eff, np.clip(p_rim, 0.05, 0.8))
            })

sens_df = pd.DataFrame(sens_records)
sens_fp = Path("outputs/valuation_sensitivity_long.csv")
sens_df.to_csv(sens_fp, index=False, encoding="utf-8-sig")

  cheap_score["cheap_mean_3y"] = np.nanmean(out[cheap_cols].values, axis=1)
