In [1]:
!mkdir -p utils

In [2]:
%%writefile utils/__init__.py

UsageError: %%writefile is a cell magic, but the cell body is empty.


In [3]:
%%writefile utils/ecb_scraper.py
import time
import pandas as pd
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.common.exceptions import WebDriverException
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import os
import pickle
import aiohttp
import asyncio
from aiofiles import open as aio_open
from aiohttp import ClientSession, ClientTimeout


class ECBScraper:
    """
    Scraper for European Central Bank (ECB) press releases.
    It loads previously saved data (if any), scrapes new articles,
    and stores results incrementally in a pickle file.
    """
    BASE_URL = "https://www.ecb.europa.eu"
    START_URL = (
        "https://www.ecb.europa.eu/press/pubbydate/html/index.en.html?"
        "name_of_publication=Press%20release"
    )

    def __init__(self, pickle_path="ecb_press_releases_df.pkl",
                 scroll_pause_time=0.1, scroll_increment=50,
                 max_scroll_attempts=False, initial_wait=10):
        """
        Initialize the scraper.

        Args:
            pickle_path (str): Path to the pickle file for loading/saving the DataFrame.
            scroll_pause_time (float): Pause between scrolls (seconds).
            scroll_increment (int): Pixels scrolled each time.
            max_scroll_attempts (int): Safety limit for scrolling.
            initial_wait (int): Seconds to wait for the initial page load.
        """
        self.pickle_path = pickle_path
        self.scroll_pause_time = scroll_pause_time
        self.scroll_increment = scroll_increment
        self.max_scroll_attempts = max_scroll_attempts
        self.initial_wait = initial_wait
        self.df = pd.DataFrame(columns=["Date", "Title", "URL"])
        self.existing_urls = set()
        self.load_data()

    def load_data(self):
        """Load existing press releases from pickle if available."""
        if os.path.exists(self.pickle_path):
            try:
                self.df = pd.read_pickle(self.pickle_path)
                if not isinstance(self.df, pd.DataFrame):
                    print(f"Warning: Pickle file '{self.pickle_path}' did not contain a DataFrame. Starting fresh.")
                    self.df = pd.DataFrame(columns=["Date", "Title", "URL"])
                elif "URL" in self.df.columns:
                    self.existing_urls = set(self.df["URL"].unique())
                print(f"Loaded {len(self.df)} existing articles from '{self.pickle_path}'. Found {len(self.existing_urls)} unique URLs.")
            except (pickle.UnpicklingError, EOFError, FileNotFoundError, Exception) as e:
                print(f"Error loading data from '{self.pickle_path}': {e}. Starting fresh.")
                self.df = pd.DataFrame(columns=["Date", "Title", "URL"])
                self.existing_urls = set()
        else:
            print(f"Pickle file '{self.pickle_path}' not found. Starting fresh.")
            self.existing_urls = set()

    def _setup_driver(self):
        """Initialize and return a headless Selenium Chrome WebDriver."""
        try:
            service = ChromeService(executable_path=ChromeDriverManager().install())
            options = webdriver.ChromeOptions()
            options.add_argument("--headless")
            options.add_argument("--disable-gpu")
            driver = webdriver.Chrome(service=service, options=options)
            return driver
        except WebDriverException as e:
            print(f"Error setting up WebDriver: {e}")
            print("Ensure ChromeDriver matches your local Chrome version.")
            return None
        except Exception as e:
            print(f"Unexpected error setting up WebDriver: {e}")
            return None

    def _scroll_page(self, driver):
        """Scroll incrementally down the loaded webpage."""
        print("Scrolling incrementally...")
        attempt = 0
        while True:
            try:
                last_scroll_position = driver.execute_script("return window.pageYOffset;")
                driver.execute_script(f"window.scrollBy(0, {self.scroll_increment});")
                time.sleep(self.scroll_pause_time)
                new_scroll_position = driver.execute_script("return window.pageYOffset;")

                if new_scroll_position == last_scroll_position:
                    print("‚úÖ Reached bottom of page.")
                    break
            except Exception as e:
                print(f"Error while scrolling: {e}")
                break

            attempt += 1
            if self.max_scroll_attempts and attempt >= self.max_scroll_attempts:
                break

    def _extract_articles(self, html_content):
        """Parse HTML and extract article metadata (date, title, URL)."""
        articles_data = []
        current_date = None
        soup = BeautifulSoup(html_content, "html.parser")

        main_content = soup.find("div", id="ecb-content-col") or soup.find("main")
        if main_content:
            sort_wrapper_div = main_content.find("div", class_="sort-wrapper")
            if sort_wrapper_div:
                main_dl = sort_wrapper_div.find("dl", recursive=False)
                if main_dl:
                    for tag in main_dl.find_all(["dt", "dd"], recursive=False):
                        if tag.name == "dt":
                            current_date = tag.get_text(strip=True)
                        elif tag.name == "dd" and current_date:
                            category_div = tag.find("div", class_="category")
                            title_div = tag.find("div", class_="title")
                            if category_div and category_div.get_text(strip=True) == "Press release" and title_div:
                                link_tag = title_div.find("a", href=True)
                                if link_tag:
                                    title = link_tag.get_text(strip=True)
                                    relative_url = link_tag["href"]
                                    absolute_url = urljoin(self.BASE_URL, relative_url)
                                    if "/press/pr/" in absolute_url:
                                        articles_data.append({
                                            "Date": current_date,
                                            "Title": title,
                                            "URL": absolute_url
                                        })
                else:
                    print("Main <dl> element not found under sort-wrapper.")
            else:
                print("No <div class='sort-wrapper'> found inside main content.")
        else:
            print("Main content area ('ecb-content-col' or 'main') not found.")
        return articles_data

    def save_data(self):
        """Save the current DataFrame to a pickle file."""
        if not self.df.empty:
            try:
                self.df.to_pickle(self.pickle_path)
                print(f"Successfully saved {len(self.df)} total articles to '{self.pickle_path}'.")
            except Exception as e:
                print(f"Error saving data to '{self.pickle_path}': {e}")
        else:
            print("DataFrame is empty. Nothing to save.")

    def scrape_and_update(self):
        """Run the full scraping pipeline and update stored data."""
        driver = self._setup_driver()
        if not driver:
            return

        try:
            print(f"Navigating to: {self.START_URL}")
            driver.get(self.START_URL)
            print(f"Waiting {self.initial_wait} seconds for page to load...")
            time.sleep(self.initial_wait)

            self._scroll_page(driver)

            print("\nExtracting article data...")
            html_content = driver.page_source
        except Exception as e:
            print(f"Error during navigation or scrolling: {e}")
            return
        finally:
            if driver:
                driver.quit()

        scraped_articles = self._extract_articles(html_content)
        print(f"Scraped {len(scraped_articles)} potential articles.")

        new_articles = [a for a in scraped_articles if a["URL"] not in self.existing_urls]

        if new_articles:
            print(f"Found {len(new_articles)} new articles.")
            new_df = pd.DataFrame(new_articles)
            self.df = pd.concat([new_df, self.df], ignore_index=True)
            self.df.drop_duplicates(subset=["URL"], keep="first", inplace=True)
            self.existing_urls.update(new_df["URL"])
            self.save_data()
        else:
            print("No new articles found.")

    def get_dataframe(self):
        """Return the current DataFrame."""
        return self.df

    @staticmethod
    def extract_article_text(url):
        """
        Fetch the main body text from a press release page.
        Includes both <p> and <ul> contents.
        The first paragraph (date) is removed.
        """
        try:
            resp = requests.get(url, timeout=15)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.text, "html.parser")

            # Extract both paragraphs and unordered lists
            paragraphs = soup.select("main div.section p")
            lists = soup.select("main div.section ul")

            texts = [p.get_text(strip=True) for p in paragraphs]

            # Include list items as bullet-style text
            for ul in lists:
                list_items = [li.get_text(strip=True) for li in ul.find_all("li")]
                if list_items:
                    texts.append("\n".join(f"‚Ä¢ {item}" for item in list_items))

            if len(texts) > 1:
                body_text = "\n\n".join(texts[1:])
            else:
                body_text = "\n\n".join(texts)

            return body_text.strip()
        except Exception as e:
            print(f"‚ùå Error fetching {url}: {e}")
            return ""

    def scrape_all_texts_to_files(self, folder="ecb_press_release"):
        """
        Download all press release texts and save each to a .txt file.
        File names are sanitized from article titles.
        """
        os.makedirs(folder, exist_ok=True)
        n_total = len(self.df)
        print(f"üìÑ Starting extraction of {n_total} articles...")

        for i, row in self.df.iterrows():
            title = row["Title"]
            url = row["URL"]
            safe_title = "".join(c if c.isalnum() or c in " -_" else "_" for c in title)
            file_path = os.path.join(folder, f"{safe_title}.txt")

            if os.path.exists(file_path):
                print(f"‚è© Skipping '{title}' (already exists)")
                continue

            print(f"üì∞ [{i+1}/{n_total}] Fetching: {title}")
            text = self.extract_article_text(url)
            if not text:
                print(f"‚ö†Ô∏è No text found for {url}")
                continue

            with open(file_path, "w", encoding="utf-8") as f:
                f.write(text)

            time.sleep(0.5)  # polite delay for ECB servers

        print(f"‚úÖ All available articles saved in '{folder}/'")

    async def _fetch_article_text(self, session, url):
        """Asynchronously download the body text of a press release (includes <p> and <ul>)."""
        try:
            async with session.get(url, timeout=20) as resp:
                resp.raise_for_status()
                html = await resp.text()
                soup = BeautifulSoup(html, "html.parser")

                paragraphs = soup.select("main div.section p")
                lists = soup.select("main div.section ul")

                texts = [p.get_text(strip=True) for p in paragraphs]
                for ul in lists:
                    list_items = [li.get_text(strip=True) for li in ul.find_all("li")]
                    if list_items:
                        texts.append("\n".join(f"‚Ä¢ {item}" for item in list_items))

                return "\n\n".join(texts[1:] if len(texts) > 1 else texts).strip()
        except Exception as e:
            print(f"‚ùå Error fetching {url}: {e}")
            return ""

    async def scrape_all_texts_to_files_async(self, folder="ecb_press_release", concurrency=10):
        """Asynchronously download and save all press release texts."""
        os.makedirs(folder, exist_ok=True)
        n_total = len(self.df)
        print(f"üìÑ Starting async extraction of {n_total} articles...")

        headers = {
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/58.0.3029.110 Safari/537.36"
            )
        }

        connector = aiohttp.TCPConnector(limit_per_host=concurrency)
        async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
            tasks = []
            for i, row in self.df.iterrows():
                title, url = row["Title"], row["URL"]
                safe_title = "".join(c if c.isalnum() or c in " -_" else "_" for c in title)
                file_path = os.path.join(folder, f"{safe_title}.txt")

                if os.path.exists(file_path):
                    print(f"‚è© Skipping '{title}' (already exists)")
                    continue

                tasks.append(self._save_article(session, url, file_path, title))

            await asyncio.gather(*tasks)

        print(f"‚úÖ All available articles saved in '{folder}/'")

    async def _save_article(self, session, url, file_path, title):
        """Download and save an individual article (async)."""
        text = await self._fetch_article_text(session, url)
        if not text:
            print(f"‚ö†Ô∏è No text found for {title}")
            return

        async with aio_open(file_path, "w", encoding="utf-8") as f:
            await f.write(text)

        print(f"‚úÖ Saved '{title}'")


Overwriting utils/ecb_scraper.py


In [4]:
import time
import pickle as pkl
import pandas as pd
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
from urllib.parse import urljoin

from utils.ecb_scraper import ECBScraper

In [5]:
scraper = ECBScraper(pickle_path="ecb_press_releases_df.pkl")

if scraper.df.empty:
    scraper.scrape_and_update()

await scraper.scrape_all_texts_to_files_async(concurrency=10)

Loaded 2136 existing articles from 'ecb_press_releases_df.pkl'. Found 2136 unique URLs.
üìÑ Starting async extraction of 2136 articles...
‚úÖ Saved 'ECB and People‚Äôs Bank of China extend bilateral euro-renminbi currency swap arrangement'
‚úÖ Saved 'ECB and Frankfurt Radio Symphony to hold Europa Open Air concert on 21 August 2025'
‚úÖ Saved 'ECB presents findings from digital euro innovation platform and announces second round of experimentation'
‚úÖ Saved 'European System of Central Banks renews Statements of Commitment to FX Global Code'
‚úÖ Saved 'New data release: Early signals from ECB wage tracker suggest lower and more stable wage pressures in first half of 2026'
‚úÖ Saved 'Survey on the Access to Finance of Enterprises: lending conditions tightened marginally, while financing needs and availability remained broadly unchanged'
‚úÖ Saved 'ECB sets transitional provisions for minimum reserve requirements following introduction of euro in Bulgaria'
‚úÖ Saved 'ECB Consumer Expect