In [None]:
# This requires you to manually upload your events file


!pip install yfinance pandas numpy openpyxl --quiet
!pip install --upgrade yfinance --quiet

import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time
import os
from pathlib import Path
import requests
import logging
from typing import Optional, Dict
from google.colab import files


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)


CACHE_DIR = Path("/content/cache/")
CACHE_DIR.mkdir(exist_ok=True)
REQUEST_DELAY = 3
MAX_RETRIES = 5


BANK_TICKERS = {
    'Deutsche Bank': 'DBK.DE',
    'BNP Paribas': 'BNP.PA',
    'Banco Santander': 'SAN.MC',
    'Intesa Sanpaolo': 'ISP.MI',
    'Bank of Ireland': 'BIRG.IR',
    'STOXX Banks EUR': 'SX7E.BE'
}


ESTIMATION_WINDOW = 250
EVENT_WINDOW_PRE = 10
EVENT_WINDOW_POST = 10

def upload_events_file():
    """Prompt user to upload events file"""
    logger.info("Please upload your 'Cleaned_Sovereign_Rating_Events.xlsx' file") #Manual upload of file required
    uploaded = files.upload()
    for filename, content in uploaded.items():
        with open(filename, 'wb') as f:
            f.write(content)
        return filename
    return None

class DataDownloader:
    """Handles data downloading with caching"""

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({'User-Agent': 'Mozilla/5.0'})

    def get_cache_path(self, ticker: str) -> Path:
        return CACHE_DIR / f"{ticker.replace('.', '_')}.pkl"

    def load_from_cache(self, ticker: str) -> Optional[pd.DataFrame]:
        cache_path = self.get_cache_path(ticker)
        if cache_path.exists():
            try:
                return pd.read_pickle(cache_path)
            except Exception as e:
                logger.warning(f"Cache load failed for {ticker}: {str(e)}")
        return None

    def save_to_cache(self, ticker: str, data: pd.DataFrame):
        cache_path = self.get_cache_path(ticker)
        try:
            data.to_pickle(cache_path)
        except Exception as e:
            logger.warning(f"Cache save failed for {ticker}: {str(e)}")

    def download_with_retry(self, ticker: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
        """Robust download with multiple retries"""
        for attempt in range(MAX_RETRIES):
            try:
                time.sleep(REQUEST_DELAY)
                data = yf.download(ticker, start=start_date, end=end_date, progress=False)

                if data.empty:
                    logger.warning(f"Empty data for {ticker}, attempt {attempt + 1}")
                    continue

                if 'Adj Close' not in data.columns:
                    if 'Close' in data.columns:
                        data['Adj Close'] = data['Close']
                    else:
                        continue

                data['Daily_Return'] = data['Adj Close'].pct_change()
                data = data[['Adj Close', 'Daily_Return']].dropna()

                if len(data) < 50:
                    logger.warning(f"Insufficient data points for {ticker}")
                    continue

                return data

            except Exception as e:
                logger.warning(f"Attempt {attempt + 1} failed for {ticker}: {str(e)}")
                if attempt == MAX_RETRIES - 1:
                    return None
                time.sleep(10 * (attempt + 1))

    def get_bank_data(self, ticker: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
        """Main method to get data with caching"""
        cached_data = self.load_from_cache(ticker)
        if cached_data is not None:
            filtered_data = cached_data.loc[(cached_data.index >= start_date) &
                                          (cached_data.index <= end_date)]
            if not filtered_data.empty:
                return filtered_data.copy()

        data = self.download_with_retry(ticker, start_date, end_date)
        if data is not None:
            self.save_to_cache(ticker, data)
            return data.loc[(data.index >= start_date) & (data.index <= end_date)].copy()

        logger.error(f"All download attempts failed for {ticker}")
        return None

class EventStudyAnalyzer:
    """Handles the event study calculations"""

    def __init__(self, downloader: DataDownloader):
        self.downloader = downloader

    def calculate_car(self, bank_name: str, ticker: str, event_date: datetime, all_events: pd.DataFrame) -> Optional[Dict]:
        """Calculate CAR for a single event"""
        event_date = pd.to_datetime(event_date)
        start_date = (event_date - timedelta(days=ESTIMATION_WINDOW + EVENT_WINDOW_PRE + 365)).strftime('%Y-%m-%d')
        end_date = (event_date + timedelta(days=EVENT_WINDOW_POST + 365)).strftime('%Y-%m-%d')

        stock_data = self.downloader.get_bank_data(ticker, start_date, end_date)
        if stock_data is None:
            return None

        trading_days = stock_data.index
        event_idx = trading_days.get_indexer([event_date], method='nearest')[0]
        event_day = trading_days[event_idx]

        if (event_idx < EVENT_WINDOW_PRE) or (event_idx + EVENT_WINDOW_POST >= len(trading_days)):
            logger.warning(f"Skipping {bank_name} - event too close to data boundaries")
            return None

        estimation_start = event_day - timedelta(days=ESTIMATION_WINDOW + EVENT_WINDOW_PRE)
        estimation_end = event_day - timedelta(days=EVENT_WINDOW_PRE + 1)
        event_start = event_day - timedelta(days=EVENT_WINDOW_PRE)
        event_end = event_day + timedelta(days=EVENT_WINDOW_POST)

        estimation_period = stock_data[(stock_data.index >= estimation_start) &
                                     (stock_data.index <= estimation_end)]

        country = all_events.loc[all_events['Date'] == event_date, 'Country'].values[0]
        other_events = all_events[
            (all_events['Country'] == country) &
            (all_events['Date'] != event_date)
        ]

        clean_estimation_data = estimation_period.copy()
        for _, row in other_events.iterrows():
            other_date = pd.to_datetime(row['Date'])
            clean_estimation_data = clean_estimation_data[
                (clean_estimation_data.index < other_date - timedelta(days=5)) |
                (clean_estimation_data.index > other_date + timedelta(days=5))
            ]

        if len(clean_estimation_data) < 50:
            logger.warning(f"Insufficient clean data for {bank_name} around {event_date}")
            return None

        expected_return = clean_estimation_data['Daily_Return'].mean()
        event_data = stock_data[(stock_data.index >= event_start) &
                              (stock_data.index <= event_end)].copy()
        event_data['Abnormal_Return'] = event_data['Daily_Return'] - expected_return

        car_pre = event_data[event_data.index < event_day]['Abnormal_Return'].sum()
        car_post = event_data[event_data.index > event_day]['Abnormal_Return'].sum()
        car_total = event_data['Abnormal_Return'].sum()

        return {
            'Bank': bank_name,
            'Ticker': ticker,
            'Event_Date': event_day,
            'Country': country,
            'Action': all_events.loc[all_events['Date'] == event_date, 'Actions'].values[0],
            'Rating_Agency': all_events.loc[all_events['Date'] == event_date, 'Agency'].values[0],
            'CAR_Pre_Event': car_pre,
            'CAR_Post_Event': car_post,
            'CAR_Total': car_total,
            'Expected_Return': expected_return,
            'Estimation_Window_Size': len(clean_estimation_data),
            'Event_Window_Size': len(event_data)
        }

def main():
    try:

        downloader = DataDownloader()
        analyzer = EventStudyAnalyzer(downloader)


        events_file = upload_events_file()
        if not events_file:
            logger.error("No events file uploaded. Exiting.")
            return


        try:
            events_df = pd.read_excel(events_file)
            events_df['Date'] = pd.to_datetime(events_df['Date'])
            logger.info(f"Loaded {len(events_df)} events from {events_file}")
        except Exception as e:
            logger.error(f"Failed to load events data: {str(e)}")
            raise


        results = []
        total_banks = len(BANK_TICKERS)
        total_events = len(events_df)

        for bank_num, (bank_name, ticker) in enumerate(BANK_TICKERS.items(), 1):
            logger.info(f"\nProcessing {bank_name} ({bank_num}/{total_banks})")

            for event_num, (_, event) in enumerate(events_df.iterrows(), 1):
                try:
                    logger.info(f"  Event {event_num}/{total_events}: {event['Date'].date()}...")
                    car_result = analyzer.calculate_car(bank_name, ticker, event['Date'], events_df)

                    if car_result:
                        results.append(car_result)
                        logger.info(f"    CAR: {car_result['CAR_Total']:.4f}")
                    else:
                        logger.warning("    Skipped - no valid data")

                except Exception as e:
                    logger.error(f"Error processing event: {str(e)}")
                    continue


        if results:
            results_df = pd.DataFrame(results)
            output_path = "/content/CAR_Results.xlsx"
            results_df.to_excel(output_path, index=False)
            logger.info(f"\nAnalysis completed. Results saved to {output_path}")


            from google.colab import files
            files.download(output_path)


            logger.info("\nSummary by Action Type:")
            logger.info(results_df.groupby('Action')['CAR_Total'].describe().to_string())
        else:
            logger.error("\nNo valid results were generated.")

    except Exception as e:
        logger.error(f"Fatal error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    logger.info("Starting event study analysis in Google Colab")
    main()
    logger.info("Process completed")

Saving Cleaned_Sovereign_Rating_Events.xlsx to Cleaned_Sovereign_Rating_Events.xlsx


  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)
  data = yf.download(ticker, start=start_date, end=end_date, progress=False)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>