# Notebook 4: Earnings Quality Measures
## Accrual-Based Quality Metrics from Financial Statements

---

**Research Project:** Retail Sentiment, Earnings Quality, and Stock Returns

**Purpose:** Compute firm-level earnings quality measures using financial statement data from SEC EDGAR.

**Methodology:**
- Dechow & Dichev (2002) accruals quality model
- McNichols (2002) modification
- Modified Jones (1991) discretionary accruals

**Data Sources:**
- SEC EDGAR 10-K and 10-Q filings
- Financial statement variables

**Output:** Firm-level panel with earnings quality metrics

---

**References:**
- Dechow, P., & Dichev, I. (2002). The quality of accruals and earnings. The Accounting Review, 77(s-1), 35-59.
- McNichols, M. (2002). Discussion of the quality of accruals and earnings. The Accounting Review, 77(s-1), 61-69.
- Jones, J. (1991). Earnings management during import relief investigations. Journal of Accounting Research, 29(2), 193-228.

## 1. Environment Setup

In [None]:
# =============================================================================
# INSTALL REQUIRED PACKAGES
# =============================================================================

!pip install pandas==2.0.3
!pip install numpy==1.24.3
!pip install scipy==1.11.3
!pip install statsmodels==0.14.0
!pip install requests==2.31.0
!pip install beautifulsoup4==4.12.2
!pip install lxml==4.9.3
!pip install sec-edgar-downloader==5.0.0
!pip install tqdm==4.66.1
!pip install pyarrow==14.0.1
!pip install yfinance==0.2.31

print("All packages installed successfully.")

In [None]:
# =============================================================================
# IMPORT LIBRARIES
# =============================================================================

import os
import re
import json
import time
import warnings
from datetime import datetime, timedelta
from typing import List, Dict, Tuple, Optional
from collections import defaultdict

import pandas as pd
import numpy as np
from scipy import stats
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS
from tqdm.notebook import tqdm

import requests
from bs4 import BeautifulSoup
import yfinance as yf

warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)
pd.set_option('display.float_format', '{:.4f}'.format)

print(f"Environment setup complete. Timestamp: {datetime.now()}")

In [None]:
# =============================================================================
# CONFIGURATION
# =============================================================================

class EarningsQualityConfig:
    """Configuration for earnings quality analysis."""
    
    # Data paths
    BASE_PATH = "/content/drive/MyDrive/Research/RetailSentiment/"
    RAW_DATA_PATH = BASE_PATH + "data/raw/"
    PROCESSED_DATA_PATH = BASE_PATH + "data/processed/"
    
    # Sample period
    START_YEAR = 2015  # Extra years for rolling estimation
    END_YEAR = 2023
    
    # Dechow-Dichev model parameters
    DD_ROLLING_WINDOW = 5  # Years for rolling regression
    DD_MIN_OBSERVATIONS = 3  # Minimum years required
    
    # Industry classification
    INDUSTRY_MIN_FIRMS = 10  # Minimum firms for industry regression
    
    # Winsorization
    WINSORIZE_LEVEL = 0.01  # 1% and 99%
    
    # SEC EDGAR
    SEC_EMAIL = "research@university.edu"  # Required for SEC API
    SEC_RATE_LIMIT = 10  # Requests per second
    
    @classmethod
    def print_config(cls):
        print("="*60)
        print("EARNINGS QUALITY CONFIGURATION")
        print("="*60)
        print(f"Period: {cls.START_YEAR} to {cls.END_YEAR}")
        print(f"DD Rolling Window: {cls.DD_ROLLING_WINDOW} years")
        print(f"Industry Min Firms: {cls.INDUSTRY_MIN_FIRMS}")
        print(f"Winsorization: {cls.WINSORIZE_LEVEL*100}%")
        print("="*60)

config = EarningsQualityConfig()
config.print_config()

In [None]:
# =============================================================================
# MOUNT GOOGLE DRIVE
# =============================================================================

from google.colab import drive
drive.mount('/content/drive')

os.makedirs(config.RAW_DATA_PATH, exist_ok=True)
os.makedirs(config.PROCESSED_DATA_PATH, exist_ok=True)
print("Data directories ready.")

## 2. Financial Statement Data Collection

### 2.1 SEC EDGAR Data Retrieval

In [None]:
# =============================================================================
# SEC EDGAR DATA COLLECTOR
# =============================================================================

class SECEdgarCollector:
    """Collects financial statement data from SEC EDGAR.
    
    Uses the SEC EDGAR API to retrieve:
    - Balance sheet items
    - Income statement items
    - Cash flow statement items
    """
    
    BASE_URL = "https://data.sec.gov"
    COMPANY_FACTS_URL = BASE_URL + "/api/xbrl/companyfacts/CIK{cik}.json"
    
    # Key XBRL tags for earnings quality calculation
    REQUIRED_TAGS = {
        # Balance Sheet
        'Assets': ['Assets'],
        'CurrentAssets': ['AssetsCurrent'],
        'Cash': ['CashAndCashEquivalentsAtCarryingValue', 'Cash'],
        'Receivables': ['AccountsReceivableNetCurrent', 'ReceivablesNetCurrent'],
        'Inventory': ['InventoryNet'],
        'CurrentLiabilities': ['LiabilitiesCurrent'],
        'AccountsPayable': ['AccountsPayableCurrent'],
        'DebtCurrent': ['DebtCurrent', 'ShortTermBorrowings'],
        'PPE': ['PropertyPlantAndEquipmentNet'],
        
        # Income Statement
        'Revenue': ['Revenues', 'RevenueFromContractWithCustomerExcludingAssessedTax', 'SalesRevenueNet'],
        'NetIncome': ['NetIncomeLoss', 'ProfitLoss'],
        'DepreciationAmortization': ['DepreciationDepletionAndAmortization', 'Depreciation'],
        
        # Cash Flow
        'OperatingCashFlow': ['NetCashProvidedByUsedInOperatingActivities'],
    }
    
    def __init__(self, email: str):
        """Initialize collector with email for SEC API."""
        self.headers = {
            'User-Agent': f'Academic Research ({email})',
            'Accept-Encoding': 'gzip, deflate'
        }
        self.cik_mapping = {}
        
    def get_cik_from_ticker(self, ticker: str) -> Optional[str]:
        """Get CIK number from ticker symbol."""
        try:
            url = f"{self.BASE_URL}/submissions/CIK{ticker.upper()}.json"
            response = requests.get(url, headers=self.headers)
            if response.status_code == 200:
                return response.json().get('cik', '').zfill(10)
        except:
            pass
        return None
    
    def load_cik_mapping(self) -> Dict[str, str]:
        """Load ticker to CIK mapping from SEC."""
        print("Loading CIK mapping from SEC...")
        url = f"{self.BASE_URL}/files/company_tickers.json"
        
        try:
            response = requests.get(url, headers=self.headers)
            data = response.json()
            
            for item in data.values():
                ticker = item['ticker']
                cik = str(item['cik_str']).zfill(10)
                self.cik_mapping[ticker] = cik
            
            print(f"Loaded {len(self.cik_mapping)} ticker-CIK mappings")
        except Exception as e:
            print(f"Error loading CIK mapping: {e}")
        
        return self.cik_mapping
    
    def get_company_facts(self, cik: str) -> Optional[Dict]:
        """Get all financial facts for a company."""
        url = self.COMPANY_FACTS_URL.format(cik=cik)
        
        try:
            response = requests.get(url, headers=self.headers)
            if response.status_code == 200:
                return response.json()
        except:
            pass
        return None
    
    def extract_financial_data(self, facts: Dict, ticker: str) -> pd.DataFrame:
        """Extract relevant financial data from company facts."""
        if not facts or 'facts' not in facts:
            return pd.DataFrame()
        
        all_data = []
        
        # Try both US-GAAP and IFRS
        for taxonomy in ['us-gaap', 'ifrs-full']:
            if taxonomy not in facts['facts']:
                continue
            
            taxonomy_facts = facts['facts'][taxonomy]
            
            for var_name, tag_list in self.REQUIRED_TAGS.items():
                for tag in tag_list:
                    if tag in taxonomy_facts:
                        tag_data = taxonomy_facts[tag]
                        
                        if 'units' not in tag_data:
                            continue
                        
                        # Get USD values
                        for unit_type in ['USD', 'USD/shares']:
                            if unit_type in tag_data['units']:
                                for entry in tag_data['units'][unit_type]:
                                    # Only quarterly/annual filings
                                    form = entry.get('form', '')
                                    if form not in ['10-Q', '10-K']:
                                        continue
                                    
                                    all_data.append({
                                        'ticker': ticker,
                                        'variable': var_name,
                                        'value': entry.get('val'),
                                        'end_date': entry.get('end'),
                                        'filed_date': entry.get('filed'),
                                        'form': form,
                                        'fiscal_year': entry.get('fy'),
                                        'fiscal_period': entry.get('fp'),
                                        'frame': entry.get('frame')
                                    })
                        break  # Found tag, move to next variable
        
        return pd.DataFrame(all_data)
    
    def collect_financials(self, tickers: List[str]) -> pd.DataFrame:
        """Collect financial data for multiple tickers."""
        print(f"Collecting financial data for {len(tickers)} tickers...")
        
        if not self.cik_mapping:
            self.load_cik_mapping()
        
        all_financials = []
        failed_tickers = []
        
        for ticker in tqdm(tickers, desc="Fetching SEC data"):
            cik = self.cik_mapping.get(ticker)
            if not cik:
                failed_tickers.append(ticker)
                continue
            
            facts = self.get_company_facts(cik)
            if facts:
                df = self.extract_financial_data(facts, ticker)
                if len(df) > 0:
                    all_financials.append(df)
            else:
                failed_tickers.append(ticker)
            
            time.sleep(0.1)  # Rate limiting
        
        if all_financials:
            result = pd.concat(all_financials, ignore_index=True)
            result['end_date'] = pd.to_datetime(result['end_date'])
            result['filed_date'] = pd.to_datetime(result['filed_date'])
            
            print(f"\nCollection complete:")
            print(f"  Tickers collected: {result['ticker'].nunique()}")
            print(f"  Failed tickers: {len(failed_tickers)}")
            print(f"  Total observations: {len(result):,}")
            
            return result
        else:
            return pd.DataFrame()

# Initialize collector
sec_collector = SECEdgarCollector(config.SEC_EMAIL)
sec_collector.load_cik_mapping()

### 2.2 Alternative: Yahoo Finance Financials

In [None]:
# =============================================================================
# YAHOO FINANCE FINANCIAL STATEMENT COLLECTOR
# =============================================================================

class YahooFinancialsCollector:
    """Alternative financial data collection using Yahoo Finance.
    
    Simpler interface but may have less historical depth.
    """
    
    def __init__(self):
        pass
    
    def get_financials(self, ticker: str) -> Dict[str, pd.DataFrame]:
        """Get quarterly and annual financial statements.
        
        Args:
            ticker: Stock ticker
            
        Returns:
            Dictionary with financial statements
        """
        try:
            stock = yf.Ticker(ticker)
            
            return {
                'quarterly_financials': stock.quarterly_financials,
                'quarterly_balance_sheet': stock.quarterly_balance_sheet,
                'quarterly_cashflow': stock.quarterly_cashflow,
                'annual_financials': stock.financials,
                'annual_balance_sheet': stock.balance_sheet,
                'annual_cashflow': stock.cashflow
            }
        except:
            return {}
    
    def collect_all_financials(self, tickers: List[str]) -> pd.DataFrame:
        """Collect financial data for multiple tickers."""
        print(f"Collecting Yahoo Finance data for {len(tickers)} tickers...")
        
        all_data = []
        
        for ticker in tqdm(tickers, desc="Fetching financials"):
            financials = self.get_financials(ticker)
            
            if financials:
                # Process quarterly data
                for key, df in financials.items():
                    if df is not None and not df.empty:
                        df_long = df.T.reset_index()
                        df_long = df_long.melt(
                            id_vars=['index'],
                            var_name='variable',
                            value_name='value'
                        )
                        df_long['ticker'] = ticker
                        df_long['period_end'] = df_long['index']
                        df_long['source'] = key
                        all_data.append(df_long)
            
            time.sleep(0.2)
        
        if all_data:
            result = pd.concat(all_data, ignore_index=True)
            print(f"Collected data for {result['ticker'].nunique()} tickers")
            return result
        else:
            return pd.DataFrame()

# Initialize Yahoo collector
yahoo_fin_collector = YahooFinancialsCollector()

In [None]:
# =============================================================================
# LOAD TICKER UNIVERSE AND COLLECT DATA
# =============================================================================

# Load tickers from previous notebooks
def load_tickers():
    filepath = os.path.join(config.PROCESSED_DATA_PATH, 'wsb_firm_day_panel.parquet')
    if os.path.exists(filepath):
        df = pd.read_parquet(filepath)
        return df['ticker'].unique().tolist()
    else:
        # Fallback to S&P 500
        tables = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')
        return tables[0]['Symbol'].str.replace('.', '-').tolist()

tickers = load_tickers()
print(f"Loaded {len(tickers)} tickers")

# Collect financial data (using Yahoo Finance for simplicity)
# For production, use SEC EDGAR for more complete data
financial_data = yahoo_fin_collector.collect_all_financials(tickers[:100])  # Subset for demo

## 3. Compute Accruals

### 3.1 Working Capital Accruals

In [None]:
# =============================================================================
# ACCRUALS CALCULATOR
# =============================================================================

class AccrualsCalculator:
    """Calculates various accruals measures from financial statements.
    
    Implements:
    1. Total Accruals (balance sheet approach)
    2. Working Capital Accruals (for Dechow-Dichev)
    3. Cash Flow-based Accruals
    """
    
    def __init__(self):
        pass
    
    def prepare_panel_data(self, raw_data: pd.DataFrame) -> pd.DataFrame:
        """Convert raw financial data to panel format.
        
        Args:
            raw_data: Long-format financial data
            
        Returns:
            Wide-format panel with one row per ticker-period
        """
        print("Preparing panel data...")
        
        # Pivot to wide format
        # First, standardize variable names
        var_mapping = {
            'Total Assets': 'total_assets',
            'Total Current Assets': 'current_assets',
            'Cash And Cash Equivalents': 'cash',
            'Cash Cash Equivalents And Short Term Investments': 'cash',
            'Accounts Receivable': 'receivables',
            'Net Receivables': 'receivables',
            'Inventory': 'inventory',
            'Total Current Liabilities': 'current_liabilities',
            'Accounts Payable': 'accounts_payable',
            'Short Long Term Debt': 'short_term_debt',
            'Current Debt': 'short_term_debt',
            'Property Plant Equipment Net': 'ppe_net',
            'Net PPE': 'ppe_net',
            'Total Revenue': 'revenue',
            'Net Income': 'net_income',
            'Depreciation And Amortization': 'depreciation',
            'Depreciation': 'depreciation',
            'Operating Cash Flow': 'operating_cf',
            'Cash Flow From Continuing Operating Activities': 'operating_cf'
        }
        
        df = raw_data.copy()
        df['var_std'] = df['variable'].map(var_mapping)
        df = df[df['var_std'].notna()]
        
        # Pivot
        panel = df.pivot_table(
            index=['ticker', 'period_end'],
            columns='var_std',
            values='value',
            aggfunc='first'
        ).reset_index()
        
        panel['period_end'] = pd.to_datetime(panel['period_end'])
        panel = panel.sort_values(['ticker', 'period_end'])
        
        print(f"Panel created: {len(panel)} observations, {panel['ticker'].nunique()} firms")
        return panel
    
    def calculate_working_capital_accruals(self, panel: pd.DataFrame) -> pd.DataFrame:
        """Calculate working capital accruals.
        
        WC_Accruals = ΔCA - ΔCash - ΔCL + ΔDebt
        
        where:
        - ΔCA = change in current assets
        - ΔCash = change in cash
        - ΔCL = change in current liabilities
        - ΔDebt = change in short-term debt
        """
        print("Calculating working capital accruals...")
        df = panel.sort_values(['ticker', 'period_end']).copy()
        
        # Lagged values
        for col in ['current_assets', 'cash', 'current_liabilities', 'short_term_debt', 'total_assets']:
            if col in df.columns:
                df[f'{col}_lag'] = df.groupby('ticker')[col].shift(1)
        
        # Changes
        df['delta_ca'] = df['current_assets'] - df.get('current_assets_lag', 0)
        df['delta_cash'] = df['cash'] - df.get('cash_lag', 0)
        df['delta_cl'] = df['current_liabilities'] - df.get('current_liabilities_lag', 0)
        df['delta_debt'] = df.get('short_term_debt', 0) - df.get('short_term_debt_lag', 0)
        
        # Working capital accruals
        df['wc_accruals'] = (
            df['delta_ca'] - 
            df['delta_cash'] - 
            df['delta_cl'] + 
            df['delta_debt'].fillna(0)
        )
        
        # Scale by average total assets
        df['avg_assets'] = (df['total_assets'] + df.get('total_assets_lag', df['total_assets'])) / 2
        df['wc_accruals_scaled'] = df['wc_accruals'] / df['avg_assets']
        
        return df
    
    def calculate_total_accruals(self, panel: pd.DataFrame) -> pd.DataFrame:
        """Calculate total accruals using balance sheet approach.
        
        Total_Accruals = (ΔCA - ΔCash) - (ΔCL - ΔDebt - ΔTaxes) - Depreciation
        
        Or using cash flow approach:
        Total_Accruals = Net Income - Operating Cash Flow
        """
        print("Calculating total accruals...")
        df = panel.copy()
        
        # Cash flow approach (more reliable)
        if 'net_income' in df.columns and 'operating_cf' in df.columns:
            df['total_accruals'] = df['net_income'] - df['operating_cf']
            df['total_accruals_scaled'] = df['total_accruals'] / df['avg_assets']
        
        return df

# Initialize calculator
accruals_calc = AccrualsCalculator()

In [None]:
# =============================================================================
# PREPARE DATA AND CALCULATE ACCRUALS
# =============================================================================

# Prepare panel data
if len(financial_data) > 0:
    panel_data = accruals_calc.prepare_panel_data(financial_data)
    
    # Calculate accruals
    panel_data = accruals_calc.calculate_working_capital_accruals(panel_data)
    panel_data = accruals_calc.calculate_total_accruals(panel_data)
    
    print("\nAccruals Summary Statistics:")
    if 'wc_accruals_scaled' in panel_data.columns:
        print(panel_data['wc_accruals_scaled'].describe())
else:
    print("No financial data available - creating synthetic data for demonstration")
    # Create synthetic panel for demonstration
    np.random.seed(42)
    n_firms = 100
    n_periods = 20
    
    panel_data = pd.DataFrame({
        'ticker': np.repeat([f'TICK{i}' for i in range(n_firms)], n_periods),
        'period_end': np.tile(pd.date_range('2019-01-01', periods=n_periods, freq='Q'), n_firms),
        'wc_accruals_scaled': np.random.normal(0, 0.05, n_firms * n_periods),
        'total_accruals_scaled': np.random.normal(-0.05, 0.08, n_firms * n_periods),
        'operating_cf': np.random.normal(0.1, 0.15, n_firms * n_periods) * 1e9,
        'avg_assets': np.random.uniform(1e9, 50e9, n_firms * n_periods),
        'revenue': np.random.uniform(0.5e9, 20e9, n_firms * n_periods),
        'ppe_net': np.random.uniform(0.2e9, 10e9, n_firms * n_periods)
    })
    
    # Add lagged/lead cash flows for DD model
    panel_data = panel_data.sort_values(['ticker', 'period_end'])
    panel_data['cfo_scaled'] = panel_data['operating_cf'] / panel_data['avg_assets']
    panel_data['cfo_lag'] = panel_data.groupby('ticker')['cfo_scaled'].shift(1)
    panel_data['cfo_lead'] = panel_data.groupby('ticker')['cfo_scaled'].shift(-1)
    
    # Add industry
    industries = ['Technology', 'Healthcare', 'Finance', 'Consumer', 'Industrial']
    panel_data['industry'] = np.tile(
        np.repeat(industries, n_firms // len(industries) + 1)[:n_firms],
        n_periods
    )
    
    print(f"Created synthetic panel: {len(panel_data)} observations")

## 4. Dechow-Dichev Earnings Quality Model

### 4.1 Model Specification

The Dechow-Dichev (2002) model measures earnings quality as the mapping of accruals into cash flows:

$$\Delta WC_t = \beta_0 + \beta_1 CFO_{t-1} + \beta_2 CFO_t + \beta_3 CFO_{t+1} + \epsilon_t$$

Earnings quality = σ(residuals) over rolling window

In [None]:
# =============================================================================
# DECHOW-DICHEV EARNINGS QUALITY MODEL
# =============================================================================

class DechowDichevModel:
    """Implements the Dechow-Dichev (2002) accruals quality model.
    
    Model: WC_Accruals_t = a + b1*CFO_{t-1} + b2*CFO_t + b3*CFO_{t+1} + e_t
    
    Earnings Quality = σ(residuals) over rolling window
    Higher residual std = Lower earnings quality
    """
    
    def __init__(self, config: EarningsQualityConfig):
        self.config = config
        self.model_results = {}
        
    def prepare_variables(self, panel: pd.DataFrame) -> pd.DataFrame:
        """Prepare variables for DD model.
        
        Args:
            panel: Panel data with accruals and cash flows
            
        Returns:
            DataFrame with DD model variables
        """
        df = panel.sort_values(['ticker', 'period_end']).copy()
        
        # Ensure CFO variables exist
        if 'cfo_scaled' not in df.columns:
            df['cfo_scaled'] = df['operating_cf'] / df['avg_assets']
        
        # Create lags and leads
        if 'cfo_lag' not in df.columns:
            df['cfo_lag'] = df.groupby('ticker')['cfo_scaled'].shift(1)
        if 'cfo_lead' not in df.columns:
            df['cfo_lead'] = df.groupby('ticker')['cfo_scaled'].shift(-1)
        
        # Winsorize
        for col in ['wc_accruals_scaled', 'cfo_scaled', 'cfo_lag', 'cfo_lead']:
            if col in df.columns:
                df[col] = self._winsorize(df[col], self.config.WINSORIZE_LEVEL)
        
        return df
    
    def _winsorize(self, series: pd.Series, level: float) -> pd.Series:
        """Winsorize series at specified level."""
        lower = series.quantile(level)
        upper = series.quantile(1 - level)
        return series.clip(lower=lower, upper=upper)
    
    def estimate_firm_level(self, panel: pd.DataFrame) -> pd.DataFrame:
        """Estimate DD model at firm level using time series.
        
        For each firm, run rolling regressions and compute residual std.
        
        Args:
            panel: Prepared panel data
            
        Returns:
            DataFrame with firm-level earnings quality
        """
        print("Estimating firm-level Dechow-Dichev model...")
        df = self.prepare_variables(panel)
        
        results = []
        
        for ticker in tqdm(df['ticker'].unique(), desc="Estimating DD model"):
            firm_data = df[df['ticker'] == ticker].sort_values('period_end')
            
            # Need sufficient observations
            valid_data = firm_data.dropna(
                subset=['wc_accruals_scaled', 'cfo_scaled', 'cfo_lag', 'cfo_lead']
            )
            
            if len(valid_data) < self.config.DD_MIN_OBSERVATIONS * 4:
                continue
            
            # Prepare regression variables
            y = valid_data['wc_accruals_scaled']
            X = sm.add_constant(valid_data[['cfo_lag', 'cfo_scaled', 'cfo_lead']])
            
            try:
                # Estimate model
                model = sm.OLS(y, X).fit()
                
                # Get residuals
                residuals = model.resid
                
                # Rolling std of residuals (earnings quality measure)
                window = self.config.DD_ROLLING_WINDOW * 4  # Quarterly
                if len(residuals) >= window:
                    rolling_std = residuals.rolling(window=window, min_periods=window//2).std()
                else:
                    rolling_std = pd.Series([residuals.std()] * len(residuals), index=residuals.index)
                
                # Store results for each period
                for idx, (period, resid_std) in enumerate(zip(
                    valid_data['period_end'], rolling_std
                )):
                    results.append({
                        'ticker': ticker,
                        'period_end': period,
                        'dd_residual': residuals.iloc[idx] if idx < len(residuals) else np.nan,
                        'dd_residual_std': resid_std,
                        'dd_r_squared': model.rsquared,
                        'dd_n_obs': len(valid_data)
                    })
                    
            except Exception as e:
                continue
        
        result_df = pd.DataFrame(results)
        
        # Higher residual std = lower quality, so invert for intuitive interpretation
        result_df['earnings_quality_dd'] = -result_df['dd_residual_std']
        
        # Standardize
        mean_eq = result_df['earnings_quality_dd'].mean()
        std_eq = result_df['earnings_quality_dd'].std()
        result_df['earnings_quality_dd_std'] = (
            (result_df['earnings_quality_dd'] - mean_eq) / std_eq
        )
        
        print(f"\nDD Model Results:")
        print(f"  Firms: {result_df['ticker'].nunique()}")
        print(f"  Observations: {len(result_df)}")
        print(f"  Avg R-squared: {result_df['dd_r_squared'].mean():.3f}")
        
        return result_df
    
    def estimate_cross_sectional(self, panel: pd.DataFrame,
                                 group_col: str = 'industry') -> pd.DataFrame:
        """Estimate DD model cross-sectionally by industry-period.
        
        Alternative approach that estimates within industry-quarter groups.
        
        Args:
            panel: Prepared panel data
            group_col: Column for grouping (e.g., 'industry')
            
        Returns:
            DataFrame with earnings quality based on cross-sectional residuals
        """
        print(f"Estimating cross-sectional DD model by {group_col}...")
        df = self.prepare_variables(panel)
        
        # Add period identifier
        df['year_quarter'] = df['period_end'].dt.to_period('Q')
        
        results = []
        
        # Group by industry-quarter
        for (group, period), group_data in tqdm(
            df.groupby([group_col, 'year_quarter']),
            desc="Processing groups"
        ):
            valid_data = group_data.dropna(
                subset=['wc_accruals_scaled', 'cfo_scaled', 'cfo_lag', 'cfo_lead']
            )
            
            if len(valid_data) < self.config.INDUSTRY_MIN_FIRMS:
                continue
            
            y = valid_data['wc_accruals_scaled']
            X = sm.add_constant(valid_data[['cfo_lag', 'cfo_scaled', 'cfo_lead']])
            
            try:
                model = sm.OLS(y, X).fit()
                
                for ticker, resid in zip(valid_data['ticker'], model.resid):
                    results.append({
                        'ticker': ticker,
                        'period_end': valid_data[valid_data['ticker'] == ticker]['period_end'].iloc[0],
                        group_col: group,
                        'year_quarter': period,
                        'dd_residual_cs': resid,
                        'dd_r_squared_cs': model.rsquared
                    })
            except:
                continue
        
        result_df = pd.DataFrame(results)
        
        # Calculate firm-level quality as std of residuals over time
        firm_quality = result_df.groupby('ticker').agg({
            'dd_residual_cs': 'std'
        }).reset_index()
        firm_quality.columns = ['ticker', 'eq_dd_cs_std']
        
        result_df = result_df.merge(firm_quality, on='ticker', how='left')
        result_df['earnings_quality_dd_cs'] = -result_df['eq_dd_cs_std']
        
        return result_df

# Initialize DD model
dd_model = DechowDichevModel(config)

In [None]:
# =============================================================================
# ESTIMATE DECHOW-DICHEV MODEL
# =============================================================================

# Estimate firm-level DD model
dd_results = dd_model.estimate_firm_level(panel_data)

print("\nEarnings Quality (DD) Distribution:")
print(dd_results['earnings_quality_dd_std'].describe())

## 5. McNichols Modification

### 5.1 Extended Model with Growth and PPE

In [None]:
# =============================================================================
# McNICHOLS EXTENDED MODEL
# =============================================================================

class McNicholsModel:
    """Implements the McNichols (2002) modification of DD model.
    
    Extends DD model with:
    - Change in revenue (growth)
    - PPE (property, plant, equipment)
    
    Model: WC_Accruals_t = a + b1*CFO_{t-1} + b2*CFO_t + b3*CFO_{t+1} 
                          + b4*ΔRev_t + b5*PPE_t + e_t
    """
    
    def __init__(self, config: EarningsQualityConfig):
        self.config = config
        
    def prepare_variables(self, panel: pd.DataFrame) -> pd.DataFrame:
        """Prepare additional variables for McNichols model."""
        df = panel.sort_values(['ticker', 'period_end']).copy()
        
        # Revenue change
        df['revenue_lag'] = df.groupby('ticker')['revenue'].shift(1)
        df['delta_revenue'] = df['revenue'] - df['revenue_lag']
        df['delta_revenue_scaled'] = df['delta_revenue'] / df['avg_assets']
        
        # PPE scaled
        df['ppe_scaled'] = df['ppe_net'] / df['avg_assets']
        
        # Winsorize
        for col in ['delta_revenue_scaled', 'ppe_scaled']:
            if col in df.columns:
                lower = df[col].quantile(self.config.WINSORIZE_LEVEL)
                upper = df[col].quantile(1 - self.config.WINSORIZE_LEVEL)
                df[col] = df[col].clip(lower=lower, upper=upper)
        
        return df
    
    def estimate(self, panel: pd.DataFrame) -> pd.DataFrame:
        """Estimate McNichols model.
        
        Args:
            panel: Panel data with accruals and financial variables
            
        Returns:
            DataFrame with McNichols earnings quality
        """
        print("Estimating McNichols model...")
        df = self.prepare_variables(panel)
        
        results = []
        
        for ticker in tqdm(df['ticker'].unique(), desc="Estimating McNichols"):
            firm_data = df[df['ticker'] == ticker].sort_values('period_end')
            
            valid_data = firm_data.dropna(subset=[
                'wc_accruals_scaled', 'cfo_scaled', 'cfo_lag', 'cfo_lead',
                'delta_revenue_scaled', 'ppe_scaled'
            ])
            
            if len(valid_data) < self.config.DD_MIN_OBSERVATIONS * 4:
                continue
            
            y = valid_data['wc_accruals_scaled']
            X = sm.add_constant(valid_data[[
                'cfo_lag', 'cfo_scaled', 'cfo_lead',
                'delta_revenue_scaled', 'ppe_scaled'
            ]])
            
            try:
                model = sm.OLS(y, X).fit()
                residuals = model.resid
                
                # Rolling std
                window = self.config.DD_ROLLING_WINDOW * 4
                if len(residuals) >= window:
                    rolling_std = residuals.rolling(window=window, min_periods=window//2).std()
                else:
                    rolling_std = pd.Series([residuals.std()] * len(residuals), index=residuals.index)
                
                for idx, (period, resid_std) in enumerate(zip(
                    valid_data['period_end'], rolling_std
                )):
                    results.append({
                        'ticker': ticker,
                        'period_end': period,
                        'mcnichols_residual': residuals.iloc[idx] if idx < len(residuals) else np.nan,
                        'mcnichols_residual_std': resid_std,
                        'mcnichols_r_squared': model.rsquared
                    })
            except:
                continue
        
        result_df = pd.DataFrame(results)
        result_df['earnings_quality_mcn'] = -result_df['mcnichols_residual_std']
        
        # Standardize
        result_df['earnings_quality_mcn_std'] = (
            (result_df['earnings_quality_mcn'] - result_df['earnings_quality_mcn'].mean()) /
            result_df['earnings_quality_mcn'].std()
        )
        
        print(f"McNichols Model Complete:")
        print(f"  Firms: {result_df['ticker'].nunique()}")
        print(f"  Avg R-squared: {result_df['mcnichols_r_squared'].mean():.3f}")
        
        return result_df

# Initialize and estimate
mcn_model = McNicholsModel(config)
mcn_results = mcn_model.estimate(panel_data)

## 6. Modified Jones Model

### 6.1 Discretionary Accruals

In [None]:
# =============================================================================
# MODIFIED JONES MODEL FOR DISCRETIONARY ACCRUALS
# =============================================================================

class ModifiedJonesModel:
    """Implements the Modified Jones (1991) model.
    
    Model: TA_t/A_{t-1} = a*(1/A_{t-1}) + b*(ΔRev_t - ΔRec_t)/A_{t-1} + c*(PPE_t/A_{t-1}) + e_t
    
    Discretionary Accruals = Residuals
    """
    
    def __init__(self, config: EarningsQualityConfig):
        self.config = config
        
    def prepare_variables(self, panel: pd.DataFrame) -> pd.DataFrame:
        """Prepare variables for Modified Jones model."""
        df = panel.sort_values(['ticker', 'period_end']).copy()
        
        # Lagged assets
        df['assets_lag'] = df.groupby('ticker')['total_assets'].shift(1) if 'total_assets' in df.columns else df['avg_assets']
        
        # Inverse of lagged assets
        df['inv_assets'] = 1 / df['assets_lag']
        
        # Change in revenue minus change in receivables (scaled)
        if 'receivables' in df.columns:
            df['receivables_lag'] = df.groupby('ticker')['receivables'].shift(1)
            df['delta_rec'] = df['receivables'] - df['receivables_lag']
        else:
            df['delta_rec'] = 0
        
        df['delta_rev_adj'] = (df.get('delta_revenue', 0) - df['delta_rec']) / df['assets_lag']
        
        # PPE scaled
        df['ppe_jones'] = df['ppe_net'] / df['assets_lag'] if 'ppe_net' in df.columns else 0
        
        # Total accruals scaled
        if 'total_accruals' in df.columns:
            df['ta_scaled'] = df['total_accruals'] / df['assets_lag']
        else:
            df['ta_scaled'] = df['wc_accruals_scaled']
        
        return df
    
    def estimate_by_industry(self, panel: pd.DataFrame,
                            industry_col: str = 'industry') -> pd.DataFrame:
        """Estimate Modified Jones model by industry-year.
        
        Args:
            panel: Prepared panel data
            industry_col: Column with industry classification
            
        Returns:
            DataFrame with discretionary accruals
        """
        print("Estimating Modified Jones model by industry...")
        df = self.prepare_variables(panel)
        df['year'] = df['period_end'].dt.year
        
        results = []
        
        for (industry, year), group_data in tqdm(
            df.groupby([industry_col, 'year']),
            desc="Processing industry-years"
        ):
            valid_data = group_data.dropna(subset=['ta_scaled', 'inv_assets', 'delta_rev_adj', 'ppe_jones'])
            
            if len(valid_data) < self.config.INDUSTRY_MIN_FIRMS:
                continue
            
            y = valid_data['ta_scaled']
            X = valid_data[['inv_assets', 'delta_rev_adj', 'ppe_jones']]
            
            try:
                model = sm.OLS(y, X).fit()
                
                for ticker, resid, fitted in zip(
                    valid_data['ticker'], model.resid, model.fittedvalues
                ):
                    results.append({
                        'ticker': ticker,
                        'period_end': valid_data[valid_data['ticker'] == ticker]['period_end'].iloc[0],
                        industry_col: industry,
                        'year': year,
                        'discretionary_accruals': resid,
                        'nondiscretionary_accruals': fitted,
                        'jones_r_squared': model.rsquared
                    })
            except:
                continue
        
        result_df = pd.DataFrame(results)
        
        # Absolute discretionary accruals (common measure)
        result_df['abs_discretionary_accruals'] = np.abs(result_df['discretionary_accruals'])
        
        # Quality measure (lower abs DA = higher quality)
        result_df['earnings_quality_jones'] = -result_df['abs_discretionary_accruals']
        
        print(f"Modified Jones Model Complete:")
        print(f"  Firms: {result_df['ticker'].nunique()}")
        
        return result_df

# Initialize and estimate
jones_model = ModifiedJonesModel(config)

# Only run if industry data available
if 'industry' in panel_data.columns:
    jones_results = jones_model.estimate_by_industry(panel_data)
else:
    print("Industry column not available - skipping Jones model")
    jones_results = pd.DataFrame()

## 7. Combine Earnings Quality Measures

In [None]:
# =============================================================================
# COMBINE ALL EARNINGS QUALITY MEASURES
# =============================================================================

def combine_earnings_quality(
    dd_results: pd.DataFrame,
    mcn_results: pd.DataFrame,
    jones_results: pd.DataFrame
) -> pd.DataFrame:
    """Combine all earnings quality measures into one panel.
    
    Args:
        dd_results: Dechow-Dichev results
        mcn_results: McNichols results
        jones_results: Modified Jones results
        
    Returns:
        Combined earnings quality panel
    """
    print("Combining earnings quality measures...")
    
    # Start with DD results
    combined = dd_results[['ticker', 'period_end', 
                          'dd_residual_std', 'earnings_quality_dd', 
                          'earnings_quality_dd_std', 'dd_r_squared']].copy()
    
    # Merge McNichols
    if len(mcn_results) > 0:
        combined = combined.merge(
            mcn_results[['ticker', 'period_end', 
                        'mcnichols_residual_std', 'earnings_quality_mcn', 
                        'earnings_quality_mcn_std', 'mcnichols_r_squared']],
            on=['ticker', 'period_end'],
            how='left'
        )
    
    # Merge Jones
    if len(jones_results) > 0:
        combined = combined.merge(
            jones_results[['ticker', 'period_end',
                          'discretionary_accruals', 'abs_discretionary_accruals',
                          'earnings_quality_jones']],
            on=['ticker', 'period_end'],
            how='left'
        )
    
    # Create composite measure (average of standardized measures)
    eq_cols = [col for col in combined.columns if col.startswith('earnings_quality_') and '_std' in col]
    if eq_cols:
        combined['earnings_quality_composite'] = combined[eq_cols].mean(axis=1)
    else:
        combined['earnings_quality_composite'] = combined['earnings_quality_dd_std']
    
    # Add year/quarter
    combined['year'] = combined['period_end'].dt.year
    combined['quarter'] = combined['period_end'].dt.quarter
    
    print(f"\nCombined Panel:")
    print(f"  Observations: {len(combined):,}")
    print(f"  Firms: {combined['ticker'].nunique()}")
    print(f"  Measures: {eq_cols}")
    
    return combined

# Combine results
earnings_quality_panel = combine_earnings_quality(
    dd_results,
    mcn_results,
    jones_results if len(jones_results) > 0 else pd.DataFrame()
)

In [None]:
# =============================================================================
# EARNINGS QUALITY SUMMARY STATISTICS
# =============================================================================

def print_eq_summary(eq_panel: pd.DataFrame):
    """Print summary statistics for earnings quality measures."""
    
    print("\n" + "="*70)
    print("EARNINGS QUALITY SUMMARY STATISTICS")
    print("="*70)
    
    eq_cols = [col for col in eq_panel.columns if 'earnings_quality' in col]
    
    for col in eq_cols:
        print(f"\n{col}:")
        print(eq_panel[col].describe().to_string())
    
    # Correlations
    if len(eq_cols) > 1:
        print("\nCorrelations between EQ measures:")
        print(eq_panel[eq_cols].corr().to_string())
    
    print("\n" + "="*70)

print_eq_summary(earnings_quality_panel)

## 8. Save Outputs

In [None]:
# =============================================================================
# SAVE EARNINGS QUALITY DATA
# =============================================================================

def save_earnings_quality(eq_panel: pd.DataFrame, output_dir: str):
    """Save earnings quality data with documentation."""
    
    os.makedirs(output_dir, exist_ok=True)
    
    # Main dataset
    filepath = os.path.join(output_dir, 'earnings_quality_panel.parquet')
    eq_panel.to_parquet(filepath, index=False)
    print(f"Saved: {filepath}")
    
    # CSV sample
    csv_path = os.path.join(output_dir, 'earnings_quality_sample.csv')
    eq_panel.head(5000).to_csv(csv_path, index=False)
    print(f"Saved: {csv_path}")
    
    # Data dictionary
    data_dict = {
        'ticker': 'Stock ticker symbol',
        'period_end': 'Fiscal period end date',
        'year': 'Fiscal year',
        'quarter': 'Fiscal quarter',
        'dd_residual_std': 'Dechow-Dichev residual standard deviation (raw)',
        'earnings_quality_dd': 'DD earnings quality (inverted: higher = better)',
        'earnings_quality_dd_std': 'DD earnings quality (standardized)',
        'dd_r_squared': 'R-squared from DD model',
        'mcnichols_residual_std': 'McNichols residual standard deviation',
        'earnings_quality_mcn': 'McNichols earnings quality',
        'earnings_quality_mcn_std': 'McNichols earnings quality (standardized)',
        'discretionary_accruals': 'Modified Jones discretionary accruals',
        'abs_discretionary_accruals': 'Absolute discretionary accruals',
        'earnings_quality_jones': 'Jones model earnings quality',
        'earnings_quality_composite': 'Composite EQ (average of standardized measures)'
    }
    
    dict_path = os.path.join(output_dir, 'earnings_quality_dictionary.json')
    with open(dict_path, 'w') as f:
        json.dump(data_dict, f, indent=2)
    print(f"Saved: {dict_path}")
    
    # Summary
    summary = {
        'total_observations': len(eq_panel),
        'unique_firms': int(eq_panel['ticker'].nunique()),
        'date_range': [str(eq_panel['period_end'].min()), str(eq_panel['period_end'].max())],
        'measures': list(data_dict.keys()),
        'created_at': datetime.now().isoformat()
    }
    
    summary_path = os.path.join(output_dir, 'earnings_quality_summary.json')
    with open(summary_path, 'w') as f:
        json.dump(summary, f, indent=2)
    print(f"Saved: {summary_path}")

# Save outputs
save_earnings_quality(earnings_quality_panel, config.PROCESSED_DATA_PATH)

## 9. Summary

In [None]:
# =============================================================================
# NOTEBOOK SUMMARY
# =============================================================================

print("""
╔══════════════════════════════════════════════════════════════════╗
║       NOTEBOOK 4: EARNINGS QUALITY MEASURES COMPLETE             ║
╚══════════════════════════════════════════════════════════════════╝

OUTPUT FILES:
─────────────
• earnings_quality_panel.parquet   - Firm-period earnings quality data
• earnings_quality_dictionary.json - Variable definitions
• earnings_quality_summary.json    - Summary statistics

EARNINGS QUALITY MEASURES:
─────────────────────────
1. Dechow-Dichev (2002):
   • Mapping of accruals to cash flows
   • σ(residuals) from WC_Accruals ~ CFO_{t-1}, CFO_t, CFO_{t+1}

2. McNichols (2002) Modification:
   • Adds revenue growth and PPE
   • Better controls for growth and capital intensity

3. Modified Jones (1991):
   • Discretionary accruals
   • Estimated by industry-year

4. Composite Measure:
   • Average of standardized measures
   • Higher values = Better earnings quality

INTERPRETATION:
───────────────
• All measures scaled so HIGHER = BETTER quality
• Standardized versions have mean=0, std=1
• Composite provides robust overall measure

NEXT STEPS:
───────────
→ Notebook 5: Data Merging & Final Dataset Construction
  - Merge social media, financial, and EQ data
  - Create event-level analysis dataset
  - Add control variables

""")