In [1]:
import os
import traceback
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import Select
from webdriver_manager.chrome import ChromeDriverManager
import pandas as pd
import time
import calendar
from datetime import datetime
import datetime
import requests
from bs4 import BeautifulSoup
from multiprocessing.pool import ThreadPool
import multiprocessing

In [2]:
import warnings; warnings.simplefilter('ignore')

In [3]:
path = os.getcwd()
path

'f:\\PracticumProject\\stock-analysis-tool'

# Scraping of Equity Data

In [4]:
def download_equity():
    """
    download the equity file.

    if file already exists, returns None

    security_url = "https://www.bseindia.com/corporates/List_Scrips.aspx"
    
    creates the driver.

    opens the security_url.
    
    Sets Active and Equity fields.
    
    downloads the file.

    """
    
    path = os.path.join(os.getcwd(),"Data")
    security_url = "https://www.bseindia.com/corporates/List_Scrips.aspx"

    if os.path.exists(os.path.join(path,"Equity.csv")):
        print("Equity.csv exists")
        return

    chromeOptions = webdriver.ChromeOptions()
    chromeOptions.add_argument("--headless")
    chromeOptions.add_experimental_option("prefs",{"download.default_directory":path})
    driver = webdriver.Chrome(ChromeDriverManager().install(),options = chromeOptions)
    driver.get(security_url)
    
    # to select Equity
    equity = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_ddSegment"]')
    equity = Select(equity)
    equity.select_by_visible_text("Equity")  

    # to select Active
    active = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_ddlStatus"]')
    active = Select(active)
    active.select_by_visible_text("Active") 
    
    # to click submit 
    submit = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_btnSubmit"]')
    submit.send_keys(Keys.RETURN)

    # to download csv file
    driver.find_element_by_xpath("/html/body/div[1]/form/div[4]/div/div/div[2]/div/div/div[2]/a/i").click()
    time.sleep(3)
    driver.quit()

# Scraping of stock data

In [5]:
def download_stocks(security_id):
    """
    Downloads the Stock data file.
    
    stock_url = "https://www.bseindia.com/markets/equity/EQReports/StockPrcHistori.aspx?flag=0"

    creates the driver.

    opens the stock_url.

    sets the security id.

    if file already exists

        sets the from date by taking the last date from the file.
        sets the to date.
        downloads the file.

    if file doesnt exists

        sets the from date
        sets the to date.
        downloads the file.

    Parameters
    ----------
    security_id : string
        security_id of the company
        
    Returns
    -------
    stock : dataframe

    Methods:
    --------
    create_driver : creates the chrome driver.

    set_to_date : Sets the TO date.

    set_from_date : Sets the FROM date.

    set_security_id : sets the security id.

    download : downloads the file.

    convert_date_to_unix_timestamp : Adds a new Unix Date column to the given dataframe.

    """ 

    path = os.path.join(os.getcwd(),"Data\\Stock")
    stock_url = "https://www.bseindia.com/markets/equity/EQReports/StockPrcHistori.aspx?flag=0"

    def convert_date_to_unix_timestamp(stock_df):
        """
        Adds a new Unix Date column to the given dataframe.

        Parameters
        ----------
        stock_df : dataframe

        Returns
        -------
        stock_df : dataframe
            updated dataframe with a new Unix Date column.
        """
        stock_df["Unix Date"] = stock_df["Date"].apply(lambda x : time.mktime(x.timetuple()))
        return stock_df

    def set_from_date(d,m,y):
        """
        Sets the FROM date.

        Parameters
        ----------
        d : string
            day

        m : string 
            month

        y : string
            year 

        """
        from_date = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_txtFromDate"]')
        from_date.clear()
        from_date.click()
        year = driver.find_element_by_xpath('/html/body/div[1]/div/div/select[2]') 
        year = Select(year)
        while year.options[0].text > y:
            year.select_by_visible_text(year.options[0].text) 
            year = driver.find_element_by_xpath('/html/body/div[1]/div/div/select[2]') 
            year = Select(year)

        year.select_by_visible_text(y) 

        month = driver.find_element_by_xpath('/html/body/div[1]/div/div/select[1]') 
        month = Select(month)
        month.select_by_visible_text(m)  

        days=driver.find_element_by_xpath("//table/tbody/tr/td/a[text()="+str(d)+"]")
        days.click()

    def set_to_date(d,m,y):
        """
        Sets the TO date.

        Parameters
        ----------
        d : string
            day

        m : string 
            month

        y : string
            year 

        """
        to_date = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_txtToDate"]')
        to_date.clear()
        to_date.click()
        year = driver.find_element_by_xpath('/html/body/div[1]/div/div/select[2]') 
        year = Select(year)
        while year.options[0].text > y:
            year.select_by_visible_text(year.options[0].text) 
            year = driver.find_element_by_xpath('/html/body/div[1]/div/div/select[2]') 
            year = Select(year)

        year.select_by_visible_text(y) 

        month = driver.find_element_by_xpath('/html/body/div[1]/div/div/select[1]') 
        month = Select(month)
        month.select_by_visible_text(m)  

        days=driver.find_element_by_xpath("//table/tbody/tr/td/a[text()="+str(d)+"]")
        days.click()
    

    def set_security_id(security):
        """
        sets the secuirty id to the input field.

        Parameters
        -----------

        security : string
            security id of the company.

        """
        element = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_smartSearch"]')
        element.clear()
        element.send_keys(security)
        element.send_keys(Keys.ENTER)

    def download():
        """
        downloads the file.
        """
        submit = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_btnSubmit"]')
        submit.click()
        time.sleep(1)
        driver.find_element_by_xpath("/html/body/form/div[4]/div/div/div[1]/div/div[2]/div/div[1]/div[2]/span/a/i").click()
        time.sleep(3)
        driver.quit()

    def create_driver():
        """
        Creates a Chrome Driver.

        Returns
        --------
        driver : driver
            chrome web driver.
        """
        chromeOptions = webdriver.ChromeOptions()
        # chromeOptions.add_argument("--headless")
        chromeOptions.add_experimental_option("prefs",{"download.default_directory":path})
        driver = webdriver.Chrome(ChromeDriverManager().install(), options = chromeOptions)
        return driver

    if os.path.exists(os.path.join(path,str(security_id)+".csv")):
        driver = create_driver()
        driver.get(stock_url)
        old_df = pd.read_csv(os.path.join(path,str(security_id)+".csv"))
        old_df["Date"] = pd.to_datetime(old_df["Date"])
        last = old_df["Date"].head(1)[0]
        set_security_id(str(security_id))

        set_from_date(last.day,calendar.month_abbr[last.month],str(last.year))
        today = datetime.date.today()
        # today = last+datetime.timedelta(365)
        set_to_date(today.day,calendar.month_abbr[today.month],str(today.year))
        download()
        new_df = pd.read_csv(os.path.join(path,str(security_id)+" (1).csv"))
        new_df["Date"] = pd.to_datetime(new_df["Date"],errors="coerce")
        new_df = new_df.drop(columns = ["Unnamed: 13"],axis=1,errors='ignore')
        new_df = new_df.dropna(how='all')
        new_df = convert_date_to_unix_timestamp(new_df)
        res = new_df.append(old_df,ignore_index=True)
        res.to_csv(os.path.join(path,str(security_id)+".csv"),index=None)
        os.remove(os.path.join(path,str(security_id)+" (1).csv"))
        new_df.to_csv(os.path.join(path,"new"+str(security_id)+".csv"),index=None)
        return new_df
    else:
        driver = create_driver()
        driver.get(stock_url)
        set_security_id(str(security_id))
        set_from_date("02","Aug","2007")
        today = datetime.date.today()
        # start = datetime.datetime.strptime("01 Jan 2000","%d %b %Y")
        # today = start+datetime.timedelta(365)
        set_to_date(today.day,calendar.month_abbr[today.month],str(today.year))
        download()
        stock =  pd.read_csv(os.path.join(path,str(security_id)+".csv"))
        stock.Date = pd.to_datetime(stock.Date,errors="coerce")
        stock = stock.drop(columns = ["Unnamed: 13"],axis=1,errors='ignore')
        stock = stock.dropna(how='all')
        stock = convert_date_to_unix_timestamp(stock)
        stock.to_csv(os.path.join(path,str(security_id)+".csv"),index=None)
        return stock

In [6]:
download_stocks("500002")

[WDM] - Current google-chrome version is 90.0.4430
[WDM] - Get LATEST driver version for 90.0.4430


[WDM] - There is no [win32] chromedriver for browser 90.0.4430 in cache
[WDM] - Get LATEST driver version for 90.0.4430
[WDM] - Trying to download new driver from https://chromedriver.storage.googleapis.com/90.0.4430.24/chromedriver_win32.zip
[WDM] - Driver has been saved in cache [C:\Users\VenkataSaiKrishna\.wdm\drivers\chromedriver\win32\90.0.4430.24]


FileNotFoundError: [Errno 2] No such file or directory: 'f:\\PracticumProject\\stock-analysis-tool\\Data\\Stock\\500002 (1).csv'

# Scraping of Corporate Actions Data

In [None]:
def download_corporate_actions(security_id):

    """
    Downloads the corporate actions of the give security id.

    corporate_url = "https://www.bseindia.com/corporates/corporate_act.aspx"

    creates the driver.
    opens the corporate_url.
    sets the from date.
    sets the to date.
    downloads the file.
    replaces the if already downloaded.

    Parameters
    ----------

    security_id : string
        security id of the company.

    Methods:
    --------
    create_driver : creates the chrome driver.

    set_security_id : sets the security id.
    
    set_to_date : Sets the TO date.

    set_from_date : Sets the FROM date.

    download : downloads the file.

    """

    path = os.path.join(os.getcwd(),"Data\\CorporateActions")
    corporate_url = "https://www.bseindia.com/corporates/corporate_act.aspx"

    def set_from_date(d,m,y):
        """
        Sets the FROM date.

        Parameters
        ----------
        d : string
            day

        m : string 
            month

        y : string
            year 

        """
        from_date = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_txtDate"]')
        from_date.clear()
        from_date.click()
        year = driver.find_element_by_xpath('/html/body/div[2]/div/div/select[2]') 
        year = Select(year)
        while year.options[0].text > y:
            year.select_by_visible_text(year.options[0].text) 
            year = driver.find_element_by_xpath('/html/body/div[2]/div/div/select[2]') 
            year = Select(year)

        year.select_by_visible_text(y) 

        month = driver.find_element_by_xpath('/html/body/div[2]/div/div/select[1]') 
        month = Select(month)
        month.select_by_visible_text(m)  

        days=driver.find_element_by_xpath("//table/tbody/tr/td/a[text()="+str(d)+"]")
        days.click()

    def set_to_date(d,m,y):
        """
        Sets the TO date.

        Parameters
        ----------
        d : string
            day

        m : string 
            month

        y : string
            year 

        """
        to_date = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_txtTodate"]')
        to_date.clear()
        to_date.click()
        year = driver.find_element_by_xpath('/html/body/div[2]/div/div/select[2]') 
        year = Select(year)
        while year.options[0].text > y:
            print(year.options[0].text,y)
            year.select_by_visible_text(year.options[0].text) 
            year = driver.find_element_by_xpath('/html/body/div[2]/div/div/select[2]') 
            year = Select(year)

        year.select_by_visible_text(y) 

        month = driver.find_element_by_xpath('/html/body/div[2]/div/div/select[1]') 
        month = Select(month)
        month.select_by_visible_text(m)  

        days=driver.find_element_by_xpath("//table/tbody/tr/td/a[text()="+str(d)+"]")
        days.click()

    def set_security_id(security):
        """
        sets the secuirty id to the input field.

        Parameters
        -----------

        security : string
            security id of the company.

        """
        element = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_SmartSearch_smartSearch"]')
        element.clear()
        element.send_keys(security)
        element.send_keys(Keys.ENTER)
    

    def download():
        """
        downloads the file.
        """
        submit = driver.find_element_by_xpath('//*[@id="ContentPlaceHolder1_btnSubmit"]')
        submit.click()
        time.sleep(1)
        driver.find_element_by_xpath("/html/body/div[1]/form/div[4]/div/div/div[2]/div/div/div[2]/a/i").click()
        time.sleep(4)
        driver.quit()

    def create_driver():
        """
        Creates a Chrome Driver.

        Returns:
        --------
        driver : driver
            chrome web driver.
        """
        chromeOptions = webdriver.ChromeOptions()
        # chromeOptions.add_argument("--headless")
        chromeOptions.add_experimental_option("prefs",{"download.default_directory":path})
        driver = webdriver.Chrome(ChromeDriverManager().install(), options = chromeOptions)
        return driver

    driver = create_driver()
    driver.get(corporate_url)
    set_security_id(str(security_id))
    # set_from_date("01","Jan","1991")
    set_from_date("02","Aug","2007")
    today = datetime.date.today()
    set_to_date(today.day,calendar.month_abbr[today.month],str(today.year))
    download()
    if os.path.exists(os.path.join(path,str(security_id)+".csv")):
        os.remove(os.path.join(path,str(security_id)+".csv"))
    os.rename(os.path.join(path,"Corporate_Actions.csv"),os.path.join(path,str(security_id)+".csv"))
    

# Scraping of Index Data

In [None]:
def download_index():

    """
    Downloads the index data file.

    index_url = "https://www.bseindia.com/indices/IndexArchiveData.html"
    index = "S&P BSE 500"

    creates the driver.

    opens the index_url.

    sets the index.

    if file already exists

        sets the from date by taking the last date from the file.
        sets the to date.
        downloads the file.
        renames the file to Index.csv

    if file doesnt exists

        sets the from date
        sets the to date.
        downloads the file.
        renames the file to Index.csv

    Methods:
    --------
    create_driver : creates the chrome driver.

    set_to_date : Sets the TO date.

    set_from_date : Sets the FROM date.

    set_index : sets the index.

    download : downloads the file.

    """

    
    index_url = "https://www.bseindia.com/indices/IndexArchiveData.html"
    index = "S&P BSE 500"
    path = os.path.join(os.getcwd(),"Data")

    def set_from_date(d,m,y):
        """
        Sets the FROM date.

        Parameters
        ----------
        d : string
            day

        m : string 
            month

        y : string
            year 

        """

        from_date = driver.find_element_by_xpath('//*[@id="txtFromDt"]')
        from_date.clear()
        from_date.click()
        year = driver.find_element_by_xpath('/html/body/div[4]/div/div/select[2]') 
        year = Select(year)
        while year.options[0].text > y:
            year.select_by_visible_text(year.options[0].text) 
            year = driver.find_element_by_xpath('/html/body/div[4]/div/div/select[2]') 
            year = Select(year)

        year.select_by_visible_text(y) 

        month = driver.find_element_by_xpath('/html/body/div[4]/div/div/select[1]') 
        month = Select(month)
        month.select_by_visible_text(m)  

        days=driver.find_element_by_xpath("//table/tbody/tr/td/a[text()="+str(d)+"]")
        days.click()

    def set_to_date(d,m,y):
        """
        Sets the TO date.

        Parameters
        ----------
        d : string
            day

        m : string 
            month

        y : string
            year 

        """

        to_date = driver.find_element_by_xpath('//*[@id="txtToDt"]')
        to_date.clear()
        to_date.click()
        year = driver.find_element_by_xpath('/html/body/div[4]/div/div/select[2]') 
        year = Select(year)
        while year.options[0].text > y:
            print(year.options[0].text,y)
            year.select_by_visible_text(year.options[0].text) 
            year = driver.find_element_by_xpath('/html/body/div[4]/div/div/select[2]') 
            year = Select(year)

        year.select_by_visible_text(y) 

        month = driver.find_element_by_xpath('/html/body/div[4]/div/div/select[1]') 
        month = Select(month)
        month.select_by_visible_text(m)  

        days=driver.find_element_by_xpath("//table/tbody/tr/td/a[text()="+str(d)+"]")
        days.click()


    def set_index(index_):
        """
        Sets the index field.

        Parameters
        ----------
        index_ : string
            index value
        """

        indexes = driver.find_element_by_xpath('//*[@id="ddlIndex"]')
        indexes = Select(indexes)
        indexes.select_by_visible_text(index_)  

    def download():
        """
        downloads the file.
        """
        submit = driver.find_element_by_xpath('/html/body/div[2]/div/div[2]/div[5]/div/input')
        submit.click()
        time.sleep(1)
        driver.find_element_by_xpath("/html/body/div[2]/div/div[1]/div/div[1]/div[2]/i").click()
        time.sleep(3)
        driver.quit()

    def create_driver():
        """
        Creates a Chrome Driver.

        Returns
        --------
        driver : driver
            chrome web driver.
        """
        chromeOptions = webdriver.ChromeOptions()
        chromeOptions.add_argument("--headless")
        chromeOptions.add_experimental_option("prefs",{"download.default_directory":path})
        driver = webdriver.Chrome(ChromeDriverManager().install(), options = chromeOptions)
        return driver

    driver = create_driver()
    driver.get(index_url)
    set_index("S&P BSE 500")
    set_from_date("2","Aug","2007")
    today = datetime.date.today()
    set_to_date(today.day,calendar.month_abbr[today.month],str(today.year))
    download()
    res = pd.read_csv(os.path.join(path,"CSVForDate.csv"),names=["Date","Open","High","Low","Close"])
    res = res.iloc[1:]
    res["Date"] = pd.to_datetime(res["Date"])
    os.remove(os.path.join(path,"CSVForDate.csv"))
    res.to_csv(os.path.join(path,"Index.csv"),index=None)

# Scraping Risk Free Rate Data

In [None]:

def download_risk_free_rate():
    """
    Downloads the Risk Free Rate file.

    risk_free_rate_url = "https://www.treasury.gov/resource-center/data-chart-center/interest-rates/pages/textview.aspx?data=yield"

    creates the driver.
    opens the risk_free_rate_url.
    downloads the file.

    Methods:
    --------

    create_driver : creates the chrome driver.

    download : extracts the data from the page and saves to a csv file.

    """

    path = os.path.join(os.getcwd(),"Data")
    risk_free_rate_url = "https://www.treasury.gov/resource-center/data-chart-center/interest-rates/pages/textview.aspx?data=yield"

    def create_driver():
        """
        Creates a Chrome Driver.

        Returns
        --------
        driver : driver
            chrome web driver.
        """
        chromeOptions = webdriver.ChromeOptions()
        chromeOptions.add_argument("--headless")
        chromeOptions.add_experimental_option("prefs",{"download.default_directory":path})
        driver = webdriver.Chrome(ChromeDriverManager().install(), options = chromeOptions)
        return driver

    def download():
        """
        downloads the file.
        """
        
        ele = driver.find_element_by_xpath('//*[@id="interestRateTimePeriod"]')
        ele = Select(ele)
        ele.select_by_visible_text("All") 

        btn = driver.find_element_by_xpath('/html/body/form/div[8]/div/div[1]/div/div[2]/div/div/div/div[1]/div[2]/div/table/tbody/tr/td/div/div[3]/div[2]/input')
        btn.click()
        time.sleep(15)
        soup = BeautifulSoup(driver.page_source,'html')
        driver.quit()
        table = soup.find_all("table", {"class": "t-chart"})
        risk_free_rate = pd.read_html(str(table))[0]
        risk_free["Date"] = pd.to_datetime(risk_free["Date"])
        risk_free_rate.to_csv(os.path.join(path,"inRiskFreeRate.csv"),index=None)
        risk_free = risk_free_rate[["Date","3 mo"]]
        risk_free_rate["Rate"] = risk_free_rate["3 mo"]
        risk_free.columns = ["Date","Rate"]
        risk_free["Date"] = pd.to_datetime(risk_free["Date"])
        risk_free.dropna(inplace=True)
        risk_free.to_csv(os.path.join(path,"RiskFreeRate.csv"),index=None)
    driver = create_driver()
    driver.get(risk_free_rate_url)
    download()

# Scraping of Revenue Profit Data

In [None]:
def download_revenue_profit(code,name): 
    """
    Creates the revenue profit file.

    Parameters
    ----------
    code : string
        security code of the company.
    name : 
        security id of the company.

    Methods:
    --------

    create_driver : creates the chrome driver.

    download : extracts the data from the page and saves to a csv file.

    """

    path = os.path.join(os.getcwd(),"Data\\Revenue")

    def create_driver():
        """
        Creates a Chrome Driver.

        Returns
        --------
        driver : driver
            chrome web driver.
        """
        chromeOptions = webdriver.ChromeOptions()
        # chromeOptions.add_argument("--headless")
        chromeOptions.add_experimental_option("prefs",{"download.default_directory":path})
        driver = webdriver.Chrome(ChromeDriverManager().install(), options = chromeOptions)
        return driver

    def download():
        """
        downloads the file.
        """
        columns = ["security code","security name",'revenue','income','expenditure','profit','eps',"year","quartile"]
        code_df = pd.DataFrame(columns=columns)
        for q in range(55,108):
            url = "https://www.bseindia.com/corporates/results.aspx?Code=" + str(code) +"&Company="+ str(name) +"&qtr="+ str(q) +"&RType=D"
            driver.get(url)
            html = driver.page_source
            soup = BeautifulSoup(html, "html")

            table = soup.find_all("table",attrs={"id":"ContentPlaceHolder1_tbl_typeID"})
            table = pd.read_html(str(table))[0]
            table = table[[0,1]]
            table.dropna(inplace=True)
            table = table.transpose()
            table.columns = table.iloc[0]
            table = table[1:]
            table.columns = map(str.lower, table.columns)
            table.drop(["description"],inplace=True,axis=1)
            try:
                table["date begin"] = pd.to_datetime(table["date begin"])
                date = table.iloc[0]["date begin"]
                table["quartile"] =  (date.month-1)//3 + 1
                table["year"] = date.year
                table["security name"] = name
                table["security code"] = code
                cols = table.columns
                mycols = ['revenue','income','expenditure','profit','eps']
                row = {}
                row["security name"] = name
                row["security code"] = code
                row["year"] = date.year
                row["quartile"] = (date.month-1)//3 + 1
                for my in mycols:
                    try:
                        res = [c for c in cols if my in c]
                        if my == "income":
                            p = [c for c in res if "total income" == c ] 
                            res = p or res
                        elif my == "profit":
                            p = [c for c in res if "net profit" == c]
                            res = p or res
                        elif my == "expenditure":
                            p = [c for c in cols if "expenses" in c]
                            res = p or res
                        elif my =="eps":
                            a = "Basic for discontinued & continuing operation"
                            b = "Diluted for discontinued & continuing operation"
                            p = [c for c in cols if a.lower() in c or b.lower() in c]
                            res = p or res
                        elif my == "revenue":
                            p = [c for c in cols if "sales" in c]
                            res = p or res
                            # row["revenue"] = table[res].values[0][0]
                            # continue
                            pass
                        row[my] = table[res].values[0][0]
                    except :
                        row[my] = ""
                        traceback.print_exc()
                code_df = code_df.append(row,ignore_index=True)
            except Exception as e:
                traceback.print_exc()
        code_df.to_csv(os.path.join(path,str(code)+".csv"),index=None)
    driver = create_driver()
    download()
    driver.quit()

In [None]:
my = pd.read_csv("my.csv")

In [None]:
download_risk_free_rate()

In [None]:
%%time
for _,row in my.iterrows():
    try:
        security_id = row["Security Code"]
        name = row["Security Id"]
        print(security_id,name)
        download_stocks(security_id)
        download_revenue_profit(security_id,name)
        download_corporate_actions(security_id)
    except Exception as e:
        traceback.print_exc()