# Indonesia Stock Exchange

This will scrape two Indonesia Stock Exchange websites to categorise their announcements by financial event type and convert the contents of their PDFs to a table format.

* idx.co.id
* ksei.co.id

## Libraries

In [None]:
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from time import sleep
from selenium.webdriver.support.ui import Select
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import pyautogui
import ctypes
import PyPDF2
from pathlib import Path

In [None]:
import pandas as pd
pd.options.mode.chained_assignment = None # filter SettingWithCopyWarning
from pandas.tseries.offsets import BDay
from datetime import datetime as dt
from datetime import timedelta
import re
import warnings
warnings.filterwarnings("ignore", category=UserWarning) # filter UserWarningSettingWithCopyWarning
import requests
from base64 import b64encode
import json
import os
from pytz import timezone

## Function 1 - run_table

Get announcements from idx.co.id and filter & group them based on the keywords list.

In [1]:
def run_table(mydate, checks = 'all'):
    if mydate == "today":
        mydate = dt.today().strftime("%Y%m%d")
    elif mydate == 'yesterday':
        mydate = (dt.today() - BDay(1)).strftime("%Y%m%d")
    else:
        mydate = dt.strptime(mydate, "%d%m%Y").strftime("%Y%m%d")

    # to count the total to check
    total = 0
    # collect history
    history = []
        
    if checks == 'all':
        with pd.ExcelFile("./backend/indonesia_keys.xlsx") as keyfile:
            checks = pd.read_excel(keyfile, 'Checks', header = None)[0].tolist()
            keywords = "|".join(pd.read_excel(keyfile, 'Keywords', header = None)[0].tolist())
            ignore = "|".join(pd.read_excel(keyfile, 'Ignore', header = None)[0].tolist())
    else:
        # since we wont be using the default keylist and inserting from the user's given list
        with pd.ExcelFile("./backend/indonesia_keys.xlsx") as keyfile:
            keywords = "|".join(pd.read_excel(keyfile, 'Keywords', header = None)[0].tolist())
            ignore = "|".join(pd.read_excel(keyfile, 'Ignore', header = None)[0].tolist())

    # - Exporting as multiple sheets within a single excel
    workbook = pd.ExcelWriter(f'output/Indonesia_Auto_{mydate}.xlsx', engine='xlsxwriter')

    for ori_check in checks:
        check = ori_check.strip().replace(" ", "+")
        # set link 
        the_link = f"https://www.idx.co.id/primary/NewsAnnouncement/GetAllAnnouncement?keywords={check}&pageNumber=1&pageSize=2000&dateFrom={mydate}&dateTo={mydate}&lang=id"

        # request page with selenium
        chrome_options = Options()
        chrome_options.headless = True

        # in case website error
        active = True

        while active:

            dr = webdriver.Chrome(options=chrome_options)
            dr.get(the_link)

            sleep(5)

            # Read HTML File
            soup = BeautifulSoup(dr.page_source, "html.parser")

            #dr.quit()

            if ("Verify you are human" not in soup.find_all('body')[0].text) and ("blocked" not in soup.find_all('body')[0].text) and ("Backend fetch failed" not in soup.find_all('body')[0].text) and (len(soup.find_all('body')) != 0):
                active = False

        # get table
        the_table = json.loads(soup.find_all('body')[0].text)['Items']
        the_table = pd.DataFrame(the_table)

        if len(the_table) != 0:
            # change name of columns
            col_name = {'Code':'symbol', 'PublishDate':'datetime', 'Title':'headline', 'Attachments':'det_desc'}
            the_table = the_table.rename(columns = col_name)

            # select relevant columns
            the_table = the_table[['datetime', 'symbol', 'headline', 'det_desc']]

            # change format of urls
            for each in range(0, len(the_table)):
                temp_links = ""
                for i, link in enumerate(the_table.loc[each, 'det_desc']):
                    if i == 0 and (link['FullSavePath'] not in history):
                        the_table.loc[each, 'main_link'] = link['FullSavePath']
                        history.append(link['FullSavePath'])
                    elif i == 0 and (link['FullSavePath'] in history):
                        the_table.loc[each, 'main_link'] = "remove"
                    else:
                        temp_links += link['FullSavePath']
                        temp_links += "\n"

                    the_table.loc[each, 'other_links'] = temp_links

            # select relevant columns
            the_table = the_table[['datetime', 'symbol', 'headline', 'main_link', 'other_links']]
                    
            # trim whitespaces from symbol column
            the_table['symbol'] = the_table['symbol'].str.strip()
            
            # remove those with no main link
            the_table = the_table.loc[the_table['main_link'] != "remove"].reset_index(drop = True)

            # get only those that contains keywords
            the_table = the_table.loc[the_table['headline'].str.count(keywords, flags=re.IGNORECASE) != 0].reset_index(drop = True)

            # get rid those with ignore
            the_table = the_table.loc[the_table['headline'].str.count(ignore, flags=re.IGNORECASE) == 0].reset_index(drop = True)

            if len(the_table) != 0:

                the_table.to_excel(workbook, sheet_name=f'{ori_check[:30]}', index=False)
                total += len(the_table)
                print(f"~ {ori_check}: {len(the_table)}")

    dr.quit()

    workbook.close()
    
    print(f"Total checks: {total}")


## Function 2 - get_ksei

For KSEI, we just need to get the meetings announcement and create a template to ingest.

In [None]:
def get_ksei(mydate):

    data = pd.DataFrame()

    date = mydate.strftime('%m')
    year = mydate.strftime('%Y')

    the_link = f"https://www.ksei.co.id/publications/corporate-action-schedules/meeting-announcement?Month={date}&Year={year}"

    # request page with selenium
    chrome_options = Options()
    chrome_options.headless = True

    dr = webdriver.Chrome(options=chrome_options)
    dr.maximize_window()
    dr.get(the_link)

    soup = BeautifulSoup(dr.page_source, "html.parser")

    dr.close()

    for tr in soup.find_all('table')[0].find_all('tr')[1:]:
        link = "https://www.ksei.co.id/" + tr.find_all('td')[0].a['href']
        date = tr.find_all('td')[2].text
        the_list = [[link, date]]
        data = pd.concat([data, pd.DataFrame(the_list, columns=['link', 'date'])], ignore_index = True)

    # filter data
    # Translate
    # lower all text
    data['date'] = data['date'].str.lower()
    # change date language
    # replace month from indonesian to english
    ind_eng_months = {"januari":"January",
                     "februari":"February",
                     "maret":"March",
                     #"April":"April",
                     "mei":"May",
                     "juni":"June",
                     "JUNI":"June",
                     "juli":"July",
                     "agustus":"August",
                     #"September":"September",
                     "oktober":"October",
                     #"November":"November",
                     "desember":"December"}

    for key, value in ind_eng_months.items():
        # Replace key character with value character in string
        try:
            if len(data) != 0:
                data['date'] = data['date'].str.replace(key, value)

        except AttributeError:
            break

    # Convert Dates
    if len(data) != 0:
        data['date'] = pd.to_datetime(data['date'], format = 'mixed', dayfirst = True)
        #filter
        data = data.loc[data['date'].dt.date == mydate.date()].reset_index(drop = True)

    links = data['link'].tolist()

    return links

## Function 3 - get_docs

In [None]:
def get_docs(issued_capital, idx_rups, ksei_rups, download_addr):

    if len(issued_capital) != 0:

        for link in issued_capital:
            # request page with selenium
            chrome_options = Options()
            chrome_options.headless = True

            #driver = webdriver.Chrome(options=chrome_options)
            # in case website error
            active = True

            while active:
                driver = webdriver.Edge()
                driver.get(link)

                time.sleep(3)
                
                # Read HTML File
                soup = BeautifulSoup(driver.page_source, "html.parser")
                #print(soup)
                
                if ("Verify you are human" not in soup.find_all('body')[0].text) and ("blocked" not in soup.find_all('body')[0].text) and ("Backend fetch failed" not in soup.find_all('body')[0].text) and (len(soup.find_all('body')) != 0):
                    active = False

                    pyautogui.hotkey('ctrl', 's')
                    time.sleep(2)
                    path = os.path.abspath(os.getcwd())+"\\data\\Issued Capital"
                    path_and_filename = path+"\\"+link[-25:]
                    #print(path_and_filename)
                    pyautogui.typewrite(path_and_filename)
                    pyautogui.press('enter')
                    while any(filename.endswith("tmp") for filename in os.listdir(download_addr)) or any(filename.endswith("tmp") for filename in os.listdir(path)):
                        sleep(1)

        driver.close()

    if len(idx_rups) != 0:

        for link in idx_rups:
            # request page with selenium
            chrome_options = Options()
            chrome_options.headless = True

            #driver = webdriver.Chrome(options=chrome_options)
            # in case website error
            active = True

            while active:
                driver = webdriver.Edge()
                driver.get(link)

                time.sleep(3)
                
                # Read HTML File
                soup = BeautifulSoup(driver.page_source, "html.parser")
                
                if ("Verify you are human" not in soup.find_all('body')[0].text) and ("blocked" not in soup.find_all('body')[0].text) and ("Backend fetch failed" not in soup.find_all('body')[0].text) and (len(soup.find_all('body')) != 0):
                    active = False

                    pyautogui.hotkey('ctrl', 's')
                    time.sleep(2)
                    path = os.path.abspath(os.getcwd())+"\\data\\IDX Meetings"
                    path_and_filename = path+"\\"+link[-25:]
                    #print(path_and_filename)
                    pyautogui.typewrite(path_and_filename)
                    pyautogui.press('enter')
                    while any(filename.endswith("tmp") for filename in os.listdir(download_addr)) or any(filename.endswith("tmp") for filename in os.listdir(path)):
                        sleep(1)

        driver.close()

    if len(ksei_rups) != 0:

        for link in ksei_rups:
            # request page with selenium
            chrome_options = Options()
            chrome_options.headless = True

            #driver = webdriver.Chrome(options=chrome_options)
            # in case website error
            active = True

            while active:
                driver = webdriver.Edge()
                driver.get(link)

                time.sleep(3)
                
                # Read HTML File
                soup = BeautifulSoup(driver.page_source, "html.parser")
                
                if ("Verify you are human" not in soup.find_all('body')[0].text) and ("blocked" not in soup.find_all('body')[0].text) and ("Backend fetch failed" not in soup.find_all('body')[0].text) and (len(soup.find_all('body')) != 0):
                    active = False

                    pyautogui.hotkey('ctrl', 's')
                    time.sleep(2)
                    path = os.path.abspath(os.getcwd())+"\\data\\KSEI Meetings"
                    path_and_filename = path+"\\"+link[-37:]
                    pyautogui.typewrite(path_and_filename)
                    pyautogui.press('enter')
                    while any(filename.endswith("tmp") for filename in os.listdir(download_addr)) or any(filename.endswith("tmp") for filename in os.listdir(path)):
                        sleep(1)

        driver.close()
        
    print("Acquired Documents")

## Function 4 - conn_sql_inc

In [None]:
# get ID and issued capital details
def conn_sql_isc(sql_username, sql_password, hostname = 'examplehost', portno = '0000', servicename = 'exampleservice'):
    
    # surpress warning
    warnings.filterwarnings("ignore", category=UserWarning)
    
    # connect to SQL
    dsn_tns = oracle.makedsn(hostname, portno, service_name=servicename) 
    conn = oracle.connect(user=sql_username, password=sql_password, dsn=dsn_tns) 
    
    print("Username & Password Accepted")
    print("Generating Table")
    
    c = conn.cursor()
    
    query = """
    SELECT *
    FROM random_table
    """ 

    isscap_list = pd.read_sql_query(query, con=conn)
    conn.close()

    return isscap_list

## Function 5 - gettable

In [None]:
def gettable():
    # folder path
    dir_path = os.path.join(os.getcwd(), "data/Issued Capital")

    # list to store files
    res = []

    # Iterate directory
    for path in os.listdir(dir_path):
        # check if current path is a file
        if os.path.isfile(os.path.join(dir_path, path)):
            res.append(path)

    # create empty dataframe
    data = pd.DataFrame()

    if len(res) != 0:
        try:
            for filename in res:
                the_file = open(f"./data/Issued Capital/{filename}", 'rb')
                the_pdf = PyPDF2.PdfReader(the_file)

                #print(filename)

                for i in range(0, len(the_pdf.pages)):
                    #print(i)
                    the_page = the_pdf.pages[i].extract_text().split('\n')

                    if i == 0 and ('Pencatatan Saham' == (the_page[1]).strip()):
                        # Increase and Decrease

                        try:
                            the_dict = {'company': the_page[0],
                                        'code': re.findall(r"\((\S+)\)", the_page[0])[0], 
                                        'isc_change' : re.findall(r"([\d\.]+)", the_page[6])[0].replace(".", ""),
                                        'isc_total' : re.findall(r"([\d\.]+)", the_page[7])[0].replace(".", ""),
                                        'isc_date': the_page[8].replace("Tanggal Pencatatan dan Perdagangan","").strip(), } # Increase
                            data = pd.concat([data, pd.DataFrame([the_dict])], ignore_index = True)

                            the_dict = {'company': the_page[0],
                                        'code': re.findall(r"^([\S\-]+) [\d\,]+ [\d\,]+", the_page[11])[0], 
                                        'isc_change' : "-"+re.findall(r"^[\S\-]+ ([\d\,]+) [\d\,]+", the_page[11])[0].replace(",", ""),
                                        'isc_total' : re.findall(r"^[\S\-]+ [\d\,]+ ([\d\,]+)", the_page[11])[0].replace(",", ""),
                                        'isc_date': the_page[8].replace("Tanggal Pencatatan dan Perdagangan","").strip(), } # Decrease
                            data = pd.concat([data, pd.DataFrame([the_dict])], ignore_index = True)
                        except:
                            # limitation - other files that are not simple increase decrease from conversion updated manually
                            t = re.findall(r"\((\S+)\)", the_page[0])[0]
                            print(f"~ Manual Entry {t} - {filename}")

                    elif i == 0 and ('Pencatatan Saham' == (the_page[2]).strip()):
                        # PUB
                        break
                    elif i == 0 and ('Pencatatan Saham' != (the_page[1]).strip()):
                        break

                the_file.close()
            
        except:
            print(f"Error in {filename}")
        
        # lower all string
        data['isc_date'] = data['isc_date'].str.lower()

        # change date language
        # replace month from indonesian to english
        ind_eng_months = {"januari":"January",
                         "februari":"February",
                         "maret":"March",
                         #"April":"April",
                         "mei":"May",
                         "juni":"June",
                         "JUNI":"June",
                         "juli":"July",
                         "agustus":"August",
                         #"September":"September",
                         "oktober":"October",
                         #"November":"November",
                         "desember":"December"}

        for key, value in ind_eng_months.items():
            # Replace key character with value character in string
            try:
                data['isc_date'] = data['isc_date'].str.replace(key, value)
            except AttributeError:
                break

        # set datetime
        # get datetime object for indo
        indodate = dt.now(timezone("Asia/Jakarta"))
        # get today's date for issued capital
        data['date'] = indodate.strftime("%d/%m/%Y")
        # set indo time -20 mins
        data['time'] = (indodate - timedelta(minutes = 20)).strftime("%H%M")
    
    return data

## Define Main Function

In [None]:
def indonesia_se(mydate, checks, sql_username, sql_password, docs_only = False):

    print("_"*50)
    isc_sql = conn_sql_isc(sql_username, sql_password)

    # get download address
    the_path = os.getcwd()
    while "\\Parent Folder" in the_path:
        the_path = os.path.split(the_path)[0]

    download_addr = the_path + "\\Downloads"
    
    if docs_only == False:
        run_table(mydate, checks)
        
    if mydate == "today":
        mydate = dt.today()
    elif mydate == 'yesterday':
        mydate = (dt.today() - BDay(1))
    else:
        mydate = dt.strptime(mydate, "%d%m%Y")
            
    if docs_only == False:
        # remove docs in file
        folder_names = ['IDX Meetings', 'Issued Capital', 'KSEI Meetings']
        for folder in folder_names:
            for file in os.listdir(f"./data/{folder}/"):
                os.remove(f"./data/{folder}/" + file)
        
        doc_date = mydate.strftime("%Y%m%d")
        with pd.ExcelFile(f'output/Indonesia_Auto_{doc_date}.xlsx') as excel_file:
            if 'Pencatatan Saham  ' in excel_file.sheet_names:
                issued_capital = pd.read_excel(excel_file, 'Pencatatan Saham  ', header = 0)['main_link'].tolist()
            else:
                issued_capital = []
            if 'Rapat Umum' in excel_file.sheet_names:
                idx_rups = pd.read_excel(excel_file, 'Rapat Umum', header = 0)['main_link'].tolist()
            else:
                idx_rups = []

        ksei_rups = get_ksei(mydate)

        print(f"KSEI Meetings: {len(ksei_rups)}")
        ctypes.windll.user32.MessageBoxW(None, "Please step away from the computer so we can get documents", "Indonesia Auto Validation", 1)

        print("_"*50)
        get_docs(issued_capital, idx_rups, ksei_rups, download_addr)
        
    print("Creating Bulks")
    thedate = mydate.strftime("%Y%m%d")
    # check if today's compare file exists
    whole_path = Path(f"./output/Indonesia_Auto_{thedate}.xlsx")

    if whole_path.exists():
        with pd.ExcelWriter(f"./output/Indonesia_Auto_{thedate}.xlsx", mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
            guide = pd.DataFrame(['Manual Updates are all changes and cancellation', 'Cancellation does not have Record Date', 'There is no cancellation cases for IDX', 'Those that have no ID or ID is code means its not found'])
            guide.to_excel(writer, sheet_name='Guidance', index=False, header = None)
    else:
        workbook = pd.ExcelWriter(f'./output/Indonesia_Auto_{thedate}.xlsx', engine='xlsxwriter')
        guide = pd.DataFrame(['Manual Updates are all changes and cancellation', 'Cancellation does not have Record Date', 'There is no cancellation cases for IDX', 'Those that have no ID or ID is code means its not found'])
        guide.to_excel(workbook, sheet_name='Guidance', index=False, header = None)
        workbook.close()

    print("Process complete")