# Imports and Configrations 

In [3]:
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
import time
import logging
import re

# URLs Used in Project

In [28]:
# Constants for URLs and selectors
YAHOO_FINANCE_URL = "https://finance.yahoo.com/"
TRENDING_STOCKS_URL = "https://finance.yahoo.com/trending-tickers"
STOCK_DATA_XPATH = "//table[contains(@class, 'W(100%)')]//tbody//tr"
OUTPUT_EXCEL_FILE = "yahoo_stocks_data.xlsx"

# WebDriver Initialization Function

In [9]:
def initialize_webdriver(headless: bool = True) -> webdriver.Chrome:
    # Initializes and returns a Chrome WebDriver instance.
    options = Options()
    if headless:
        options.add_argument("--headless")
        options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
    options.add_argument("--window-size=1920,1080") # Set a default window size for headless mode

    try:
        # Assuming chromedriver is in PATH or provide the path explicitly
        driver = webdriver.Chrome(options=options)
        print("WebDriver initialized successfully.")
        return driver
    except WebDriverException as e:
        logging.error(f"Error initializing WebDriver: {e}")
        raise

# Navigation Function

In [12]:
def navigate_to_page(driver: webdriver.Chrome, url: str, wait_time: int = 10, page_title: str = "") -> bool:
    # Navigates to a given URL and waits for the page to load.
    try:
        print(f"Navigating to {url}")
        driver.get(url)
        WebDriverWait(driver, wait_time).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
        if page_title:
            WebDriverWait(driver, wait_time).until(EC.title_contains(page_title))
        print(f"Page '{driver.title}' loaded successfully.")
        return True
    except TimeoutException:
        print(f"Timeout while loading page: {url}. Title: {driver.title}")
        return False
    except WebDriverException as e:
        print(f"WebDriver error navigating to {url}: {e}")
        return False

# Data Extraction Function

In [15]:
def extract_trending_stock_data(driver: webdriver.Chrome, xpath: str, max_retries: int = 3) -> list[list[str]]:
    # Extracts trending stock data from a table using the specified XPath.
    all_stock_data = []
    for attempt in range(max_retries):
        try:
            WebDriverWait(driver, 20).until(EC.presence_of_all_elements_located((By.XPATH, xpath)))
            rows = driver.find_elements(By.XPATH, xpath)
            if not rows:
                logging.warning(f"No rows found with XPath: {xpath} on attempt {attempt + 1}")
                time.sleep(2) # Wait before retrying
                continue

            for row in rows:
                try:
                    cells = row.find_elements(By.TAG_NAME, "td")
                    # Extract text from each cell, strip whitespace
                    row_data = [cell.text.strip() for cell in cells]
                    if row_data:
                        all_stock_data.append(row_data)
                except NoSuchElementException:
                    logging.warning("Skipping a row due to missing cells.")
                    continue
            logging.info(f"Successfully extracted {len(all_stock_data)} stock entries.")
            break # Exit loop if data extracted
        except TimeoutException:
            logging.error(f"Timeout waiting for stock data table on attempt {attempt + 1}.")
            time.sleep(2)
        except Exception as e:
            logging.error(f"An unexpected error occurred during data extraction on attempt {attempt + 1}: {e}")
            time.sleep(2)
    return all_stock_data

# Data Cleaning and Conversion Function

In [20]:
def clean_and_convert_data(raw_data: list[list[str]]) -> pd.DataFrame:
    # Cleans and converts raw scraped stock data into a Pandas DataFrame.

    # Define columns based on Yahoo Finance trending tickers table
    # This might need adjustment if the actual scraped data columns vary
    columns = [
        "Symbol", "Name", "Last Price (Intraday)", "Change", "% Change",
        "Volume", "Avg Vol (3 month)", "Market Cap", "PE Ratio (TTM)",
         "% Change (YTD)", "Graph" # "Graph" column is usually an image/sparkline
    ]

    # Handle cases where rows might have fewer columns than expected
    processed_data = []
    for row in raw_data:
        if len(row) == len(columns):
            processed_data.append(row)
        elif len(row) > len(columns):
            # If more columns, truncate to expected number
            processed_data.append(row[:len(columns)])
            logging.warning(f"Row truncated: {row}")
        else:
            # If fewer columns, pad with None or empty string
            padded_row = row + [''] * (len(columns) - len(row))
            processed_data.append(padded_row)
            logging.warning(f"Row padded: {row}")


    stocks_df = pd.DataFrame(processed_data, columns=columns)

    # Data type conversion and cleaning
    # Remove characters like 'M', 'B', 'T', '%' and convert to numeric
    def clean_numeric_string(s):
        if pd.isna(s) or not isinstance(s, str):
            return None
        s = s.replace(',', '').strip()
        if 'T' in s:
            return float(s.replace('T', '')) * 1_000_000_000_000
        elif 'B' in s:
            return float(s.replace('B', '')) * 1_000_000_000
        elif 'M' in s:
            return float(s.replace('M', '')) * 1_000_000
        elif '%' in s:
            return float(s.replace('%', '')) / 100
        try:
            return float(s)
        except ValueError:
            return None

    # Apply cleaning to relevant columns
    numeric_cols = [
        "Last Price (Intraday)", "Change", "% Change", "Volume",
        "Avg Vol (3 month)", "Market Cap", "PE Ratio (TTM)", "% Change (YTD)"
    ]

    for col in numeric_cols:
        if col in stocks_df.columns:
            stocks_df[col] = stocks_df[col].apply(clean_numeric_string)
        else:
            logging.warning(f"Column '{col}' not found in DataFrame for cleaning.")

    # Drop the 'Graph' column as it's not data
    if "Graph" in stocks_df.columns:
        stocks_df.drop(columns=["Graph"], inplace=True)

    logging.info("Data cleaning and conversion complete.")
    return stocks_df

# Data Export Function

In [23]:
def export_dataframe_to_excel(df: pd.DataFrame, file_path: str):
    # Exports a Pandas DataFrame to an Excel file.
    try:
        df.to_excel(file_path, index=False)
        logging.info(f"Data successfully exported to {file_path}")
    except Exception as e:
        logging.error(f"Error exporting data to Excel: {e}")

# Main Execution Logic

In [32]:
# --- Main Execution Flow ---
if __name__ == "__main__":
    driver = None
    try:
        logging.info("Starting Yahoo Finance Trending Stocks Scraper.")
        driver = initialize_webdriver(headless=True) # Run in headless mode

        # Navigate to Trending Tickers page
        if not navigate_to_page(driver, TRENDING_STOCKS_URL, page_title="Trending Stocks"):
            logging.error("Failed to load Trending Stocks page. Exiting.")
            exit()

        # Extract data
        raw_stocks_data = extract_trending_stock_data(driver, STOCK_DATA_XPATH)

        if raw_stocks_data:
            # Process data
            stocks_df = clean_and_convert_data(raw_stocks_data)

            # Display head of the processed DataFrame
            logging.info("Preview of the processed DataFrame:")
            print(stocks_df.head().to_markdown(index=False)) # Use to_markdown for clean print in console

            # Export to Excel
            export_dataframe_to_excel(stocks_df, OUTPUT_EXCEL_FILE)
        else:
            logging.warning("No stock data extracted. Skipping data processing and export.")

    except Exception as e:
        logging.critical(f"An unhandled error occurred during the scraping process: {e}")
    finally:
        if driver:
            driver.quit()
            logging.info("WebDriver closed.")
        logging.info("Scraping process finished.")

WebDriver initialized successfully.
Navigating to https://finance.yahoo.com/trending-tickers
Page 'Top Trending Stocks: US stocks with the highest interest today - Yahoo Finance' loaded successfully.


ERROR:root:Timeout waiting for stock data table on attempt 1.
ERROR:root:Timeout waiting for stock data table on attempt 2.
ERROR:root:Timeout waiting for stock data table on attempt 3.
