In [1]:
import requests, time, re
import pandas as pd

USER_AGENT = "KangTae MySecNotebook/1.0 (honeypipeline@gmail.com)"  # 본인 것으로!
BASE = "https://data.sec.gov"

def make_session():
    s = requests.Session()
    s.headers.update({"User-Agent": USER_AGENT, "Accept-Encoding":"gzip, deflate"})
    s.trust_env = False  # 회사 프록시 쓰면 True로 바꾸세요
    return s

def rate_sleep():
    time.sleep(0.3)  # 너무 빠르면 429

In [2]:
def get_cik_from_ticker(session, ticker: str) -> str:
    url = "https://www.sec.gov/files/company_tickers.json"
    rate_sleep()
    r = session.get(url, timeout=30)
    r.raise_for_status()
    data = r.json()
    lookup = {row["ticker"].lower(): str(row["cik_str"]) for row in data.values()}
    cik = lookup.get(ticker.lower())
    if not cik:
        raise ValueError(f"Ticker not found: {ticker}")
    return cik.zfill(10)

def get_company_facts(session, cik10: str) -> dict:
    url = f"{BASE}/api/xbrl/companyfacts/CIK{cik10}.json"
    rate_sleep()
    r = session.get(url, timeout=60)
    r.raise_for_status()
    return r.json()

In [None]:
# 원하는 개념(필요시 추가/수정)
CONCEPTS = [
    # ── 매출/이익/비용 (P&L)
    "us-gaap:RevenueFromContractWithCustomerExcludingAssessedTax",
    "us-gaap:SalesRevenueNet",
    "us-gaap:Revenues",
    "us-gaap:CostOfGoodsAndServicesSold",
    "us-gaap:GrossProfit",
    "us-gaap:ResearchAndDevelopmentExpense",
    "us-gaap:SellingGeneralAndAdministrativeExpense",
    "us-gaap:OperatingIncomeLoss",
    "us-gaap:DepreciationDepletionAndAmortization",
    "us-gaap:InterestExpense",
    "us-gaap:IncomeTaxExpenseBenefit",
    "us-gaap:IncomeLossFromContinuingOperationsBeforeIncomeTaxesExtraordinaryItemsNoncontrollingInterest",
    "us-gaap:NetIncomeLoss",

    # ── EPS / 주식수 / 배당
    "us-gaap:EarningsPerShareBasic",
    "us-gaap:EarningsPerShareDiluted",
    "us-gaap:WeightedAverageNumberOfSharesOutstandingBasic",
    "us-gaap:WeightedAverageNumberOfDilutedSharesOutstanding",
    "us-gaap:CommonStockDividendsPerShareDeclared",

    # ── 대차대조표 (재무상태표)
    "us-gaap:Assets",
    "us-gaap:AssetsCurrent",
    "us-gaap:CashAndCashEquivalentsAtCarryingValue",
    "us-gaap:AccountsReceivableNetCurrent",
    "us-gaap:InventoryNet",
    "us-gaap:PropertyPlantAndEquipmentNet",
    "us-gaap:Goodwill",
    "us-gaap:IntangibleAssetsNetExcludingGoodwill",
    "us-gaap:Liabilities",
    "us-gaap:LiabilitiesCurrent",
    "us-gaap:LongTermDebtNoncurrent",
    "us-gaap:LongTermDebtCurrent",
    "us-gaap:ShortTermBorrowings",
    "us-gaap:StockholdersEquity",

    # ── 현금흐름표
    "us-gaap:NetCashProvidedByUsedInOperatingActivities",
    "us-gaap:NetCashProvidedByUsedInInvestingActivities",
    "us-gaap:NetCashProvidedByUsedInFinancingActivities",
    "us-gaap:PaymentsToAcquirePropertyPlantAndEquipment",  # CapEx (지출, 음수)
    "us-gaap:PaymentsForRepurchaseOfCommonStock",
    "us-gaap:PaymentsOfDividendsCommonStock",
]

FORMS = {"10-Q","10-K"}
START_FY = 2021  # 회계연도 기준 필터

def extract_facts_df(companyfacts: dict, concepts=CONCEPTS, forms=FORMS, start_fy=START_FY) -> pd.DataFrame:
    facts = companyfacts.get("facts", {})
    rows = []
    for concept in concepts:
        ns, name = concept.split(":")
        if ns not in facts or name not in facts[ns]:
            continue
        # units: { "USD": [ { val, fy, fp, form, end, ... }, ... ], "USD/shares": [...], ... }
        for unit, items in (facts[ns][name].get("units") or {}).items():
            for it in items:
                form = it.get("form")
                fy = it.get("fy")
                if form not in forms: 
                    continue
                if fy is not None and fy < start_fy:
                    continue
                rows.append({
                    "cik": companyfacts.get("cik"),
                    "entityName": companyfacts.get("entityName"),
                    "concept": concept,
                    "unit": unit,
                    "val": it.get("val"),
                    "end": it.get("end"),       # 기간 종료일(yyyy-mm-dd)
                    "fy": fy,                   # 회계연도
                    "fp": it.get("fp"),         # Q1/Q2/Q3/Q4 또는 FY
                    "form": form,               # 10-Q / 10-K
                    "accn": it.get("accn"),     # 접수번호
                    "filed": it.get("filed"),  # ← 추가
                })
    df = pd.DataFrame(rows)
    # 중복 개념 우선순위(매출 여러 태그일 때): RevenueFrom... > SalesRevenueNet > Revenues
    if not df.empty:
        concept_rank = {
            "us-gaap:RevenueFromContractWithCustomerExcludingAssessedTax": 1,
            "us-gaap:SalesRevenueNet": 2,
            "us-gaap:Revenues": 3
        }
        df["concept_rank"] = df["concept"].map(concept_rank).fillna(10)
    return df

In [4]:
sess = make_session()
cik10 = get_cik_from_ticker(sess, "AAPL")
facts = get_company_facts(sess, cik10)
df = extract_facts_df(facts)

print("rows:", len(df))
df.head(10)

rows: 1635


Unnamed: 0,cik,entityName,concept,unit,val,end,fy,fp,form,accn,concept_rank
0,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,260174000000.0,2019-09-28,2021,FY,10-K,0000320193-21-000105,1.0
1,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,91819000000.0,2019-12-28,2021,Q1,10-Q,0000320193-21-000010,1.0
2,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,150132000000.0,2020-03-28,2021,Q2,10-Q,0000320193-21-000056,1.0
3,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,58313000000.0,2020-03-28,2021,Q2,10-Q,0000320193-21-000056,1.0
4,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,209817000000.0,2020-06-27,2021,Q3,10-Q,0000320193-21-000065,1.0
5,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,59685000000.0,2020-06-27,2021,Q3,10-Q,0000320193-21-000065,1.0
6,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,274515000000.0,2020-09-26,2021,FY,10-K,0000320193-21-000105,1.0
7,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,274515000000.0,2020-09-26,2022,FY,10-K,0000320193-22-000108,1.0
8,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,111439000000.0,2020-12-26,2021,Q1,10-Q,0000320193-21-000010,1.0
9,320193,Apple Inc.,us-gaap:RevenueFromContractWithCustomerExcludi...,USD,111439000000.0,2020-12-26,2022,Q1,10-Q,0000320193-22-000007,1.0


In [5]:
# 같은 날짜/폼에 매출 개념이 여러 개 있으면 rank로 하나만 선택
def choose_best_revenue(df):
    rev = df[df["concept"].str.contains(r"us-gaap:(Revenue|SalesRevenueNet|Revenues)", case=False, regex=True)].copy()
    if rev.empty: 
        return rev
    rev = rev.sort_values(["end","form","concept_rank"]).drop_duplicates(subset=["end","form"], keep="first")
    rev["metric"] = "Revenue"
    return rev

rev = choose_best_revenue(df)
ni  = df[df["concept"]=="us-gaap:NetIncomeLoss"].assign(metric="NetIncome")
eps_d = df[df["concept"]=="us-gaap:EarningsPerShareDiluted"].assign(metric="EPS_Diluted")
eps_b = df[df["concept"]=="us-gaap:EarningsPerShareBasic"].assign(metric="EPS_Basic")
assets = df[df["concept"]=="us-gaap:Assets"].assign(metric="Assets")
liabs  = df[df["concept"]=="us-gaap:Liabilities"].assign(metric="Liabilities")
cash   = df[df["concept"]=="us-gaap:CashAndCashEquivalentsAtCarryingValue"].assign(metric="CashAndCashEq")

keep = pd.concat([rev, ni, eps_d, eps_b, assets, liabs, cash], ignore_index=True)
keep["end"] = pd.to_datetime(keep["end"])
keep = keep.sort_values(["end","metric"])

pivot = keep.pivot_table(index=["end","form","fy","fp"], columns="metric", values="val", aggfunc="first").reset_index()
pivot = pivot.sort_values("end")
pivot
# pivot.head(20)

  rev = df[df["concept"].str.contains(r"us-gaap:(Revenue|SalesRevenueNet|Revenues)", case=False, regex=True)].copy()


metric,end,form,fy,fp,Assets,CashAndCashEq,EPS_Basic,EPS_Diluted,Liabilities,NetIncome,Revenue
0,2019-09-28,10-K,2021,FY,,,2.99,2.97,,55256000000.0,260174000000.0
1,2019-12-28,10-Q,2021,Q1,,,1.26,1.25,,22236000000.0,91819000000.0
2,2020-03-28,10-Q,2021,Q2,,,1.91,1.89,,33485000000.0,150132000000.0
3,2020-06-27,10-Q,2021,Q3,,,2.56,2.54,,44738000000.0,209817000000.0
4,2020-09-26,10-K,2021,FY,323888000000.0,38016000000.0,3.31,3.28,258549000000.0,57411000000.0,274515000000.0
5,2020-09-26,10-K,2022,FY,,,3.31,3.28,,57411000000.0,
6,2020-09-26,10-Q,2021,Q1,323888000000.0,38016000000.0,,,258549000000.0,,
7,2020-09-26,10-Q,2021,Q2,323888000000.0,38016000000.0,,,258549000000.0,,
8,2020-09-26,10-Q,2021,Q3,323888000000.0,38016000000.0,,,258549000000.0,,
9,2020-12-26,10-Q,2021,Q1,354054000000.0,36010000000.0,1.7,1.68,287830000000.0,28755000000.0,111439000000.0


In [6]:
def fetch_metrics_for_tickers(tickers, start_fy=2021):
    sess = make_session()
    all_tables = []
    for t in tickers:
        try:
            cik10 = get_cik_from_ticker(sess, t)
            facts = get_company_facts(sess, cik10)
            df = extract_facts_df(facts, start_fy=start_fy)
            # 위와 동일한 요약
            rev = choose_best_revenue(df)
            ni  = df[df["concept"]=="us-gaap:NetIncomeLoss"].assign(metric="NetIncome")
            eps = df[df["concept"]=="us-gaap:EarningsPerShareDiluted"].assign(metric="EPS_Diluted")
            tbl = pd.concat([rev, ni, eps], ignore_index=True)
            tbl["ticker"] = t.upper()
            all_tables.append(tbl)
        except Exception as e:
            print(f"[{t}] FAIL:", e)
        time.sleep(0.3)
    if not all_tables: 
        return pd.DataFrame()
    out = pd.concat(all_tables, ignore_index=True)
    out["end"] = pd.to_datetime(out["end"])
    return out

tick_tbl = fetch_metrics_for_tickers(["AAPL","MSFT","NVDA"], start_fy=2021)
tick_pivot = tick_tbl.pivot_table(index=["ticker","end","form","fy","fp"],
                                  columns="concept", values="val", aggfunc="first").reset_index()
tick_pivot.head(15)

  rev = df[df["concept"].str.contains(r"us-gaap:(Revenue|SalesRevenueNet|Revenues)", case=False, regex=True)].copy()
  rev = df[df["concept"].str.contains(r"us-gaap:(Revenue|SalesRevenueNet|Revenues)", case=False, regex=True)].copy()
  rev = df[df["concept"].str.contains(r"us-gaap:(Revenue|SalesRevenueNet|Revenues)", case=False, regex=True)].copy()


concept,ticker,end,form,fy,fp,us-gaap:EarningsPerShareDiluted,us-gaap:NetIncomeLoss,us-gaap:RevenueFromContractWithCustomerExcludingAssessedTax,us-gaap:Revenues
0,AAPL,2019-09-28,10-K,2021,FY,2.97,55256000000.0,260174000000.0,
1,AAPL,2019-12-28,10-Q,2021,Q1,1.25,22236000000.0,91819000000.0,
2,AAPL,2020-03-28,10-Q,2021,Q2,1.89,33485000000.0,150132000000.0,
3,AAPL,2020-06-27,10-Q,2021,Q3,2.54,44738000000.0,209817000000.0,
4,AAPL,2020-09-26,10-K,2021,FY,3.28,57411000000.0,274515000000.0,
5,AAPL,2020-09-26,10-K,2022,FY,3.28,57411000000.0,,
6,AAPL,2020-12-26,10-Q,2021,Q1,1.68,28755000000.0,111439000000.0,
7,AAPL,2020-12-26,10-Q,2022,Q1,1.68,28755000000.0,,
8,AAPL,2021-03-27,10-Q,2021,Q2,3.08,52385000000.0,201023000000.0,
9,AAPL,2021-03-27,10-Q,2022,Q2,3.08,52385000000.0,,


In [7]:
import pandas as pd
import numpy as np
import re
from typing import List, Tuple

# 매출 우선순위 유지
_REVENUE_PREF = {
    "us-gaap:RevenueFromContractWithCustomerExcludingAssessedTax": 1,
    "us-gaap:SalesRevenueNet": 2,
    "us-gaap:Revenues": 3
}

def choose_best_revenue(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty or "concept" not in df.columns:
        return df.iloc[0:0]
    rev = df[df["concept"].isin(_REVENUE_PREF.keys())].copy()
    if rev.empty:
        return rev
    if "concept_rank" not in rev.columns:
        rev["concept_rank"] = rev["concept"].map(_REVENUE_PREF).fillna(10)
    rev = (rev.sort_values(["end","form","concept_rank"])
             .drop_duplicates(subset=["end","form"], keep="first"))
    rev["metric"] = "Revenue"
    return rev

def _pick(df: pd.DataFrame, concept: str, metric_name: str, unit: str | None = None) -> pd.DataFrame:
    sub = df[df["concept"] == concept].copy()
    if unit and "unit" in sub.columns:
        sub = sub[sub["unit"] == unit]
    if sub.empty: 
        return sub
    sub["metric"] = metric_name
    return sub

def build_metrics_pivot(df: pd.DataFrame,
                        unit_whitelist: List[str] | None = None
                       ) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    companyfacts df(단일 기업) -> (long keep, wide pivot)
    - unit_whitelist: ["USD","USD/shares","shares"] 등 필터링
    - 파생지표: GrossMargin, OpMargin, FCF 등 계산
    """
    if df.empty:
        return df, df

    use = df.copy()
    if unit_whitelist is not None and "unit" in use.columns:
        use = use[use["unit"].isin(unit_whitelist)].copy()

    # ── 개별 지표 추출
    rev   = choose_best_revenue(use)
    cogs  = _pick(use, "us-gaap:CostOfGoodsAndServicesSold",                "COGS",           unit="USD")
    gp    = _pick(use, "us-gaap:GrossProfit",                               "GrossProfit",    unit="USD")
    rnd   = _pick(use, "us-gaap:ResearchAndDevelopmentExpense",             "R&D",            unit="USD")
    sga   = _pick(use, "us-gaap:SellingGeneralAndAdministrativeExpense",    "SG&A",           unit="USD")
    opinc = _pick(use, "us-gaap:OperatingIncomeLoss",                       "OperatingIncome",unit="USD")
    d_and_a = _pick(use,"us-gaap:DepreciationDepletionAndAmortization",     "D&A",            unit="USD")
    int_exp = _pick(use,"us-gaap:InterestExpense",                          "InterestExpense",unit="USD")
    taxexp  = _pick(use,"us-gaap:IncomeTaxExpenseBenefit",                  "IncomeTax",      unit="USD")
    pretx   = _pick(use,"us-gaap:IncomeLossFromContinuingOperationsBeforeIncomeTaxesExtraordinaryItemsNoncontrollingInterest", "PretaxIncome", unit="USD")
    ni    = _pick(use, "us-gaap:NetIncomeLoss",                              "NetIncome",      unit="USD")

    eps_b = _pick(use, "us-gaap:EarningsPerShareBasic",                     "EPS_Basic",      unit="USD/shares")
    eps_d = _pick(use, "us-gaap:EarningsPerShareDiluted",                   "EPS_Diluted",    unit="USD/shares")
    wab   = _pick(use, "us-gaap:WeightedAverageNumberOfSharesOutstandingBasic",   "WASO_Basic",   unit="shares")
    wad   = _pick(use, "us-gaap:WeightedAverageNumberOfDilutedSharesOutstanding", "WASO_Diluted", unit="shares")
    divps = _pick(use, "us-gaap:CommonStockDividendsPerShareDeclared",      "DivPS",          unit="USD/shares")

    assets   = _pick(use, "us-gaap:Assets",                                 "Assets",         unit="USD")
    assets_c = _pick(use, "us-gaap:AssetsCurrent",                          "AssetsCurrent",  unit="USD")
    cash     = _pick(use, "us-gaap:CashAndCashEquivalentsAtCarryingValue",  "CashAndCashEq",  unit="USD")
    ar       = _pick(use, "us-gaap:AccountsReceivableNetCurrent",           "AR_Net",         unit="USD")
    inv      = _pick(use, "us-gaap:InventoryNet",                           "Inventory",      unit="USD")
    ppe      = _pick(use, "us-gaap:PropertyPlantAndEquipmentNet",           "PPE_Net",        unit="USD")
    goodwill = _pick(use, "us-gaap:Goodwill",                               "Goodwill",       unit="USD")
    intang   = _pick(use, "us-gaap:IntangibleAssetsNetExcludingGoodwill",   "Intangibles",    unit="USD")
    liab     = _pick(use, "us-gaap:Liabilities",                            "Liabilities",    unit="USD")
    liab_c   = _pick(use, "us-gaap:LiabilitiesCurrent",                     "LiabilitiesCurrent", unit="USD")
    ltd_nc   = _pick(use, "us-gaap:LongTermDebtNoncurrent",                 "LongTermDebt_NC",unit="USD")
    ltd_c    = _pick(use, "us-gaap:LongTermDebtCurrent",                    "LongTermDebt_Current", unit="USD")
    stb      = _pick(use, "us-gaap:ShortTermBorrowings",                    "ShortTermBorrowings", unit="USD")
    equity   = _pick(use, "us-gaap:StockholdersEquity",                     "Equity",         unit="USD")

    cfo   = _pick(use, "us-gaap:NetCashProvidedByUsedInOperatingActivities","CFO",            unit="USD")
    cfi   = _pick(use, "us-gaap:NetCashProvidedByUsedInInvestingActivities","CFI",            unit="USD")
    cff   = _pick(use, "us-gaap:NetCashProvidedByUsedInFinancingActivities","CFF",            unit="USD")
    capex = _pick(use, "us-gaap:PaymentsToAcquirePropertyPlantAndEquipment","CapEx",          unit="USD")
    buybk = _pick(use, "us-gaap:PaymentsForRepurchaseOfCommonStock",        "Buybacks",       unit="USD")
    dvd   = _pick(use, "us-gaap:PaymentsOfDividendsCommonStock",            "DividendsPaid",  unit="USD")

    keep = pd.concat([
        rev, cogs, gp, rnd, sga, opinc, d_and_a, int_exp, taxexp, pretx, ni,
        eps_b, eps_d, wab, wad, divps,
        assets, assets_c, cash, ar, inv, ppe, goodwill, intang, liab, liab_c, ltd_nc, ltd_c, stb, equity,
        cfo, cfi, cff, capex, buybk, dvd
    ], ignore_index=True)

    if keep.empty:
        return keep, keep

    # ── 파생지표 계산용 피벗 (동일 index로 숫자 결합)
    base = (keep.pivot_table(index=["end","form","fy","fp"], columns="metric", values="val", aggfunc="first")
                 .reset_index())
    # 비율/파생치: NaN-safe 계산
    def _ratio(num, den):
        return np.where((den==0) | pd.isna(den), np.nan, num/den)

    # GrossMargin / OpMargin / NetMargin
    if "GrossProfit" in base and "Revenue" in base:
        base["GrossMargin"] = _ratio(base["GrossProfit"], base["Revenue"])
    if "OperatingIncome" in base and "Revenue" in base:
        base["OpMargin"] = _ratio(base["OperatingIncome"], base["Revenue"])
    if "NetIncome" in base and "Revenue" in base:
        base["NetMargin"] = _ratio(base["NetIncome"], base["Revenue"])

    # Free Cash Flow = CFO - CapEx (CapEx는 보통 음수 값; 부호 처리 원하는 대로 조정 가능)
    if "CFO" in base and "CapEx" in base:
        base["FCF"] = base["CFO"] - base["CapEx"]

    # NetDebt = (LongTermDebt + ShortTermBorrowings + LongTermDebt_Current) - Cash
    for col in ["LongTermDebt_NC","ShortTermBorrowings","LongTermDebt_Current","CashAndCashEq"]:
        if col not in base.columns:
            base[col] = np.nan
    base["NetDebt"] = (base["LongTermDebt_NC"].fillna(0) 
                       + base["ShortTermBorrowings"].fillna(0)
                       + base["LongTermDebt_Current"].fillna(0)
                       - base["CashAndCashEq"].fillna(0))

    # 정렬
    keep["end"] = pd.to_datetime(keep["end"])
    keep = keep.sort_values(["end","metric"])
    base = base.sort_values("end")

    return keep, base

def fetch_metrics_for_tickers(tickers: list[str],
                              start_fy: int = 2021,
                              unit_whitelist: list[str] | None = None
                             ) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    여러 티커에 대해 companyfacts를 가져와 요약(long)과 피벗(wide)을 생성.
    반환: (all_keep_long, all_pivot_wide)
    - unit_whitelist로 단위 필터링 가능(예: ["USD", "USD/shares"])
    - 이 함수는 외부에 정의된:
        - make_session(), get_cik_from_ticker(), get_company_facts(), extract_facts_df()
      를 사용한다고 가정.
    """
    sess = make_session()
    all_keep, all_pivot = [], []
    for t in tickers:
        try:
            cik10 = get_cik_from_ticker(sess, t)
            facts = get_company_facts(sess, cik10)
            df    = extract_facts_df(facts, start_fy=start_fy)  # ← 앞에서 만든 함수 재사용
            if df.empty:
                continue
            keep, pivot = build_metrics_pivot(df, unit_whitelist=unit_whitelist)
            if not keep.empty:
                keep = keep.assign(ticker=t.upper(), cik=df["cik"].iloc[0], entityName=df["entityName"].iloc[0])
                all_keep.append(keep)
            if not pivot.empty:
                pivot = pivot.assign(ticker=t.upper())
                all_pivot.append(pivot)
        except Exception as e:
            print(f"[{t}] FAIL:", e)
        time.sleep(0.3)

    keep_all  = pd.concat(all_keep, ignore_index=True)  if all_keep  else pd.DataFrame()
    pivot_all = pd.concat(all_pivot, ignore_index=True) if all_pivot else pd.DataFrame()
    # 보기 좋게 정렬
    if not keep_all.empty:
        keep_all = keep_all.sort_values(["ticker","end","metric"])
    if not pivot_all.empty:
        pivot_all = pivot_all.sort_values(["ticker","end"])
    return keep_all, pivot_all

In [None]:
# 단일 회사 df → 피벗
_, pivot_aapl = build_metrics_pivot(df, unit_whitelist=["USD","USD/shares"])
pivot_aapl.head()

# 여러 티커 한 번에
keep_all, pivot_all = fetch_metrics_for_tickers(["AAPL","MSFT","NVDA"],
                                                start_fy=2021,
                                                unit_whitelist=["USD","USD/shares"])
# pivot_all.head(20)

metric,end,form,fy,fp,AR_Net,Assets,AssetsCurrent,Buybacks,CFF,CFI,...,GrossMargin,OpMargin,NetMargin,FCF,ShortTermBorrowings,NetDebt,ticker,DividendsPaid,Goodwill,Intangibles
0,2018-09-29,10-K,2021,FY,,,,,,,...,,,,,,0.0,AAPL,,,
1,2019-09-28,10-K,2021,FY,,,,66897000000.0,-90976000000.0,45896000000.0,...,0.378178,0.24572,0.212381,58896000000.0,,0.0,AAPL,,,
2,2019-09-28,10-K,2022,FY,,,,,,,...,,,,,,0.0,AAPL,,,
3,2019-09-28,10-Q,2021,Q1,,,,,,,...,,,,,,0.0,AAPL,,,
4,2019-09-28,10-Q,2021,Q2,,,,,,,...,,,,,,0.0,AAPL,,,
5,2019-09-28,10-Q,2021,Q3,,,,,,,...,,,,,,0.0,AAPL,,,
6,2019-12-28,10-Q,2021,Q1,,,,20706000000.0,-25407000000.0,-13668000000.0,...,0.383548,0.278472,0.242172,28409000000.0,,0.0,AAPL,,,
7,2019-12-28,10-Q,2021,Q2,,,,,,,...,,,,,,0.0,AAPL,,,
8,2020-03-28,10-Q,2021,Q2,,,,39280000000.0,-46347000000.0,-4655000000.0,...,0.383576,0.255921,0.223037,39867000000.0,,0.0,AAPL,,,
9,2020-03-28,10-Q,2021,Q3,,,,,,,...,,,,,,0.0,AAPL,,,


In [9]:
import pandas as pd
import numpy as np

def dedupe_companyfacts(df: pd.DataFrame) -> pd.DataFrame:
    """
    companyfacts 추출 df를 개념/단위/기간(end) 기준으로 한 건만 남기기.
    우선순위: form(10-K > 10-Q) → filed(최신)
    df에는 extract_facts_df에서 넣은 열: concept, unit, end, form, accn, filed(추가 필요) 등이 있다고 가정.
    """
    if df.empty:
        return df

    # form 우선순위 점수
    form_rank = {"10-K": 0, "10-Q": 1}
    df = df.copy()
    df["form_rank"] = df["form"].map(form_rank).fillna(9)
    # filed가 없다면 accn으로 대체 정렬(문자열이지만 최신 accn이 보통 사전순으로도 뒤쪽)
    if "filed" in df.columns:
        df["filed"] = pd.to_datetime(df["filed"], errors="coerce")
    else:
        df["filed"] = pd.NaT

    # 정렬: 10-K 우선, filed 최신이 뒤로 오게
    df = df.sort_values(["concept","unit","end","form_rank","filed"], ascending=[True, True, True, True, True])
    # 같은 (concept, unit, end) 그룹에서 마지막(=우선순위 높고 가장 최근)을 취함
    keep = df.groupby(["concept","unit","end"], as_index=False).tail(1).reset_index(drop=True)
    return keep