## Data Collection

##### import python packages

In [27]:
import requests
import pandas as pd
import numpy as np
import datetime
import time
from io import StringIO

### Financial Statement Crawler
Income Statement | Balance Sheet | Financial Ratios
#### Statement of one period

In [2]:
def income_statement(year, season):
    
    # Transform the AD year input to National year system
    if year >= 1000:
        year -= 1911
    
    # starting from 102 national year (2013 AD year), IFRSs system is applied
    # before then, was GAAP
    if year >= 102:
        url = 'https://mops.twse.com.tw/mops/web/ajax_t163sb04'
    elif year < 102:
        url = 'https://mops.twse.com.tw/mops/web/ajax_t51sb13'
    else:
        print('type does not match')

        
    r = requests.post(url, 
                      {'encodeURIComponent':1,
                       'step':1,
                       'firstin':1,
                       'off':1,
                       'TYPEK':'sii',
                       'year':str(year).zfill(3),
                       'season':str(season).zfill(2)})
    r.encoding = 'utf8'
    
    # receive a list of DataFrames
    DFs = pd.read_html(r.text)

    # data with GAAP system sometimes returns duplicate columns 
    if year < 102:
        for i in DFs:
            try:
                i.columns = i.columns.droplevel()
            except:
                pass
    
    # the first element is some notification
    return DFs[1:]


In [3]:
def balance_sheet(year, season):
    
    # Transform the AD year input to National year system
    if year >= 1000:
        year -= 1911
    
    # starting from 102 national year (2013 AD year), IFRSs system is applied
    # before then, was GAAP
    if year >= 102:
        url = 'https://mops.twse.com.tw/mops/web/ajax_t163sb05'
    elif year < 102:
        url = 'https://mops.twse.com.tw/mops/web/ajax_t51sb12'
    else:
        print('type does not match')


    r = requests.post(url, 
                      {'encodeURIComponent':1,
                       'step':1,
                       'firstin':1,
                       'off':1,
                       'TYPEK':'sii',
                       'year':str(year).zfill(3),
                       'season':str(season).zfill(2)})
    r.encoding = 'utf8'
    
    # receive a list of DataFrames
    DFs = pd.read_html(r.text)

    
    if year >= 102:
        DFs = DFs[1:]
    elif year < 102:
        DFs = DFs[1::2]
    return DFs


In [4]:
def financial_rate(year):

    url = "https://mops.twse.com.tw/mops/web/ajax_t51sb02"

    # Transform the AD year input to National year system
    if year >= 1000:
        year -= 1911

    # starting from 102 national year (2013 AD year), IFRSs system is applied
    # before then, was GAAP
    if year>=102:
        r = requests.post(url, {
            'encodeURIComponent':1,
            'step':1,
            'run':"Y",
            'firstin':1,
            'off':1,
            'TYPEK':'sii',
            'year':str(year).zfill(3),
            'ifrs':"Y",
            })
    elif year<102:
        r = requests.post(url, {
            'encodeURIComponent':1,
            'step':1,
            'firstin':1,
            'off':1,
            'TYPEK':'sii',
            'year':str(year).zfill(3),
            })
    r.encoding = 'utf8'
    
    # receive a list of DataFrames
    DFs = pd.read_html(r.text)
    
    # the first element is some notification
    return DFs[1:]


#### Statement for range of periods

In [5]:
def financial_deadline(data, year, season, DFs):
    """
    data are split into different tables based on industries
    deadlines for releasing finanical statement for financial industry are different 
    from other industries.
    """
    if year>=2013:
        if season==1:
            data[datetime.date(year, 5, 30)] = DFs.pop(-3)
            data[datetime.date(year, 5, 15)] = pd.concat(DFs)
        elif season == 2:
            data[datetime.date(year, 8, 14)] = pd.concat([DFs.pop(2),DFs.pop(-1)])
            data[datetime.date(year, 8, 31)] = pd.concat(DFs)
        elif season == 3:
            data[datetime.date(year, 11, 29)] = DFs.pop(-3)
            data[datetime.date(year, 11, 14)] = pd.concat(DFs)
        elif season == 4:
            data[datetime.date(year+1, 3, 31)] = pd.concat(DFs)
    elif year<2013:
        if season==1:
            data[datetime.date(year, 5, 15)] = pd.concat(DFs)
        elif season == 2:
            data[datetime.date(year, 8, 31)] = dfs.pop(-4)
            data[datetime.date(year, 9, 13)] = pd.concat(DFs)
        elif season == 3:
            data[datetime.date(year, 11, 14)] = pd.concat(DFs)
        elif season == 4:
            data[datetime.date(year+1, 3, 31)] = pd.concat(DFs)
    return data

In [26]:
def statementCrawler(end_year,end_quater, start_year, statement_type, allow_continuous_fail_count=1):

    # init variables
    data = {}
    fail_count = 0
    count_period = 0

    n_years = int(start_year)
    year = int(end_year)
    season = int(end_quater)

    while year >= n_years:
        print('parsing', str(year)+str(season).zfill(2))
        
        try:
            if statement_type == "balance_sheet":
                dfs = balance_sheet(year,season)
                data = financial_deadline(data, year, season, dfs)
            elif statement_type == "income statement":
                dfs = income_statement(year,season)
                data = financial_deadline(data, year, season, dfs)
            elif statement_type == "financial_rate":
                dfs = pd.concat(financial_rate(year))
                dfs["財報日期"] = str(year)+"04"
                data[datetime.date(year+1, 3, 31)] = dfs
                season = 1
            else:
                print("invalid statement input")
                break
            
            print('success!')
            count_period +=1
            print(count_period)
            fail_count = 0
            
            # only continues if success
            if season ==1:
                year-=1
                season = 4
            else:
                season -=1

        except:
            print('Failed')
            fail_count += 1
            if fail_count == allow_continuous_fail_count:
                raise
                break

        time.sleep(10)
    return data

### Stock Trade Price Crawler

In [28]:
def crawl_price(date):
    r = requests.post('http://www.twse.com.tw/exchangeReport/MI_INDEX?response=csv&date=' + str(date).replace('-','') + '&type=ALL')
    ret = pd.read_csv(StringIO("\n".join([i.translate({ord(c): None for c in ' '}) 
                                        for i in r.text.split('\n') 
                                        if len(i.split('",')) == 17 and i[0] != '='])), header=0)
    ret = ret.set_index('證券代號')
    ret['成交金額'] = ret['成交金額'].str.replace(',','')
    ret['成交股數'] = ret['成交股數'].str.replace(',','')
    return ret


In [31]:
def stockpriceCrawler(startDate, endDate=datetime.date.today()):
    
    # init variables
    data = {}
    fail_count = 0
    count_day = 0
    
    # avoiding long holidays
    allow_continuous_fail_count = 25
    
    startDate = pd.to_datetime(str(startDate)).date()
    
    while startDate <= endDate:

        print('parsing', endDate)
        
        try:
            data[endDate] = crawl_price(endDate)
            print('success!')
            count_day +=1
            print(count_day)
            fail_count = 0
        except:
            print('fail! check the date is holiday')
            fail_count += 1
            if fail_count == allow_continuous_fail_count:
                raise
                break

        # backward one day
        endDate -= datetime.timedelta(days=1)
        time.sleep(15)
        
    return pd.concat(data).rename_axis(["日期","證券代號"]).iloc[:,:-1]

In [32]:
test1 = stockpriceCrawler("2020/10/15")

parsing 2020-10-18
fail! check the date is holiday
parsing 2020-10-17
fail! check the date is holiday
parsing 2020-10-16
success!
1
parsing 2020-10-15
success!
2


## Data Cleansing

In [None]:
all_bs = pd.concat(bs_df).applymap(lambda x: x if x != '--' else np.nan)
all_bs = all_bs[all_bs['公司代號'] != '公司代號']
all_bs = all_bs[~all_bs['公司代號'].isnull()]
all_bs = all_bs.rename_axis(["日期","沒用"]).reset_index().set_index(["日期","公司名稱"]).iloc[:,1:]

In [None]:
all_is = pd.concat(is_df).applymap(lambda x: x if x != '--' else np.nan)
all_is = all_is[all_is['公司代號'] != '公司代號']
all_is = all_is[~all_is['公司代號'].isnull()]
all_is = all_is.rename_axis(["日期","沒用"]).reset_index().set_index(["日期","公司名稱"]).iloc[:,1:]

In [42]:
all_fr = pd.concat(testFR).applymap(lambda x: x if x != '--' else np.nan)
all_fr = all_fr[all_fr[('公司代號','公司代號')] != '公司代號']
all_fr = all_fr[~all_fr[('公司代號','公司代號')].isnull()]
all_fr = all_fr.rename_axis(["日期","沒用"]).reset_index().set_index(["日期",('公司代號','公司代號')]).iloc[:,1:]
all_fr.columns = all_fr.columns.droplevel(0)
a = all_fr.columns.to_list()
a[-1] = "財報日期"
all_fr.columns = a
all_fr.rename_axis(mapper=["日期","公司代號"],inplace = True)

In [None]:
# column names for Total Asset were not matched
# checked the sum data amount of all different  Total Asset which matched the total data amount
len(bs_df["資產合計"][~bs_df["資產合計"].isna()])+len(bs_df["資產總計"][~bs_df["資產總計"].isna()])+len(bs_df["資產總額"][~bs_df["資產總額"].isna()])
# combined as a new column
bs_df["總資產"] = bs_df["資產合計"].fillna(0.0)+bs_df["資產總計"].fillna(0.0)+bs_df["資產總額"].fillna(0.0)

# same for the Total Liabilities
len(bs_df["負債合計"][~bs_df["負債合計"].isna()])+len(bs_df["負債總計"][~bs_df["負債總計"].isna()])+len(bs_df["負債總額"][~bs_df["負債總額"].isna()])
bs_df["總負債"] = bs_df["負債合計"].fillna(0.0)+bs_df["負債總計"].fillna(0.0)+bs_df["負債總額"].fillna(0.0)

# same for the Total Equities
len(bs_df["股東權益合計"][~bs_df["股東權益合計"].isna()])+len(bs_df["股東權益總計"][~bs_df["股東權益總計"].isna()])+len(bs_df["權益合計"][~bs_df["權益合計"].isna()])+len(bs_df["權益總額"][~bs_df["權益總額"].isna()])+len(bs_df["權益總計"][~bs_df["權益總計"].isna()])
bs_df["總權益"] 

In [None]:
len(is_df["本期淨利（淨損）"][~is_df["本期淨利（淨損）"].isna()])+len(is_df["本期稅後淨利（淨損）"][~is_df["本期稅後淨利（淨損）"].isna()])