<a href="https://colab.research.google.com/github/henryonomakpo/The-Impact-of-ESG-Ratings-on-EV-Manufacturing-Industry/blob/main/EV_ESG_PANEL_DATA_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
!pip install yfinance
!pip install yesg
!pip install statsmodels
!pip install xlsxwriter
!pip install linearmodels



In [17]:
import yesg
import pandas as pd
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# List of tickers for which ESG scores are needed
tickers = ["TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN", "GWLLF", "LCID", "XPEV",
           "LI", "GM", "F", "005380.KS", "BMW.DE"]

# Initialize an empty DataFrame to store ESG data
esg_data = pd.DataFrame()

# Loop through each ticker and fetch historic ESG scores
for ticker in tickers:
    try:
        # Fetch all available historic ESG ratings for the ticker
        esg_scores = yesg.get_historic_esg(ticker)

        if not esg_scores.empty:
            # Add a column for the ticker symbol
            esg_scores['Ticker'] = ticker

            # Append to the main DataFrame
            esg_data = pd.concat([esg_data, esg_scores])
        else:
            print(f"No ESG data found for {ticker}")
    except Exception as e:
        print(f"Error fetching ESG data for {ticker}: {e}")

# Reset index for the final DataFrame
esg_data.reset_index(inplace=True)

# Save ESG data to a CSV file in Google Drive
file_path = '/content/drive/My Drive/historic_esg_scores.csv'
esg_data.to_csv(file_path, index=False)

print(f"ESG data saved to: {file_path}")


Mounted at /content/drive
ESG data saved to: /content/drive/My Drive/historic_esg_scores.csv


### 1) Fixed Effect 2-way

In [18]:
# Import necessary libraries
import pandas as pd
import numpy as np
import yfinance as yf
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
import statsmodels.api as sm
# from statsmodels.stats.outliers_influence import variance_inflation_factor # Not used
import warnings
import sys
import matplotlib.pyplot as plt
import matplotlib.style as style
import xlsxwriter
from linearmodels.panel import PanelOLS # Import panel models
import statsmodels.formula.api as smf
import re

# --- Settings and Configuration ---
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
from statsmodels.tools.sm_exceptions import ValueWarning
warnings.simplefilter('ignore', ValueWarning)
# from linearmodels.shared.exceptions import AbsorbingEffectWarning
# warnings.simplefilter('ignore', AbsorbingEffectWarning)


style.use("default")

# --- Define Tickers and ESG Risk Groups ---
# Add GWLLF to the main list
TICKERS_ALL_POTENTIAL = [
    "TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN", "LCID", "XPEV",
    "LI", "GM", "F", "005380.KS", "PSNY", "BMW.DE", "MBG.DE", "GWLLF" # Added GWLLF
]
TICKERS_LOW_RISK = ["PSNY", "MBG.DE"]
TICKERS_HIGH_RISK = ["RIVN", "GWLLF"] # Added GWLLF
# Medium risk tickers will be determined dynamically AFTER checking data availability

# Define Date Range
START_DATE_PRICES = "2019-10-30"
END_DATE_PRICES = "2024-12-31"
START_DATE_ANALYSIS = "2020-01-01" # Start analysis after lag allows first month
END_DATE_ANALYSIS = "2024-12-31"
FF_FACTORS_PATH = "/content/gd_Developed_5_Factors.csv" # MODIFY IF NEEDED

print("--- Script Started ---")
print(f"Potential Tickers: {TICKERS_ALL_POTENTIAL}")
print(f"Low ESG Risk Definition: {TICKERS_LOW_RISK}")
print(f"High ESG Risk Definition: {TICKERS_HIGH_RISK}")
print(f"Analysis Period: {START_DATE_ANALYSIS} to {END_DATE_ANALYSIS}")
print(f"Factor File: {FF_FACTORS_PATH}")

# --- Advanced Imputation Function (Defined Once) ---
def advanced_imputation(df_input):
    df = df_input.copy(); original_index = df.index
    all_missing_cols = df.columns[df.isnull().all()]
    df_imputable = df.copy()
    if len(all_missing_cols) > 0:
        print(f"  -> Imputation: Dropping {len(all_missing_cols)} all-NaN columns: {list(all_missing_cols)}")
        df_imputable = df_imputable.drop(columns=all_missing_cols)
    cols_to_impute = df_imputable.columns[df_imputable.isnull().any()]
    if df_imputable.isnull().sum().sum() == 0:
        print("  -> Imputation: 🟢 No missing values found. Skipping imputation.")
        imputed_data = df_imputable
    elif len(cols_to_impute) > 0:
        print(f"  -> Imputation: Imputing {len(cols_to_impute)} columns using IterativeImputer (BayesianRidge)...")
        n_features = min(10, df_imputable.shape[1] - 1) if df_imputable.shape[1] > 1 else 1
        imputer = IterativeImputer(estimator=BayesianRidge(), max_iter=30, random_state=42, tol=1e-4, n_nearest_features=n_features, verbose=0)
        try:
            with warnings.catch_warnings(): warnings.simplefilter("ignore")
            imputed_values = imputer.fit_transform(df_imputable[cols_to_impute])
            imputed_subset = pd.DataFrame(imputed_values, columns=cols_to_impute, index=original_index)
            imputed_data = df_imputable.copy(); imputed_data[cols_to_impute] = imputed_subset
            print(f"  -> Imputation: Completed for {len(cols_to_impute)} columns.")
        except Exception as e:
            print(f"  -> Imputation: 🔴 Error during imputation: {e}. Returning data with NaNs.")
            imputed_data = df_imputable
    else:
        print("  -> Imputation: 🟡 No columns required imputation.")
        imputed_data = df_imputable
    try: imputed_data = imputed_data.astype(float)
    except Exception as e: print(f"  -> Imputation: 🔴 Warning: Could not convert imputed data to float: {e}")
    return imputed_data

# --- Step 1: Download and Prepare ACTUAL Stock Returns ---
print("\n--- Downloading and Preparing Stock Returns ---")
tickers_available = [] # Initialize list of tickers with actual data
try:
    # Download using the potential list
    all_stock_data = yf.download(TICKERS_ALL_POTENTIAL, start=START_DATE_PRICES, end=END_DATE_PRICES, progress=False)
    if all_stock_data.empty: raise ValueError("No stock price data downloaded.")

    # --- Robustly Select Price Data ---
    if isinstance(all_stock_data.columns, pd.MultiIndex):
        if 'Adj Close' in all_stock_data.columns.levels[0]: price_data = all_stock_data['Adj Close']
        elif 'Close' in all_stock_data.columns.levels[0]: price_data = all_stock_data['Close']; print("  -> Warning: Using 'Close' price as 'Adj Close' not found.")
        else: raise ValueError("Neither 'Adj Close' nor 'Close' found.")
    else: raise ValueError("Expected MultiIndex columns from yfinance.") # Assume multiple tickers expected

    price_data = price_data.ffill().bfill()
    price_data = price_data.dropna(axis=1, how='all')
    if price_data.empty: raise ValueError("All stock price columns NaN after fill.")
    tickers_available = list(price_data.columns) # FINAL list of tickers with price data

    print(f"  -> Final stock price data has {len(tickers_available)} valid tickers.")
    # Re-define groups based on tickers that actually have data available AFTER download
    TICKERS_LOW_RISK_FINAL = [t for t in TICKERS_LOW_RISK if t in tickers_available]
    TICKERS_HIGH_RISK_FINAL = [t for t in TICKERS_HIGH_RISK if t in tickers_available]
    TICKERS_MEDIUM_RISK_FINAL = [t for t in tickers_available if t not in TICKERS_LOW_RISK_FINAL and t not in TICKERS_HIGH_RISK_FINAL]
    print(f"  -> Low Risk Group (Data Available): {TICKERS_LOW_RISK_FINAL}")
    print(f"  -> High Risk Group (Data Available): {TICKERS_HIGH_RISK_FINAL}") # Should now include GWLLF if download was successful
    print(f"  -> Medium Risk Group (Data Available): {TICKERS_MEDIUM_RISK_FINAL}")

    price_data.index = pd.to_datetime(price_data.index)
    monthly_prices = price_data.resample('ME').last()
    stock_monthly_returns = monthly_prices.pct_change().dropna(how='all', axis=0)
    if stock_monthly_returns.empty: raise ValueError("Monthly returns empty.")
    print("Actual stock monthly returns calculated.")
except Exception as e: print(f"🔴 FATAL ERROR downloading/processing stock returns: {e}"); sys.exit()


# --- Step 2: Load and Prepare Factors Data ---
# (Keep the same as before)
print("\n--- Loading and Preparing Factors Data ---")
ff_factors_monthly = pd.DataFrame()
try:
    ff_factors_monthly_raw = pd.read_csv(FF_FACTORS_PATH, index_col=0, header=0, dtype='object')
    ff_factors_monthly_raw.index = pd.to_datetime(ff_factors_monthly_raw.index, errors='coerce')
    ff_factors_monthly_raw = ff_factors_monthly_raw[pd.notna(ff_factors_monthly_raw.index)]
    if ff_factors_monthly_raw.empty: raise ValueError("All rows dropped factors date parse.")
    for col in ff_factors_monthly_raw.columns: ff_factors_monthly_raw[col] = pd.to_numeric(ff_factors_monthly_raw[col], errors='coerce')
    if pd.api.types.is_datetime64_any_dtype(ff_factors_monthly_raw.index): ff_factors_monthly_raw.index = ff_factors_monthly_raw.index + pd.offsets.MonthEnd(0)
    ff_factors_monthly_raw = ff_factors_monthly_raw[~ff_factors_monthly_raw.index.duplicated(keep='last')]
    ff_factors_monthly_raw = ff_factors_monthly_raw[(ff_factors_monthly_raw.index >= START_DATE_PRICES) & (ff_factors_monthly_raw.index <= END_DATE_PRICES)]
    if ff_factors_monthly_raw.empty: raise ValueError("No factor data remains after filtering for price period.")
    print("Factors data loaded, cleaned, date-filtered for lag.")
    if ff_factors_monthly_raw.isnull().sum().sum() > 0:
        print("\n--- Imputing Factors Data ---")
        ff_factors_monthly = advanced_imputation(ff_factors_monthly_raw)
    else: print("\n--- No missing values in factors data. ---"); ff_factors_monthly = ff_factors_monthly_raw.astype(float)
    factor_cols_to_convert = ["Mkt-RF", "SMB", "HML", "RMW", "CMA", "RF", "MOM"]
    for col in factor_cols_to_convert:
        if col in ff_factors_monthly.columns and pd.api.types.is_numeric_dtype(ff_factors_monthly[col]):
            max_abs_val = ff_factors_monthly[col].abs().max()
            if not pd.isna(max_abs_val) and max_abs_val > 1:
                 print(f"  -> Converting factor '{col}' assuming percentage points.")
                 ff_factors_monthly[col] = ff_factors_monthly[col] / 100.0
except FileNotFoundError: print(f"\n🔴 FATAL ERROR: Factors file not found: {FF_FACTORS_PATH}"); sys.exit()
except Exception as e: print(f"\n🔴 FATAL ERROR loading/processing Factors Data: {e}"); sys.exit()


# --- Step 3: Create Lagged Factor Variables ---
# (Keep the same as before - it iterates through tickers_available)
print("\n--- Creating Lagged Factor Variables ---")
factors_lagged = ff_factors_monthly.copy()
lagged_cols_required = []
esg_components = ["E", "S", "G"]
for ticker in tickers_available:
    for comp in esg_components:
        col = f"{ticker}_{comp}"
        lag_col = f"{ticker}_{comp}_lag"
        if col in factors_lagged.columns:
            factors_lagged[lag_col] = factors_lagged[col].shift(1)
            lagged_cols_required.append(lag_col)
if not lagged_cols_required: print("🔴 FATAL ERROR: No lagged ESG component columns created."); sys.exit()
factors_lagged = factors_lagged.dropna(subset=lagged_cols_required, how='any')
factors_lagged = factors_lagged[(factors_lagged.index >= START_DATE_ANALYSIS) & (factors_lagged.index <= END_DATE_ANALYSIS)]
if factors_lagged.empty: print(f"🔴 FATAL ERROR: Data empty after lagging/filtering."); sys.exit()
print(f"Lagged factors created & filtered. Data available from: {factors_lagged.index.min().date()}")

# --- Step 4: Prepare Data for Panel Regression ---
# (Keep the same as before - uses the dynamically determined FINAL group lists)
print("\n--- Preparing Data for Panel Analysis ---")
panel_data = pd.DataFrame()
try:
    common_index = stock_monthly_returns.index.intersection(factors_lagged.index)
    if len(common_index) == 0: raise ValueError("No overlapping dates after lagging.")
    aligned_returns = stock_monthly_returns.loc[common_index, tickers_available].copy().astype(float)
    aligned_factors = factors_lagged.loc[common_index].copy().astype(float)
    print(f"Aligned data for {len(common_index)} months.")
    if "RF" not in aligned_factors.columns: raise ValueError("Risk-Free rate ('RF') not found.")
    risk_free_rate = aligned_factors["RF"]
    excess_returns = aligned_returns.subtract(risk_free_rate, axis=0)
    excess_returns_long = excess_returns.stack().reset_index()
    excess_returns_long.columns = ['Date', 'Ticker', 'ExcessReturn']
    market_factors = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
    market_factors_present = [f for f in market_factors if f in aligned_factors.columns]
    factors_market_wide = aligned_factors[market_factors_present].reset_index()
    entity_specific_lagged_cols = [col for col in aligned_factors.columns if col.endswith(('_E_lag', '_S_lag', '_G_lag'))]
    if not entity_specific_lagged_cols: raise ValueError("No lagged E, S, G columns found after alignment.")
    factors_entity_specific_wide = aligned_factors[entity_specific_lagged_cols].reset_index()
    factors_entity_specific_long = factors_entity_specific_wide.melt(id_vars='Date', var_name='FactorName', value_name='FactorValue')
    factors_entity_specific_long[['Ticker', 'FactorType']] = factors_entity_specific_long['FactorName'].str.extract(r'(.+?)_(E_lag|S_lag|G_lag)$')
    factors_entity_specific_long = factors_entity_specific_long.dropna(subset=['Ticker'])
    factors_entity_specific_pivot = factors_entity_specific_long.pivot_table(index=['Date', 'Ticker'], columns='FactorType', values='FactorValue').reset_index()
    panel_data = pd.merge(excess_returns_long, factors_market_wide, on='Date', how='left')
    panel_data = pd.merge(panel_data, factors_entity_specific_pivot, on=['Date', 'Ticker'], how='left')
    group_map = {t: 'Medium' for t in TICKERS_MEDIUM_RISK_FINAL} # Use FINAL list
    group_map.update({t: 'Low' for t in TICKERS_LOW_RISK_FINAL})   # Use FINAL list
    group_map.update({t: 'High' for t in TICKERS_HIGH_RISK_FINAL}) # Use FINAL list
    panel_data['ESG_Group'] = panel_data['Ticker'].map(group_map)
    if panel_data['ESG_Group'].isnull().any(): print("🔴 Warning: Some tickers were not assigned an ESG group.")
    panel_data.dropna(subset=['ESG_Group'], inplace=True)
    panel_data['Date'] = pd.to_datetime(panel_data['Date'])
    panel_data = panel_data.set_index(['Ticker', 'Date']).sort_index()
    if 'Mkt-RF' in panel_data.columns: panel_data.rename(columns={'Mkt-RF': 'Mkt_RF'}, inplace=True)
    else: print("🔴 WARNING: 'Mkt-RF' column not found for renaming.")
    base_factors_renamed = ['Mkt_RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
    esg_lags = ['E_lag', 'S_lag', 'G_lag']
    essential_cols_check = ['ExcessReturn'] + [f for f in base_factors_renamed if f in panel_data.columns] + [f for f in esg_lags if f in panel_data.columns]
    initial_rows = len(panel_data)
    panel_data = panel_data.dropna(subset=essential_cols_check)
    if len(panel_data) < initial_rows: print(f"Dropped {initial_rows - len(panel_data)} rows due to NaNs in essential model columns.")
    if panel_data.empty: raise ValueError("Panel data empty after final NaN drop.")
    print("Panel data prepared successfully.")
except Exception as e: print(f"🔴 FATAL ERROR preparing panel data: {e}"); import traceback; traceback.print_exc(); sys.exit()


# --- Step 5: Run Panel Regressions for Each Group Separately ---
print("\n--- Running Panel Regressions Separately for Each ESG Risk Group ---")

# Define base formula parts
base_factors_present_final = [f for f in ['Mkt_RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM'] if f in panel_data.columns]
esg_components_present_final = [f for f in ['E_lag', 'S_lag', 'G_lag'] if f in panel_data.columns]

if not base_factors_present_final: print("🔴 FATAL ERROR: No base factors found for regression."); sys.exit()
if not esg_components_present_final: print("🔴 FATAL ERROR: No lagged ESG components found for regression."); sys.exit()

formula_group_run = f"ExcessReturn ~ 1 + {' + '.join(base_factors_present_final + esg_components_present_final)}"
print(f"\nBase Formula for Group Regressions:\n{formula_group_run}")

group_results = {}
all_model_summaries = {}

groups_to_run = panel_data['ESG_Group'].unique()

for group_name in groups_to_run:
    print(f"\n--- Fitting Models for Group: {group_name} ---")
    group_data = panel_data[panel_data['ESG_Group'] == group_name].copy()

    if group_data.empty:
        print(f"  -> No data available for group {group_name}. Skipping.")
        group_results[group_name] = {"Entity FE": "No Data", "Two-Way FE": "No Data"}
        continue

    entities_in_group = len(group_data.index.get_level_values('Ticker').unique())
    time_periods_in_group = len(group_data.index.get_level_values('Date').unique())
    num_regressors = len(base_factors_present_final) + len(esg_components_present_final)

    print(f"  -> Group contains {entities_in_group} entities and {time_periods_in_group} time periods.")
    if time_periods_in_group <= num_regressors :
         print(f"  -> Skipping Group {group_name}: Not enough time periods ({time_periods_in_group}) relative to regressors ({num_regressors}).")
         group_results[group_name] = {"Entity FE": "Skipped: T <= K", "Two-Way FE": "Skipped: T <= K"}
         continue

    group_results[group_name] = {}

    # --- Model: Entity Fixed Effects for the group ---
    # NOW HIGH RISK GROUP SHOULD HAVE N=2 (or more if GWLLF data exists)
    model_key_fe = f"{group_name}_Entity_FE"
    try:
        if entities_in_group < 1: # Should not happen if group_data is not empty, but safety check
             raise ValueError("No entities found in group data.")

        group_data_fe = group_data[['ExcessReturn'] + base_factors_present_final + esg_components_present_final].dropna()
        if group_data_fe.empty or len(group_data_fe) < num_regressors + 2:
             raise ValueError("Not enough valid observations for Entity FE after NaN drop.")
        if (group_data_fe[base_factors_present_final + esg_components_present_final].var(ddof=0) == 0).any():
             zero_var_cols = group_data_fe[base_factors_present_final + esg_components_present_final].columns[(group_data_fe[base_factors_present_final + esg_components_present_final].var(ddof=0) == 0)]
             print(f"  -> Warning: Zero variance in predictor(s) for {group_name} group FE: {list(zero_var_cols)}.")

        # Always try EntityEffects if N>=1, but clustering requires N>1
        cluster_entity_flag_fe = entities_in_group > 1
        model_fe = PanelOLS.from_formula(formula_group_run + " + EntityEffects", data=group_data_fe, drop_absorbed=True)
        fe_res = model_fe.fit(cov_type='clustered', cluster_entity=cluster_entity_flag_fe)
        print(f"\n--- {group_name} Group: Fixed Effects (Entity) ---")
        print(fe_res)
        group_results[group_name]['Entity FE'] = fe_res.summary # Store summary object
        all_model_summaries[model_key_fe] = fe_res.summary

    except Exception as e:
        print(f"🔴 ERROR fitting Entity FE for {group_name} group: {e}")
        group_results[group_name]['Entity FE'] = f"Error: {e}"
        all_model_summaries[model_key_fe] = f"Error: {e}"


    # --- Model: Two-Way Fixed Effects for the group ---
    model_key_twfe = f"{group_name}_TwoWay_FE"
    try:
        num_exog_twfe = len(esg_components_present_final)
        # TWFE requires N > k_non_absorbed AND T > k_non_absorbed AND N > 1
        if entities_in_group <= num_exog_twfe or time_periods_in_group <= num_exog_twfe or entities_in_group <= 1 :
             print(f"  -> Skipping Two-Way FE for {group_name}: Insufficient N({entities_in_group}) or T({time_periods_in_group}) relative to non-absorbed regressors({num_exog_twfe}) or N <= 1.")
             group_results[group_name]['Two-Way FE'] = "Skipped: Insufficient N/T or N=1"
        else:
            group_data_twfe = group_data[['ExcessReturn'] + base_factors_present_final + esg_components_present_final].dropna()
            if group_data_twfe.empty or len(group_data_twfe) < num_regressors + 2:
                 raise ValueError("Not enough valid observations for Two-Way FE.")
            if (group_data_twfe[esg_components_present_final].var(ddof=0) == 0).any():
                 zero_var_cols_twfe = group_data_twfe[esg_components_present_final].columns[(group_data_twfe[esg_components_present_final].var(ddof=0) == 0)]
                 print(f"  -> Warning: Zero variance in ESG components for {group_name} group TWFE: {list(zero_var_cols_twfe)}.")

            model_twfe = PanelOLS.from_formula(formula_group_run + " + EntityEffects + TimeEffects", data=group_data_twfe, drop_absorbed=True)
            with warnings.catch_warnings(): warnings.simplefilter("ignore")
            twfe_res = model_twfe.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)
            print(f"\n--- {group_name} Group: Fixed Effects (Two-Way) ---")
            print(twfe_res)
            group_results[group_name]['Two-Way FE'] = twfe_res.summary # Store summary object
            all_model_summaries[model_key_twfe] = twfe_res.summary

    except Exception as e:
        print(f"🔴 ERROR fitting Two-Way FE for {group_name} group: {e}")
        group_results[group_name]['Two-Way FE'] = f"Error: {e}"
        all_model_summaries[model_key_twfe] = f"Error: {e}"


# --- Step 6: Save Group Results ---
print("\n--- Saving Group Regression Results ---")
output_excel_path_groups = 'panel_regression_results_by_group_incl_GWLLF.xlsx' # New filename
try:
    with pd.ExcelWriter(output_excel_path_groups, engine='xlsxwriter') as writer:
        workbook = writer.book
        for model_name, summary_obj in all_model_summaries.items():
            sheet_name = model_name.replace(" ","_")[:31]
            worksheet = workbook.add_worksheet(sheet_name)
            if isinstance(summary_obj, str): worksheet.write_string(0, 0, summary_obj)
            elif hasattr(summary_obj, 'as_text'):
                  summary_text = summary_obj.as_text()
                  worksheet.set_column(0, 0, 120); worksheet.write_string(0, 0, summary_text)
            else: worksheet.write_string(0, 0, "Unexpected result object format.")
    print(f"Group regression summaries saved to '{output_excel_path_groups}'.")
except Exception as e:
    print(f"🔴 Error saving group results to Excel: {e}")

print("\n--- Script Finished ---")

--- Script Started ---
Potential Tickers: ['TSLA', '1211.HK', 'VOW3.DE', 'NIO', 'RIVN', 'LCID', 'XPEV', 'LI', 'GM', 'F', '005380.KS', 'PSNY', 'BMW.DE', 'MBG.DE', 'GWLLF']
Low ESG Risk Definition: ['PSNY', 'MBG.DE']
High ESG Risk Definition: ['RIVN', 'GWLLF']
Analysis Period: 2020-01-01 to 2024-12-31
Factor File: /content/gd_Developed_5_Factors.csv

--- Downloading and Preparing Stock Returns ---
  -> Final stock price data has 15 valid tickers.
  -> Low Risk Group (Data Available): ['PSNY', 'MBG.DE']
  -> High Risk Group (Data Available): ['RIVN', 'GWLLF']
  -> Medium Risk Group (Data Available): ['005380.KS', '1211.HK', 'BMW.DE', 'F', 'GM', 'LCID', 'LI', 'NIO', 'TSLA', 'VOW3.DE', 'XPEV']
Actual stock monthly returns calculated.

--- Loading and Preparing Factors Data ---
Factors data loaded, cleaned, date-filtered for lag.

--- Imputing Factors Data ---
  -> Imputation: Imputing 16 columns using IterativeImputer (BayesianRidge)...
  -> Imputation: Completed for 16 columns.
  -> Conver

Variables have been fully absorbed and have removed from the regression:

Mkt_RF, SMB, HML, RMW, CMA, MOM

  twfe_res = model_twfe.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)



--- Medium Group: Fixed Effects (Two-Way) ---
                          PanelOLS Estimation Summary                           
Dep. Variable:           ExcessReturn   R-squared:                        0.0027
Estimator:                   PanelOLS   R-squared (Between):              0.0911
No. Observations:                 540   R-squared (Within):              -0.0022
Date:                Tue, Apr 01 2025   R-squared (Overall):             -0.0019
Time:                        17:29:34   Log-likelihood                    179.09
Cov. Estimator:             Clustered                                           
                                        F-statistic:                      0.4189
Entities:                           9   P-value                           0.7395
Avg Obs:                       60.000   Distribution:                   F(3,469)
Min Obs:                       60.000                                           
Max Obs:                       60.000   F-statistic (robust): 

### New REGRESSION[link text](https:// [link text](https://))
### Explanation of Steps
#### Import Libraries: The necessary libraries for data manipulation, visualization, and statistical analysis are imported.
#### Load Data: The CSV file containing the Fama-French factors and ESG data is loaded.
#### Check for Missing Columns: The code checks if all expected columns are present in the DataFrame.
#### Multiple Imputation: Missing values are filled using multiple imputation with IterativeImputer.
#### Calculate Excess Returns: Excess returns for each EV automaker are calculated by subtracting the risk-free rate.
#### Calculate VIF: VIF is calculated to check for multicollinearity among the independent variables.
#### Run Regression: The regression model is fitted for each automaker, and the results are displayed.


####* This code will analyze the impact of ESG ratings and Fama-French factors on the excess returns of the specified EV automakers.

In [21]:
# Import necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
import yfinance as yf
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import LassoCV
from sklearn.model_selection import KFold

# Set the plotting style
style.use("default")
params = {
    "axes.labelsize": 8, "font.size": 8, "legend.fontsize": 8,
    "xtick.labelsize": 8, "ytick.labelsize": 8, "text.usetex": False,
    "font.family": "sans-serif", "axes.spines.top": False, "axes.spines.right": False,
    "grid.color": "grey", "axes.grid": True, "grid.alpha": 0.5, "grid.linestyle": ":",
}
plt.rcParams.update(params)

# List of EV tickers including MBG.DE for Mercedes-Benz
ev_tickers = [
    "TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN",
    "LCID", "XPEV", "LI", "GM", "F",
    "005380.KS", "PSNY", "BMW.DE", "MBG.DE"
]

# Load Fama-French factors with ESG data
ff_factors_monthly = pd.read_csv(
    "/content/gd_Developed_5_Factors.csv",
    index_col=0,
    header=0,
    parse_dates=True,
    dayfirst=True,
    date_format="%m/%d/%y"
)

# Filter for specified date range
ff_factors_monthly = ff_factors_monthly[(ff_factors_monthly.index >= '2019-12-01') & (ff_factors_monthly.index <= '2024-12-31')]

# Check for new columns in the dataset
expected_columns = [
    'Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'RF', 'MOM',
    'TSLA_ESG', 'TSLA_E', 'TSLA_S', 'TSLA_G',
    'NIO_ESG', 'NIO_E', 'NIO_S', 'NIO_G',
    'RIVN_ESG', 'RIVN_E', 'RIVN_S', 'RIVN_G',
    'LCID_ESG', 'LCID_E', 'LCID_S', 'LCID_G',
    'XPEV_ESG', 'XPEV_E', 'XPEV_S', 'XPEV_G',
    'LI_ESG', 'LI_E', 'LI_S', 'LI_G',
    'GM_ESG', 'GM_E', 'GM_S', 'GM_G',
    'F_ESG', 'F_E', 'F_S', 'F_G',
    '005380.KS_ESG', '005380.KS_E', '005380.KS_S', '005380.KS_G',
    'BMW.DE_ESG', 'BMW.DE_E', 'BMW.DE_S', 'BMW.DE_G',
    'MBG.DE_ESG', 'MBG.DE_E', 'MBG.DE_S', 'MBG.DE_G',
    'PSNY_ESG', 'PSNY_E', 'PSNY_S', 'PSNY_G'
]

# Ensure all expected columns are in the DataFrame
missing_columns = [col for col in expected_columns if col not in ff_factors_monthly.columns]
if missing_columns:
    print(f"Missing columns in dataset: {missing_columns}")

# Drop empty columns before imputation
ff_factors_monthly = ff_factors_monthly.dropna(axis=1, how='all')

# Handle missing data with multiple imputation using Bayesian Ridge
imputer = IterativeImputer(estimator=BayesianRidge(), max_iter=20, random_state=0)
ff_factors_imputed = pd.DataFrame(imputer.fit_transform(ff_factors_monthly), columns=ff_factors_monthly.columns, index=ff_factors_monthly.index)

# Calculate excess returns for each EV ticker
excess_returns = pd.DataFrame(index=ff_factors_imputed.index)

for ticker in ev_tickers:
    if ticker + "_ESG" in ff_factors_imputed.columns:
        excess_returns[ticker] = ff_factors_imputed[ticker + "_ESG"] - ff_factors_imputed["RF"]

# Combine excess returns with ESG data
final_data = excess_returns.join(ff_factors_imputed, how='outer')

# Step 1: Calculate VIF for multicollinearity
def calculate_vif(X):
    vif_data = pd.DataFrame()
    vif_data["feature"] = X.columns
    vif_data["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
    return vif_data

# Step 2: Run the Regression Model for each ticker with ESG variables
regression_results = {}

for ticker in ev_tickers:
    if ticker in excess_returns.columns:
        esg_column = f"{ticker}_ESG"

        if esg_column not in ff_factors_imputed.columns:
            print(f"ESG column for {ticker} not found. Skipping regression.")
            continue

        X = sm.add_constant(ff_factors_imputed[["Mkt-RF", "SMB", "HML", "RMW", "CMA", "MOM", esg_column]])

        y = excess_returns[ticker].dropna()
        if len(y) == 0:
            print(f"No data for regression on {ticker}.")
            continue

        X = X.loc[y.index].dropna()
        y = y.loc[X.index]

        if len(y) == 0 or X.empty:
            print(f"No valid data for regression on {ticker} after cleaning.")
            continue

        # Calculate VIF before running the regression
        vif_data = calculate_vif(X)
        print(f"VIF for {ticker}:\n{vif_data}\n")

        # Remove variables with high VIF (> 10) before running regression
        high_vif_columns = vif_data[vif_data['VIF'] > 10]['feature'].tolist()
        if high_vif_columns:
            print(f"Removing columns with high VIF for {ticker}: {high_vif_columns}")
            X = X.drop(columns=high_vif_columns)

        # Regularization: Lasso regression with cross-validation
        lasso = LassoCV(alphas=[0.01, 0.1, 1.0, 10.0], cv=5, random_state=0).fit(X, y)
        selected_features = X.columns[lasso.coef_ != 0]  # Keep only non-zero coefficients

        # Check if there are any selected features
        if len(selected_features) == 0:
            print(f"No features selected for regression on {ticker}.")
            continue

        # Run the regression model with selected features
        model = sm.OLS(y, X[selected_features]).fit()

        # Store results in the dictionary
        regression_results[ticker] = {
            "model_summary": model.summary(),
            "vif_data": vif_data
        }

        # Display the regression results
        print(f"Regression results for {ticker}:\n{model.summary()}\n")

# Save all regression results to an Excel file
with pd.ExcelWriter('regression_results.xlsx', engine='xlsxwriter') as writer:
    for ticker, results in regression_results.items():
        summary_df = pd.DataFrame(results["model_summary"].tables[1].data[1:], columns=results["model_summary"].tables[1].data[0])
        summary_df.to_excel(writer, sheet_name=f'{ticker}_summary', index=False)

        results["vif_data"].to_excel(writer, sheet_name=f'{ticker}_vif', index=False)

print("All regression results saved to 'regression_results.xlsx'.")

VIF for TSLA:
    feature       VIF
0     const  1.808893
1    Mkt-RF  1.532477
2       SMB  1.412534
3       HML  6.045289
4       RMW  2.076981
5       CMA  4.540515
6       MOM  1.565706
7  TSLA_ESG  1.156833

Regression results for TSLA:
                                 OLS Regression Results                                
Dep. Variable:                   TSLA   R-squared (uncentered):                   1.000
Model:                            OLS   Adj. R-squared (uncentered):              1.000
Method:                 Least Squares   F-statistic:                          2.272e+05
Date:                Mon, 31 Mar 2025   Prob (F-statistic):                   4.60e-109
Time:                        08:56:55   Log-Likelihood:                         -5.8978
No. Observations:                  61   AIC:                                      13.80
Df Residuals:                      60   BIC:                                      15.91
Df Model:                           1                 

### Store Imputed dataset in CSV

In [22]:
# Import necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
import yfinance as yf
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import LassoCV

# Set the plotting style
style.use("default")
params = {
    "axes.labelsize": 8, "font.size": 8, "legend.fontsize": 8,
    "xtick.labelsize": 8, "ytick.labelsize": 8, "text.usetex": False,
    "font.family": "sans-serif", "axes.spines.top": False, "axes.spines.right": False,
    "grid.color": "grey", "axes.grid": True, "grid.alpha": 0.5, "grid.linestyle": ":",
}
plt.rcParams.update(params)

# List of EV tickers including MBG.DE for Mercedes-Benz
ev_tickers = [
    "TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN",
    "LCID", "XPEV", "LI", "GM", "F",
    "005380.KS", "PSNY", "BMW.DE", "MBG.DE"
]

# Load Fama-French factors with ESG data
ff_factors_monthly = pd.read_csv(
    "/content/gd_Developed_5_Factors.csv",
    index_col=0,
    header=0,
    parse_dates=True,
    dayfirst=True,
    date_format="%m/%d/%y"
)

# Filter for specified date range
ff_factors_monthly = ff_factors_monthly[(ff_factors_monthly.index >= '2019-12-01') & (ff_factors_monthly.index <= '2024-12-31')]

# Check for new columns in the dataset
expected_columns = [
    'Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'RF', 'MOM',
    'TSLA_ESG', 'TSLA_E', 'TSLA_S', 'TSLA_G',
    'NIO_ESG', 'NIO_E', 'NIO_S', 'NIO_G',
    'RIVN_ESG', 'RIVN_E', 'RIVN_S', 'RIVN_G',
    'LCID_ESG', 'LCID_E', 'LCID_S', 'LCID_G',
    'XPEV_ESG', 'XPEV_E', 'XPEV_S', 'XPEV_G',
    'LI_ESG', 'LI_E', 'LI_S', 'LI_G',
    'GM_ESG', 'GM_E', 'GM_S', 'GM_G',
    'F_ESG', 'F_E', 'F_S', 'F_G',
    '005380.KS_ESG', '005380.KS_E', '005380.KS_S', '005380.KS_G',
    'BMW.DE_ESG', 'BMW.DE_E', 'BMW.DE_S', 'BMW.DE_G',
    'MBG.DE_ESG', 'MBG.DE_E', 'MBG.DE_S', 'MBG.DE_G',
    'PSNY_ESG', 'PSNY_E', 'PSNY_S', 'PSNY_G'
]

# Ensure all expected columns are in the DataFrame
missing_columns = [col for col in expected_columns if col not in ff_factors_monthly.columns]
if missing_columns:
    print(f"Missing columns in dataset: {missing_columns}")

# Drop empty columns before imputation
ff_factors_monthly = ff_factors_monthly.dropna(axis=1, how='all')

# Handle missing data with multiple imputation using Bayesian Ridge
imputer = IterativeImputer(estimator=BayesianRidge(), max_iter=20, random_state=0)
ff_factors_imputed = pd.DataFrame(imputer.fit_transform(ff_factors_monthly), columns=ff_factors_monthly.columns, index=ff_factors_monthly.index)

# Save the complete imputed dataset to CSV
ff_factors_imputed.to_csv('imputed_fama_french_factors.csv')

# Calculate excess returns for each EV ticker
excess_returns = pd.DataFrame(index=ff_factors_imputed.index)

for ticker in ev_tickers:
    if ticker + "_ESG" in ff_factors_imputed.columns:
        excess_returns[ticker] = ff_factors_imputed[ticker + "_ESG"] - ff_factors_imputed["RF"]

# Combine excess returns with ESG data
final_data = excess_returns.join(ff_factors_imputed, how='outer')

# Step 1: Calculate VIF for multicollinearity
def calculate_vif(X):
    vif_data = pd.DataFrame()
    vif_data["feature"] = X.columns
    vif_data["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
    return vif_data

# Step 2: Run the Regression Model for each ticker with ESG variables
regression_results = {}

for ticker in ev_tickers:
    if ticker in excess_returns.columns:
        esg_column = f"{ticker}_ESG"

        if esg_column not in ff_factors_imputed.columns:
            print(f"ESG column for {ticker} not found. Skipping regression.")
            continue

        X = sm.add_constant(ff_factors_imputed[["Mkt-RF", "SMB", "HML", "RMW", "CMA", "MOM", esg_column]])

        y = excess_returns[ticker].dropna()
        if len(y) == 0:
            print(f"No data for regression on {ticker}.")
            continue

        X = X.loc[y.index].dropna()
        y = y.loc[X.index]

        if len(y) == 0 or X.empty:
            print(f"No valid data for regression on {ticker} after cleaning.")
            continue

        # Calculate VIF before running the regression
        vif_data = calculate_vif(X)
        print(f"VIF for {ticker}:\n{vif_data}\n")

        # Remove variables with high VIF (> 10) before running regression
        high_vif_columns = vif_data[vif_data['VIF'] > 10]['feature'].tolist()
        if high_vif_columns:
            print(f"Removing columns with high VIF for {ticker}: {high_vif_columns}")
            X = X.drop(columns=high_vif_columns)

        # Regularization: Lasso regression with cross-validation
        lasso = LassoCV(alphas=[0.01, 0.1, 1.0, 10.0], cv=5, random_state=0).fit(X, y)
        selected_features = X.columns[lasso.coef_ != 0]  # Keep only non-zero coefficients

        # Check if there are any selected features
        if len(selected_features) == 0:
            print(f"No features selected for regression on {ticker}.")
            continue

        # Run the regression model with selected features
        model = sm.OLS(y, X[selected_features]).fit()

        # Store results in the dictionary
        regression_results[ticker] = {
            "model_summary": model.summary(),
            "vif_data": vif_data
        }

        # Display the regression results
        print(f"Regression results for {ticker}:\n{model.summary()}\n")

        # Save VIF results to CSV
        vif_data.to_csv(f'vif_results_{ticker}.csv', index=False)

# Save all regression results to an Excel file
with pd.ExcelWriter('regression_results.xlsx', engine='xlsxwriter') as writer:
    for ticker, results in regression_results.items():
        summary_df = pd.DataFrame(results["model_summary"].tables[1].data[1:], columns=results["model_summary"].tables[1].data[0])
        summary_df.to_excel(writer, sheet_name=f'{ticker}_summary', index=False)

print("All regression results saved to 'regression_results.xlsx'.")

VIF for TSLA:
    feature       VIF
0     const  1.808893
1    Mkt-RF  1.532477
2       SMB  1.412534
3       HML  6.045289
4       RMW  2.076981
5       CMA  4.540515
6       MOM  1.565706
7  TSLA_ESG  1.156833

Regression results for TSLA:
                                 OLS Regression Results                                
Dep. Variable:                   TSLA   R-squared (uncentered):                   1.000
Model:                            OLS   Adj. R-squared (uncentered):              1.000
Method:                 Least Squares   F-statistic:                          2.272e+05
Date:                Mon, 31 Mar 2025   Prob (F-statistic):                   4.60e-109
Time:                        08:57:53   Log-Likelihood:                         -5.8978
No. Observations:                  61   AIC:                                      13.80
Df Residuals:                      60   BIC:                                      15.91
Df Model:                           1                 

### *3. Perform Panel Regression Analysis and stored CSV complete file

In [46]:
# Import necessary libraries
import pandas as pd
import numpy as np
import yfinance as yf
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
import warnings
import sys
import matplotlib.pyplot as plt
import matplotlib.style as style
import xlsxwriter
from linearmodels.panel import PanelOLS # Import panel models
import statsmodels.formula.api as smf # For easier formula specification with interactions
import re # Import regex for formula parsing

# --- Settings and Configuration ---
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
# from statsmodels.tools.sm_exceptions import ValueWarning, ConvergenceWarning, EstimationWarning
# warnings.simplefilter('ignore', (ValueWarning, ConvergenceWarning, EstimationWarning))

style.use("default") # Reset style if needed

# --- Define Tickers and ESG Risk Groups ---
TICKERS_ALL = [
    "TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN",
    "LCID", "XPEV", "LI", "GM", "F",
    "005380.KS", "PSNY", "BMW.DE", "MBG.DE"
]
TICKERS_LOW_RISK = ["PSNY", "MBG.DE"]
TICKERS_HIGH_RISK = ["RIVN"]
# Medium risk tickers will be determined dynamically

# Define Date Range
START_DATE_PRICES = "2019-10-30" # Start earlier for return calculation + lag
END_DATE_PRICES = "2024-12-31"
START_DATE_ANALYSIS = "2020-01-01" # Start analysis after lag allows first month
END_DATE_ANALYSIS = "2024-12-31"
FF_FACTORS_PATH = "/content/gd_Developed_5_Factors.csv" # MODIFY IF NEEDED

print("--- Script Started ---")
print(f"All Tickers: {TICKERS_ALL}")
print(f"Low ESG Risk: {TICKERS_LOW_RISK}")
print(f"High ESG Risk: {TICKERS_HIGH_RISK}")
print(f"Analysis Period: {START_DATE_ANALYSIS} to {END_DATE_ANALYSIS}")
print(f"Factor File: {FF_FACTORS_PATH}")

# --- Advanced Imputation Function (Defined Once) ---
def advanced_imputation(df_input):
    df = df_input.copy(); original_index = df.index
    all_missing_cols = df.columns[df.isnull().all()]
    df_imputable = df.copy()
    if len(all_missing_cols) > 0:
        print(f"  -> Imputation: Dropping {len(all_missing_cols)} all-NaN columns: {list(all_missing_cols)}")
        df_imputable = df_imputable.drop(columns=all_missing_cols)
    cols_to_impute = df_imputable.columns[df_imputable.isnull().any()]
    if df_imputable.isnull().sum().sum() == 0:
        print("  -> Imputation: 🟢 No missing values found. Skipping imputation.")
        imputed_data = df_imputable
    elif len(cols_to_impute) > 0:
        print(f"  -> Imputation: Imputing {len(cols_to_impute)} columns using IterativeImputer (BayesianRidge)...")
        n_features = min(10, df_imputable.shape[1] - 1) if df_imputable.shape[1] > 1 else 1
        imputer = IterativeImputer(estimator=BayesianRidge(), max_iter=30, random_state=42, tol=1e-4, n_nearest_features=n_features, verbose=0)
        try:
            with warnings.catch_warnings(): warnings.simplefilter("ignore")
            imputed_values = imputer.fit_transform(df_imputable[cols_to_impute])
            imputed_subset = pd.DataFrame(imputed_values, columns=cols_to_impute, index=original_index)
            imputed_data = df_imputable.copy(); imputed_data[cols_to_impute] = imputed_subset
            print(f"  -> Imputation: Completed for {len(cols_to_impute)} columns.")
        except Exception as e:
            print(f"  -> Imputation: 🔴 Error during imputation: {e}. Returning data with NaNs.")
            imputed_data = df_imputable
    else:
        print("  -> Imputation: 🟡 No columns required imputation.")
        imputed_data = df_imputable
    try: imputed_data = imputed_data.astype(float)
    except Exception as e: print(f"  -> Imputation: 🔴 Warning: Could not convert imputed data to float: {e}")
    return imputed_data

# --- Step 1: Download and Prepare ACTUAL Stock Returns ---
print("\n--- Downloading and Preparing Stock Returns ---")
tickers_available = [] # Initialize list of tickers with data
try:
    all_stock_data = yf.download(TICKERS_ALL, start=START_DATE_PRICES, end=END_DATE_PRICES, progress=False)
    if all_stock_data.empty: raise ValueError("No stock price data downloaded.")
    if isinstance(all_stock_data.columns, pd.MultiIndex):
        if 'Adj Close' in all_stock_data.columns.levels[0]: price_data = all_stock_data['Adj Close']
        elif 'Close' in all_stock_data.columns.levels[0]: price_data = all_stock_data['Close']; print("  -> Warning: Using 'Close' price as 'Adj Close' not found.")
        else: raise ValueError("Neither 'Adj Close' nor 'Close' found.")
    elif isinstance(all_stock_data.columns, pd.Index):
        if 'Adj Close' in all_stock_data.columns: target_col = 'Adj Close'
        elif 'Close' in all_stock_data.columns: target_col = 'Close'; print("  -> Warning: Using 'Close' price.")
        else: raise ValueError("Neither 'Adj Close' nor 'Close' found.")
        ticker_name = TICKERS_ALL[0] if len(TICKERS_ALL)==1 else "SINGLE_TICKER"
        price_data = all_stock_data[[target_col]].rename(columns={target_col: ticker_name})
    else: raise ValueError("Unexpected column structure.")

    price_data = price_data.ffill().bfill()
    price_data = price_data.dropna(axis=1, how='all')
    if price_data.empty: raise ValueError("All stock price columns NaN after fill.")
    tickers_available = list(price_data.columns) # UPDATE available tickers
    print(f"  -> Final stock price data has {len(tickers_available)} valid tickers: {tickers_available}")
    price_data.index = pd.to_datetime(price_data.index)
    monthly_prices = price_data.resample('ME').last()
    stock_monthly_returns = monthly_prices.pct_change().dropna(how='all', axis=0)
    if stock_monthly_returns.empty: raise ValueError("Monthly returns empty.")
    print("Actual stock monthly returns calculated.")
except Exception as e: print(f"🔴 FATAL ERROR downloading/processing stock returns: {e}"); sys.exit()


# --- Step 2: Load and Prepare Factors Data ---
print("\n--- Loading and Preparing Factors Data ---")
ff_factors_monthly = pd.DataFrame() # Initialize
try:
    ff_factors_monthly_raw = pd.read_csv(FF_FACTORS_PATH, index_col=0, header=0, dtype='object')
    ff_factors_monthly_raw.index = pd.to_datetime(ff_factors_monthly_raw.index, errors='coerce')
    ff_factors_monthly_raw = ff_factors_monthly_raw[pd.notna(ff_factors_monthly_raw.index)]
    if ff_factors_monthly_raw.empty: raise ValueError("All rows dropped factors date parse.")
    for col in ff_factors_monthly_raw.columns: ff_factors_monthly_raw[col] = pd.to_numeric(ff_factors_monthly_raw[col], errors='coerce')
    if pd.api.types.is_datetime64_any_dtype(ff_factors_monthly_raw.index): ff_factors_monthly_raw.index = ff_factors_monthly_raw.index + pd.offsets.MonthEnd(0)
    ff_factors_monthly_raw = ff_factors_monthly_raw[~ff_factors_monthly_raw.index.duplicated(keep='last')]
    ff_factors_monthly_raw = ff_factors_monthly_raw[(ff_factors_monthly_raw.index >= START_DATE_ANALYSIS) & (ff_factors_monthly_raw.index <= END_DATE_ANALYSIS)]
    if ff_factors_monthly_raw.empty: raise ValueError("No factor data remains after filtering.")
    print("Factors data loaded, cleaned, filtered.")
    if ff_factors_monthly_raw.isnull().sum().sum() > 0:
        print("\n--- Imputing Factors Data ---")
        ff_factors_monthly = advanced_imputation(ff_factors_monthly_raw)
    else: print("\n--- No missing values in factors data. ---"); ff_factors_monthly = ff_factors_monthly_raw.astype(float)
    factor_cols_to_convert = ["Mkt-RF", "SMB", "HML", "RMW", "CMA", "RF", "MOM"]
    for col in factor_cols_to_convert:
        if col in ff_factors_monthly.columns and pd.api.types.is_numeric_dtype(ff_factors_monthly[col]):
            max_abs_val = ff_factors_monthly[col].abs().max()
            if not pd.isna(max_abs_val) and max_abs_val > 1: # Using 1 as threshold for % (e.g., 1.5 = 1.5%)
                 print(f"  -> Converting factor '{col}' assuming percentage points.")
                 ff_factors_monthly[col] = ff_factors_monthly[col] / 100.0
except FileNotFoundError: print(f"\n🔴 FATAL ERROR: Factors file not found: {FF_FACTORS_PATH}"); sys.exit()
except Exception as e: print(f"\n🔴 FATAL ERROR loading/processing Factors Data: {e}"); sys.exit()


# --- Step 3: Create Lagged Factor Variables ---
print("\n--- Creating Lagged Factor Variables ---")
factors_lagged = ff_factors_monthly.copy()
lagged_cols_required = []
for ticker in tickers_available: # Use only tickers we have data for
    for comp in ["ESG", "E", "S", "G"]:
        col = f"{ticker}_{comp}"
        lag_col = f"{ticker}_{comp}_lag"
        if col in factors_lagged.columns:
            factors_lagged[lag_col] = factors_lagged[col].shift(1)
            lagged_cols_required.append(lag_col)

if not lagged_cols_required: print("  -> Warning: No lagged ESG/component columns were created.")
factors_lagged = factors_lagged.dropna(subset=lagged_cols_required, how='any')
if factors_lagged.empty: print(f"🔴 FATAL ERROR: Data empty after lagging."); sys.exit()
print(f"Lagged factors created. Data available from: {factors_lagged.index.min().date()}")


# --- Step 4: Prepare Data for Panel Regression ---
print("\n--- Preparing Data for Panel Analysis ---")
panel_data = pd.DataFrame() # Initialize
try:
    common_index = stock_monthly_returns.index.intersection(factors_lagged.index)
    if len(common_index) == 0: raise ValueError("No overlapping dates after lagging.")
    aligned_returns = stock_monthly_returns.loc[common_index, tickers_available].copy().astype(float)
    aligned_factors = factors_lagged.loc[common_index].copy().astype(float)
    print(f"Aligned data for {len(common_index)} months.")

    if "RF" not in aligned_factors.columns: raise ValueError("Risk-Free rate ('RF') not found.")
    risk_free_rate = aligned_factors["RF"]
    excess_returns = aligned_returns.subtract(risk_free_rate, axis=0)
    excess_returns_long = excess_returns.stack().reset_index()
    excess_returns_long.columns = ['Date', 'Ticker', 'ExcessReturn']

    market_factors = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
    market_factors_present = [f for f in market_factors if f in aligned_factors.columns]
    factors_market_wide = aligned_factors[market_factors_present].reset_index()

    entity_specific_lagged_cols = [col for col in aligned_factors.columns if '_lag' in col]
    factors_entity_specific_wide = aligned_factors[entity_specific_lagged_cols].reset_index()
    factors_entity_specific_long = factors_entity_specific_wide.melt(id_vars='Date', var_name='FactorName', value_name='FactorValue')
    factors_entity_specific_long[['Ticker', 'FactorType']] = factors_entity_specific_long['FactorName'].str.extract(r'([^_.-]+(?:[.][A-Z]{2})?)_(.*_lag)') # Improved Regex for tickers like 1211.HK
    factors_entity_specific_long = factors_entity_specific_long.dropna(subset=['Ticker'])
    factors_entity_specific_pivot = factors_entity_specific_long.pivot_table(index=['Date', 'Ticker'], columns='FactorType', values='FactorValue').reset_index()

    panel_data = pd.merge(excess_returns_long, factors_market_wide, on='Date', how='left')
    panel_data = pd.merge(panel_data, factors_entity_specific_pivot, on=['Date', 'Ticker'], how='left')

    tickers_medium_risk = [t for t in tickers_available if t not in TICKERS_LOW_RISK and t not in TICKERS_HIGH_RISK]
    print(f"Medium ESG Risk: {tickers_medium_risk}")
    panel_data['Group_LowRisk'] = panel_data['Ticker'].apply(lambda x: 1 if x in TICKERS_LOW_RISK else 0)
    panel_data['Group_HighRisk'] = panel_data['Ticker'].apply(lambda x: 1 if x in TICKERS_HIGH_RISK else 0)

    panel_data['Date'] = pd.to_datetime(panel_data['Date'])
    panel_data = panel_data[(panel_data['Date'] >= START_DATE_ANALYSIS) & (panel_data['Date'] <= END_DATE_ANALYSIS)]
    panel_data = panel_data.set_index(['Ticker', 'Date']).sort_index()

    # --- RENAME Mkt-RF column ---
    if 'Mkt-RF' in panel_data.columns:
        print("DEBUG: Renaming 'Mkt-RF' column to 'Mkt_RF'")
        panel_data.rename(columns={'Mkt-RF': 'Mkt_RF'}, inplace=True)
    else: print("🔴 WARNING: 'Mkt-RF' column not found in panel_data for renaming.")

    panel_data = panel_data.dropna()
    if panel_data.empty: raise ValueError("Panel data empty after merging and final NaN drop.")

    print("Panel data prepared successfully.")
    print("Panel Data (Head with Groups - Renamed Column):")
    print(panel_data.head().to_string(float_format="%.4f"))

except Exception as e:
    print(f"🔴 FATAL ERROR preparing panel data: {e}")
    import traceback; traceback.print_exc()
    sys.exit()


# --- Step 5: Define Variables and Run Panel Models ---
print("\n--- Running Panel Regression Models ---")

# --- Prepare Base Data ---
dependent = panel_data['ExcessReturn']
base_factors_present = [f for f in ['Mkt_RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM'] if f in panel_data.columns]
esg_components_present = [f for f in ['E_lag', 'S_lag', 'G_lag'] if f in panel_data.columns]

# --- Model 1: Pooled OLS (Keep interactions as before) ---
print("\n--- Fitting Pooled OLS with Interactions ---")
# (Keep the full formula and fitting code from the previous correct block)
# ... Pooled OLS fitting code ...
# print(pooled_interact_res) # Assuming it ran okay

# --- Model 2: Fixed Effects (Entity) - SIMPLIFIED ---
print("\n--- Fitting Fixed Effects (Entity) - No Interactions ---")
try:
    # Formula without group dummies or interactions
    fe_entity_formula = f"ExcessReturn ~ 1 + {' + '.join(base_factors_present + esg_components_present)} + EntityEffects"
    print(f"Entity FE Formula: {fe_entity_formula}")
    mod_fe_entity_simple = PanelOLS.from_formula(fe_entity_formula, data=panel_data, drop_absorbed=True) # Try drop_absorbed
    fe_entity_simple_res = mod_fe_entity_simple.fit(cov_type='clustered', cluster_entity=True)
    print(fe_entity_simple_res)
except Exception as e:
    print(f"🔴 ERROR fitting simplified Entity FE model: {e}")
    # import traceback; traceback.print_exc()


# --- Model 3: Fixed Effects (Two-Way) - SIMPLIFIED ---
print("\n--- Fitting Fixed Effects (Two-Way: Entity + Time) - No Interactions ---")
try:
    # Formula without group dummies or interactions
    fe_tw_formula = f"ExcessReturn ~ 1 + {' + '.join(base_factors_present + esg_components_present)} + EntityEffects + TimeEffects"
    print(f"Two-Way FE Formula: {fe_tw_formula}")
    mod_fe_tw_simple = PanelOLS.from_formula(fe_tw_formula, data=panel_data, drop_absorbed=True) # Try drop_absorbed
    fe_tw_simple_res = mod_fe_tw_simple.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)
    print(fe_tw_simple_res)
except Exception as e:
    print(f"🔴 ERROR fitting simplified Two-Way FE model: {e}")
    # import traceback; traceback.print_exc()

print("\n--- Script Finished ---")
# --- Saving code would need adjustment ---

# --- Saving (Commented Out until models run successfully) ---
# output_filename = 'panel_regression_results_groups_components.xlsx'
# print(f"\nSaving all regression results to '{output_filename}'...")
# try:
#     with pd.ExcelWriter(output_filename, engine='xlsxwriter') as writer:
#         workbook = writer.book
#         models_to_save = {
#             "Pooled_OLS_Interact": pooled_interact_res,
#             "Entity_FE_Interact": fe_interact_res,
#             "TwoWay_FE_Interact": twfe_interact_res
#         }
#         for model_name, model_result in models_to_save.items():
#             summary_text = model_result.summary.as_text()
#             worksheet = workbook.add_worksheet(model_name)
#             worksheet.set_column(0, 0, 120) # Wider column
#             worksheet.write_string(0, 0, summary_text)
#     print(f"Panel regression summaries saved to '{output_filename}'.")
# except Exception as e:
#     print(f"🔴 Error saving panel results to Excel: {e}")

--- Script Started ---
All Tickers: ['TSLA', '1211.HK', 'VOW3.DE', 'NIO', 'RIVN', 'LCID', 'XPEV', 'LI', 'GM', 'F', '005380.KS', 'PSNY', 'BMW.DE', 'MBG.DE']
Low ESG Risk: ['PSNY', 'MBG.DE']
High ESG Risk: ['RIVN']
Analysis Period: 2020-01-01 to 2024-12-31
Factor File: /content/gd_Developed_5_Factors.csv

--- Downloading and Preparing Stock Returns ---
  -> Final stock price data has 14 valid tickers: ['005380.KS', '1211.HK', 'BMW.DE', 'F', 'GM', 'LCID', 'LI', 'MBG.DE', 'NIO', 'PSNY', 'RIVN', 'TSLA', 'VOW3.DE', 'XPEV']
Actual stock monthly returns calculated.

--- Loading and Preparing Factors Data ---
Factors data loaded, cleaned, filtered.

--- Imputing Factors Data ---
  -> Imputation: Imputing 12 columns using IterativeImputer (BayesianRidge)...
  -> Imputation: Completed for 12 columns.
  -> Converting factor 'Mkt-RF' assuming percentage points.
  -> Converting factor 'SMB' assuming percentage points.
  -> Converting factor 'HML' assuming percentage points.
  -> Converting factor 'R

Variables have been fully absorbed and have removed from the regression:

Mkt_RF, SMB, HML, RMW, CMA, MOM

  fe_tw_simple_res = mod_fe_tw_simple.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)


[link text](https://)### Fama-MacBeth  
### ESG lags for EV Automakers

In [47]:
# Import necessary libraries
import pandas as pd
import numpy as np
import yfinance as yf
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
import warnings
import sys
import matplotlib.pyplot as plt
import matplotlib.style as style
import xlsxwriter
from linearmodels.panel import PanelOLS # Import panel models
import statsmodels.formula.api as smf
import re

# --- Settings and Configuration ---
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
# from statsmodels.tools.sm_exceptions import ValueWarning, ConvergenceWarning, EstimationWarning, AbsorbingEffectWarning
# warnings.simplefilter('ignore', (ValueWarning, ConvergenceWarning, EstimationWarning, AbsorbingEffectWarning))

style.use("default")

# --- Define Tickers and ESG Risk Groups ---
TICKERS_ALL = [
    "TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN",
    "LCID", "XPEV", "LI", "GM", "F",
    "005380.KS", "PSNY", "BMW.DE", "MBG.DE"
]
TICKERS_LOW_RISK = ["PSNY", "MBG.DE"]
TICKERS_HIGH_RISK = ["RIVN"]
# Medium risk tickers will be determined dynamically

# Define Date Range
START_DATE_PRICES = "2019-10-30"
END_DATE_PRICES = "2024-12-31"
START_DATE_ANALYSIS = "2020-01-01"
END_DATE_ANALYSIS = "2024-12-31"
FF_FACTORS_PATH = "/content/gd_Developed_5_Factors.csv" # MODIFY IF NEEDED

print("--- Script Started ---")
print(f"All Tickers: {TICKERS_ALL}")
print(f"Low ESG Risk: {TICKERS_LOW_RISK}")
print(f"High ESG Risk: {TICKERS_HIGH_RISK}")
print(f"Analysis Period: {START_DATE_ANALYSIS} to {END_DATE_ANALYSIS}")
print(f"Factor File: {FF_FACTORS_PATH}")

# --- Advanced Imputation Function ---
def advanced_imputation(df_input):
    df = df_input.copy(); original_index = df.index
    all_missing_cols = df.columns[df.isnull().all()]
    df_imputable = df.copy()
    if len(all_missing_cols) > 0:
        print(f"  -> Imputation: Dropping {len(all_missing_cols)} all-NaN columns: {list(all_missing_cols)}")
        df_imputable = df_imputable.drop(columns=all_missing_cols)
    cols_to_impute = df_imputable.columns[df_imputable.isnull().any()]
    if df_imputable.isnull().sum().sum() == 0:
        print("  -> Imputation: 🟢 No missing values found. Skipping imputation.")
        imputed_data = df_imputable
    elif len(cols_to_impute) > 0:
        print(f"  -> Imputation: Imputing {len(cols_to_impute)} columns using IterativeImputer (BayesianRidge)...")
        n_features = min(10, df_imputable.shape[1] - 1) if df_imputable.shape[1] > 1 else 1
        imputer = IterativeImputer(estimator=BayesianRidge(), max_iter=30, random_state=42, tol=1e-4, n_nearest_features=n_features, verbose=0)
        try:
            with warnings.catch_warnings(): warnings.simplefilter("ignore")
            imputed_values = imputer.fit_transform(df_imputable[cols_to_impute])
            imputed_subset = pd.DataFrame(imputed_values, columns=cols_to_impute, index=original_index)
            imputed_data = df_imputable.copy(); imputed_data[cols_to_impute] = imputed_subset
            print(f"  -> Imputation: Completed for {len(cols_to_impute)} columns.")
        except Exception as e:
            print(f"  -> Imputation: 🔴 Error during imputation: {e}. Returning data with NaNs.")
            imputed_data = df_imputable
    else:
        print("  -> Imputation: 🟡 No columns required imputation.")
        imputed_data = df_imputable
    try: imputed_data = imputed_data.astype(float)
    except Exception as e: print(f"  -> Imputation: 🔴 Warning: Could not convert imputed data to float: {e}")
    return imputed_data

# --- Step 1: Download and Prepare ACTUAL Stock Returns ---
print("\n--- Downloading and Preparing Stock Returns ---")
tickers_available = []
try:
    all_stock_data = yf.download(TICKERS_ALL, start=START_DATE_PRICES, end=END_DATE_PRICES, progress=False)
    if all_stock_data.empty: raise ValueError("No stock price data downloaded.")
    if isinstance(all_stock_data.columns, pd.MultiIndex):
        if 'Adj Close' in all_stock_data.columns.levels[0]: price_data = all_stock_data['Adj Close']
        elif 'Close' in all_stock_data.columns.levels[0]: price_data = all_stock_data['Close']; print("  -> Warning: Using 'Close' price as 'Adj Close' not found.")
        else: raise ValueError("Neither 'Adj Close' nor 'Close' found.")
    elif isinstance(all_stock_data.columns, pd.Index):
        if 'Adj Close' in all_stock_data.columns: target_col = 'Adj Close'
        elif 'Close' in all_stock_data.columns: target_col = 'Close'; print("  -> Warning: Using 'Close' price.")
        else: raise ValueError("Neither 'Adj Close' nor 'Close' found.")
        ticker_name = TICKERS_ALL[0] if len(TICKERS_ALL)==1 else "SINGLE_TICKER"
        price_data = all_stock_data[[target_col]].rename(columns={target_col: ticker_name})
    else: raise ValueError("Unexpected column structure.")

    price_data = price_data.ffill().bfill()
    price_data = price_data.dropna(axis=1, how='all')
    if price_data.empty: raise ValueError("All stock price columns NaN after fill.")
    tickers_available = list(price_data.columns)
    print(f"  -> Final stock price data has {len(tickers_available)} valid tickers: {tickers_available}")
    price_data.index = pd.to_datetime(price_data.index)
    monthly_prices = price_data.resample('ME').last()
    stock_monthly_returns = monthly_prices.pct_change().dropna(how='all', axis=0)
    if stock_monthly_returns.empty: raise ValueError("Monthly returns empty.")
    print("Actual stock monthly returns calculated.")
except Exception as e: print(f"🔴 FATAL ERROR downloading/processing stock returns: {e}"); sys.exit()

# --- Step 2: Load and Prepare Factors Data ---
print("\n--- Loading and Preparing Factors Data ---")
ff_factors_monthly = pd.DataFrame()
try:
    ff_factors_monthly_raw = pd.read_csv(FF_FACTORS_PATH, index_col=0, header=0, dtype='object')
    ff_factors_monthly_raw.index = pd.to_datetime(ff_factors_monthly_raw.index, errors='coerce')
    ff_factors_monthly_raw = ff_factors_monthly_raw[pd.notna(ff_factors_monthly_raw.index)]
    if ff_factors_monthly_raw.empty: raise ValueError("All rows dropped factors date parse.")
    for col in ff_factors_monthly_raw.columns: ff_factors_monthly_raw[col] = pd.to_numeric(ff_factors_monthly_raw[col], errors='coerce')
    if pd.api.types.is_datetime64_any_dtype(ff_factors_monthly_raw.index): ff_factors_monthly_raw.index = ff_factors_monthly_raw.index + pd.offsets.MonthEnd(0)
    ff_factors_monthly_raw = ff_factors_monthly_raw[~ff_factors_monthly_raw.index.duplicated(keep='last')]
    # Filter factors for the broader period needed for lagging
    ff_factors_monthly_raw = ff_factors_monthly_raw[(ff_factors_monthly_raw.index >= START_DATE_PRICES) & (ff_factors_monthly_raw.index <= END_DATE_PRICES)]
    if ff_factors_monthly_raw.empty: raise ValueError("No factor data remains after filtering for price period.")
    print("Factors data loaded, cleaned, date-filtered.")
    if ff_factors_monthly_raw.isnull().sum().sum() > 0:
        print("\n--- Imputing Factors Data ---")
        ff_factors_monthly = advanced_imputation(ff_factors_monthly_raw)
    else: print("\n--- No missing values in factors data. ---"); ff_factors_monthly = ff_factors_monthly_raw.astype(float)
    factor_cols_to_convert = ["Mkt-RF", "SMB", "HML", "RMW", "CMA", "RF", "MOM"]
    for col in factor_cols_to_convert:
        if col in ff_factors_monthly.columns and pd.api.types.is_numeric_dtype(ff_factors_monthly[col]):
            max_abs_val = ff_factors_monthly[col].abs().max()
            if not pd.isna(max_abs_val) and max_abs_val > 1:
                 print(f"  -> Converting factor '{col}' assuming percentage points.")
                 ff_factors_monthly[col] = ff_factors_monthly[col] / 100.0
except FileNotFoundError: print(f"\n🔴 FATAL ERROR: Factors file not found: {FF_FACTORS_PATH}"); sys.exit()
except Exception as e: print(f"\n🔴 FATAL ERROR loading/processing Factors Data: {e}"); sys.exit()

# --- Step 3: Create Lagged Factor Variables ---
print("\n--- Creating Lagged Factor Variables ---")
factors_lagged = ff_factors_monthly.copy()
lagged_cols_required = []
for ticker in tickers_available:
    for comp in ["ESG", "E", "S", "G"]:
        col = f"{ticker}_{comp}"
        lag_col = f"{ticker}_{comp}_lag"
        if col in factors_lagged.columns:
            factors_lagged[lag_col] = factors_lagged[col].shift(1)
            # Only mark ESG components as required for dropping NaNs if they will be used
            if comp in ["E", "S", "G"]:
                lagged_cols_required.append(lag_col)

if not lagged_cols_required: print("  -> Warning: No lagged ESG component columns were created/found.")
factors_lagged = factors_lagged.dropna(subset=lagged_cols_required, how='any')
if factors_lagged.empty: print(f"🔴 FATAL ERROR: Data empty after lagging."); sys.exit()
# Filter *after* lagging to the analysis period
factors_lagged = factors_lagged[(factors_lagged.index >= START_DATE_ANALYSIS) & (factors_lagged.index <= END_DATE_ANALYSIS)]
if factors_lagged.empty: print(f"🔴 FATAL ERROR: Data empty after filtering lagged data to analysis period."); sys.exit()
print(f"Lagged factors created & filtered. Data available from: {factors_lagged.index.min().date()}")

# --- Step 4: Prepare Data for Panel Regression ---
print("\n--- Preparing Data for Panel Analysis ---")
panel_data = pd.DataFrame()
try:
    common_index = stock_monthly_returns.index.intersection(factors_lagged.index)
    if len(common_index) == 0: raise ValueError("No overlapping dates after lagging.")
    aligned_returns = stock_monthly_returns.loc[common_index, tickers_available].copy().astype(float)
    aligned_factors = factors_lagged.loc[common_index].copy().astype(float)
    print(f"Aligned data for {len(common_index)} months.")

    if "RF" not in aligned_factors.columns: raise ValueError("Risk-Free rate ('RF') not found.")
    risk_free_rate = aligned_factors["RF"]
    excess_returns = aligned_returns.subtract(risk_free_rate, axis=0)
    excess_returns_long = excess_returns.stack().reset_index()
    excess_returns_long.columns = ['Date', 'Ticker', 'ExcessReturn']

    market_factors = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
    market_factors_present = [f for f in market_factors if f in aligned_factors.columns]
    factors_market_wide = aligned_factors[market_factors_present].reset_index()

    entity_specific_lagged_cols = [col for col in aligned_factors.columns if '_lag' in col]
    factors_entity_specific_wide = aligned_factors[entity_specific_lagged_cols].reset_index()
    factors_entity_specific_long = factors_entity_specific_wide.melt(id_vars='Date', var_name='FactorName', value_name='FactorValue')
    factors_entity_specific_long[['Ticker', 'FactorType']] = factors_entity_specific_long['FactorName'].str.extract(r'([^_.-]+(?:[.][A-Z]{2})?)_(.*_lag)')
    factors_entity_specific_long = factors_entity_specific_long.dropna(subset=['Ticker'])
    factors_entity_specific_pivot = factors_entity_specific_long.pivot_table(index=['Date', 'Ticker'], columns='FactorType', values='FactorValue').reset_index()

    panel_data = pd.merge(excess_returns_long, factors_market_wide, on='Date', how='left')
    panel_data = pd.merge(panel_data, factors_entity_specific_pivot, on=['Date', 'Ticker'], how='left')

    tickers_medium_risk = [t for t in tickers_available if t not in TICKERS_LOW_RISK and t not in TICKERS_HIGH_RISK]
    print(f"Medium ESG Risk: {tickers_medium_risk}")
    # Create group mapping dictionary
    group_map = {t: 'Medium' for t in tickers_medium_risk}
    group_map.update({t: 'Low' for t in TICKERS_LOW_RISK if t in tickers_available})
    group_map.update({t: 'High' for t in TICKERS_HIGH_RISK if t in tickers_available})
    panel_data['ESG_Group'] = panel_data['Ticker'].map(group_map)

    panel_data['Date'] = pd.to_datetime(panel_data['Date'])
    # Filter should not be needed here again if done correctly after lagging factors
    # panel_data = panel_data[(panel_data['Date'] >= START_DATE_ANALYSIS) & (panel_data['Date'] <= END_DATE_ANALYSIS)]
    panel_data = panel_data.set_index(['Ticker', 'Date']).sort_index()

    # Rename Mkt-RF column (Must happen AFTER merging)
    if 'Mkt-RF' in panel_data.columns: panel_data.rename(columns={'Mkt-RF': 'Mkt_RF'}, inplace=True)
    else: print("🔴 WARNING: 'Mkt-RF' column not found in panel_data for renaming.")

    # Define required columns based on potential models
    base_factors_renamed = ['Mkt_RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
    esg_lags = ['E_lag', 'S_lag', 'G_lag']
    essential_cols_check = ['ExcessReturn'] + [f for f in base_factors_renamed if f in panel_data.columns] + [f for f in esg_lags if f in panel_data.columns]
    panel_data = panel_data.dropna(subset=essential_cols_check) # Drop based on cols actually used
    if panel_data.empty: raise ValueError("Panel data empty after final NaN drop.")

    print("Panel data prepared successfully.")
    print("Panel Data (Head with Group):")
    print(panel_data.head().to_string(float_format="%.4f"))

except Exception as e: print(f"🔴 FATAL ERROR preparing panel data: {e}"); import traceback; traceback.print_exc(); sys.exit()


# --- Step 5: Run Panel Regressions for Each Group Separately ---
print("\n--- Running Panel Regressions Separately for Each ESG Risk Group ---")

# Define base formula parts (use renamed Mkt_RF)
base_factors_present_final = [f for f in ['Mkt_RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM'] if f in panel_data.columns]
esg_components_present_final = [f for f in ['E_lag', 'S_lag', 'G_lag'] if f in panel_data.columns]

if not base_factors_present_final: print("🔴 Warning: No base factors found for regression."); sys.exit()
if not esg_components_present_final: print("🔴 Warning: No lagged ESG components found for regression."); sys.exit()

# Construct the formula (same for each group regression, no interactions here)
formula_group_run = f"ExcessReturn ~ 1 + {' + '.join(base_factors_present_final + esg_components_present_final)}"
print(f"\nBase Formula for Group Regressions:\n{formula_group_run}")

group_results = {}

for group_name, group_tickers in [('Low', TICKERS_LOW_RISK), ('Medium', tickers_medium_risk), ('High', TICKERS_HIGH_RISK)]:
    print(f"\n--- Fitting Models for Group: {group_name} ---")
    # Filter data for the current group
    group_data = panel_data[panel_data['ESG_Group'] == group_name]

    if group_data.empty:
        print(f"  -> No data available for group {group_name}. Skipping.")
        continue
    if len(group_data.index.get_level_values('Ticker').unique()) < 2 and group_name != 'High': # Allow High risk group with 1 ticker for FE
         print(f"  -> Only one entity in group {group_name}. Cannot reliably estimate Entity FE.")
         # Optionally run Pooled OLS only for this group
         # continue

    group_results[group_name] = {}

    # --- Model: Entity Fixed Effects for the group ---
    try:
        model_fe = PanelOLS.from_formula(formula_group_run + " + EntityEffects", data=group_data, drop_absorbed=True)
        # Check rank before fitting if possible (though drop_absorbed helps)
        # rank = np.linalg.matrix_rank(model_fe.exog.ndarray)
        # if rank < model_fe.exog.shape[1]: print(f"  -> Warning: Potential collinearity in FE exog for {group_name}")

        fe_res = model_fe.fit(cov_type='clustered', cluster_entity=True)
        print(f"\n--- {group_name} Group: Fixed Effects (Entity) ---")
        print(fe_res)
        group_results[group_name]['Entity FE'] = fe_res.summary
    except Exception as e:
        print(f"🔴 ERROR fitting Entity FE for {group_name} group: {e}")
        # import traceback; traceback.print_exc()
        group_results[group_name]['Entity FE'] = f"Error: {e}"

    # --- Model: Two-Way Fixed Effects for the group ---
    try:
        # Check if enough time periods relative to number of entities
        entities_in_group = len(group_data.index.get_level_values('Ticker').unique())
        time_periods_in_group = len(group_data.index.get_level_values('Date').unique())
        # Heuristic: Need T > k and N > k for TWFE (k = num regressors)
        num_regressors = len(base_factors_present_final) + len(esg_components_present_final)
        if time_periods_in_group <= num_regressors or entities_in_group <= num_regressors :
             print(f"  -> Skipping Two-Way FE for {group_name}: Not enough entities({entities_in_group}) or time periods({time_periods_in_group}) relative to regressors({num_regressors}).")
        else:
            model_twfe = PanelOLS.from_formula(formula_group_run + " + EntityEffects + TimeEffects", data=group_data, drop_absorbed=True)
            # rank = np.linalg.matrix_rank(model_twfe.exog.ndarray) # Exog after absorbing is harder to get directly
            # if rank < model_twfe.exog.shape[1]: print(f"  -> Warning: Potential collinearity in TWFE exog for {group_name}")

            with warnings.catch_warnings(): # Suppress AbsorbingEffectWarning if drop_absorbed=True handles it
                 warnings.simplefilter("ignore")
                 twfe_res = model_twfe.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)
            print(f"\n--- {group_name} Group: Fixed Effects (Two-Way) ---")
            print(twfe_res)
            group_results[group_name]['Two-Way FE'] = twfe_res.summary

    except Exception as e:
        print(f"🔴 ERROR fitting Two-Way FE for {group_name} group: {e}")
        # import traceback; traceback.print_exc()
        group_results[group_name]['Two-Way FE'] = f"Error: {e}"


# --- Step 6: Save Group Results ---
print("\n--- Saving Group Regression Results ---")
output_excel_path_groups = 'panel_regression_results_by_group.xlsx'
try:
    with pd.ExcelWriter(output_excel_path_groups, engine='xlsxwriter') as writer:
        workbook = writer.book
        for group, results in group_results.items():
            for model_type, summary_obj in results.items():
                 sheet_name = f"{group}_{model_type}".replace(" ","_")[:31] # Max 31 char sheet name
                 worksheet = workbook.add_worksheet(sheet_name)
                 if isinstance(summary_obj, str): # Error message
                      worksheet.write_string(0, 0, summary_obj)
                 elif hasattr(summary_obj, 'as_text'):
                      summary_text = summary_obj.as_text()
                      worksheet.set_column(0, 0, 120)
                      worksheet.write_string(0, 0, summary_text)
                 else: # Fallback if object is unexpected
                      worksheet.write_string(0, 0, "Unexpected result object format.")

    print(f"Group regression summaries saved to '{output_excel_path_groups}'.")
except Exception as e:
    print(f"🔴 Error saving group results to Excel: {e}")


print("\n--- Script Finished ---")

--- Script Started ---
All Tickers: ['TSLA', '1211.HK', 'VOW3.DE', 'NIO', 'RIVN', 'LCID', 'XPEV', 'LI', 'GM', 'F', '005380.KS', 'PSNY', 'BMW.DE', 'MBG.DE']
Low ESG Risk: ['PSNY', 'MBG.DE']
High ESG Risk: ['RIVN']
Analysis Period: 2020-01-01 to 2024-12-31
Factor File: /content/gd_Developed_5_Factors.csv

--- Downloading and Preparing Stock Returns ---
  -> Final stock price data has 14 valid tickers: ['005380.KS', '1211.HK', 'BMW.DE', 'F', 'GM', 'LCID', 'LI', 'MBG.DE', 'NIO', 'PSNY', 'RIVN', 'TSLA', 'VOW3.DE', 'XPEV']
Actual stock monthly returns calculated.

--- Loading and Preparing Factors Data ---
Factors data loaded, cleaned, date-filtered.

--- Imputing Factors Data ---
  -> Imputation: Imputing 12 columns using IterativeImputer (BayesianRidge)...
  -> Imputation: Completed for 12 columns.
  -> Converting factor 'Mkt-RF' assuming percentage points.
  -> Converting factor 'SMB' assuming percentage points.
  -> Converting factor 'HML' assuming percentage points.
  -> Converting fact

### Fetch ESG Data using yesg

In [38]:


# Import necessary libraries
import pandas as pd
import numpy as np
import yfinance as yf
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
import warnings
import sys # To exit script on fatal errors
import matplotlib.pyplot as plt # For potential plotting if needed
import matplotlib.style as style
import xlsxwriter # For saving results

# --- Settings and Configuration ---
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
# from statsmodels.tools.sm_exceptions import ValueWarning, ConvergenceWarning, EstimationWarning
# warnings.simplefilter('ignore', (ValueWarning, ConvergenceWarning, EstimationWarning))

# Set the plotting style
style.use("default")
params = {
    "axes.labelsize": 8, "font.size": 8, "legend.fontsize": 8,
    "xtick.labelsize": 8, "ytick.labelsize": 8, "text.usetex": False,
    "font.family": "sans-serif", "axes.spines.top": False, "axes.spines.right": False,
    "grid.color": "grey", "axes.grid": True, "grid.alpha": 0.5, "grid.linestyle": ":",
}
plt.rcParams.update(params)


# List of EV tickers (Ensure these are valid on Yahoo Finance)
EV_TICKERS = [
    "TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN",
    "LCID", "XPEV", "LI", "GM", "F",
    "005380.KS", "PSNY", "BMW.DE", "MBG.DE"
]

# Define Date Range (Adjust start date if needed for pct_change and lag)
START_DATE_PRICES = "2019-10-30" # Start earlier for return calculation + lag
END_DATE_PRICES = "2024-12-31" # Use actual end date needed for factors
START_DATE_ANALYSIS = "2020-01-01" # Start analysis after lag allows first month
END_DATE_ANALYSIS = "2024-12-31" # End date for the analysis period


# Path to your Fama-French + ESG factors file
FF_FACTORS_PATH = "/content/gd_Developed_5_Factors.csv" # MODIFY IF NEEDED

# VIF Threshold for multicollinearity warning (excluding constant)
VIF_THRESHOLD = 10

print("--- Script Started ---")
print(f"Tickers: {EV_TICKERS}")
print(f"Price Data Range: {START_DATE_PRICES} to {END_DATE_PRICES}")
print(f"Analysis Period: {START_DATE_ANALYSIS} to {END_DATE_ANALYSIS}")
print(f"Factor File: {FF_FACTORS_PATH}")

# --- Advanced Imputation Function (Defined Once) ---
def advanced_imputation(df_input):
    """
    Perform advanced imputation using Iterative Imputer. Handles all-NaN columns.
    Returns a DataFrame with the same index and columns (minus all-NaN cols) and float dtype.
    """
    df = df_input.copy() # Work on a copy
    original_index = df.index

    all_missing_cols = df.columns[df.isnull().all()]
    df_imputable = df.copy()

    if len(all_missing_cols) > 0:
        print(f"  -> Dropping {len(all_missing_cols)} columns with all missing values: {list(all_missing_cols)}")
        df_imputable = df_imputable.drop(columns=all_missing_cols)

    cols_to_impute = df_imputable.columns[df_imputable.isnull().any()]

    if df_imputable.isnull().sum().sum() == 0:
         print("  -> 🟢 No missing values found after dropping all-NaN columns. Skipping imputation.")
         imputed_data = df_imputable
    elif len(cols_to_impute) > 0:
        print(f"  -> Imputing {len(cols_to_impute)} columns using IterativeImputer (BayesianRidge)...")
        n_features = min(10, df_imputable.shape[1] - 1) if df_imputable.shape[1] > 1 else 1
        # Increase iterations and adjust tolerance if convergence issues persist
        imputer = IterativeImputer(estimator=BayesianRidge(), max_iter=30, random_state=42, tol=1e-4, n_nearest_features=n_features, verbose=0)
        try:
            imputed_values = imputer.fit_transform(df_imputable[cols_to_impute])
            imputed_subset = pd.DataFrame(imputed_values, columns=cols_to_impute, index=original_index)
            imputed_data = df_imputable.copy()
            imputed_data[cols_to_impute] = imputed_subset
            print(f"  -> Imputation completed for {len(cols_to_impute)} columns.")
        except Exception as e:
            print(f"  -> 🔴 Error during imputation: {e}. Returning data with NaNs.")
            imputed_data = df_imputable # Return unimputed (but potentially with cols dropped)
    else:
        print("  -> 🟡 No columns required imputation after dropping all-NaN columns.")
        imputed_data = df_imputable # Return the data with all-NaN columns dropped
    try:
        imputed_data = imputed_data.astype(float)
    except Exception as e:
         print(f"  -> 🔴 Warning: Could not convert imputed data to float: {e}")
    return imputed_data




# --- Step 1: Download and Prepare ACTUAL Stock Returns ---
print("\n--- Downloading and Preparing Stock Returns ---")
try:
    # Download ALL available data first
    all_stock_data = yf.download(EV_TICKERS, start=START_DATE_PRICES, end=END_DATE_PRICES)

    if all_stock_data.empty:
        print("🔴 FATAL ERROR: No stock price data downloaded. Check tickers and date range.")
        sys.exit()

    print(f"Downloaded data shape: {all_stock_data.shape}")

    # --- Robustly Select Price Data ---
    # Check if columns are a MultiIndex
    if isinstance(all_stock_data.columns, pd.MultiIndex):
        print("  -> MultiIndex columns detected.")
        # --- MODIFICATION START ---
        # Prioritize 'Adj Close', but fall back to 'Close' if not available
        if 'Adj Close' in all_stock_data.columns.levels[0]:
            price_data = all_stock_data['Adj Close']
            print(f"  -> Selected 'Adj Close' data for {price_data.shape[1]} tickers.")
        elif 'Close' in all_stock_data.columns.levels[0]:
            price_data = all_stock_data['Close']
            print(f"  -> Warning: 'Adj Close' not found. Selected 'Close' price data instead for {price_data.shape[1]} tickers.")
        else:
            print("🔴 FATAL ERROR: Neither 'Adj Close' nor 'Close' level found in MultiIndex columns.")
            print("   Available top levels:", all_stock_data.columns.levels[0])
            sys.exit()
        # --- MODIFICATION END ---

    # Handle case where only ONE ticker was downloaded (results in simple Index)
    elif isinstance(all_stock_data.columns, pd.Index):
         print("  -> Simple Index columns detected (likely only one ticker downloaded successfully).")
         # --- MODIFICATION START ---
         # Prioritize 'Adj Close', fall back to 'Close'
         if 'Adj Close' in all_stock_data.columns:
            target_col = 'Adj Close'
            print("  -> Selecting 'Adj Close' for single ticker.")
         elif 'Close' in all_stock_data.columns:
             target_col = 'Close'
             print("  -> Warning: Selecting 'Close' price for single ticker.")
         else:
            print("🔴 FATAL ERROR: Neither 'Adj Close' nor 'Close' column found in simple Index columns.")
            print("   Available columns:", all_stock_data.columns)
            sys.exit()

         # Try to find the ticker name from the original list
         ticker_name = "UNKNOWN_TICKER"
         for t in EV_TICKERS:
             if any(t.upper() in str(col).upper() for col in all_stock_data.columns):
                  ticker_name = t
                  break
         price_data = all_stock_data[[target_col]].rename(columns={target_col: ticker_name})
         print(f"  -> Selected '{target_col}' data for single ticker: {ticker_name}")
         # --- MODIFICATION END ---

    else: # Should not happen, but include for completeness
        print("🔴 FATAL ERROR: Unexpected column structure in downloaded data.")
        sys.exit()

    # --- Process Selected Price Data ---
    price_data = price_data.ffill().bfill() # Fill gaps first
    initial_cols = price_data.shape[1]
    price_data = price_data.dropna(axis=1, how='all') # Drop fully empty cols
    final_cols = price_data.shape[1]
    if final_cols < initial_cols: print(f"  -> Dropped {initial_cols - final_cols} ticker columns with all NaN values after fill.")
    if price_data.empty:
        print("🔴 FATAL ERROR: All stock price columns were NaN after fill."); sys.exit()
    else: print(f"  -> Final stock price data has {final_cols} valid ticker columns.")

    # Ensure index is DatetimeIndex
    price_data.index = pd.to_datetime(price_data.index)
    print("\nStock Price Data (Selected, Processed, Head):")
    print(price_data.head().to_string(float_format="%.2f")) # Use the selected price_data

    # --- Calculate ACTUAL Monthly Stock Returns ---
    monthly_prices = price_data.resample('ME').last() # Use the selected price_data
    stock_monthly_returns = monthly_prices.pct_change()
    stock_monthly_returns = stock_monthly_returns.dropna(how='all', axis=0)
    print("\nActual Stock Monthly Returns (Head):")
    print(stock_monthly_returns.head().to_string(float_format="%.4f"))
    if stock_monthly_returns.empty:
        print("🔴 FATAL ERROR: Stock monthly returns DataFrame is empty."); sys.exit()

except KeyError as e:
    print(f"🔴 FATAL ERROR: KeyError encountered during stock data processing: {e}")
    sys.exit()
except Exception as e:
    print(f"🔴 FATAL ERROR downloading or processing stock returns: {e}")
    # import traceback; traceback.print_exc()
    sys.exit()


# --- REST OF THE SCRIPT REMAINS THE SAME ---
# Step 2: Load Factors
# Step 3: Create Lagged Factors
# Step 4: Align Returns and Factors
# Step 5: Prepare Factors for Regression Models (Uses X_factors_aligned derived from factors file)
# Step 6: Check VIF for Base Factors
# Step 7: Run Time-Series Regressions (Uses excess_returns derived from stock_monthly_returns)
# Step 8: Save Results




# --- Step 2: Load and Prepare Factors Data ---
print("\n--- Loading and Preparing Factors Data ---")
ff_factors_monthly = pd.DataFrame()
try:
    print(f"Attempting to load factors data from: {FF_FACTORS_PATH}")
    ff_factors_monthly_raw = pd.read_csv(
        FF_FACTORS_PATH, index_col=0, header=0, dtype='object'
    )
    print("CSV loaded successfully.")

    # --- Robust Date Conversion for INDEX ---
    original_index_name = ff_factors_monthly_raw.index.name
    ff_factors_monthly_raw.index = pd.to_datetime(ff_factors_monthly_raw.index, errors='coerce')

    # --- Drop Rows with Invalid Dates ---
    rows_before_drop = len(ff_factors_monthly_raw)
    ff_factors_monthly_raw = ff_factors_monthly_raw[pd.notna(ff_factors_monthly_raw.index)]
    rows_after_drop = len(ff_factors_monthly_raw)
    if rows_after_drop < rows_before_drop: print(f"  -> Warning: Dropped {rows_before_drop - rows_after_drop} rows from factors due to invalid date index.")
    if rows_after_drop == 0:
        print(f"  -> 🔴 FATAL ERROR: All rows dropped from factors after date parsing."); sys.exit()
    else: print(f"  -> {rows_after_drop} factor rows remain after date parsing check.")
    if original_index_name: ff_factors_monthly_raw.index.name = original_index_name

    # --- Convert data columns to numeric ---
    for col in ff_factors_monthly_raw.columns:
        ff_factors_monthly_raw[col] = pd.to_numeric(ff_factors_monthly_raw[col], errors='coerce')
    print("  -> Converted factor data columns to numeric.")

    # --- Standardize index to month-end ---
    if pd.api.types.is_datetime64_any_dtype(ff_factors_monthly_raw.index):
        ff_factors_monthly_raw.index = ff_factors_monthly_raw.index + pd.offsets.MonthEnd(0)
        print("  -> Standardized factor index to month-end.")
    else: print("  -> 🔴 Warning: Factor index is not datetime type, cannot standardize.")

    # --- Remove duplicate month-end index entries ---
    initial_len = len(ff_factors_monthly_raw)
    ff_factors_monthly_raw = ff_factors_monthly_raw[~ff_factors_monthly_raw.index.duplicated(keep='last')]
    if len(ff_factors_monthly_raw) < initial_len: print(f"  -> Removed {initial_len - len(ff_factors_monthly_raw)} duplicate factor index entries.")

    # --- Filter factors for the analysis date range ---
    ff_factors_monthly_raw = ff_factors_monthly_raw[
        (ff_factors_monthly_raw.index >= START_DATE_ANALYSIS) & # Use analysis start date
        (ff_factors_monthly_raw.index <= END_DATE_ANALYSIS)   # Use analysis end date
    ]
    print(f"  -> Filtered factors data to analysis period: {START_DATE_ANALYSIS} to {END_DATE_ANALYSIS}.")
    if ff_factors_monthly_raw.empty:
        print("  -> 🔴 FATAL ERROR: No factor data remains after filtering for analysis period."); sys.exit()

    print("\nFactors Data Before Imputation (Cleaned & Filtered, Head):")
    print(ff_factors_monthly_raw.head().to_string(float_format="%.4f"))

    # --- Impute Factors Data ---
    print("\n--- Imputing Factors Data ---")
    if ff_factors_monthly_raw.isnull().sum().sum() > 0:
        ff_factors_monthly = advanced_imputation(ff_factors_monthly_raw)
    else:
        print("  -> 🟢 No missing values in loaded factors data. Skipping imputation.")
        ff_factors_monthly = ff_factors_monthly_raw.astype(float)

    print("\nFactors Data After Imputation (Head):")
    print(ff_factors_monthly.head().to_string(float_format="%.4f"))

    # --- Convert specific factors from percentage points to decimals ---
    factor_cols_to_convert = ["Mkt-RF", "SMB", "HML", "RMW", "CMA", "RF", "MOM"]
    print("\nChecking factor scales for potential conversion (Threshold > 5%):")
    for col in factor_cols_to_convert:
        if col in ff_factors_monthly.columns:
             if pd.api.types.is_numeric_dtype(ff_factors_monthly[col]):
                 max_abs_val = ff_factors_monthly[col].abs().max()
                 if not pd.isna(max_abs_val) and max_abs_val > 5:
                     print(f"  -> Converting factor '{col}' from percentage points to decimals.")
                     ff_factors_monthly[col] = ff_factors_monthly[col] / 100.0

except FileNotFoundError: print(f"\n🔴 FATAL ERROR: Factors file not found: {FF_FACTORS_PATH}"); sys.exit()
except KeyError as e: print(f"\n🔴 FATAL ERROR: Problem accessing column/index during factors processing: {e}"); sys.exit()
except Exception as e: print(f"\n🔴 FATAL ERROR loading/processing Factors Data: {e}"); sys.exit()


# --- Step 3: Create Lagged Factor Variables ---
print("\n--- Creating Lagged Factor Variables ---")
factors_lagged = ff_factors_monthly.copy()
lagged_cols = []
# Lag ESG factors
for ticker in EV_TICKERS: # Use original list to catch all potential columns
    for comp in ["ESG", "E", "S", "G"]:
        col = f"{ticker}_{comp}"
        lag_col = f"{ticker}_{comp}_lag"
        if col in factors_lagged.columns:
            factors_lagged[lag_col] = factors_lagged[col].shift(1)
            lagged_cols.append(lag_col)

# Drop initial rows with NaNs due to lagging
factors_lagged = factors_lagged.dropna(subset=lagged_cols, how='any') # Drop if ANY lagged ESG is NaN
if factors_lagged.empty:
    print(f"🔴 FATAL ERROR: Data is empty after creating lagged variables and dropping NaNs. Check input data / date range.")
    sys.exit()
print(f"Lagged factors created. Data available from: {factors_lagged.index.min().date()}")


# --- Step 4: Align Returns and Lagged Factors for Final Analysis Period ---
print("\n--- Aligning Returns and Lagged Factors ---")
# Align using the index of the lagged factors (which already accounts for lag NaNs and date filter)
common_index = stock_monthly_returns.index.intersection(factors_lagged.index)
# No need to filter dates again, as factors_lagged is already filtered

if len(common_index) == 0:
    print("🔴 FATAL ERROR: No overlapping dates after creating lagged factors.")
    sys.exit()

aligned_returns = stock_monthly_returns.loc[common_index].copy().astype(float)
aligned_factors = factors_lagged.loc[common_index].copy().astype(float)
print(f"Aligned data for {len(common_index)} months from {common_index.min().date()} to {common_index.max().date()}.")

# --- Calculate Excess Returns using Aligned Data ---
print("\n--- Calculating Excess Stock Returns ---")
if "RF" not in aligned_factors.columns:
    print("\n🔴 FATAL ERROR: Risk-Free rate ('RF') not found in aligned factors data.")
    sys.exit()
risk_free_rate = aligned_factors["RF"]
excess_returns = aligned_returns.subtract(risk_free_rate, axis=0) # Stock Return - RF
print("\nAligned Monthly EXCESS STOCK Returns (Head):")
print(excess_returns.head().to_string(float_format="%.4f"))


# --- Step 5: Prepare Factors for Regression Models ---
print("\n--- Preparing Factors for Regression ---")
# Define base factors and find available lagged ESG columns
base_factor_columns = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
available_lagged_esg_cols = {
    ticker: [f"{ticker}_{comp}_lag" for comp in ["ESG", "E", "S", "G"] if f"{ticker}_{comp}_lag" in aligned_factors.columns]
    for ticker in EV_TICKERS # Iterate through original list
}
print("Available lagged ESG component columns per ticker identified.")


# --- Step 6: Check VIF for Base Factors ---
# VIF is usually checked on the common predictors used across models
print("\n--- Checking for Multicollinearity (VIF) among Base Factors ---")
try:
    base_factors_present = [f for f in base_factor_columns if f in aligned_factors.columns]
    if len(base_factors_present) > 1:
        X_vif_base = aligned_factors[base_factors_present].dropna()
        if not X_vif_base.empty and X_vif_base.shape[0] >= X_vif_base.shape[1]:
            X_vif_const = sm.add_constant(X_vif_base, prepend=True, has_constant='skip')
            vif_data_base = pd.DataFrame()
            vif_data_base["Feature"] = X_vif_const.columns
            vif_data_base["VIF"] = [variance_inflation_factor(X_vif_const.values, i) for i in range(X_vif_const.shape[1])]
            print("VIF for Base Factors:")
            print(vif_data_base[vif_data_base["Feature"] != "const"].to_string(index=False, float_format="%.2f"))
            high_vif_base = vif_data_base[(vif_data_base["Feature"] != "const") & (vif_data_base["VIF"] > VIF_THRESHOLD)]["Feature"].tolist()
            if high_vif_base: print(f"  -> ⚠️ Potential High VIF detected (>{VIF_THRESHOLD}) for base factors: {high_vif_base}")
        else: print("  -> Skipping VIF check for base factors: Not enough data after NaN drop.")
    else: print("  -> Skipping VIF check for base factors: Less than 2 base factors present.")
except Exception as e: print(f"  -> 🔴 Error calculating VIF for base factors: {e}")


# --- Step 7: Run Time-Series Factor Regressions for Each Ticker ---
print("\n--- Running Time-Series Factor Regressions for Each Ticker ---")
regression_results = {}
final_vif_data = {}

# Use only tickers present in the calculated excess returns
tickers_to_regress = [t for t in EV_TICKERS if t in excess_returns.columns]
if not tickers_to_regress:
    print("🔴 FATAL ERROR: No tickers remaining in excess returns data.")
    sys.exit()

for ticker in tickers_to_regress:
    print(f"\n--- Regression for: {ticker} ---")

    # Define predictors for this specific ticker
    predictors = base_factor_columns[:] # Start with base FF+MOM
    # Add available *lagged* ESG columns for this ticker
    if ticker in available_lagged_esg_cols:
        predictors.extend(available_lagged_esg_cols[ticker])

    # Ensure all selected predictors exist in the aligned factors data
    valid_predictors = [p for p in predictors if p in aligned_factors.columns]
    if not valid_predictors:
        print(f"  -> 🔴 Skipping {ticker}: No valid predictor columns found.")
        continue

    # Prepare X and y for this ticker
    X_unaligned = aligned_factors[valid_predictors]
    y_unaligned = excess_returns[ticker] # Use CORRECT excess returns

    # Align X and y, drop NaNs for this specific regression
    data_ticker = pd.concat([y_unaligned, X_unaligned], axis=1).dropna()

    # Check for sufficient data AFTER dropping NaNs
    min_obs = max(len(valid_predictors) + 10, 24) # Heuristic min observations
    if data_ticker.shape[0] < min_obs:
        print(f"  -> 🔴 Skipping {ticker}: Insufficient observations ({data_ticker.shape[0]} < {min_obs}) after NaN drop.")
        continue

    y = data_ticker[ticker]
    X = data_ticker[valid_predictors]
    X = sm.add_constant(X, prepend=True) # Add constant

    # --- Calculate VIF specific to THIS model's predictors ---
    try:
        vif_model_specific = pd.DataFrame()
        vif_model_specific["Feature"] = X.columns
        # Check for constant variance columns before VIF calculation
        constant_cols = X.columns[X.std() == 0]
        if len(constant_cols) > 0:
             print(f"  -> Warning: Constant column(s) detected in predictors for {ticker}: {list(constant_cols)}. Skipping VIF for these.")
             X_vif_calc = X.drop(columns=constant_cols)
             # Recalculate VIF only on non-constant columns if any remain
             if X_vif_calc.shape[1] > 0:
                  vif_values = [variance_inflation_factor(X_vif_calc.values, i) for i in range(X_vif_calc.shape[1])]
                  vif_subset_df = pd.DataFrame({'Feature': X_vif_calc.columns, 'VIF': vif_values})
                  # Merge back Nans for constant columns
                  vif_model_specific = pd.merge(vif_model_specific[['Feature']], vif_subset_df, on='Feature', how='left')
             else: # No non-constant predictors left
                  vif_model_specific['VIF'] = np.nan
        else: # No constant columns
            vif_model_specific["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]

        print(f"VIF for {ticker} model predictors:")
        print(vif_model_specific.to_string(index=False, float_format="%.2f"))
        final_vif_data[ticker] = vif_model_specific # Store it
        # Check VIF again here, focusing on predictors (exclude 'const')
        high_vif_predictors = vif_model_specific[(vif_model_specific["Feature"] != "const") & (vif_model_specific["VIF"] > VIF_THRESHOLD)]["Feature"].tolist()
        if high_vif_predictors: print(f"  -> ⚠️ High VIF detected (>{VIF_THRESHOLD}) for predictors in {ticker} model: {high_vif_predictors}")

    except Exception as e:
        print(f"  -> 🔴 Error calculating VIF for {ticker} model: {e}")
        final_vif_data[ticker] = pd.DataFrame({'Feature': X.columns, 'VIF': [np.nan]*len(X.columns)})


    # --- Run the OLS regression ---
    try:
        model = sm.OLS(y, X).fit()
        print(f"\nRegression results for {ticker}:")
        print(model.summary())

        # Store results (store the fitted model object)
        regression_results[ticker] = model

    except Exception as e:
        print(f"  -> 🔴 Error running regression for {ticker}: {e}")


# --- Step 8: Save Results ---
print("\n--- Saving Regression Results ---")
output_excel_path = 'ev_factor_regression_results_ts.xlsx' # Changed filename slightly
try:
    with pd.ExcelWriter(output_excel_path, engine='xlsxwriter') as writer:
        workbook = writer.book
        # --- Save Model Summaries as Text ---
        for ticker, model in regression_results.items():
            summary_text = model.summary().as_text()
            worksheet = workbook.add_worksheet(f'{ticker}_Summary')
            # Increase column width for readability
            worksheet.set_column(0, 0, 100)
            worksheet.write_string(0, 0, summary_text)

        # --- Save VIF Data ---
        for ticker, vif_df in final_vif_data.items():
             # Check if vif_df is a DataFrame before saving
             if isinstance(vif_df, pd.DataFrame):
                  vif_df.to_excel(writer, sheet_name=f'{ticker}_VIF', index=False)
             else:
                  print(f"  -> Skipping VIF save for {ticker}, data not in expected format.")


    print(f"Regression results saved to '{output_excel_path}'.")

except Exception as e:
    print(f"🔴 Error saving results to Excel: {e}")


print("\n--- Script Finished ---")


[**************        29%                       ]  4 of 14 completed

--- Script Started ---
Tickers: ['TSLA', '1211.HK', 'VOW3.DE', 'NIO', 'RIVN', 'LCID', 'XPEV', 'LI', 'GM', 'F', '005380.KS', 'PSNY', 'BMW.DE', 'MBG.DE']
Price Data Range: 2019-10-30 to 2024-12-31
Analysis Period: 2020-01-01 to 2024-12-31
Factor File: /content/gd_Developed_5_Factors.csv

--- Downloading and Preparing Stock Returns ---


[*********************100%***********************]  14 of 14 completed


Downloaded data shape: (1342, 70)
  -> MultiIndex columns detected.
  -> Final stock price data has 14 valid ticker columns.

Stock Price Data (Selected, Processed, Head):
Ticker      005380.KS  1211.HK  BMW.DE    F    GM  LCID    LI  MBG.DE  NIO  PSNY   RIVN  TSLA  VOW3.DE  XPEV
Date                                                                                                        
2019-10-30   99150.29    36.55   50.53 6.63 36.03  9.89 16.46   33.36 1.43 10.00 100.73 21.00   107.08 21.22
2019-10-31   98745.59    36.06   50.11 6.67 35.32  9.89 16.46   32.93 1.45 10.00 100.73 20.99   104.70 21.22
2019-11-01  100364.37    36.69   50.64 6.90 36.09  9.89 16.46   33.11 1.52 10.00 100.73 20.89   105.92 21.22
2019-11-04  100364.37    37.28   52.22 6.98 36.49  9.89 16.46   33.39 1.71 10.00 100.73 21.16   109.00 21.22
2019-11-05  101173.75    37.43   53.09 7.00 36.33  9.89 16.46   33.50 2.34 10.00 100.73 21.15   109.30 21.22

Actual Stock Monthly Returns (Head):
Ticker      005380.KS  1211

  return 1 - self.ssr/self.centered_tss
  vif = 1. / (1. - r_squared_i)
  vif = 1. / (1. - r_squared_i)
  vif = 1. / (1. - r_squared_i)



Regression results for NIO:
                            OLS Regression Results                            
Dep. Variable:                    NIO   R-squared:                       0.227
Model:                            OLS   Adj. R-squared:                  0.138
Method:                 Least Squares   F-statistic:                     2.542
Date:                Mon, 31 Mar 2025   Prob (F-statistic):             0.0311
Time:                        09:44:36   Log-Likelihood:                -17.553
No. Observations:                  59   AIC:                             49.11
Df Residuals:                      52   BIC:                             63.65
Df Model:                           6                                         
Covariance Type:            nonrobust                                         
                  coef    std err          t      P>|t|      [0.025      0.975]
-------------------------------------------------------------------------------
Mkt-RF          1.363

  vif = 1. / (1. - r_squared_i)



Regression results for BMW.DE:
                            OLS Regression Results                            
Dep. Variable:                 BMW.DE   R-squared:                       0.399
Model:                            OLS   Adj. R-squared:                  0.273
Method:                 Least Squares   F-statistic:                     3.182
Date:                Mon, 31 Mar 2025   Prob (F-statistic):            0.00333
Time:                        09:44:37   Log-Likelihood:                 20.269
No. Observations:                  59   AIC:                            -18.54
Df Residuals:                      48   BIC:                             4.316
Df Model:                          10                                         
Covariance Type:            nonrobust                                         
                     coef    std err          t      P>|t|      [0.025      0.975]
----------------------------------------------------------------------------------
const       

### This code saves the Regression and Fama MacBeth Two-way regression resutlts in a CSV file


--- Saving Regression Results and Tables to Excel ---
Regression results and tables saved to 'ev_factor_regression_results_ts.xlsx'.


### Panael Regression Analysis

In [40]:
# Import necessary libraries
import pandas as pd
import numpy as np
import yfinance as yf
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
import warnings
import sys
import matplotlib.pyplot as plt
import matplotlib.style as style
import xlsxwriter
from linearmodels.panel import PanelOLS, RandomEffects # Import panel models

# --- Settings and Configuration ---
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
# from statsmodels.tools.sm_exceptions import ValueWarning, ConvergenceWarning, EstimationWarning
# warnings.simplefilter('ignore', (ValueWarning, ConvergenceWarning, EstimationWarning))

style.use("default") # Reset style if needed

# List of EV tickers
EV_TICKERS = [
    "TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN",
    "LCID", "XPEV", "LI", "GM", "F",
    "005380.KS", "PSNY", "BMW.DE", "MBG.DE"
]
START_DATE_PRICES = "2019-10-30"
END_DATE_PRICES = "2024-12-31"
START_DATE_ANALYSIS = "2020-01-01" # Start analysis after lag allows first month
END_DATE_ANALYSIS = "2024-12-31"
FF_FACTORS_PATH = "/content/gd_Developed_5_Factors.csv" # MODIFY IF NEEDED

print("--- Script Started ---")

# --- Advanced Imputation Function (Defined Once) ---
def advanced_imputation(df_input):
    df = df_input.copy(); original_index = df.index
    all_missing_cols = df.columns[df.isnull().all()]
    df_imputable = df.copy()
    if len(all_missing_cols) > 0:
        print(f"  -> Imputation: Dropping {len(all_missing_cols)} all-NaN columns: {list(all_missing_cols)}")
        df_imputable = df_imputable.drop(columns=all_missing_cols)
    cols_to_impute = df_imputable.columns[df_imputable.isnull().any()]
    if df_imputable.isnull().sum().sum() == 0:
        print("  -> Imputation: 🟢 No missing values found. Skipping imputation.")
        imputed_data = df_imputable
    elif len(cols_to_impute) > 0:
        print(f"  -> Imputation: Imputing {len(cols_to_impute)} columns...")
        n_features = min(10, df_imputable.shape[1] - 1) if df_imputable.shape[1] > 1 else 1
        imputer = IterativeImputer(estimator=BayesianRidge(), max_iter=30, random_state=42, tol=1e-4, n_nearest_features=n_features, verbose=0)
        try:
            imputed_values = imputer.fit_transform(df_imputable[cols_to_impute])
            imputed_subset = pd.DataFrame(imputed_values, columns=cols_to_impute, index=original_index)
            imputed_data = df_imputable.copy(); imputed_data[cols_to_impute] = imputed_subset
            print(f"  -> Imputation: Completed for {len(cols_to_impute)} columns.")
        except Exception as e:
            print(f"  -> Imputation: 🔴 Error during imputation: {e}. Returning data with NaNs.")
            imputed_data = df_imputable
    else:
        print("  -> Imputation: 🟡 No columns required imputation.")
        imputed_data = df_imputable
    try: imputed_data = imputed_data.astype(float)
    except Exception as e: print(f"  -> Imputation: 🔴 Warning: Could not convert imputed data to float: {e}")
    return imputed_data

# --- Step 1: Download and Prepare ACTUAL Stock Returns ---
print("\n--- Downloading and Preparing Stock Returns ---")
try:
    all_stock_data = yf.download(EV_TICKERS, start=START_DATE_PRICES, end=END_DATE_PRICES, progress=False)
    if all_stock_data.empty: raise ValueError("No stock price data downloaded.")
    if isinstance(all_stock_data.columns, pd.MultiIndex):
        if 'Adj Close' in all_stock_data.columns.levels[0]: price_data = all_stock_data['Adj Close']
        elif 'Close' in all_stock_data.columns.levels[0]: price_data = all_stock_data['Close']; print("  -> Warning: Using 'Close' price as 'Adj Close' not found.")
        else: raise ValueError("Neither 'Adj Close' nor 'Close' found in MultiIndex.")
    elif isinstance(all_stock_data.columns, pd.Index):
        if 'Adj Close' in all_stock_data.columns: target_col = 'Adj Close'
        elif 'Close' in all_stock_data.columns: target_col = 'Close'; print("  -> Warning: Using 'Close' price for single ticker.")
        else: raise ValueError("Neither 'Adj Close' nor 'Close' found in simple Index.")
        ticker_name = EV_TICKERS[0] if len(EV_TICKERS)==1 else "SINGLE_TICKER"
        price_data = all_stock_data[[target_col]].rename(columns={target_col: ticker_name})
    else: raise ValueError("Unexpected column structure in downloaded data.")

    price_data = price_data.ffill().bfill()
    price_data = price_data.dropna(axis=1, how='all')
    if price_data.empty: raise ValueError("All stock price columns were NaN after fill.")
    print(f"  -> Final stock price data has {price_data.shape[1]} valid tickers.")
    price_data.index = pd.to_datetime(price_data.index)
    monthly_prices = price_data.resample('ME').last()
    stock_monthly_returns = monthly_prices.pct_change().dropna(how='all', axis=0)
    if stock_monthly_returns.empty: raise ValueError("Stock monthly returns DataFrame is empty.")
    print("Actual stock monthly returns calculated.")

except Exception as e: print(f"🔴 FATAL ERROR downloading/processing stock returns: {e}"); sys.exit()


# --- Step 2: Load and Prepare Factors Data ---
print("\n--- Loading and Preparing Factors Data ---")
try:
    ff_factors_monthly_raw = pd.read_csv(FF_FACTORS_PATH, index_col=0, header=0, dtype='object')
    ff_factors_monthly_raw.index = pd.to_datetime(ff_factors_monthly_raw.index, errors='coerce')
    ff_factors_monthly_raw = ff_factors_monthly_raw[pd.notna(ff_factors_monthly_raw.index)]
    if ff_factors_monthly_raw.empty: raise ValueError("All rows dropped from factors after date parsing.")
    for col in ff_factors_monthly_raw.columns: ff_factors_monthly_raw[col] = pd.to_numeric(ff_factors_monthly_raw[col], errors='coerce')
    if pd.api.types.is_datetime64_any_dtype(ff_factors_monthly_raw.index): ff_factors_monthly_raw.index = ff_factors_monthly_raw.index + pd.offsets.MonthEnd(0)
    ff_factors_monthly_raw = ff_factors_monthly_raw[~ff_factors_monthly_raw.index.duplicated(keep='last')]
    ff_factors_monthly_raw = ff_factors_monthly_raw[(ff_factors_monthly_raw.index >= START_DATE_ANALYSIS) & (ff_factors_monthly_raw.index <= END_DATE_ANALYSIS)]
    if ff_factors_monthly_raw.empty: raise ValueError("No factor data remains after filtering for analysis period.")
    print("Factors data loaded, cleaned, filtered.")
    if ff_factors_monthly_raw.isnull().sum().sum() > 0:
        print("\n--- Imputing Factors Data ---")
        ff_factors_monthly = advanced_imputation(ff_factors_monthly_raw)
    else: print("\n--- No missing values in factors data. Skipping imputation. ---"); ff_factors_monthly = ff_factors_monthly_raw.astype(float)
    factor_cols_to_convert = ["Mkt-RF", "SMB", "HML", "RMW", "CMA", "RF", "MOM"]
    for col in factor_cols_to_convert:
        if col in ff_factors_monthly.columns and pd.api.types.is_numeric_dtype(ff_factors_monthly[col]):
            max_abs_val = ff_factors_monthly[col].abs().max()
            if not pd.isna(max_abs_val) and max_abs_val > 1: # Use 1 as threshold now (e.g., 1.5% is > 1)
                 print(f"  -> Converting factor '{col}' assuming percentage points.")
                 ff_factors_monthly[col] = ff_factors_monthly[col] / 100.0
except FileNotFoundError: print(f"\n🔴 FATAL ERROR: Factors file not found: {FF_FACTORS_PATH}"); sys.exit()
except Exception as e: print(f"\n🔴 FATAL ERROR loading/processing Factors Data: {e}"); sys.exit()


# --- Step 3: Create Lagged Factor Variables ---
print("\n--- Creating Lagged Factor Variables ---")
factors_lagged = ff_factors_monthly.copy()
lagged_esg_cols = []
# Lag ONLY the overall ESG score for this model run
for ticker in EV_TICKERS:
    col = f"{ticker}_ESG"
    lag_col = f"{ticker}_ESG_lag"
    if col in factors_lagged.columns:
        factors_lagged[lag_col] = factors_lagged[col].shift(1)
        lagged_esg_cols.append(lag_col)
    # else: # Optional: track which tickers lack ESG data
    #     print(f"  -> No ESG data found for {ticker} to create lag.")

if not lagged_esg_cols: print("  -> Warning: No lagged ESG columns were created.")
factors_lagged = factors_lagged.dropna(subset=lagged_esg_cols, how='any') # Drop rows where ANY lagged ESG is NaN
if factors_lagged.empty: print(f"🔴 FATAL ERROR: Data empty after lagging. Check input data/dates."); sys.exit()
print(f"Lagged factors created. Data available from: {factors_lagged.index.min().date()}")


# --- Step 4: Prepare Data for Panel Regression ---
print("\n--- Preparing Data for Panel Analysis ---")
try:
    # --- Align Returns and Factors based on Lagged Factors Index ---
    common_index = stock_monthly_returns.index.intersection(factors_lagged.index)
    if len(common_index) == 0: raise ValueError("No overlapping dates after creating lagged factors.")

    aligned_returns = stock_monthly_returns.loc[common_index].copy().astype(float)
    aligned_factors = factors_lagged.loc[common_index].copy().astype(float)
    print(f"Aligned data for {len(common_index)} months.")

    # --- Calculate Excess Returns ---
    if "RF" not in aligned_factors.columns: raise ValueError("Risk-Free rate ('RF') not found.")
    risk_free_rate = aligned_factors["RF"]
    excess_returns = aligned_returns.subtract(risk_free_rate, axis=0)

    # --- Reshape Data to Long Format ---
    # 1. Stack excess returns
    excess_returns_long = excess_returns.stack().reset_index()
    excess_returns_long.columns = ['Date', 'Ticker', 'ExcessReturn']

    # 2. Prepare Market-Wide Factors
    market_factors = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
    market_factors_present = [f for f in market_factors if f in aligned_factors.columns]
    factors_market_wide = aligned_factors[market_factors_present].reset_index()

    # 3. Prepare Entity-Specific Factors (Lagged ESG)
    factors_esg_specific_wide = aligned_factors[lagged_esg_cols].reset_index()
    # Melt/Stack ESG factors
    factors_esg_specific_long = factors_esg_specific_wide.melt(id_vars='Date', var_name='FactorName', value_name='FactorValue')
    # Extract Ticker and create standardized ESG column name
    factors_esg_specific_long['Ticker'] = factors_esg_specific_long['FactorName'].str.split('_').str[0]
    factors_esg_specific_long['ESG_Factor_Type'] = 'ESG_lag' # Standardize name
    factors_esg_specific_long = factors_esg_specific_long.pivot_table(index=['Date', 'Ticker'], columns='ESG_Factor_Type', values='FactorValue').reset_index()

    # 4. Merge everything together
    panel_data = pd.merge(excess_returns_long, factors_market_wide, on='Date', how='left')
    panel_data = pd.merge(panel_data, factors_esg_specific_long, on=['Date', 'Ticker'], how='left')

    # 5. Set MultiIndex
    panel_data['Date'] = pd.to_datetime(panel_data['Date'])
    panel_data = panel_data.set_index(['Ticker', 'Date']).sort_index()

    # 6. Handle potential NaNs from merging/lagging before analysis
    panel_data = panel_data.dropna() # Drop rows with any NaN in factors or return

    if panel_data.empty: raise ValueError("Panel data is empty after merging and NaN drop.")

    print("Panel data prepared successfully (long format).")
    print("Panel Data (Head):")
    print(panel_data.head().to_string(float_format="%.4f"))

except Exception as e:
    print(f"🔴 FATAL ERROR preparing panel data: {e}")
    # import traceback; traceback.print_exc()
    sys.exit()

# --- Step 5: Define Variables and Run Panel Models ---
print("\n--- Running Panel Regression Models ---")

try:
    # Define dependent and independent variables
    dependent = panel_data['ExcessReturn']
    exog_vars = market_factors_present + ['ESG_lag'] # Use base factors + overall lagged ESG
    exog = sm.add_constant(panel_data[exog_vars]) # Add constant

    print(f"\nModel Specification: ExcessReturn ~ const + {' + '.join(exog_vars)}")

    # --- Model 1: Pooled OLS ---
    print("\n--- Fitting Pooled OLS ---")
    mod_pooled = PanelOLS(dependent, exog, check_rank=False) # check_rank=False can help with potential collinearity issues sometimes
    # Use clustered standard errors (robust to heteroskedasticity and within-entity correlation)
    pooled_res = mod_pooled.fit(cov_type='clustered', cluster_entity=True)
    print(pooled_res)

    # --- Model 2: Fixed Effects (Entity) ---
    print("\n--- Fitting Fixed Effects (Entity) ---")
    mod_fe_entity = PanelOLS(dependent, exog, entity_effects=True, check_rank=False)
    fe_entity_res = mod_fe_entity.fit(cov_type='clustered', cluster_entity=True)
    print(fe_entity_res)

    # --- Model 3: Fixed Effects (Time) ---
    print("\n--- Fitting Fixed Effects (Time) ---")
    mod_fe_time = PanelOLS(dependent, exog, time_effects=True, check_rank=False)
    # Cluster by time if using time effects (or use other robust types)
    fe_time_res = mod_fe_time.fit(cov_type='clustered', cluster_time=True)
    print(fe_time_res)

    # --- Model 4: Fixed Effects (Two-Way: Entity + Time) ---
    print("\n--- Fitting Fixed Effects (Two-Way: Entity + Time) ---")
    mod_fe_tw = PanelOLS(dependent, exog, entity_effects=True, time_effects=True, check_rank=False)
     # Use two-way clustered standard errors
    fe_tw_res = mod_fe_tw.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)
    print(fe_tw_res)

    # --- Model 5: Random Effects ---
    # print("\n--- Fitting Random Effects ---")
    # mod_re = RandomEffects(dependent, exog, check_rank=False)
    # re_res = mod_re.fit(cov_type='clustered', cluster_entity=True)
    # print(re_res)

    # --- (Optional) Hausman Test ---
    # Compares FE vs RE. Requires fitting RE model above.
    # try:
    #     import scipy.stats
    #     # Get params and cov for FE (Entity) and RE
    #     fe_params = fe_entity_res.params
    #     re_params = re_res.params
    #     fe_cov = fe_entity_res.cov
    #     re_cov = re_res.cov
    #
    #     # Common parameters (exclude constant if FE includes it implicitly)
    #     common_params = fe_params.index.intersection(re_params.index)
    #     if 'const' in common_params and 'const' not in fe_params: # Adjust if FE absorbs const
    #          common_params = common_params.drop('const')
    #
    #     # Calculate Hausman statistic
    #     diff = fe_params[common_params] - re_params[common_params]
    #     cov_diff = fe_cov.loc[common_params, common_params] - re_cov.loc[common_params, common_params]
    #     # Ensure cov_diff is invertible
    #     try:
    #         inv_cov_diff = np.linalg.inv(cov_diff)
    #         hausman_stat = diff.T @ inv_cov_diff @ diff
    #         df = len(common_params)
    #         p_value = scipy.stats.chi2.sf(hausman_stat, df)
    #         print("\n--- Hausman Test (FE vs RE) ---")
    #         print(f"Chi-squared statistic: {hausman_stat:.4f}")
    #         print(f"Degrees of freedom: {df}")
    #         print(f"P-value: {p_value:.4f}")
    #         if p_value < 0.05:
    #             print(" -> Reject null hypothesis: Fixed Effects model is preferred.")
    #         else:
    #             print(" -> Fail to reject null hypothesis: Random Effects model may be suitable.")
    #     except np.linalg.LinAlgError:
    #         print("\n--- Hausman Test (FE vs RE) ---")
    #         print(" -> Could not perform test: Covariance difference matrix is singular.")
    #
    # except ImportError:
    #     print("\n--- Hausman Test ---")
    #     print(" -> Install scipy to run the Hausman test: pip install scipy")
    # except Exception as e:
    #      print(f"\n--- Hausman Test ---")
    #      print(f" -> Error running Hausman test: {e}")

except Exception as e:
    print(f"🔴 FATAL ERROR during panel regression model fitting: {e}")
    # import traceback; traceback.print_exc()


print("\n--- Script Finished ---")

--- Script Started ---

--- Downloading and Preparing Stock Returns ---
  -> Final stock price data has 14 valid tickers.
Actual stock monthly returns calculated.

--- Loading and Preparing Factors Data ---
Factors data loaded, cleaned, filtered.

--- Imputing Factors Data ---
  -> Imputation: Imputing 12 columns...
  -> Imputation: Completed for 12 columns.
  -> Converting factor 'Mkt-RF' assuming percentage points.
  -> Converting factor 'SMB' assuming percentage points.
  -> Converting factor 'HML' assuming percentage points.
  -> Converting factor 'RMW' assuming percentage points.
  -> Converting factor 'CMA' assuming percentage points.
  -> Converting factor 'MOM' assuming percentage points.

--- Creating Lagged Factor Variables ---
Lagged factors created. Data available from: 2020-02-29

--- Preparing Data for Panel Analysis ---
Aligned data for 59 months.
Panel data prepared successfully (long format).
Panel Data (Head):
                      ExcessReturn  Mkt-RF     SMB     HML

In [61]:

import yesg
import pandas as pd
import numpy as np


# All available historic ESG rating for THE Critical Component COMPANY
TSLA_ev = yesg.get_historic_esg('TSLA')
TSLA_ev.columns = ['Guan_ESG', 'Guan_E', 'Guan_S', 'Guang_G']

# Fill na with 0
TSLA_ev = pd.DataFrame(TSLA_ev)

# inplace
TSLA_ev.replace(np.nan, 0, inplace=True)

# Guangzhou_high_ev.tail()


# Merge and save as csv
#eighteenth_high_ev = seventeenth_high_ev.join(Guangzhou_high_ev, how='outer')

# Save to csv
#all_low_bat5.to_csv(r'/Users/henryefeonomakpo/Downloads/1-Indra-H-Thesis idea/1-saved csv from python/all18_ev.csv')

#display
display(TSLA_ev)

Unnamed: 0_level_0,Guan_ESG,Guan_E,Guan_S,Guang_G
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2014-09-01,56.00,62.00,50.00,52.0
2014-10-01,56.00,62.00,50.00,52.0
2014-11-01,56.00,62.00,50.00,52.0
2014-12-01,56.00,62.00,50.00,52.0
2015-01-01,56.00,62.00,50.00,55.0
...,...,...,...,...
2024-11-01,24.73,3.20,14.13,7.4
2024-12-01,24.73,3.20,14.13,7.4
2025-01-01,24.73,3.20,14.13,7.4
2025-02-01,24.73,3.20,14.13,7.4


### Scrape ESG risk rating dataset

In [57]:
# Import necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.style as style
import yfinance as yf
import statsmodels.api as sm
import numpy as np
import xlsxwriter
import warnings
from io import StringIO

# Import PanelOLS and RandomEffects from linearmodels
from linearmodels.panel import PanelOLS, RandomEffects

# Import PCA and StandardScaler from sklearn
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

# --- Standard Setup (Warnings, Style) ---
warnings.filterwarnings("ignore", category=FutureWarning) # Suppress various FutureWarnings

# Set plotting style (optional)
style.use("default")
params = {"axes.labelsize": 8, "font.size": 8, "legend.fontsize": 8, "xtick.labelsize": 8,
          "ytick.labelsize": 8, "text.usetex": False, "font.family": "sans-serif",
          "axes.spines.top": False, "axes.spines.right": False, "grid.color": "grey",
          "axes.grid": True, "grid.alpha": 0.5, "grid.linestyle": ":"}
plt.rcParams.update(params)

# --- Constants ---
#print(yesg.get_historic_esg('GWLLF'))
ev_tickers_returns = ["TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN", "GWLLF", "LCID", "XPEV",
                      "LI", "GM", "F", "005380.KS", "PSNY", "BMW.DE"]
ev_tickers_esg_components = ["TSLA", "1211.HK", "VOW3.DE", "NIO", "RIVN", "LCID", "XPEV",
                             "LI", "GM", "F", "BMW.DE", "PSNY"] # Tickers expected to have E,S,G cols
csv_file_path = "/content/gd_Developed_5_Factors.csv" # MAKE SURE THIS PATH IS CORRECT

# --- Data Downloading (Returns) ---
adj_close_data = pd.DataFrame()
print("Downloading historical stock data...")
download_start_date = "2021-11-01" # Start earlier for lagging
download_end_date = "2025-03-27"
for ticker in ev_tickers_returns:
    try:
        data = yf.download(ticker, start=download_start_date, end=download_end_date, progress=False)
        if data.empty:
            print(f"No data downloaded for {ticker}. Skipping.")
            continue
        adj_close_data[ticker] = data.get("Adj Close", data["Close"])
        # print(f"Successfully downloaded data for {ticker}.") # Keep less verbose
    except Exception as e:
        print(f"Error downloading data for {ticker}: {e}")
print("Stock data download complete.")

# --- Calculate Monthly Returns ---
print("\nCalculating monthly returns...")
if adj_close_data.empty: raise ValueError("No stock data downloaded.")
monthly_returns = adj_close_data.resample("ME").last().pct_change() * 100
print("Monthly returns calculated.")

# --- Load Fama-French 6-Factor and ESG data ---
print(f"\nLoading Fama-French 6-Factor and ESG data from '{csv_file_path}'...")
try:
    _ff_esg_data_raw = pd.read_csv(csv_file_path, index_col=0, header=0, on_bad_lines='warn')
    print("Data loaded successfully.")
except FileNotFoundError: raise FileNotFoundError(f"CSV file not found: {csv_file_path}")
except Exception as e: raise Exception(f"Error loading CSV file: {e}")

# Define expected factor columns (FF5 + MOM + RF)
factor_columns = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'RF', 'MOM']
esg_suffixes = ['_E', '_S', '_G', '_ESG'] # Base suffixes to look for

# Build list of all expected columns
expected_columns = factor_columns[:]
for ticker in ev_tickers_esg_components:
    for suffix in esg_suffixes:
        expected_columns.append(f"{ticker}{suffix}")

# Filter the loaded data
actual_columns_in_csv = [col for col in expected_columns if col in _ff_esg_data_raw.columns]
essential_factors = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM', 'RF']
if not all(f in actual_columns_in_csv for f in essential_factors):
     missing_factors = [f for f in essential_factors if f not in actual_columns_in_csv]
     raise ValueError(f"Core Fama-French/Momentum factors missing: {missing_factors}")

ff_esg_data = _ff_esg_data_raw[actual_columns_in_csv].copy()
print(f"Using {len(ff_esg_data.columns)} columns from CSV.")

# Convert factor data index to datetime
try:
    ff_esg_data.index = pd.to_datetime(ff_esg_data.index.astype(str), format="%Y%m", errors='coerce')
    ff_esg_data.index = ff_esg_data.index + pd.offsets.MonthEnd(0)
    ff_esg_data.dropna(axis=0, how='all', subset=factor_columns[:-1], inplace=True)
except ValueError as e: raise ValueError(f"Error converting factor data index to datetime: {e}")


# --- Impute Missing ESG Component Data (BEFORE Stacking/Merging) ---
print("\nImputing missing ESG component data (E, S, G) using ticker-specific medians...")
imputation_count = 0
for ticker in ev_tickers_esg_components:
    for suffix in ['_E', '_S', '_G']:
        col_name = f"{ticker}{suffix}"
        if col_name in ff_esg_data.columns:
            series = ff_esg_data[col_name]
            if series.isnull().any():
                median_val = series.median()
                if pd.isna(median_val):
                    print(f"Warning: Median for {col_name} is NaN. Filling missing values with 0.")
                    fill_value = 0
                else:
                    fill_value = median_val
                num_filled = series.isnull().sum()
                ff_esg_data[col_name] = series.fillna(fill_value)
                imputation_count += num_filled
print(f"ESG component imputation complete. Imputed {imputation_count} values.")

# --- Data Preparation for Panel ---

# 1. Align Returns and Imputed Factor/ESG Data by Date
print("\nAligning returns and factor/ESG data by date...")
common_index_dates = monthly_returns.index.intersection(ff_esg_data.index)
if common_index_dates.empty: raise ValueError("No overlapping dates found after loading/imputing.")
start_date, end_date = common_index_dates.min(), common_index_dates.max()
print(f"Common date range for panel: {start_date.date()} to {end_date.date()}")
monthly_returns_aligned = monthly_returns.loc[common_index_dates]
ff_esg_data_aligned = ff_esg_data.loc[common_index_dates]

# 2. Prepare Dependent Variable (Excess Returns) in Long Format
print("Preparing excess returns in long format...")
excess_returns_long_list = []
rf_aligned = ff_esg_data_aligned["RF"]
for ticker in ev_tickers_returns:
    if ticker in monthly_returns_aligned.columns:
        temp_ret = pd.DataFrame({'Return': monthly_returns_aligned[ticker], 'Ticker': ticker}, index=monthly_returns_aligned.index)
        temp_ret = temp_ret.join(rf_aligned.rename('RF'), how='left')
        temp_ret['excess_return'] = temp_ret['Return'] - temp_ret['RF']
        excess_returns_long_list.append(temp_ret[['Ticker', 'excess_return']])
if not excess_returns_long_list: raise ValueError("No excess returns calculated.")
excess_returns_with_index = pd.concat(excess_returns_long_list)
excess_returns_long = excess_returns_with_index.reset_index().rename(columns={'index': 'Date'})
excess_returns_long.dropna(subset=['excess_return'], inplace=True)

# 3. Prepare Imputed ESG Component Data (E, S, G) in Long Format
print("Preparing imputed ESG component data (E, S, G) in long format...")
esg_components_long_list = []
for ticker in ev_tickers_esg_components:
    component_data_df = pd.DataFrame({'Ticker': ticker}, index=ff_esg_data_aligned.index)
    processed_any_component = False
    for suffix in ['_E', '_S', '_G']:
        col_name = f"{ticker}{suffix}"
        if col_name in ff_esg_data_aligned.columns:
            component_data_df[suffix[1:]] = ff_esg_data_aligned[col_name]
            processed_any_component = True
        else: component_data_df[suffix[1:]] = np.nan
    if processed_any_component: esg_components_long_list.append(component_data_df)
if not esg_components_long_list: print("Warning: No ESG component data processed.")
esg_components_with_index = pd.concat(esg_components_long_list) if esg_components_long_list else pd.DataFrame(columns=['Ticker','E','S','G'])
esg_components_long = esg_components_with_index.reset_index().rename(columns={'index': 'Date'})

# 4. Merge Returns, Factors, and ESG Components
print("Merging returns, factors, and ESG component data...")
panel_data = pd.merge(excess_returns_long, ff_esg_data_aligned[factor_columns].reset_index().rename(columns={'index':'Date'}), on='Date', how='left')
panel_data = pd.merge(panel_data, esg_components_long[['Date', 'Ticker', 'E', 'S', 'G']], on=['Date', 'Ticker'], how='left')

# 5. Create Lagged ESG Component Variables
print("Creating lagged ESG component variables...")
panel_data.sort_values(by=['Ticker', 'Date'], inplace=True)
lagged_esg_cols = ['E_lagged', 'S_lagged', 'G_lagged']
panel_data['E_lagged'] = panel_data.groupby('Ticker')['E'].shift(1)
panel_data['S_lagged'] = panel_data.groupby('Ticker')['S'].shift(1)
panel_data['G_lagged'] = panel_data.groupby('Ticker')['G'].shift(1)

# 6. Handle Final Missing Values
print("Handling final missing values after merging and lagging...")
essential_factors_no_rf = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM'] # 6 factors
essential_cols = ['excess_return'] + lagged_esg_cols + essential_factors_no_rf
initial_rows = len(panel_data)
# Drop rows where ANY essential variable for ANY model is missing
panel_data.dropna(subset=essential_cols, inplace=True)
rows_after_na = len(panel_data)
print(f"Dropped {initial_rows - rows_after_na} rows due to missing values.")
if panel_data.empty: raise ValueError("Panel data empty after handling NaNs.")

# --- Prepare for PCA (Run BEFORE setting index) ---
# Uses the final cleaned panel_data
print("\nPreparing data for PCA analysis...")
pca_data_final = panel_data[lagged_esg_cols].copy()
if pca_data_final.isnull().any().any():
    print("Warning: NaNs found before PCA. Applying mean imputation...")
    pca_data_final = pca_data_final.fillna(pca_data_final.mean())
print("Scaling data for PCA...")
scaler = StandardScaler()
scaled_data = scaler.fit_transform(pca_data_final)
print("Running PCA...")
pca = PCA(n_components=3)
pca_components = pca.fit_transform(scaled_data)
print("Explained Variance Ratio by PCA components:", pca.explained_variance_ratio_)
panel_data[['PC1', 'PC2', 'PC3']] = pca_components # Add PCA components to main dataframe
pca_cols = ['PC1', 'PC2', 'PC3']

# --- Set Panel MultiIndex ---
print("\nSetting Panel MultiIndex (Ticker, Date)...")
try:
    panel_data = panel_data.set_index(['Ticker', 'Date'])
except KeyError: raise KeyError("Columns 'Ticker' or 'Date' not found.")
print(f"Final Panel Data Ready. Shape: {panel_data.shape}")

# --- Panel Regression Analysis ---
all_results = {} # Dictionary to store results

# --- Model 1: Separate Fixed Effects for E, S, G ---
print("\n--- Running Separate FE Models for E, S, G ---")
for component in lagged_esg_cols: # ['E_lagged', 'S_lagged', 'G_lagged']
    print(f"\n--- FE Model: {component} ---")
    model_key = f"FE_{component.split('_')[0]}"
    try:
        dependent = panel_data['excess_return']
        exog_vars = [component]
        if not all(v in panel_data.columns for v in exog_vars): raise ValueError(f"{exog_vars} not found.")
        exog_df_final = panel_data[exog_vars]
        if exog_df_final.empty or exog_df_final.var(ddof=0).iloc[0] == 0: raise ValueError(f"{component} has no data or no variance.")

        exog = exog_df_final
        mod = PanelOLS(dependent, exog, entity_effects=True, time_effects=True)
        fe_res = mod.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)
        print(fe_res)
        all_results[model_key] = fe_res.summary

    except ValueError as ve: print(f"MODEL ESTIMATION ERROR for {component}: {ve}"); all_results[model_key] = f"Error: {ve}"
    except Exception as e: print(f"UNEXPECTED ERROR for {component}: {e}"); all_results[model_key] = f"Error: {e}"

# --- Model 2: Fixed Effects with PCA Components ---
print("\n--- Running FE Model with PCA Components ---")
model_key = "FE_PCA"
try:
    dependent = panel_data['excess_return']
    exog_vars = pca_cols # ['PC1', 'PC2', 'PC3']
    if not all(v in panel_data.columns for v in exog_vars): raise ValueError(f"PCA columns not found.")
    exog_df_final = panel_data[exog_vars]
    if exog_df_final.empty or (exog_df_final.var(ddof=0) == 0).any(): raise ValueError("PCA components have no data or variance.")

    exog = exog_df_final
    mod = PanelOLS(dependent, exog, entity_effects=True, time_effects=True)
    fe_pca_res = mod.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)
    print(fe_pca_res)
    all_results[model_key] = fe_pca_res.summary

except ValueError as ve: print(f"MODEL ESTIMATION ERROR for PCA: {ve}"); all_results[model_key] = f"Error: {ve}"
except Exception as e: print(f"UNEXPECTED ERROR for PCA: {e}"); all_results[model_key] = f"Error: {e}"

# --- Model 3: Random Effects with E, S, G ---
print("\n--- Running Random Effects Model with E, S, G ---")
print("Note: RE assumes predictors uncorrelated with unobserved entity effects.")
model_key = "RE_ESG_Components"
try:
    dependent = panel_data['excess_return']
    exog_vars = ['E_lagged', 'S_lagged', 'G_lagged']
    if not all(v in panel_data.columns for v in exog_vars): raise ValueError(f"{exog_vars} not found for RE.")
    exog_df_final = panel_data[exog_vars]
    if exog_df_final.empty or (exog_df_final.var(ddof=0) == 0).any(): raise ValueError("ESG components have no data/variance for RE.")

    # Add constant for RE
    exog = sm.add_constant(exog_df_final)

    mod_re = RandomEffects(dependent, exog)
    re_res = mod_re.fit(cov_type='clustered', cluster_entity=True, cluster_time=True)
    print(re_res)
    all_results[model_key] = re_res.summary

except ValueError as ve: print(f"MODEL ESTIMATION ERROR for RE: {ve}"); all_results[model_key] = f"Error: {ve}"
except Exception as e: print(f"UNEXPECTED ERROR for RE: {e}"); all_results[model_key] = f"Error: {e}"


# --- Save All Results to Excel ---
output_filename = 'panel_regression_results_revised_strategies.xlsx' # Updated filename
print(f"\nSaving all regression results to '{output_filename}'...")
try:
    with pd.ExcelWriter(output_filename, engine='xlsxwriter') as writer:
        for model_name, summary_obj in all_results.items():
            print(f"Saving results for {model_name}...")
            if isinstance(summary_obj, str): # Error message
                pd.DataFrame({'Error': [summary_obj]}).to_excel(writer, sheet_name=f'{model_name}_Error', index=False)
            elif hasattr(summary_obj, 'tables') and len(summary_obj.tables) >= 3:
                try:
                    df_top = pd.read_html(StringIO(summary_obj.tables[0].as_html()), header=None, index_col=0)[0]
                    df_coeffs = pd.read_html(StringIO(summary_obj.tables[1].as_html()), header=0, index_col=0)[0]
                    df_bottom = pd.read_html(StringIO(summary_obj.tables[2].as_html()), header=None, index_col=0)[0]

                    df_top.to_excel(writer, sheet_name=f'{model_name}_Model', header=False)
                    df_coeffs.reset_index().to_excel(writer, sheet_name=f'{model_name}_Coefs', index=False)
                    df_bottom.to_excel(writer, sheet_name=f'{model_name}_Diag', header=False)
                except Exception as e_parse:
                    print(f"  Error parsing/saving tables for {model_name}: {e_parse}")
                    pd.DataFrame({'Error': [f"Error parsing/saving tables: {e_parse}"]}).to_excel(writer, sheet_name=f'{model_name}_Error', index=False)
            else:
                print(f"  Warning: Summary object format unexpected for {model_name}.")
                pd.DataFrame({'Error': ["Summary object format unexpected."]}).to_excel(writer, sheet_name=f'{model_name}_Error', index=False)
    print(f"All attempted model results saved to '{output_filename}'.")
except Exception as save_err:
    print(f"ERROR: Could not write to Excel file '{output_filename}': {save_err}")

Downloading historical stock data...
Stock data download complete.

Calculating monthly returns...
Monthly returns calculated.

Loading Fama-French 6-Factor and ESG data from '/content/gd_Developed_5_Factors.csv'...
Data loaded successfully.
Using 55 columns from CSV.

Imputing missing ESG component data (E, S, G) using ticker-specific medians...
ESG component imputation complete. Imputed 0 values.

Aligning returns and factor/ESG data by date...
Common date range for panel: 2022-01-31 to 2024-12-31
Preparing excess returns in long format...
Preparing imputed ESG component data (E, S, G) in long format...
Merging returns, factors, and ESG component data...
Creating lagged ESG component variables...
Handling final missing values after merging and lagging...
Dropped 48 rows due to missing values.

Preparing data for PCA analysis...
Scaling data for PCA...
Running PCA...
Explained Variance Ratio by PCA components: [0.69397629 0.20844301 0.09758071]

Setting Panel MultiIndex (Ticker, Date)

  return Series(np.sqrt(np.diag(self.cov)), self._var_names, name="std_error")


                          PanelOLS Estimation Summary                           
Dep. Variable:          excess_return   R-squared:                        0.0009
Estimator:                   PanelOLS   R-squared (Between):             -0.3215
No. Observations:                 420   R-squared (Within):               0.0006
Date:                Fri, Mar 28 2025   R-squared (Overall):             -0.0037
Time:                        18:37:51   Log-likelihood                   -1749.2
Cov. Estimator:             Clustered                                           
                                        F-statistic:                      0.1172
Entities:                          12   P-value                           0.9500
Avg Obs:                       35.000   Distribution:                   F(3,371)
Min Obs:                       35.000                                           
Max Obs:                       35.000   F-statistic (robust):            -0.7652
                            

# Panel Data model (specifically, a Two-Way Fixed Effects model using the linearmodels library) with the Fama-French 5 factors and lagged aggregate ESG scores.
### Fetch EV Automakers ESG risk rating data using Python's yesg library

### Step-by-Step Implementation
#### Import Required Libraries
#### Fetch Data
#### Calculate Monthly Returns
#### Prepare the Dataset
#### Implement Multiple Linear Regression Without Addressing Multicollinearity
#### Evaluate and Display Results
#### Address Multicollinearity
#### Implement Multiple Linear Regression After Addressing Multicollinearity
#### Evaluate and Display Results Again

### Reliable result