In [1]:
import requests
import pandas as pd
from edgar_functions import *
import json
import os
import yfinance as yf

In [2]:
api_key = "219428616cc21644822bddb2fad4a7a3"
statement = "income-statement"
period = "annual"
ticker = "HD"
yf_ticker = yf.Ticker(ticker)
# simplified_url = f"https://financialmodelingprep.com/api/v3/{statement}/{ticker}?period={period}&apikey={api_key}"
# as_reported_url = f"https://financialmodelingprep.com/api/v3/{statement}-as-reported/{ticker}?period={period}&apikey={api_key}"
# full_statement = f"https://financialmodelingprep.com/api/v3/financial-statement-full-as-reported/{ticker}?period={period}&apikey={api_key}"
cik = get_cik_matching_ticker(ticker)
# simplified_response = requests.get(simplified_url)
# as_reported_response = requests.get(as_reported_url)
# full_statement_response = requests.get(full_statement)
# income_statement = pd.DataFrame(simplified_response.json())
# income_statement_as_reported = pd.DataFrame(as_reported_response.json())
# full_statement = pd.DataFrame(full_statement_response.json())

In [3]:
json_file = f"companyfacts/CIK{cik}.json"
with open(json_file, "r") as f:
    json_data = json.load(f)

In [12]:
us_gaap_facts = json_data.get("facts", {}).get("us-gaap", {})
facts_df_instantaneous_fixed = pd.DataFrame()
facts_df_quarterly_fixed = pd.DataFrame()
facts_df_annual_fixed = pd.DataFrame()


for i, (fact, details) in enumerate(us_gaap_facts.items()):
    units = details.get("units", {})
    unit_key = list(units.keys())[0] if units else None

    if unit_key:
        data = units[unit_key]
        temp_df = pd.DataFrame(data)

        if "frame" in temp_df.columns:
            temp_df = temp_df[temp_df["frame"].notnull()]

            # Filter by frame type and append to the corresponding DataFrame
            temp_df_instantaneous = temp_df[temp_df["frame"].str.contains("I")].copy()
            temp_df_quarterly = temp_df[
                temp_df["frame"].str.contains("Q") & ~temp_df["frame"].str.contains("I")
            ].copy()
            temp_df_annual = temp_df[
                temp_df["frame"].str.contains("CY")
                & ~temp_df["frame"].str.contains("Q")
            ].copy()

            if not temp_df_instantaneous.empty:
                temp_df_instantaneous.loc[:, "fact"] = fact
                facts_df_instantaneous_fixed = pd.concat(
                    [facts_df_instantaneous_fixed, temp_df_instantaneous]
                ).reset_index(drop=True)

            if not temp_df_quarterly.empty:
                temp_df_quarterly.loc[:, "fact"] = fact
                facts_df_quarterly_fixed = pd.concat(
                    [facts_df_quarterly_fixed, temp_df_quarterly]
                ).reset_index(drop=True)

            if not temp_df_annual.empty:
                temp_df_annual.loc[:, "fact"] = fact
                facts_df_annual_fixed = pd.concat(
                    [facts_df_annual_fixed, temp_df_annual]
                ).reset_index(drop=True)

In [None]:
def get_fmp_statement(cik, statement, period, api_key):
    url = f"https://financialmodelingprep.com/api/v3/{statement}/{cik}?period={period}&apikey={api_key}"
    response = requests.get(url)
    fmp_statement_df = pd.DataFrame(response.json())
    return fmp_statement_df


def get_edgar_facts_from_downloaded_statement(cik):
    json_file = f"companyfacts/CIK{cik}.json"
    with open(json_file, "r") as f:
        json_data = json.load(f)
    us_gaap_facts = json_data.get("facts", {}).get("us-gaap", {})
    full_dataframe = pd.DataFrame()
    for i, (fact, details) in enumerate(us_gaap_facts.items()):
        units = details.get("units", {})
        unit_key = list(units.keys())[0] if units else None
        if unit_key:
            data = units[unit_key]
            temp_df = pd.DataFrame(data)

            if "frame" in temp_df.columns:
                temp_df = temp_df[temp_df["frame"].notnull()]
                temp_df = temp_df[temp_df["form"] == "10-K"]
                if not temp_df.empty:
                    temp_df.loc[:, "fact"] = fact

                full_dataframe = pd.concat([full_dataframe, temp_df])
    return full_dataframe


def create_statement_mapping(edgar_df, statement):
    filtered_mapping_dict = {}
    for col in statement.columns:
        if statement[col].dtype in ["float64", "int64"]:
            filtered_values = statement[col].dropna()
            filtered_values = filtered_values[filtered_values != 0]
            filtered_full_df = edgar_df.dropna(subset=["val"])
            filtered_full_df = filtered_full_df[filtered_full_df["val"] != 0]
            matching_facts = []
            for value in filtered_values:
                matching_rows = filtered_full_df[filtered_full_df["val"] == value]
                if not matching_rows.empty:
                    matching_facts.extend(matching_rows["fact"].tolist())
            if matching_facts:
                filtered_mapping_dict[col] = list(set(matching_facts))
    return filtered_mapping_dict


def create_three_statement_naming_mappings(cik, period, api_key):
    balance_sheet = get_fmp_statement(cik, "balance-sheet-statement", period, api_key)
    income_statement = get_fmp_statement(cik, "income-statement", period, api_key)
    cash_flow_statement = get_fmp_statement(cik, "cash-flow-statement", period, api_key)
    full_facts = get_edgar_facts_from_downloaded_statement(cik)
    balance_sheet_mapping = create_statement_mapping(full_facts, balance_sheet)
    income_statement_mapping = create_statement_mapping(full_facts, income_statement)
    cash_flow_statement_mapping = create_statement_mapping(
        full_facts, cash_flow_statement
    )
    return balance_sheet_mapping, income_statement_mapping, cash_flow_statement_mapping


def update_three_statement_mappings(
    existing_balance_mapping,
    existing_income_mapping,
    existing_cash_flow_mapping,
    tickers,
    period,
    api_key,
):
    updated_balance_mapping = existing_balance_mapping.copy()
    updated_income_mapping = existing_income_mapping.copy()
    updated_cash_flow_mapping = existing_cash_flow_mapping.copy()
    for ticker in tickers:
        cik = get_cik_matching_ticker(
            ticker
        )  # Replace with your method to get CIK from ticker
        balance_sheet = get_fmp_statement(
            cik, "balance-sheet-statement", period, api_key
        )
        income_statement = get_fmp_statement(cik, "income-statement", period, api_key)
        cash_flow_statement = get_fmp_statement(
            cik, "cash-flow-statement", period, api_key
        )
        full_facts = get_edgar_facts_from_downloaded_statement(cik)
        new_balance_mapping = create_statement_mapping(full_facts, balance_sheet)
        new_income_mapping = create_statement_mapping(full_facts, income_statement)
        new_cash_flow_mapping = create_statement_mapping(
            full_facts, cash_flow_statement
        )
        for key, value in new_balance_mapping.items():
            updated_balance_mapping.setdefault(key, []).extend(value)
            updated_balance_mapping[key] = list(set(updated_balance_mapping[key]))
        for key, value in new_income_mapping.items():
            updated_income_mapping.setdefault(key, []).extend(value)
            updated_income_mapping[key] = list(set(updated_income_mapping[key]))
        for key, value in new_cash_flow_mapping.items():
            updated_cash_flow_mapping.setdefault(key, []).extend(value)
            updated_cash_flow_mapping[key] = list(set(updated_cash_flow_mapping[key]))

    return updated_balance_mapping, updated_income_mapping, updated_cash_flow_mapping

In [None]:
def update_three_statement_mappings_from_json(
    existing_balance_mapping_json,
    existing_income_mapping_json,
    existing_cash_flow_mapping_json,
    tickers,
    period,
    api_key,
):
    with open(existing_balance_mapping_json, "r") as f:
        existing_balance_mapping = json.load(f)
    with open(existing_income_mapping_json, "r") as f:
        existing_income_mapping = json.load(f)
    with open(existing_cash_flow_mapping_json, "r") as f:
        existing_cash_flow_mapping = json.load(f)

    updated_balance_mapping = existing_balance_mapping.copy()
    updated_income_mapping = existing_income_mapping.copy()
    updated_cash_flow_mapping = existing_cash_flow_mapping.copy()

    for ticker in tickers:
        cik = get_cik_matching_ticker(ticker)

        balance_sheet = get_fmp_statement(
            cik, "balance-sheet-statement", period, api_key
        )
        income_statement = get_fmp_statement(cik, "income-statement", period, api_key)
        cash_flow_statement = get_fmp_statement(
            cik, "cash-flow-statement", period, api_key
        )
        full_facts = get_edgar_facts_from_downloaded_statement(cik)

        new_balance_mapping = create_statement_mapping(full_facts, balance_sheet)
        new_income_mapping = create_statement_mapping(full_facts, income_statement)
        new_cash_flow_mapping = create_statement_mapping(
            full_facts, cash_flow_statement
        )

        for key, value in new_balance_mapping.items():
            updated_balance_mapping.setdefault(key, []).extend(value)
            updated_balance_mapping[key] = list(set(updated_balance_mapping[key]))

        for key, value in new_income_mapping.items():
            updated_income_mapping.setdefault(key, []).extend(value)
            updated_income_mapping[key] = list(set(updated_income_mapping[key]))

        for key, value in new_cash_flow_mapping.items():
            updated_cash_flow_mapping.setdefault(key, []).extend(value)
            updated_cash_flow_mapping[key] = list(set(updated_cash_flow_mapping[key]))

    output_folder = "updated_mapping"
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    with open(os.path.join(output_folder, "updated_balance_mapping.json"), "w") as f:
        json.dump(updated_balance_mapping, f)
    with open(os.path.join(output_folder, "updated_income_mapping.json"), "w") as f:
        json.dump(updated_income_mapping, f)
    with open(os.path.join(output_folder, "updated_cash_flow_mapping.json"), "w") as f:
        json.dump(updated_cash_flow_mapping, f)

    return "Mappings updated and saved to JSON files in the 'updated_mapping' folder."

In [None]:
tickers = ["AAPL", "TSLA", "MSFT", "AMD"]
existing_balance_mapping = "updated_mapping/updated_balance_mapping.json"
existing_income_mapping = "updated_mapping/updated_income_mapping.json"
existing_cash_flow_mapping = "updated_mapping/updated_cash_flow_mapping.json"

In [None]:
url = f"https://en.wikipedia.org/wiki/S%26P_100"
response = requests.get(url)
tickers = pd.read_html(response.text)[2]["Symbol"].tolist()

In [None]:
update_three_statement_mappings_from_json(
    existing_balance_mapping,
    existing_income_mapping,
    existing_cash_flow_mapping,
    tickers,
    "annual",
    api_key,
)

In [13]:
val = 157403000000

In [14]:
facts_df_instantaneous_fixed[facts_df_instantaneous_fixed["val"] == val]

Unnamed: 0,end,val,accn,fy,fp,form,filed,frame,fact


In [15]:
facts_df_quarterly_fixed[facts_df_quarterly_fixed["val"] == val]

Unnamed: 0,start,end,val,accn,fy,fp,form,filed,frame,fact


In [16]:
facts_df_annual_fixed[facts_df_annual_fixed["val"] == val]

Unnamed: 0,start,end,val,accn,fy,fp,form,filed,frame,fact
1349,2022-01-31,2023-01-29,157403000000.0,0000354950-23-000059,2022,FY,10-K,2023-03-15,CY2022,RevenueFromContractWithCustomerExcludingAssess...
