Index to ticker mapper

In [None]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import os
import logging
import io

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
OUTPUT_DIR = "index_market_data"

# --- Advanced, Configuration-Driven Ticker Scraping Engine ---

INDEX_CONFIG = {
    "sp500": {
        "name": "S&P 500",
        "url": "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies",
        "table_identifier": {'id': 'constituents'}, 
        "ticker_column": "Symbol",
        "clean_fn": lambda s: s.replace('.', '-')
    },
    "dowjones": {
        "name": "Dow Jones",
        "url": "https://en.wikipedia.org/wiki/Dow_Jones_Industrial_Average",
        "table_identifier": {'class': 'wikitable'},
        "ticker_column": "Symbol",
        "clean_fn": lambda s: s
    },
    "nasdaq100": {
        "name": "NASDAQ 100",
        "url": "https://en.wikipedia.org/wiki/Nasdaq-100",
        "table_identifier": {'id': 'constituents'},
        "ticker_column": "Ticker",
        "clean_fn": lambda s: s.replace('.', '-')
    },
    "nifty50": {
        "name": "Nifty 50",
        "url": "https://en.wikipedia.org/wiki/NIFTY_50",
        "table_identifier": {'class': 'wikitable sortable'},
        "ticker_column": "Symbol",
        "clean_fn": lambda s: f"{s}.NS" # Add .NS suffix for Indian stocks
    },
    "ftse100": {
        "name": "FTSE 100",
        "url": "https://en.wikipedia.org/wiki/FTSE_100_Index",
        "table_identifier": {'id': 'constituents'},
        "ticker_column": "EPIC",
        "clean_fn": lambda s: f"{s}.L" if '.' not in s else s
    },
}

def get_tickers_from_wikipedia(config):
    """Scrapes tickers from a Wikipedia page based on a flexible configuration."""
    name = config['name']
    url = config['url']
    logging.info(f"Scraping {name} tickers from Wikipedia...")
    
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
        response = requests.get(url, headers=headers, timeout=15)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'lxml')
        
        table = soup.find('table', config['table_identifier'])
        if table is None:
            logging.error(f"Could not find the specified table for {name}.")
            return []

        df = pd.read_html(io.StringIO(str(table)))[0]
        
        if isinstance(df.columns, pd.MultiIndex):
            df.columns = ['_'.join(map(str, col)).strip() for col in df.columns.values]
        
        ticker_col = config['ticker_column']
        if ticker_col not in df.columns:
            logging.error(f"Ticker column '{ticker_col}' not found for {name}.")
            return []
            
        tickers = df[ticker_col].astype(str).apply(config['clean_fn']).tolist()
        logging.info(f"Found {len(tickers)} tickers for {name}.")
        return tickers

    except Exception as e:
        logging.error(f"An error occurred while scraping {name}: {e}")
        return []

def get_russell_2000_tickers():
    """Scrapes Russell 2000 tickers from a non-Wikipedia source."""
    logging.info("Scraping Russell 2000 tickers...")
    url = 'https://www.lazyfa.com/screener/russell-2000'
    try:
        response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'})
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'lxml')
        table = soup.find('table')
        tickers = []
        if table:
            for row in table.findAll('tr')[1:]:
                try:
                    ticker = row.findAll('td')[0].text.strip()
                    tickers.append(ticker)
                except IndexError:
                    continue
        logging.info(f"Found {len(tickers)} Russell 2000 tickers.")
        return tickers
    except Exception as e:
        logging.error(f"Failed to scrape Russell 2000 tickers: {e}")
        return []


if __name__ == "__main__":
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    all_ticker_mappings = []
    
    # Scrape tickers from Wikipedia config
    for config in INDEX_CONFIG.values():
        tickers = get_tickers_from_wikipedia(config)
        # Add a mapping for each ticker to its index
        for ticker in tickers:
            all_ticker_mappings.append({
                "Ticker": ticker,
                "Index": config['name']
            })
        
    # Scrape Russell 2000 tickers
    russell_tickers = get_russell_2000_tickers()
    for ticker in russell_tickers:
        all_ticker_mappings.append({
            "Ticker": ticker,
            "Index": "Russell 2000"
        })

    # --- Process and Save the final list ---
    df_mappings = pd.DataFrame(all_ticker_mappings)
    df_mappings.drop_duplicates(inplace=True)
    logging.info(f"Found a total of {len(df_mappings)} ticker-to-index mappings.")
    
    # Save the mapping file (for Script 3)
    output_path = os.path.join(OUTPUT_DIR, "ticker_to_index_map.csv")
    df_mappings.to_csv(output_path, index=False)
    logging.info(f"Successfully saved ticker-to-index mappings to: {output_path}")

    # Save the unique list (for Script 2)
    unique_tickers = sorted(list(df_mappings['Ticker'].unique()))
    output_path_unique = os.path.join(OUTPUT_DIR, "all_scraped_tickers.csv")
    ticker_df = pd.DataFrame(unique_tickers, columns=["Ticker"])
    ticker_df.to_csv(output_path_unique, index=False)
    logging.info(f"Saved unique ticker list to: {output_path_unique}")

    logging.info("Ticker scraping process complete.")

Stock values finder

In [None]:
import yfinance as yf
import pandas as pd
import os
import logging
import time

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
TICKER_DIR = "index_market_data"

# --- File Paths ---
TICKER_FILE = os.path.join(TICKER_DIR, "all_scraped_tickers.csv")
PRICE_OUTPUT_FILE = os.path.join(TICKER_DIR, "ticker_historical_prices.csv")
INFO_OUTPUT_FILE = os.path.join(TICKER_DIR, "company_info.csv")

# --- Tunables ---
SLEEP_TIMER = 0.1 # Small delay for the .info fetch to avoid rate-limiting


def get_specific_prices(tickers_list):
    """
    Fetches specific historical closing prices for a list of tickers in a fast batch.
    *** MODIFIED to handle live, incomplete data. ***
    """
    logging.info("--- Starting Price Data Fetch (Batch Download) ---")
    if not tickers_list:
        logging.warning("No tickers provided for price fetch.")
        return

    try:
        # Download '2mo' of data to ensure we have ~21 trading days
        data = yf.download(tickers_list, period="2mo", interval="1d", progress=True)
    except Exception as e:
        logging.error(f"An error occurred during yfinance price download: {e}")
        return
        
    if data.empty:
        logging.warning("No price data was returned from yfinance.")
        return
        
    logging.info("Price download complete. Processing data...")
    
    close_prices = data['Close']
    
    # --- START OF MODIFICATION ---
    # Check if the last row (today's live data) has ANY NaN values.
    # If it does, it means at least one market is closed, and the data is incomplete.
    # We drop this row to use the last *full* trading day as our T-0.
    if not close_prices.empty and close_prices.iloc[-1].isnull().any():
        logging.warning("Dropping last row due to NaN values (market likely open/data incomplete).")
        close_prices = close_prices.iloc[:-1] # Get all rows *except* the last one
    # --- END OF MODIFICATION ---

    
    # Check if we still have enough data after (potentially) dropping a row
    if len(close_prices) < 22:
        logging.warning(f"Not enough data (need ~22 days, got {len(close_prices)}). Price output may be incomplete.")
        df_summary = pd.DataFrame(columns=[
            'T-0 (Most Recent)', 'T-1 (Previous Day)', 'T-5 (~1 Week Ago)', 'T-21 (~1 Month Ago)'
        ])
    else:
        # These indices now correctly reference the last *full* day's close
        df_summary = pd.DataFrame({
            'T-0 (Most Recent)': close_prices.iloc[-1],
            'T-1 (Previous Day)': close_prices.iloc[-2],
            'T-5 (~1 Week Ago)': close_prices.iloc[-6],
            'T-21 (~1 Month Ago)': close_prices.iloc[-22]
        })
    
    df_summary = df_summary.dropna(how='all').round(2)
    df_summary.index.name = "Ticker"
    
    try:
        df_summary.to_csv(PRICE_OUTPUT_FILE)
        logging.info(f"--- Successfully saved price data to: {PRICE_OUTPUT_FILE} ---")
    except Exception as e:
        logging.error(f"Failed to save price data. Error: {e}")

def fetch_company_info(tickers_list):
    """
    Fetches sector and market cap for all tickers.
    This is a SLOW process as it requires individual API calls.
    """
    logging.info("--- Starting Company Info Fetch (Sector/MarketCap) ---")
    if not tickers_list:
        logging.warning("No tickers provided for company info fetch.")
        return

    all_info_data = []
    total_tickers = len(tickers_list)
    logging.warning(f"This will take a long time (Approx { (total_tickers * SLEEP_TIMER) / 60 :.1f} minutes).")

    for i, ticker_symbol in enumerate(tickers_list):
        if (i+1) % 50 == 0:
            logging.info(f"Progress: {i+1} / {total_tickers} tickers processed.")
            
        try:
            ticker = yf.Ticker(ticker_symbol)
            info = ticker.info

            data = {
                "Ticker": ticker_symbol,
                "Sector": info.get('sector', 'N/A'),
                "Industry": info.get('industry', 'N_A'),
                "MarketCap": info.get('marketCap', 0)
            }
            all_info_data.append(data)
            time.sleep(SLEEP_TIMER)

        except Exception as e:
            logging.warning(f"Could not get .info for {ticker_symbol}. Skipping.")
            continue
            
    logging.info("--- Info fetch complete, saving data ---")
    
    if not all_info_data:
        logging.error("No company info was fetched.")
        return

    df_info = pd.DataFrame(all_info_data)
    df_info = df_info[df_info['MarketCap'] > 0] # Filter out bad data
    df_info = df_info[df_info['Sector'] != 'N/A']
    
    df_info.to_csv(INFO_OUTPUT_FILE, index=False)
    logging.info(f"--- Successfully saved company info to {INFO_OUTPUT_FILE} ---")

if __name__ == "__main__":
    os.makedirs(TICKER_DIR, exist_ok=True)
    
    # --- Load Ticker List ---
    try:
        df_tickers = pd.read_csv(TICKER_FILE)
        tickers_list = df_tickers['Ticker'].tolist()
        logging.info(f"Loaded {len(tickers_list)} unique tickers from {TICKER_FILE}.")
    except FileNotFoundError:
        logging.error(f"Could not find {TICKER_FILE}. Please run Script 1 first.")
        exit()
    except Exception as e:
        logging.error(f"Error loading ticker file: {e}")
        exit()

    # --- Run Data Fetching Processes ---
    get_specific_prices(tickers_list)
    fetch_company_info(tickers_list)
    
    logging.info("All data fetching complete.")

Plot

In [None]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import os
import logging
import numpy as np

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
TICKER_DIR = "index_market_data"

# --- COLOR CONTRAST TUNING ---

# This defines the % change for full red/green. 0.015 = 1.5%
COLOR_RANGE_LIMIT = 0.04

# --- MODIFICATION: Changed to Red-White-Green ---
# This is our custom high-contrast scale.
# 0.5 (midpoint) is White.
HIGH_CONTRAST_SCALE = [
    [0.0, '#CC0000'],   # Dark Red (at -1.5% or lower)
    [0.499, '#FF8888'], # Light Red (just before zero)
    [0.5, '#FFFFFF'],   # White (exactly at zero)
    [0.501, "#00FF00"], # Light Green (just after zero)
    [1.0, "#006D00"]    # Dark Green (at +1.5% or higher)
]
# --- END COLOR CONTRAST TUNING ---


# --- File Paths ---
MAP_FILE = os.path.join(TICKER_DIR, "ticker_to_index_map.csv")
INFO_FILE = os.path.join(TICKER_DIR, "company_info.csv")
PRICE_FILE = os.path.join(TICKER_DIR, "ticker_historical_prices.csv")
OUTPUT_HTML_FILE = os.path.join(TICKER_DIR, "market_treemap.html")

def create_treemap():
    logging.info("--- Creating Treemap ---")
    
    # --- 1. Load all data sources ---
    try:
        df_map = pd.read_csv(MAP_FILE)
        df_info = pd.read_csv(INFO_FILE)
        df_prices = pd.read_csv(PRICE_FILE)
        logging.info(f"Loaded {len(df_map)} mappings, {len(df_info)} company infos, {len(df_prices)} prices.")
    except FileNotFoundError as e:
        logging.error(f"Error loading file: {e}")
        logging.error("Please make sure you have run Script 1 and Script 2 (and let Script 2 finish!)")
        return

    # --- 2. Prepare and Merge Data ---
    
    # Calculate all 3 time-period changes
    df_prices['1-Day PctChange'] = (
        (df_prices['T-0 (Most Recent)'] - df_prices['T-1 (Previous Day)']) / 
         df_prices['T-1 (Previous Day)']
    )
    df_prices['1-Week PctChange'] = (
        (df_prices['T-0 (Most Recent)'] - df_prices['T-5 (~1 Week Ago)']) / 
         df_prices['T-5 (~1 Week Ago)']
    )
    df_prices['1-Month PctChange'] = (
        (df_prices['T-0 (Most Recent)'] - df_prices['T-21 (~1 Month Ago)']) / 
         df_prices['T-21 (~1 Month Ago)']
    )
    
    df_prices_final = df_prices[['Ticker', '1-Day PctChange', '1-Week PctChange', '1-Month PctChange']]

    # Merge the data
    logging.info("Merging dataframes...")
    df_merged_1 = pd.merge(df_map, df_info, on="Ticker", how="inner")
    logging.info(f"After merging map + info: {len(df_merged_1)} rows remaining.")
    
    df_final = pd.merge(df_merged_1, df_prices_final, on="Ticker", how="inner")
    logging.info(f"After merging prices: {len(df_final)} rows remaining.")

    # --- DEBUGGING for your "Only Nifty50" problem ---
    if len(df_final) < 500: # Check if the final data is suspiciously small
        logging.warning("--- WARNING ---")
        logging.warning(f"Final dataset is very small ({len(df_final)} rows).")
        logging.warning("This strongly suggests your 'company_info.csv' file is incomplete.")
        logging.warning("Please re-run Script 2 and let it finish fetching all tickers.")
        logging.warning("--- END WARNING ---")

    # Clean up
    df_final.dropna(inplace=True)
    df_final = df_final[df_final['MarketCap'] > 0]
    
    if df_final.empty:
        logging.error("After merging and cleaning, the final DataFrame is empty. Cannot plot.")
        return

    logging.info(f"Final data prepared for plotting with {len(df_final)} valid entries.")

    # --- 3. Plot the Treemap ---
    logging.info("Generating Plotly treemaps...")

    treemap_path = [px.Constant("All Indices"), 'Index', 'Sector', 'Ticker']
    hover_data_formats = {
        'MarketCap': ':,.0f',
        '1-Day PctChange': ':.2%',
        '1-Week PctChange': ':.2%',
        '1-Month PctChange': ':.2%'
    }
    
    # We REMOVE the color arguments from here
    fig_1d = px.treemap(
        df_final, path=treemap_path, values='MarketCap', color='1-Day PctChange',
        hover_data=hover_data_formats
    )

    fig_1w = px.treemap(
        df_final, path=treemap_path, values='MarketCap', color='1-Week PctChange',
        hover_data=hover_data_formats
    )

    fig_1m = px.treemap(
        df_final, path=treemap_path, values='MarketCap', color='1-Month PctChange',
        hover_data=hover_data_formats
    )


    # --- 4. Combine into a single figure with a dropdown ---
    
    fig = go.Figure()
    
    fig.add_trace(fig_1d.data[0])
    fig.add_trace(fig_1w.data[0])
    fig.add_trace(fig_1m.data[0])

    fig.data[1].visible = False
    fig.data[2].visible = False

    buttons = [
        dict(
            label="1-Day Change",
            method="update",
            args=[{"visible": [True, False, False]}]
        ),
        dict(
            label="1-Week Change",
            method="update",
            args=[{"visible": [False, True, False]}]
        ),
        dict(
            label="1Am-Month Change",
            method="update",
            args=[{"visible": [False, False, True]}]
        )
    ]

    # --- 5. Customize and Save ---
    
    fig.update_layout(
        updatemenus=[
            dict(
                active=0,
                buttons=buttons,
                direction="down",
                pad={"r": 10, "t": 10},
                showactive=True,
                x=0.01,
                xanchor="left",
                y=1.1,
                yanchor="top"
            )
        ],
        title='Global Market Treemap by Index and Sector',
        margin=dict(t=80, l=25, r=25, b=25),
        
        # This one coloraxis applies to all traces, guaranteeing
        # a stationary scale centered at 0.
        coloraxis=dict(
            colorscale=HIGH_CONTRAST_SCALE, # Use our custom high-contrast Red-White-Green scale
            cmin=-COLOR_RANGE_LIMIT,        # Set the stationary minimum
            cmax=COLOR_RANGE_LIMIT,         # Set the stationary maximum
            cmid=0,                         # Force 0 to be the center (White)
            showscale=True,                 # Show the color bar
            colorbar_title='Pct. Change'
        )
    )
    
    # Let Plotly's smart-text handle readability
    
    fig.write_html(OUTPUT_HTML_FILE)
    logging.info(f"--- Successfully saved interactive treemap to: {OUTPUT_HTML_FILE} ---")
    
    # Optional: Also display the figure now
    # fig.show()

if __name__ == "__main__":
    create_treemap()