## Graham Valuation Method

## DCF: Discounted Cash Flows

In [None]:
import yfinance as yf
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import date, timedelta
import pandas as pd
import numpy as np
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class FinancialRatios(BaseModel):
    ticker: str
    company_name: str
    day_date: date = Field(default_factory=date.today)
    pe_ratio: Optional[float] = None
    price_to_book: Optional[float] = None
    dividend_yield: Optional[float] = None
    debt_to_equity: Optional[float] = None
    current_ratio: Optional[float] = None
    return_on_equity: Optional[float] = None
    market_cap: Optional[float] = None
    earnings_growth: Optional[float] = None

class Ticker:
    def __init__(self, symbol: str):
        self.symbol = symbol
        self.ticker_obj = yf.Ticker(symbol)
        self.info = self.ticker_obj.info
        self.historical_data = self.get_historical_data()

    def get_financial_ratios(self) -> FinancialRatios:
        return FinancialRatios(
            ticker=self.symbol,
            company_name=self.info.get('longName', 'N/A'),
            pe_ratio=self.info.get('trailingPE'),
            price_to_book=self.info.get('priceToBook'),
            dividend_yield=self.info.get('dividendYield'),
            debt_to_equity=self.info.get('debtToEquity'),
            current_ratio=self.info.get('currentRatio'),
            return_on_equity=self.info.get('returnOnEquity'),
            market_cap=self.info.get('marketCap'),
            earnings_growth=self.info.get('earningsQuarterlyGrowth')
        )

    def get_historical_data(self, years=10):
        try:
            end_date = date.today()
            start_date = end_date - timedelta(days=years*365)
            return self.ticker_obj.history(start=start_date, end=end_date)
        except Exception as e:
            logger.error(f"Error fetching historical data for {self.symbol}: {e}")
            return pd.DataFrame()

    def calculate_earnings_stability(self):
        try:
            earnings = self.ticker_obj.income_stmt.T
            if earnings is not None and not earnings.empty:
                yearly_earnings = earnings['Net Income'].tail(10)
                return (yearly_earnings > 0).all() if len(yearly_earnings) > 3 else None
            return None
        except Exception as e:
            logger.error(f"Error calculating earnings stability for {self.symbol}: {e}")
            return None

    def calculate_dividend_stability(self):
        try:
            dividends = self.historical_data['Dividends']
            yearly_dividends = dividends.resample('YE').sum()
            return (yearly_dividends > 0).all() and len(yearly_dividends) >= 3
        except Exception as e:
            logger.error(f"Error calculating dividend stability for {self.symbol}: {e}")
            return None

    def calculate_earnings_growth(self):
        try:
            earnings = self.ticker_obj.income_stmt.T
            if earnings is not None and not earnings.empty:
                yearly_earnings = earnings['Net Income'].tail(10)
                if len(yearly_earnings) >= 3:
                    growth_rate = (yearly_earnings.iloc[0] / yearly_earnings.iloc[-1]) ** (1/10) - 1
                    return growth_rate > 0.01  # 7% annual growth
            return None
        except Exception as e:
            logger.error(f"Error calculating earnings growth for {self.symbol}: {e}")
            return None

    def graham_analysis(self) -> dict:
        ratios = self.get_financial_ratios()
        analysis = {
            'ticker': self.symbol,
            'company_name': ratios.company_name,
            'pe_ratio': ratios.pe_ratio,
            'price_to_book': ratios.price_to_book,
            'dividend_yield': ratios.dividend_yield,
            'debt_to_equity': ratios.debt_to_equity,
            'current_ratio': ratios.current_ratio,
            'return_on_equity': ratios.return_on_equity,
            'market_cap': ratios.market_cap,
            'earnings_growth': ratios.earnings_growth,
            'adequate_size': ratios.market_cap > 2e9 if ratios.market_cap else None,
            'strong_financial_condition': ratios.current_ratio >= 2 if ratios.current_ratio else None,
            'earnings_stability': self.calculate_earnings_stability(),
            'dividend_stability': self.calculate_dividend_stability(),
            'earnings_growth': self.calculate_earnings_growth(),
            'low_pe_ratio': ratios.pe_ratio < 10 if ratios.pe_ratio else None,
            'low_pb_ratio': ratios.price_to_book < 5 if ratios.price_to_book else None
        }
        return analysis

def analyze_multiple_tickers(symbols: List[str]) -> pd.DataFrame:
    analyses = [Ticker(symbol).graham_analysis() for symbol in symbols]
    df = pd.DataFrame(analyses)
    return df

# Example usage
if __name__ == "__main__":
    tickers = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
    df = analyze_multiple_tickers(tickers)
    
    logger.info("Analysis complete. DataFrame shape: %s", df.shape)
    
    # Example of filtering with missing data handling
    graham_compliant = df[
        (df['adequate_size'] == True) &
        (df['strong_financial_condition'] == True) &
        (df['earnings_stability'] == True) &
        (df['dividend_stability'] == True) &
        (df['earnings_growth'] == True) &
        (df['low_pe_ratio'] == True) &
        (df['low_pb_ratio'] == True)
    ]
    
    print("Stocks meeting all Graham criteria:\n%s", 
                graham_compliant[['ticker', 'company_name']])
    
    # Example of custom filtering with missing data handling
    value_stocks = df[
        (df['pe_ratio'].notna() & (df['pe_ratio'] < 15)) &
        (df['price_to_book'].notna() & (df['price_to_book'] < 1.5)) &
        (df['dividend_yield'].notna() & (df['dividend_yield'] > 0.02))  # 2% dividend yield
    ]
    
    print("Value stocks based on P/E, P/B, and dividend yield:\n%s", 
                value_stocks[['ticker', 'company_name', 'pe_ratio', 'price_to_book', 'dividend_yield']])
    
    # Summary of missing data
    missing_data = df.isnull().sum()
    logger.info("Missing data summary:\n%s", missing_data[missing_data > 0])

2024-09-21 19:59:02,689 - ERROR - $FB: possibly delisted; no timezone found
2024-09-21 19:59:02,691 - ERROR - Error calculating dividend stability for FB: 'Dividends'
2024-09-21 19:59:03,294 - INFO - Analysis complete. DataFrame shape: (10, 16)
2024-09-21 19:59:03,299 - INFO - Missing data summary:
pe_ratio                      1
price_to_book                 1
dividend_yield                4
debt_to_equity                2
current_ratio                 2
return_on_equity              1
market_cap                    1
earnings_growth               1
adequate_size                 1
strong_financial_condition    2
earnings_stability            1
dividend_stability            1
low_pe_ratio                  1
low_pb_ratio                  1
dtype: int64


Stocks meeting all Graham criteria:
%s Empty DataFrame
Columns: [ticker, company_name]
Index: []
Value stocks based on P/E, P/B, and dividend yield:
%s Empty DataFrame
Columns: [ticker, company_name, pe_ratio, price_to_book, dividend_yield]
Index: []


In [47]:
import yfinance as yf
import pandas as pd
from abc import ABC, abstractmethod
import logging

# Set up logging
logging.basicConfig(filename='stock_valuation.log', level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class StockData:
    def __init__(self, ticker):
        self.ticker = ticker
        self.ticker_obj = yf.Ticker(ticker)
        self.info = self.ticker_obj.info
        self.financials = self.ticker_obj.financials
        self.balance_sheet = self.ticker_obj.balance_sheet
        self.cash_flow = self.ticker_obj.cashflow

    def get_eps(self):
        try:
            return self.info.get('trailingEps', 0)
        except Exception as e:
            logger.error(f"Error getting EPS for {self.ticker}: {str(e)}")
            return 0

    def get_book_value_per_share(self):
        try:
            total_assets = self.balance_sheet.loc['Total Assets'].iloc[0]
            total_liabilities = self.balance_sheet.loc['Total Liabilities Net Minority Interest'].iloc[0]
            shares_outstanding = self.info.get('sharesOutstanding', 1)
            return (total_assets - total_liabilities) / shares_outstanding
        except Exception as e:
            logger.error(f"Error calculating book value per share for {self.ticker}: {str(e)}")
            return 0

    def get_free_cash_flow(self):
        try:
            return self.cash_flow.loc['Free Cash Flow'].iloc[0]
        except Exception as e:
            logger.error(f"Error getting free cash flow for {self.ticker}: {str(e)}")
            return 0

    def get_current_price(self):
        try:
            return self.info.get('currentPrice', 0)
        except Exception as e:
            logger.error(f"Error getting current price for {self.ticker}: {str(e)}")
            return 0

class ValuationBase(ABC):
    def __init__(self, stock_data):
        self.stock_data = stock_data

    @abstractmethod
    def calculate_intrinsic_value(self):
        pass

    def calculate_margin_of_safety(self):
        try:
            intrinsic_value = self.calculate_intrinsic_value()
            current_price = self.stock_data.get_current_price()
            if intrinsic_value > 0:
                return (intrinsic_value - current_price) / intrinsic_value
            return 0
        except Exception as e:
            logger.error(f"Error calculating margin of safety: {str(e)}")
            return 0

class GrahamValuation(ValuationBase):
    def calculate_intrinsic_value(self):
        try:
            eps = self.stock_data.get_eps()
            bvps = self.stock_data.get_book_value_per_share()
            intrinsic_value = (eps * bvps * 22.5) ** 0.5
            logger.info(f"Graham Valuation for {self.stock_data.ticker}: {intrinsic_value}")
            return intrinsic_value
        except Exception as e:
            logger.error(f"Error calculating Graham Valuation for {self.stock_data.ticker}: {str(e)}")
            return 0

class DCFValuation(ValuationBase):
    def __init__(self, stock_data, growth_rate=0.05, discount_rate=0.1, years=5):
        super().__init__(stock_data)
        self.growth_rate = growth_rate
        self.discount_rate = discount_rate
        self.years = years

    def calculate_intrinsic_value(self):
        try:
            fcf = self.stock_data.get_free_cash_flow()
            terminal_value = fcf * (1 + self.growth_rate) / (self.discount_rate - self.growth_rate)
            
            present_value = 0
            for year in range(1, self.years + 1):
                present_value += fcf * (1 + self.growth_rate) ** year / (1 + self.discount_rate) ** year
            
            present_value += terminal_value / (1 + self.discount_rate) ** self.years
            shares_outstanding = self.stock_data.info.get('sharesOutstanding', 1)
            intrinsic_value = present_value / shares_outstanding
            logger.info(f"DCF Valuation for {self.stock_data.ticker}: {intrinsic_value}")
            return intrinsic_value
        except Exception as e:
            logger.error(f"Error calculating DCF Valuation for {self.stock_data.ticker}: {str(e)}")
            return 0

class ComparableCompanyAnalysis(ValuationBase):
    def __init__(self, stock_data, comparable_tickers):
        super().__init__(stock_data)
        self.comparable_tickers = comparable_tickers

    def calculate_intrinsic_value(self):
        try:
            comparable_data = []
            for ticker in self.comparable_tickers:
                try:
                    comp = yf.Ticker(ticker)
                    pe_ratio = comp.info.get('trailingPE', 0)
                    market_cap = comp.info.get('marketCap', 0)
                    revenue = comp.info.get('totalRevenue', 0)
                    comparable_data.append({
                        'ticker': ticker,
                        'pe_ratio': pe_ratio,
                        'price_to_sales': market_cap / revenue if revenue else 0
                    })
                except Exception as e:
                    logger.error(f"Error fetching data for comparable company {ticker}: {str(e)}")

            df = pd.DataFrame(comparable_data)
            median_pe = df['pe_ratio'].median()
            median_ps = df['price_to_sales'].median()

            target_stock_eps = self.stock_data.get_eps()
            target_stock_revenue = self.stock_data.financials.loc['Total Revenue'].iloc[0]

            pe_valuation = median_pe * target_stock_eps
            ps_valuation = median_ps * target_stock_revenue / self.stock_data.info.get('sharesOutstanding', 1)

            intrinsic_value = (pe_valuation + ps_valuation) / 2
            logger.info(f"Comparable Company Analysis Valuation for {self.stock_data.ticker}: {intrinsic_value}")
            return intrinsic_value
        except Exception as e:
            logger.error(f"Error calculating Comparable Company Analysis Valuation for {self.stock_data.ticker}: {str(e)}")
            return 0

class PERatioValuation(ValuationBase):
    def __init__(self, stock_data, target_pe):
        super().__init__(stock_data)
        self.target_pe = target_pe

    def calculate_intrinsic_value(self):
        try:
            eps = self.stock_data.get_eps()
            intrinsic_value = eps * self.target_pe
            logger.info(f"PE Ratio Valuation for {self.stock_data.ticker}: {intrinsic_value}")
            return intrinsic_value
        except Exception as e:
            logger.error(f"Error calculating PE Ratio Valuation for {self.stock_data.ticker}: {str(e)}")
            return 0

class EVToEBITDAValuation(ValuationBase):
    def __init__(self, stock_data, target_ev_to_ebitda):
        super().__init__(stock_data)
        self.target_ev_to_ebitda = target_ev_to_ebitda

    def calculate_intrinsic_value(self):
        try:
            ebitda = self.stock_data.financials.loc['EBITDA'].iloc[0]
            target_ev = ebitda * self.target_ev_to_ebitda
            
            total_debt = self.stock_data.balance_sheet.loc['Total Debt'].iloc[0]
            cash = self.stock_data.balance_sheet.loc['Cash And Cash Equivalents'].iloc[0]
            
            target_market_cap = target_ev - total_debt + cash
            shares_outstanding = self.stock_data.info.get('sharesOutstanding', 1)
            
            intrinsic_value = target_market_cap / shares_outstanding
            logger.info(f"EV to EBITDA Valuation for {self.stock_data.ticker}: {intrinsic_value}")
            return intrinsic_value
        except Exception as e:
            logger.error(f"Error calculating EV to EBITDA Valuation for {self.stock_data.ticker}: {str(e)}")
            return 0

In [50]:
import pandas as pd

class StockScanner:
    def __init__(self, tickers):
        self.tickers = tickers
        self.results = pd.DataFrame(
            columns=[
                'Ticker', 
                'Current Price', 
                'Valuation Method', 
                'Intrinsic Value', 
                'Margin of Safety'
            ]
        )
        
    def scan(self, valuation_methods):
        for ticker in self.tickers:
            stock_data = StockData(ticker)
            for method, params in valuation_methods:
                valuation = method(stock_data, **params)
                intrinsic_value = valuation.calculate_intrinsic_value()
                margin_of_safety = valuation.calculate_margin_of_safety()
                self.results = pd.concat([self.results, pd.DataFrame([{
                    'Ticker': ticker,
                    'Current Price': stock_data.get_current_price(),
                    'Valuation Method': method.__name__,
                    'Intrinsic Value': intrinsic_value,
                    'Margin of Safety': margin_of_safety
                }])], ignore_index=True)

    def rank_stocks(self, method='average_margin_of_safety'):
        if method == 'average_margin_of_safety':
            ranked = self.results.groupby('Ticker')['Margin of Safety'].mean().sort_values(ascending=False)
        elif method == 'highest_margin_of_safety':
            ranked = self.results.groupby('Ticker')['Margin of Safety'].max().sort_values(ascending=False)
        else:
            raise ValueError("Invalid ranking method")

        return ranked

    def get_detailed_results(self):
        return self.results.sort_values('Margin of Safety', ascending=False)

# Example usage
tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN','2222.SR']
scanner = StockScanner(tickers)

valuation_methods = [
    (GrahamValuation, {}),
    (DCFValuation, {'growth_rate': 0.05, 'discount_rate': 0.1, 'years': 5}),
    (ComparableCompanyAnalysis, {'comparable_tickers': ['IBM', 'ORCL', 'CSCO']}),
    (PERatioValuation, {'target_pe': 15}),
    (EVToEBITDAValuation, {'target_ev_to_ebitda': 12})
]

scanner.scan(valuation_methods)

# Get ranked stocks
ranked_stocks = scanner.rank_stocks()
print("Ranked Stocks by Average Margin of Safety:")
display(ranked_stocks)
print()

# Get detailed results
detailed_results = scanner.get_detailed_results()
print("Detailed Results:")
display(detailed_results)

  self.results = pd.concat([self.results, pd.DataFrame([{


Ranked Stocks by Average Margin of Safety:


Ticker
2222.SR    0.010698
GOOGL     -0.187825
MSFT      -1.726147
AMZN      -1.868232
AAPL      -2.554750
Name: Margin of Safety, dtype: float64


Detailed Results:


Unnamed: 0,Ticker,Current Price,Valuation Method,Intrinsic Value,Margin of Safety
24,2222.SR,27.45,EVToEBITDAValuation,48.909178,0.438756
22,2222.SR,27.45,ComparableCompanyAnalysis,36.420327,0.2463
11,GOOGL,163.59,DCFValuation,206.355542,0.207242
14,GOOGL,163.59,EVToEBITDAValuation,199.896917,0.181628
12,GOOGL,163.59,ComparableCompanyAnalysis,184.860802,0.115064
21,2222.SR,27.45,DCFValuation,27.300537,-0.005475
23,2222.SR,27.45,PERatioValuation,27.0,-0.016667
17,AMZN,191.6,ComparableCompanyAnalysis,156.012331,-0.228108
13,GOOGL,163.59,PERatioValuation,104.55,-0.564706
20,2222.SR,27.45,GrahamValuation,17.055803,-0.609423


In [29]:
stock_data = StockData('AAPL')

In [43]:
stock_data.ticker_obj.balance_sheet#.loc['Cash']

Unnamed: 0,2023-09-30,2022-09-30,2021-09-30,2020-09-30,2019-09-30
Treasury Shares Number,0.0,,,,
Ordinary Shares Number,15550061000.0,15943425000.0,16426786000.0,16976763000.0,
Share Issued,15550061000.0,15943425000.0,16426786000.0,16976763000.0,
Net Debt,81123000000.0,96423000000.0,89779000000.0,74420000000.0,
Total Debt,123930000000.0,132480000000.0,136522000000.0,122278000000.0,
...,...,...,...,...,...
Cash Cash Equivalents And Short Term Investments,61555000000.0,48304000000.0,62639000000.0,90943000000.0,
Other Short Term Investments,31590000000.0,24658000000.0,27699000000.0,52927000000.0,
Cash And Cash Equivalents,29965000000.0,23646000000.0,34940000000.0,38016000000.0,
Cash Equivalents,1606000000.0,5100000000.0,17635000000.0,20243000000.0,
