In [13]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [14]:
import os
from typing import Any, Literal
from math import ceil
import time
import datetime
import random

from dataclasses import dataclass

import requests
from requests import Response
from bs4 import BeautifulSoup
import pandas as pd

from tqdm import tqdm
import webscraping_lib

from omegaconf import MISSING, OmegaConf, DictConfig
from hydra import initialize, initialize_config_module, initialize_config_dir, compose
from hydra.core.config_store import ConfigStore


In [15]:
@dataclass
class Config:
    web: webscraping_lib.CompaniesMarketCapConfig = MISSING
    debug: bool = False


cs: ConfigStore = ConfigStore.instance()
cs.store(name="base_config", node=Config)

# database_lib registers its configs
# in database_lib/web
webscraping_lib.register_configs()


In [16]:
with initialize(version_base=None, config_path="conf"):
    CFG: DictConfig = compose(config_name="config")
    print(CFG)


{'web': {'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:92.0) Gecko/20100101 Firefox/92.0', 'parser': 'lxml', 'companies_url': 'https://companiesmarketcap.com/?page=', 'ticker_url': 'https://finance.yahoo.com/quote/', 'output_filename': '_companies_', 'max_companies': 10}, 'debug': True}


In [17]:
def float_or_na(value: Any) -> float | Literal[0]:
    try:
        return float(value)
    except (ValueError, TypeError):
        return 0


In [18]:
def get_url_page(
    url_link, user_agent=CFG.web.user_agent, parser=CFG.web.parser
) -> BeautifulSoup | Literal[""]:
    """This accepts an URL as a parameter,
    accesses and loads the webpage into a variable
    retuns a document of the type BeautifulSoup"""

    # uses requests function to access and load the web page
    stock_page_response: Response = requests.get(
        url_link, headers={"user-agent": user_agent}
    )

    if not stock_page_response.ok:
        print(
            "Status code for {}: {}".format(url_link, stock_page_response.status_code)
        )
        # raise Exception('Failed to fetch web page ' + url_link)
        return ""

    # If the status code is success , the page is sent through html parser and builds a parsed document.
    stock_page_doc: BeautifulSoup = BeautifulSoup(stock_page_response.text, parser)

    # Returns a beautifulSoup document.
    return stock_page_doc


In [19]:
def get_stocks(num_stocks: int = 10, cfg=CFG):
    """
    This functions builds a list of most popular stock symbols.
    Returns the list of N number of popular stocks
    """
    # Get the number of pages to access based on the number of stocks that need to be processed. each page has 100 stocks
    page_numbers: int = int((lambda x: 1 if x < 1 else ceil(x / 100))(num_stocks))

    stocks_symbols = []
    for page_number in range(1, page_numbers + 1):
        popular_stocks_url = cfg.web.companies_url + str(page_number) + "/"

        print("Web Page: ", popular_stocks_url)
        # Call the function 'get_url_page' and get parsed html document
        stocks_symbols_tags = get_url_page(popular_stocks_url).find_all(
            "div", {"class": "company-code"}
        )

        # Extract ticker symbol name from the tag 'div' in the document
        for stocks_symbols_tag in stocks_symbols_tags:
            stocks_symbols.append(stocks_symbols_tag.text.strip())

    # Return the list with N stocks
    return stocks_symbols[:num_stocks]


In [20]:
def get_name_n_symbol(companyName: str) -> tuple[str, str]:
    """
    A Helper function to accept Name and returns company Name and ticker symbol
    """
    cName: list[str] = companyName.split("(")
    return cName[-2].strip(), cName[-1].strip(")")


def get_ticker_details(ticker_symbol: str):
    """
    This function accepts the ticker symbol,
    gets the html parsed document, finds appropriate tags and its value(text)
    massages the data and returns stocks details as a python Dictionary
    """
    # time.sleep(random.uniform(0, 1))
    # print("Processing : ", ticker_symbol)
    ticker_url: str = "https://finance.yahoo.com/quote/" + ticker_symbol

    # get html parsed document.
    stock_page_doc: BeautifulSoup | Literal[""] = get_url_page(ticker_url)

    if len(stock_page_doc) == 0:
        return ""

    # Use find function of BeatufulSoup objet to get the values of the tags
    # Use helper function get_name_n_symbol to extract company name and ticker symbol from the h1 name
    cName, ticker = get_name_n_symbol(stock_page_doc.h1.text)
    MarketPrice = stock_page_doc.find(
        "fin-streamer",
        {"class": "Fw(b) Fz(36px) Mb(-4px) D(ib)", "data-field": "regularMarketPrice"},
    ).text.replace(",", "")
    previousClosePrice = stock_page_doc.find(
        "td", {"class": "Ta(end) Fw(600) Lh(14px)", "data-test": "PREV_CLOSE-value"}
    ).text.replace(",", "")
    Volume = stock_page_doc.find(
        "td", {"class": "Ta(end) Fw(600) Lh(14px)", "data-test": "TD_VOLUME-value"}
    ).text.replace(",", "")
    pe_ratio = stock_page_doc.find(
        "td", {"class": "Ta(end) Fw(600) Lh(14px)", "data-test": "PE_RATIO-value"}
    ).text.replace(",", "")
    eps_ratio = stock_page_doc.find(
        "td", {"class": "Ta(end) Fw(600) Lh(14px)", "data-test": "EPS_RATIO-value"}
    ).text.replace(",", "")

    # Some of the stocks(ex.S&P) does not have market capital, using lambda function to replace such vaules with 0
    MarketCap = (lambda x: x.text.replace(",", "") if x != None else "0")(
        stock_page_doc.find(
            "td", {"class": "Ta(end) Fw(600) Lh(14px)", "data-test": "MARKET_CAP-value"}
        )
    )

    ticker_dict = {
        "Company": cName.replace(",", ""),
        "Symbol": ticker,
        "Marketprice": float_or_na(MarketPrice),
        "previousClosePrice": float_or_na(previousClosePrice),
        "changeInPrice": round(
            float_or_na(MarketPrice) - float_or_na(previousClosePrice), 2
        ),
        "pe_ratio": float_or_na(pe_ratio),
        "eps_ratio": float_or_na(eps_ratio),
        "Volume": int(Volume),
        "MarketCap": MarketCap,
    }

    # Return Dictionary with stock details
    return ticker_dict


In [21]:
def write_csv(dict_items, file_name: str) -> None:
    """
    Accepts list of python dictionary with stock details and write it to a csv file
    Prints success message upon completing the writing to the file
    """

    # open the file for writing
    with open(file_name, "w") as f:

        # Get headers(keys) of the first dictionary from the list. Convert to a list, join each element of the list
        # with ',' to form a string and write to the file.
        headers = list(dict_items[0].keys())
        f.write(",".join(headers) + "\n")

        # For each Dictionary item, create a list with values and write it to the file
        for dict_item in dict_items:
            values = []
            for header in headers:
                try:
                    values.append(str(dict_item.get(header, "")))
                except:
                    pass
            f.write(",".join(values) + "\n")

    print("Writing to file '{}' completed".format(file_name))


In [22]:
def verify_results(file_name: str) -> None:
    """
    This Function verifies the File Output.
    Accepts file name as the parameter and displays sample output and row count.
    """

    # Create the dataFrame with the csv file
    stocks_df: pd.DataFrame = pd.read_csv(file_name)

    # print a record count of a single column
    print("")
    print("Checking Output written to the file")
    print("---------------------------------------")
    print("Number of records written to the file : ", stocks_df.count()[1])
    print("")
    # print a sample output of first 4 rows in the file alson with its headers
    print("Sample Output : ")
    display(stocks_df.head(4))


In [23]:
def scrape_stocks_info(num_stocks: int = 10, cfg=CFG) -> None:
    """
    This function Accepts number of stocks to be processed and writes the stock information to a file
    """

    # Gets List of popular stocks and passes them to the function 'get_ticker_details' one by one.
    # This is return a list of dictionaries with stock details.
    print("Start processing Stock symbols...")
    stocks_info = []
    pbar = tqdm(get_stocks(num_stocks=num_stocks, cfg=cfg))
    for ticker_name in pbar:
        pbar.set_description(f"Processing {ticker_name}")
        stocks_info.append(get_ticker_details(ticker_name))
    # stocks_info = [
    #     get_ticker_details(ticker_name)
    #     for ticker_name in tqdm(get_popular_stocks(num_stocks=num_stocks, cfg=cfg))
    # ]
    print("End processing Stock symbols...")

    # Pass the list of dictionies to the 'write_csv' function which writes it to the file.
    today: datetime = datetime.datetime.now()
    file_name: str = (
        str(num_stocks) + cfg.web.output_filename + today.strftime("%Y-%m-%d") + ".csv"
    )
    write_csv(stocks_info, file_name)

    # Verify Results:
    verify_results(file_name)


In [24]:
scrape_stocks_info(num_stocks=CFG.web.max_companies, cfg=CFG)


Start processing Stock symbols...
Web Page:  https://companiesmarketcap.com/?page=1/


Processing V: 100%|██████████| 10/10 [00:09<00:00,  1.09it/s]     

End processing Stock symbols...
Writing to file '10_companies_2022-12-31.csv' completed

Checking Output written to the file
---------------------------------------
Number of records written to the file :  10

Sample Output : 





Unnamed: 0,Company,Symbol,Marketprice,previousClosePrice,changeInPrice,pe_ratio,eps_ratio,Volume,MarketCap
0,Apple Inc.,AAPL,129.93,129.61,0.32,21.27,6.11,76874922,2.067T
1,Saudi Arabian Oil Company,2222.SR,32.1,32.2,-0.1,17.83,1.8,5222570,7.059T
2,Microsoft Corporation,MSFT,239.82,241.01,-1.19,25.81,9.29,21938472,1.788T
3,Alphabet Inc.,GOOG,88.73,88.95,-0.22,17.16,5.17,19169187,1.145T
