In [6]:
# Author: Ferangiz Abdurakhmonova

from __future__ import annotations

import math
import os
import sys
from typing import Dict, List, Optional, Tuple

import pandas as pd
import numpy as np

try:
    from scipy.stats import chi2
    _HAS_SCIPY = True
except Exception:
    _HAS_SCIPY = False


# -------------------------- Utility helpers -------------------------- #

def safe_div(n: float | int, d: float | int) -> float:
    try:
        if d is None or pd.isna(d) or float(d) == 0.0:
            return float('nan')
        return float(n) / float(d)
    except Exception:
        return float('nan')


def first_digit(x: float | int) -> Optional[int]:
    if x is None or pd.isna(x):
        return None
    try:
        v = abs(float(x))
    except Exception:
        return None
    if v <= 0:
        return None

    s = f"{v:.12g}"
    s = s.lstrip('0').lstrip('.')
    for ch in s:
        if ch.isdigit() and ch != '0':
            return int(ch)
        
    while v < 1:
        v *= 10
    while v >= 10:
        v /= 10
    d = int(v)
    return d if 1 <= d <= 9 else None


# -------------------------- Benford -------------------------- #

def compute_benford(df: pd.DataFrame, amount_col: str) -> Dict[str, object]:

    if amount_col not in df.columns:
        raise KeyError(f"Amount column '{amount_col}' not in transactions DataFrame")

    digits = [first_digit(v) for v in df[amount_col].values]
    digits = [d for d in digits if d is not None]
    n = len(digits)
    if n == 0:
        raise ValueError("No positive numeric amounts found for Benford analysis")

    counts = np.array([digits.count(d) for d in range(1, 10)], dtype=float)
    observed = counts / counts.sum()
    expected = np.array([math.log10(1 + 1/d) for d in range(1, 10)], dtype=float)

    mad = float(np.mean(np.abs(observed - expected)))

    expected_counts = expected * n

    chi2_stat = float(np.sum((counts - expected_counts) ** 2 / expected_counts))

    if _HAS_SCIPY:

        p_value = float(chi2.sf(chi2_stat, df=8))
    else:
        p_value = None

    if mad < 0.006:
        conformity = "Close"
    elif mad < 0.012:
        conformity = "Acceptable"
    elif mad < 0.015:
        conformity = "Marginal"
    else:
        conformity = "Nonconformity"

    return {
        "expected_probs": expected,
        "observed_probs": observed,
        "counts": counts.astype(int),
        "n": n,
        "mad": mad,
        "chi2": chi2_stat,
        "p_value": p_value,
        "conformity": conformity,
    }


# -------------------------- Beneish M-Score -------------------------- #

def _beneish_components(curr: pd.Series, prev: pd.Series) -> Dict[str, float]:

    Sales_t, Sales_p = curr.get("Sales"), prev.get("Sales")
    COGS_t, COGS_p = curr.get("COGS"), prev.get("COGS")
    AR_t, AR_p = curr.get("Receivables"), prev.get("Receivables")
    CA_t, CA_p = curr.get("CurrentAssets"), prev.get("CurrentAssets")
    PPE_t, PPE_p = curr.get("PPE"), prev.get("PPE")
    TA_t, TA_p = curr.get("TotalAssets"), prev.get("TotalAssets")
    Dep_t, Dep_p = curr.get("Depreciation"), prev.get("Depreciation")
    SGA_t, SGA_p = curr.get("SGA"), prev.get("SGA")
    TL_t, TL_p = curr.get("TotalLiabilities"), prev.get("TotalLiabilities")

    dsri = safe_div(safe_div(AR_t, Sales_t), safe_div(AR_p, Sales_p))

    gm_t = safe_div((Sales_t - COGS_t), Sales_t)
    gm_p = safe_div((Sales_p - COGS_p), Sales_p)
    gmi = safe_div(gm_p, gm_t)

    aq_t = safe_div((TA_t - (CA_t + PPE_t)), TA_t)
    aq_p = safe_div((TA_p - (CA_p + PPE_p)), TA_p)
    aqi = safe_div(aq_t, aq_p)

    sgi = safe_div(Sales_t, Sales_p)

    dep_rate_t = safe_div(Dep_t, (PPE_t + Dep_t))
    dep_rate_p = safe_div(Dep_p, (PPE_p + Dep_p))
    depi = safe_div(dep_rate_p, dep_rate_t)

    sgai = safe_div(safe_div(SGA_t, Sales_t), safe_div(SGA_p, Sales_p))

    NI_t = curr.get("NetIncome")
    CFO_t = curr.get("CFO")
    tata = safe_div((NI_t - CFO_t), TA_t)

    lvgi = safe_div(safe_div(TL_t, TA_t), safe_div(TL_p, TA_p))

    return {
        "DSRI": dsri, "GMI": gmi, "AQI": aqi, "SGI": sgi,
        "DEPI": depi, "SGAI": sgai, "TATA": tata, "LVGI": lvgi
    }


def beneish_m_score(curr: pd.Series, prev: pd.Series) -> Tuple[float, Dict[str, float]]:
    comps = _beneish_components(curr, prev)
    m = (
        -4.84
        + 0.92 * comps["DSRI"]
        + 0.528 * comps["GMI"]
        + 0.404 * comps["AQI"]
        + 0.892 * comps["SGI"]
        + 0.115 * comps["DEPI"]
        - 0.172 * comps["SGAI"]
        + 4.679 * comps["TATA"]
        - 0.327 * comps["LVGI"]
    )
    return m, comps


# -------------------------- Altman Z (non-mfg) -------------------------- #

def altman_z_non_mfg(curr: pd.Series) -> float:
    CA = curr.get("CurrentAssets")
    CL = curr.get("CurrentLiabilities")
    TA = curr.get("TotalAssets")
    RE = curr.get("RetainedEarnings", np.nan)
    EBIT = curr.get("EBIT")
    TL = curr.get("TotalLiabilities")
    Equity = curr.get("ShareholdersEquity", np.nan)

    if pd.isna(RE):
        pass

    WC = None if (pd.isna(CA) or pd.isna(CL)) else (CA - CL)
    X1 = safe_div(WC, TA)
    X2 = safe_div(RE, TA)
    X3 = safe_div(EBIT, TA)

    if pd.isna(Equity):
        if not pd.isna(TA) and not pd.isna(TL):
            Equity = TA - TL
    X4 = safe_div(Equity, TL)

    z = 6.56 * X1 + 3.26 * X2 + 6.72 * X3 + 1.05 * X4
    return z


# -------------------------- Piotroski F-Score -------------------------- #

def piotroski_f_score(curr: pd.Series, prev: pd.Series) -> Tuple[int, Dict[str, int]]:
    TA_t, TA_p = curr.get("TotalAssets"), prev.get("TotalAssets")
    NI_t, NI_p = curr.get("NetIncome"), prev.get("NetIncome")
    CFO_t = curr.get("CFO")

    roa_t = safe_div(NI_t, TA_t)
    roa_p = safe_div(NI_p, TA_p)
    s1 = int(roa_t > 0)
    s2 = int(CFO_t is not None and not pd.isna(CFO_t) and CFO_t > 0)
    s3 = int(roa_t > roa_p)
    s4 = int((CFO_t - NI_t) > 0) if (CFO_t is not None and not pd.isna(CFO_t) and NI_t is not None and not pd.isna(NI_t)) else 0

    LTD_t = curr.get("LongTermDebt", np.nan)
    LTD_p = prev.get("LongTermDebt", np.nan)
    if pd.isna(LTD_t) or pd.isna(LTD_p):
        lev_t = safe_div(curr.get("TotalLiabilities"), TA_t)
        lev_p = safe_div(prev.get("TotalLiabilities"), TA_p)
    else:
        lev_t = safe_div(LTD_t, TA_t)
        lev_p = safe_div(LTD_p, TA_p)

    s5 = int(lev_t < lev_p) if (not pd.isna(lev_t) and not pd.isna(lev_p)) else 0

    cr_t = safe_div(curr.get("CurrentAssets"), curr.get("CurrentLiabilities"))
    cr_p = safe_div(prev.get("CurrentAssets"), prev.get("CurrentLiabilities"))
    s6 = int(cr_t > cr_p) if (not pd.isna(cr_t) and not pd.isna(cr_p)) else 0

    so_t = curr.get("SharesOutstanding", np.nan)
    so_p = prev.get("SharesOutstanding", np.nan)
    s7 = int(so_t <= so_p) if (not pd.isna(so_t) and not pd.isna(so_p)) else 0

    Sales_t, Sales_p = curr.get("Sales"), prev.get("Sales")
    COGS_t, COGS_p = curr.get("COGS"), prev.get("COGS")

    gm_t = safe_div((Sales_t - COGS_t), Sales_t)
    gm_p = safe_div((Sales_p - COGS_p), Sales_p)
    s8 = int(gm_t > gm_p) if (not pd.isna(gm_t) and not pd.isna(gm_p)) else 0

    at_t = safe_div(Sales_t, TA_t)
    at_p = safe_div(Sales_p, TA_p)
    s9 = int(at_t > at_p) if (not pd.isna(at_t) and not pd.isna(at_p)) else 0

    parts = {f"S{i}": v for i, v in enumerate([s1, s2, s3, s4, s5, s6, s7, s8, s9], start=1)}
    return int(sum(parts.values())), parts

def earnings_quality(curr: pd.Series) -> float:
    NI = curr.get("NetIncome")
    CFO = curr.get("CFO")
    return safe_div(CFO, NI)


# -------------------------- Data model setup -------------------------- #
REQUIRED_FINANCIALS = [
    "Company", "Period", "TotalAssets", "TotalLiabilities", "CurrentAssets",
    "CurrentLiabilities", "Receivables", "Inventory", "PPE", "Depreciation",
    "Sales", "COGS", "SGA", "EBIT", "NetIncome", "CFO"
]

OPTIONAL_FINANCIALS = ["LongTermDebt", "SharesOutstanding", "Intangibles", "ShareholdersEquity", "RetainedEarnings"]

TRANSACTION_COLS = ["Company", "Date", "Amount"]


def create_synthetic_data(outdir: str = ".") -> Tuple[str, str]:
    outdir = os.path.abspath(outdir)
    os.makedirs(outdir, exist_ok=True)

    tx_path = os.path.join(outdir, "transactions_template.csv")
    fin_path = os.path.join(outdir, "financials_template.csv")

    pd.DataFrame({
        "Company": ["ACME Inc", "ACME Inc"],
        "Date": ["2024-12-01", "2024-12-02"],
        "Amount": [1250.75, 987.30],
    }).to_csv(tx_path, index=False)

    pd.DataFrame([
        {
            "Company": "ACME Inc", "Period": 2023, "TotalAssets": 1000000, "TotalLiabilities": 600000,
            "CurrentAssets": 300000, "CurrentLiabilities": 200000, "Receivables": 80000, "Inventory": 120000,
            "PPE": 400000, "Depreciation": 50000, "Sales": 900000, "COGS": 600000, "SGA": 120000,
            "EBIT": 100000, "NetIncome": 70000, "CFO": 85000, "LongTermDebt": 300000,
            "SharesOutstanding": 1000000, "Intangibles": 50000, "ShareholdersEquity": 400000, "RetainedEarnings": 150000
        },
        {
            "Company": "ACME Inc", "Period": 2024, "TotalAssets": 1100000, "TotalLiabilities": 630000,
            "CurrentAssets": 320000, "CurrentLiabilities": 210000, "Receivables": 90000, "Inventory": 130000,
            "PPE": 450000, "Depreciation": 52000, "Sales": 950000, "COGS": 620000, "SGA": 130000,
            "EBIT": 110000, "NetIncome": 76000, "CFO": 90000, "LongTermDebt": 310000,
            "SharesOutstanding": 1000000, "Intangibles": 55000, "ShareholdersEquity": 470000, "RetainedEarnings": 180000
        },
    ]).to_csv(fin_path, index=False)

    return tx_path, fin_path


def read_csv_maybe(path: str) -> Optional[pd.DataFrame]:
    if not path:
        return None
    if not os.path.exists(path):
        print("No csv file was found")
        return None
    try:
        df = pd.read_csv(path)
        if df.empty:
            print("No csv file was found")
            return None
        return df
    except Exception:
        print("No csv file was found")
        return None


def validate_financials_columns(df: pd.DataFrame) -> List[str]:
    missing = [c for c in REQUIRED_FINANCIALS if c not in df.columns]
    return missing


def latest_pair_by_company(fin: pd.DataFrame) -> Dict[str, Tuple[pd.Series, pd.Series]]:
    pairs: Dict[str, Tuple[pd.Series, pd.Series]] = {}
    for comp, g in fin.groupby("Company"):
        gg = g.sort_values("Period").dropna(subset=["Period"])  # oldest -> newest
        if len(gg) < 2:
            continue
        prev = gg.iloc[-2]
        curr = gg.iloc[-1]
        pairs[comp] = (curr, prev)
    return pairs


def summarize_methods(
    benford: Optional[Dict[str, object]],
    company_rows: List[Dict[str, object]],
) -> pd.DataFrame:
    rows = []

    if benford is not None:
        rows.append({
            "Method": "Benford's Compliance",
            "Result": f"MAD={benford['mad']:.4f}; chi2={benford['chi2']:.2f}" + (f"; p={benford['p_value']:.4f}" if benford['p_value'] is not None else ""),
            "Rule/Threshold": "MAD < 0.006 close; < 0.012 acceptable; < 0.015 marginal",
            "Interpretation": f"{benford['conformity']} conformity",
            "Notes": f"n={benford['n']} transactions; amount column analyzed"
        })

    for row in company_rows:
        rows.append({
            "Method": f"Beneish M-Score ({row['Company']})",
            "Result": f"{row.get('MScore', np.nan):.3f}",
            "Rule/Threshold": "Manipulation risk if M > -2.22",
            "Interpretation": "Risk" if (row.get('MScore', np.nan) > -2.22) else "Low/Unknown",
            "Notes": "Computed from two periods"
        })
        rows.append({
            "Method": f"Altman Z (non-mfg) ({row['Company']})",
            "Result": f"{row.get('AltmanZ', np.nan):.3f}",
            "Rule/Threshold": "Distress < 1.1; Safe > 2.6",
            "Interpretation": (
                "Distress" if row.get('AltmanZ', np.nan) < 1.1 else (
                    "Safe" if row.get('AltmanZ', np.nan) > 2.6 else "Grey"
                )
            ),
            "Notes": "Per 4-factor model"
        })
        rows.append({
            "Method": f"Piotroski F-Score ({row['Company']})",
            "Result": f"{int(row.get('PiotroskiF', 0))}/9",
            "Rule/Threshold": "Higher is stronger (0–9)",
            "Interpretation": (
                "Strong" if row.get('PiotroskiF', 0) >= 7 else (
                    "Weak" if row.get('PiotroskiF', 0) <= 3 else "Neutral"
                )
            ),
            "Notes": "9 binary signals"
        })
        rows.append({
            "Method": f"Earnings Quality QoE ({row['Company']})",
            "Result": f"{row.get('QoE', np.nan):.3f}",
            "Rule/Threshold": "~1 or higher preferred; <1 may be weak",
            "Interpretation": (
                "Low" if row.get('QoE', np.nan) < 1 else "OK/High"
            ),
            "Notes": "CFO / NetIncome"
        })

    return pd.DataFrame(rows)


def main() -> None:
    print("This program calculates 5 methods for financial capitals.")
    print("Choose an action:\n  [1] Run program based on input data \n  [2] Generate synthetic data")
    action = input("Enter 1 or 2: ").strip()

    if action == "2":
        tx_path, fin_path = create_synthetic_data()
        print(f"Synthetic data is generated. Paths:\n - {tx_path}\n - {fin_path}")
        return

    if action != "1":
        print("Exiting.")
        return 

    fin_path = input("Path to FINANCIALS input data (format: .csv (leave blank to skip): ").strip()
    tx_path = input("Path to TRANSACTIONS input data for Benford (format: .csv (leave blank to skip): ").strip()

    fin_df = read_csv_maybe(fin_path)
    tx_df = read_csv_maybe(tx_path)

    benford_result: Optional[Dict[str, object]] = None
    company_rows: List[Dict[str, object]] = []

    if tx_df is not None:
        amount_col = None
 
        for cand in ["Amount", "amount", "AMOUNT", "Value", "value"]:
            if cand in tx_df.columns:
                amount_col = cand
                break
        if not amount_col:
            print("Columns available in transactions CSV:", list(tx_df.columns))
            amount_col = input("Enter the column name that holds transaction amounts: ").strip()
        if amount_col not in tx_df.columns:
            print("No csv file was found")
        else:
            print(f"Using transactions CSV: {tx_path}")
            print(f"Amount column: {amount_col}")
            try:
                benford_result = compute_benford(tx_df, amount_col)

                observed = pd.DataFrame({
                    "Digit": list(range(1, 10)),
                    "ExpectedProb": benford_result["expected_probs"],
                    "ObservedProb": benford_result["observed_probs"],
                    "Count": benford_result["counts"],
                })
                observed.to_csv("benford_details.csv", index=False)
            except Exception as e:
                print(f"Benford computation error: {e}")


    if fin_df is not None:
        missing = validate_financials_columns(fin_df)
        if missing:
            print("Financials CSV is missing required columns:", missing)
            print("No csv file was found")
        else:
            print(f"Using financials CSV: {fin_path}")
            pairs = latest_pair_by_company(fin_df)
            if not pairs:
                print("No company has two periods — skipping financial-based methods.")
            for comp, (curr, prev) in pairs.items():
                row: Dict[str, object] = {"Company": comp, "Period": curr.get("Period")}

                try:
                    mscore, comps = beneish_m_score(curr, prev)
                    row["MScore"] = mscore
                    for k, v in comps.items():
                        row[k] = v
                except Exception as e:
                    row["MScore"] = np.nan
                    row["_beneish_error"] = str(e)

                try:
                    row["AltmanZ"] = altman_z_non_mfg(curr)
                except Exception as e:
                    row["AltmanZ"] = np.nan
                    row["_altman_error"] = str(e)

                try:
                    f, parts = piotroski_f_score(curr, prev)
                    row["PiotroskiF"] = f
                    for k, v in parts.items():
                        row[f"F_{k}"] = v
                except Exception as e:
                    row["PiotroskiF"] = np.nan
                    row["_piotroski_error"] = str(e)

                try:
                    row["QoE"] = earnings_quality(curr)
                except Exception as e:
                    row["QoE"] = np.nan
                    row["_qoe_error"] = str(e)

                company_rows.append(row)

            if company_rows:
                pd.DataFrame(company_rows).to_csv("company_metrics.csv", index=False)

    if (tx_df is None) and (fin_df is None):
        print("No csv file was found")
        return

    summary = summarize_methods(benford_result, company_rows)
    if summary.empty:
        print("No results to display (insufficient data).")
        return

    summary.to_csv("results_summary.csv", index=False)

    with pd.option_context('display.max_rows', None, 'display.max_colwidth', 60):
        print("\n=== Results Summary (Table Style) ===")
        print(summary.to_string(index=False))
        print("\nSaved: results_summary.csv, company_metrics.csv, benford_details.csv (if applicable)")


if __name__ == "__main__":
    main()


This program calculates 5 methods for financial capitals.
Choose an action:
  [1] Run program based on input data 
  [2] Generate synthetic data
Enter 1 or 2: 2
Synthetic data is generated. Paths:
 - C:\Users\legio\Documents\transactions_template.csv
 - C:\Users\legio\Documents\financials_template.csv
