<a href="https://colab.research.google.com/github/YangChaoChung/US-stock/blob/main/Financial_Recovery_Stock_Metrics_II.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1. Add the industry to table
2. Calculate the expect prices via EPS ... ect

In [None]:
pip install yfinance



In [None]:
pip install pandas



In [None]:
import pandas as pd
import numpy as np
import yfinance as yf  # For fetching financial data
import logging
import time  # To handle API rate limits
import datetime
from google.colab import files

In [None]:
recovery_criteria = {
    'revenue_growth': 0.0,             # Any positive growth
    'net_income_growth': 0.0,        # Any positive growth
    'ebitda_growth': 0.0,            # Any positive growth
    'eps_growth': 0.0,               # Any positive EPS growth
    'gross_profit_margin': 0.30,     # At least 30%
    'operating_margin': 0.10,        # At least 10%
    'roe': None,                     # At least 10%
    'roa': 0.05,                     # At least 5%
}

# Define your stock list
stock_list = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA']
# stock_list = ['ABOS', 'BIIB'] # speculative stocks

# Define the recovery score threshold
recovery_score_threshold = 10  # Stocks scoring 70% or higher are considered recovering

In [None]:
import pandas as pd
from google.colab import files

def upload_csv():
    # prompt: import the .csv file into a Google Colab notebook and save Symbol column to stock_list
    uploaded = files.upload()

    # Retrieve the filename from the uploaded files dictionary
    for fn in uploaded.keys():
        print(f'User uploaded file "{fn}" with length {len(uploaded[fn])} bytes')

        # Use the filename directly instead of assuming a specific name
        df = pd.read_csv(fn)
        stock_list = df['Symbol'].tolist()
        return stock_list


In [None]:
def get_financial_data(ticker):
    try:
        stock = yf.Ticker(ticker)
        financials = {
            'income_statement': stock.financials,                # Annual
            'quarterly_income_statement': stock.quarterly_financials,
            'balance_sheet': stock.balance_sheet,                # Annual
            'quarterly_balance_sheet': stock.quarterly_balance_sheet,
            'cash_flow': stock.cashflow,                         # Annual # Need to check
            'quarterly_cash_flow': stock.quarterly_cashflow,     # Need to check
            'info': stock.info
        }
        return financials
    except Exception as e:
        logging.error(f"Error fetching data for {ticker}: {e}")
        return None


In [None]:
# Example usage
# speculative_sectors = ['Utilities']
speculative_sectors = []
speculative_industries = ['Biotechnology']
# referring to small-cap stocks that are under $250 million,
# and ultra or mega-cap stocks, which are large caps that are over $50 billion.

def is_speculative(financials):
    # Ensure 'info' dictionary is present and not empty
    if not financials.get('info'):
        return False  # Could log an error or raise an exception depending on your application's needs

    info = financials['info']
    # Define speculative conditions
    conditions = {
        'high_volatility': info.get('beta', 0) > 1.5,
        'market_capitalization': info.get('marketCap', 0) < 154999870,
        'speculative_sector': info.get('sector') in speculative_sectors,
        'speculative_industry': info.get('industry') in speculative_industries
    }

    # A stock is considered speculative if any of the conditions are met
    return any(conditions.values())

In [None]:
def analyze_financials(financials, ticker):
    analysis = {}
    try:
        # Choose between quarterly or annual data
        income_statement = financials['income_statement']
        balance_sheet = financials['balance_sheet']
        cash_flow_statement = financials['cash_flow']
        info = financials['info']
        is_spec = is_speculative(financials)

         # A stock is considered speculative if any of the conditions are met
        analysis['is_speculative'] = is_spec

        # Revenue Growth
        revenue = income_statement.loc['Total Revenue'].dropna()
        if len(revenue) >= 2:
            if revenue.iloc[-2] != 0:
                revenue_growth = (revenue.iloc[-1] - revenue.iloc[-2]) / revenue.iloc[-2]
                analysis['revenue_growth'] = revenue_growth
            else:
                analysis['revenue_growth'] = None
                logging.warning(f"Revenue growth calculation for {ticker} skipped due to zero revenue in the previous period.")
        else:
            analysis['revenue_growth'] = None

        # Net Income Growth
        net_income = income_statement.loc['Net Income'].dropna()
        if len(net_income) >= 2:
            if net_income.iloc[-2] != 0:
                net_income_growth = (net_income.iloc[-1] - net_income.iloc[-2]) / net_income.iloc[-2]
                analysis['net_income_growth'] = net_income_growth
            else:
                analysis['net_income_growth'] = None
                logging.warning(f"Net income growth calculation for {ticker} skipped due to zero net income in the previous period.")
        else:
            analysis['net_income_growth'] = None

        # EBITDA Growth
        possible_ebitda_keys = ['Normalized EBITDA', 'EBITDA']
        found_ebitda_key = next((key for key in possible_ebitda_keys if key in income_statement.index), None)
        if found_ebitda_key:
            ebitda = income_statement.loc[found_ebitda_key].dropna()
            if len(ebitda) >= 2:
                ebitda_growth = (ebitda.iloc[-1] - ebitda.iloc[-2]) / ebitda.iloc[-2]
                analysis['ebitda_growth'] = ebitda_growth
            else:
                analysis['ebitda_growth'] = None
                logging.warning(f"Not enough data to calculate EBITDA growth for {ticker}.")
        else:
            analysis['ebitda_growth'] = None
            logging.warning(f"EBITDA data not found for {ticker}. Available keys: {income_statement.index.tolist()}")

        # Earnings Per Share (EPS) Growth
        if 'Diluted EPS' in income_statement.index:
            eps = income_statement.loc['Diluted EPS'].dropna()
            if len(eps) >= 2:
                if eps.iloc[-2] != 0:
                    eps_growth = (eps.iloc[-1] - eps.iloc[-2]) / eps.iloc[-2]
                    analysis['eps_growth'] = eps_growth
                else:
                    analysis['eps_growth'] = None
                    logging.warning(f"EPS growth calculation for {ticker} skipped due to zero EPS in the previous period.")
            else:
                analysis['eps_growth'] = None
        else:
            analysis['eps_growth'] = None

        # Gross Profit Margin
        if 'Gross Profit' in income_statement.index and 'Total Revenue' in income_statement.index:
            gross_profit = income_statement.loc['Gross Profit'].dropna()
            if len(gross_profit) >= 1 and len(revenue) >= 1:
                if revenue.iloc[-1] != 0:
                    gross_profit_margin = gross_profit.iloc[-1] / revenue.iloc[-1]
                    analysis['gross_profit_margin'] = gross_profit_margin
                else:
                    analysis['gross_profit_margin'] = None
                    logging.warning(f"Gross profit margin calculation for {ticker} skipped due to zero revenue.")
            else:
                analysis['gross_profit_margin'] = None
        else:
            analysis['gross_profit_margin'] = None

        # Operating Margin
        if found_ebitda_key and 'Total Revenue' in income_statement.index:
            operating_income = income_statement.loc[found_ebitda_key].dropna()
            if len(operating_income) >= 1 and len(revenue) >= 1:
                if revenue.iloc[-1] != 0:
                    operating_margin = operating_income.iloc[-1] / revenue.iloc[-1]
                    analysis['operating_margin'] = operating_margin
                else:
                    analysis['operating_margin'] = None
                    logging.warning(f"Operating margin calculation for {ticker} skipped due to zero revenue.")
            else:
                analysis['operating_margin'] = None
        else:
            analysis['operating_margin'] = None

        # Return on Equity (ROE) from 'info' dictionary
        roe = info.get('returnOnEquity')
        if roe is not None:
            analysis['roe'] = roe
        else:
            analysis['roe'] = None
            logging.warning(f"ROE data not available in 'info' for {ticker}.")

        # Return on Assets (ROA) from 'info' dictionary
        roa = info.get('returnOnAssets')
        if roa is not None:
            analysis['roa'] = roa
        else:
            analysis['roa'] = None
            logging.warning(f"ROA data not available in 'info' for {ticker}.")


        return analysis

    except Exception as e:
        logging.error(f"Error analyzing financials for {ticker}: {e}")
        return None


In [None]:
metrics = [
    'revenue_growth',
    'net_income_growth',
    'ebitda_growth',
    'eps_growth',
    'gross_profit_margin',
    'operating_margin',
    'roe',
    'roa',
]
# max_score = len(metrics)

def evaluate_recovery(analysis, criteria):
    score = 0
    reasons = []
    total_metrics = 0  # Count of metrics evaluated

    for metric in metrics:
        if analysis[metric] is not None and criteria.get(metric) is not None:
            total_metrics += 1
            if analysis[metric] >= criteria[metric]:
                score += 1
            else:
                reasons.append(f"{metric.replace('_', ' ').capitalize()} below threshold")
        else:
            reasons.append(f"{metric.replace('_', ' ').capitalize()} data unavailable")

    # Calculate recovery score as a percentage
    if total_metrics > 0:
        recovery_score = (score / total_metrics) * 100
    else:
        recovery_score = None
        reasons.append('Insufficient data for scoring')

    return recovery_score, reasons


In [None]:
metrics_priority = {
    'net_income_growth': 30,  # Related to Net Profit Margin
    'eps_growth': 20,         # Earnings Per Share (EPS)
    'roe': 15,                # Return on Equity (ROE)
    'revenue_growth': 15,     # Revenue Growth Rate
    'cash_flow': 10,          # Cash Flow (assumed as placeholder)
    'roa': 3,                 # Return on Assets (ROA)
    'operating_margin': 5,    # Operating Profit Margin
    'gross_profit_margin': 2  # Gross Profit Margin
}

def evaluate_recovery_v2(analysis, criteria):
    score = 0
    reasons = []
    total_metrics = 0  # Count of metrics evaluated

    for metric in metrics_priority:
        if analysis.get(metric) is not None and criteria.get(metric) is not None:
            total_metrics += 1
            if analysis[metric] >= criteria[metric]:
                score += metrics_priority[metric]  # Give more weight to higher-priority metrics
            else:
                reasons.append(f"{metric.replace('_', ' ').capitalize()} below threshold")
        else:
            reasons.append(f"{metric.replace('_', ' ').capitalize()} data unavailable")

    # Calculate recovery score as a percentage
    max_possible_score = sum(metrics_priority.values())

    if total_metrics > 0:
        recovery_score = (score / max_possible_score) * 100
    else:
        recovery_score = 0  # Set default recovery score to 0 if no metrics were evaluated
        reasons.append('Insufficient data for scoring')

    # Handle cases where too much data is unavailable
    if total_metrics < len(metrics_priority) / 2:
        reasons.append('Too much data is unavailable to provide a reliable score. Score may not be accurate.')

    return recovery_score, reasons


In [None]:
def analyze_stock(ticker, filter_speculative=False):
    logging.info(f"Analyzing {ticker}")
    financials = get_financial_data(ticker)
    if not financials:
        logging.error(f"No financial data for {ticker}")
        return None
    if filter_speculative and is_speculative(financials):
        logging.info(f"{ticker} is a speculative stock. Skipping analysis.")
        return None

    # Check stock market company sector from ticker.
    analysis = analyze_financials(financials, ticker)
    if not analysis:
        logging.error(f"Failed to analyze financials for {ticker}")
        return None

    recovery_score, reasons = evaluate_recovery_v2(analysis, recovery_criteria)
    analysis_result = {
        'Ticker': ticker,
        **analysis,
        'RecoveryScore': recovery_score,
        'Reasons': reasons
    }
    return analysis_result


In [None]:
def main():
    # Initialize logging
    logging.basicConfig(level=logging.INFO)

    # Run analysis
    analysis_results = []

    for ticker in stock_list:
        print(f"Analyzing {ticker}.")
        result = analyze_stock(ticker, True)
        if result:
            analysis_results.append(result)
        time.sleep(2)  # Respect API rate limits

    df_results = pd.DataFrame(analysis_results)
    # Filter recovering stocks
    recovering_stocks = df_results[df_results['RecoveryScore'] >= recovery_score_threshold]

    # Display results
    if not recovering_stocks.empty:
        print("Recovering stocks:")
        print(recovering_stocks[['Ticker', 'RecoveryScore'] + metrics])
    else:
        print("No stocks met the recovery criteria.")

    # Optional: Save results to CSV
    today = datetime.datetime.now().strftime("%Y-%m-%d") # Generate today's date timestamp
    # Create filename with today's timestamp
    filename = f"stock_recovery_analysis_{today}.csv"
    # Save the DataFrame to a CSV file with the new filename
    recovering_stocks.to_csv(filename, index=False)

    # Download the CSV file
    files.download(filename)


In [None]:
if __name__ == "__main__":
    stock_list = upload_csv(); # Optional: Load datas from CSV file
    main()