In [None]:
import time
from typing import Dict, List
import numpy as np
import pandas as pd
import requests
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import os


# ---------- CONFIG ----------
UA = {"User-Agent": "Giulia Petrilli giuliapetrilli2000@gmail.com"}
BASE = "https://data.sec.gov/api"

# --- TAG GROUPS ---
REVENUE_TAGS = [
    "RevenueFromContractWithCustomerExcludingAssessedTax",
    "Revenues",
    "SalesRevenueNet"
]
GROSS_PROFIT_TAGS = ["GrossProfit"]
COST_TAGS = ["CostOfRevenue", "CostOfGoodsAndServicesSold"]
NET_INCOME_TAGS = ["NetIncomeLoss"]

OPERATING_EXP_TAGS = ["OperatingExpenses", "SellingGeneralAndAdministrativeExpenses"]
R_AND_D_TAGS = ["ResearchAndDevelopmentExpense"]
DEPR_AMORT_TAGS = ["DepreciationAndAmortization", "AmortizationExpense"]
INTEREST_EXP_TAGS = ["InterestExpense"]
INTEREST_INC_TAGS = ["InterestIncome"]
INCOMEBEFORETAX_TAGS = ["IncomeBeforeTax"]
TAX_TAGS = ["IncomeTaxExpenseBenefit", "ProvisionForIncomeTaxes"]

EPS_BASIC_TAGS = ["EarningsPerShareBasic"]
EPS_DILUTED_TAGS = ["EarningsPerShareDiluted"]
SHARES_BASIC_TAGS = ["WeightedAverageNumberOfSharesOutstandingBasic"]
SHARES_DILUTED_TAGS = ["WeightedAverageNumberOfDilutedSharesOutstanding"]

ASSETS_TAGS = ["Assets"]
ASSETS_CURR_TAGS = ["AssetsCurrent"]
LIAB_TAGS = ["Liabilities"]
LIAB_CURR_TAGS = ["LiabilitiesCurrent"]
EQUITY_TAGS = ["StockholdersEquity", "Equity"]
CASH_TAGS = ["CashAndCashEquivalentsAtCarryingValue"]
RECEIVABLES_TAGS = ["AccountsReceivableNetCurrent"]
INVENTORY_TAGS = ["InventoriesNet"]
DEBT_TAGS = ["LongTermDebt"]

CAPEX_TAGS = ["CapitalExpenditures"]
OPER_CASHFLOW_TAGS = ["NetCashProvidedByOperatingActivities"]
INV_CASHFLOW_TAGS = ["NetCashUsedForInvestingActivities"]
FIN_CASHFLOW_TAGS = ["NetCashProvidedByFinancingActivities"]

EMPLOYEE_TAGS = ["NumberOfEmployees", "WeightedAverageNumberOfEmployees"]

# ---------- UTIL / FETCH ----------
def load_ticker_map() -> Dict[str, str]:
    url = "https://www.sec.gov/files/company_tickers.json"
    r = requests.get(url, headers=UA, timeout=30)
    r.raise_for_status()
    j = r.json()
    return {v["ticker"].upper(): f'{int(v["cik_str"]):010d}' for v in j.values()}

def _get_json(url: str):
    r = requests.get(url, headers=UA, timeout=30)
    if r.status_code == 404:
        return None
    r.raise_for_status()
    return r.json()

def company_concept(cik10: str, taxonomy: str, tag: str):
    url = f"{BASE}/xbrl/companyconcept/CIK{cik10}/{taxonomy}/{tag}.json"
    return _get_json(url)

def concept_to_df(j: dict, prefer_units=("USD", "USD$", "USD (in millions)")) -> pd.DataFrame:
    if j is None:
        return pd.DataFrame(columns=["fy","fp","end","val"])
    units = j.get("units", {})
    unit_key = None
    for u in prefer_units:
        if u in units:
            unit_key = u
            break
    if unit_key is None and units:
        unit_key = next(iter(units))
    rows = units.get(unit_key, [])
    if not rows:
        return pd.DataFrame(columns=["fy","fp","end","val"])
    df = pd.DataFrame(rows)
    for c in ("fy","fp","end","val"):
        if c not in df.columns: df[c] = pd.NA
    df["fy"] = pd.to_numeric(df["fy"], errors="coerce").astype("Int64")
    df["end"] = pd.to_datetime(df["end"], errors="coerce")
    return df.sort_values(["end","fy"], na_position="last").reset_index(drop=True)[["fy","fp","end","val"]]

def quarterly_series(df: pd.DataFrame) -> pd.DataFrame:
    q = df[df["fp"].str.upper().str.startswith("Q")].copy()
    if q.empty:
        return pd.DataFrame(columns=["fy","fp","end","value"])
    q["qnum"] = q["fp"].str.upper().str.extract(r"Q(\d)").astype("Int64")
    q = (q.sort_values("end")
           .groupby(["fy","qnum"], dropna=True)
           .agg(value=("val","last"), end=("end","last"))
           .reset_index())
    q["fp"] = "Q" + q["qnum"].astype("Int64").astype(str)
    return q[["fy","fp","end","value"]].sort_values(["fy","fp"])

def _fetch_first_available(cik10: str, tags: List[str], label: str = "") -> pd.DataFrame:
    for tag in tags:
        j = company_concept(cik10, "us-gaap", tag)
        if j is None:
            continue
        df = concept_to_df(j)
        if not df.empty:
            print(f"[{label}] Using tag: {tag}")
            return df
        time.sleep(0.2)
    print(f"[{label}] No data for tags: {tags}")
    return pd.DataFrame(columns=["fy","fp","end","val"])

# ---------- MAIN FUNCTION ----------
def get_quarterly_financials(ticker: str, last_n_quarters: int = 16) -> pd.DataFrame:
    tmap = load_ticker_map()
    tk = ticker.upper()
    if tk not in tmap:
        raise ValueError(f"Ticker {ticker} not in SEC mapping.")
    cik10 = tmap[tk]

    def _add_column(df, tags, colname, label=""):
        q = quarterly_series(_fetch_first_available(cik10, tags, label=label))
        return q.rename(columns={"value": colname})

    # Core
    rev_q = _add_column(None, REVENUE_TAGS, "revenue", "Revenue")
    gp_q  = _add_column(None, GROSS_PROFIT_TAGS, "gross_profit", "Gross Profit")
    if gp_q.empty:
        cost_q = _add_column(None, COST_TAGS, "cost", "Cost (for GP derivation)")
        gp_q = rev_q.merge(cost_q, on=["fy","fp","end"], how="left")
        gp_q["gross_profit"] = gp_q["revenue"] - gp_q["cost"]
        gp_q = gp_q[["fy","fp","end","gross_profit"]]
        print("[Gross Profit] Derived from Revenue – Cost")

    ni_q = _add_column(None, NET_INCOME_TAGS, "net_income", "Net Income")

    # Expanded metrics
    op_exp_q   = _add_column(None, OPERATING_EXP_TAGS, "operating_expenses", "Operating Exp")
    rnd_q      = _add_column(None, R_AND_D_TAGS, "r_and_d", "R&D")
    da_q       = _add_column(None, DEPR_AMORT_TAGS, "depr_amort", "D&A")
    int_exp_q  = _add_column(None, INTEREST_EXP_TAGS, "interest_expense", "Interest Exp")
    int_inc_q  = _add_column(None, INTEREST_INC_TAGS, "interest_income", "Interest Inc")
    ebt_q      = _add_column(None, INCOMEBEFORETAX_TAGS, "income_before_tax", "Income Before Tax")
    tax_q      = _add_column(None, TAX_TAGS, "tax_expense", "Tax Expense")

    eps_b_q    = _add_column(None, EPS_BASIC_TAGS, "eps_basic", "EPS Basic")
    eps_d_q    = _add_column(None, EPS_DILUTED_TAGS, "eps_diluted", "EPS Diluted")
    sh_b_q     = _add_column(None, SHARES_BASIC_TAGS, "shares_basic", "Shares Basic")
    sh_d_q     = _add_column(None, SHARES_DILUTED_TAGS, "shares_diluted", "Shares Diluted")

    assets_q   = _add_column(None, ASSETS_TAGS, "assets", "Assets")
    assets_c_q = _add_column(None, ASSETS_CURR_TAGS, "assets_current", "Assets Current")
    liab_q     = _add_column(None, LIAB_TAGS, "liabilities", "Liabilities")
    liab_c_q   = _add_column(None, LIAB_CURR_TAGS, "liabilities_current", "Liabilities Current")
    equity_q   = _add_column(None, EQUITY_TAGS, "equity", "Equity")
    cash_q     = _add_column(None, CASH_TAGS, "cash", "Cash")
    ar_q       = _add_column(None, RECEIVABLES_TAGS, "accounts_receivable", "AR")
    inv_q      = _add_column(None, INVENTORY_TAGS, "inventory", "Inventory")
    debt_q     = _add_column(None, DEBT_TAGS, "long_term_debt", "Debt")

    capex_q    = _add_column(None, CAPEX_TAGS, "capex", "CapEx")
    ocf_q      = _add_column(None, OPER_CASHFLOW_TAGS, "operating_cf", "Op Cash Flow")
    icf_q      = _add_column(None, INV_CASHFLOW_TAGS, "investing_cf", "Inv Cash Flow")
    fcf_q      = _add_column(None, FIN_CASHFLOW_TAGS, "financing_cf", "Fin Cash Flow")

    emp_q      = _add_column(None, EMPLOYEE_TAGS, "employees", "Employees")

    # Merge all
    dfs = [
        rev_q, gp_q, ni_q, op_exp_q, rnd_q, da_q, int_exp_q, int_inc_q,
        ebt_q, tax_q, eps_b_q, eps_d_q, sh_b_q, sh_d_q,
        assets_q, assets_c_q, liab_q, liab_c_q, equity_q,
        cash_q, ar_q, inv_q, debt_q,
        capex_q, ocf_q, icf_q, fcf_q,
        emp_q
    ]
    df = dfs[0]
    for d in dfs[1:]:
        df = df.merge(d, on=["fy","fp","end"], how="outer")

    # Margins
    df["gross_margin_pct"] = df["gross_profit"] / df["revenue"]
    df["net_margin_pct"]   = df["net_income"]  / df["revenue"]

    if last_n_quarters:
        df = df.sort_values("end").tail(last_n_quarters).reset_index(drop=True)
    return df


In [None]:

if __name__ == "__main__":
    # --- USER INPUTS ---
    boycotted = False  # set to True for boycott targets, False for control group
    tickers = ["SN", "WEN", "GIS"]  # <--- add any tickers you want here
    last_n_quarters = 200

    base_path = "/Users/giuliamariapetrilli/Documents/GitHub/masters_thesis/data"

    # --- LOOP THROUGH TICKERS ---
    for TICKER in tickers:
        print(f"\n===== Processing {TICKER.upper()} =====")

        try:
            # Fetch data
            df = get_quarterly_financials(TICKER, last_n_quarters=last_n_quarters)
            if df.empty:
                print(f"⚠️ No data returned for {TICKER}. Skipping.")
                continue

            # Add metadata columns
            df.insert(0, 'ticker', TICKER.upper())
            df.insert(0, 'boycotted', int(boycotted))

            # Choose folder based on flag
            if boycotted:
                folder = f"{base_path}/boycott_target/{TICKER.lower()}"
            else:
                folder = f"{base_path}/control_group/{TICKER.lower()}"

            # Create folder if needed
            os.makedirs(folder, exist_ok=True)

            # Save file
            output_path = f"{folder}/{TICKER.lower()}_quarterly.csv"
            df.to_csv(output_path, index=False)

            # Show confirmation
            print(f"✅ Saved {TICKER} data to: {output_path}")
            print(f"   → Shape: {df.shape}")
            print(f"   → Columns: {list(df.columns)}")

        except Exception as e:
            print(f"❌ Error processing {TICKER}: {e}")


In [None]:
# check if the df gets generated
if __name__ == "__main__":
    TICKER = "NKE"  # replace with any ticker
    df = get_quarterly_financials(TICKER, last_n_quarters=200)

    print(df)

    # Show all columns and shape
    print(df.columns)
    print(df.shape)

In [None]:
# single company

if __name__ == "__main__":
    boycotted = False # set to False for control group
    TICKER = "SN"  # replace with any ticker
    df = get_quarterly_financials(TICKER, last_n_quarters= 200)

    print(df)

    df.insert(0, 'ticker', TICKER.upper())

    if boycotted:
        df.insert(0, 'boycotted', 1)
    else:
        df.insert(0, 'boycotted', 0)

    # Choose folder based on boycotted flag
    base_path = "/Users/giuliamariapetrilli/Documents/GitHub/masters_thesis/data"
    if boycotted:
        folder = f"{base_path}/boycott_target/{TICKER.lower()}"
    else:
        folder = f"{base_path}/control_group/{TICKER.lower()}"

    # Create folder if it doesn’t exist
    os.makedirs(folder, exist_ok=True)

    # Save the file
    output_path = f"{folder}/{TICKER.lower()}_quarterly.csv"
    df.to_csv(output_path, index=False)

    # Show all columns and shape
    print(df.columns)
    print(df.shape)

    print(f"✅ File saved to: {output_path}")
