Create a function to measure a financial health of a company 

In [1]:
import OpenDartReader 
import json

with open('../../config/config.json', 'r') as json_file:
    config = json.load(json_file)
    dart_api_1 = config['dart_api_1']
    dart_api_2 = config['dart_api_2']
    dart_api_3 = config['dart_api_3']
    dart_apis = [dart_api_1, dart_api_2, dart_api_3]


import pandas as pd
import datetime
import sys, os
sys.path.append(os.path.dirname(os.getcwd()))  
from tools.dictionary import ACCOUNT_NAME_DICTIONARY, BS_ACCOUNTS, IS_ACCOUNTS

def sj_div(account_nm):
    if account_nm in BS_ACCOUNTS:
        return 'BS'
    elif account_nm in IS_ACCOUNTS:
        return 'IS'
    else:
        raise Exception('No BS, IS account exception')

def post_process(rec): 
    if len(rec) > 0:
        # in some cases, certain data is '-'
        rec.replace('-', pd.NA, inplace=True)
        # add more logic for post_process if needed
        # ...
        return rec
    else: 
        return rec

def collect_financial_reports(dart, code, duration=None): # duration as years
    # find initial report
    year = datetime.datetime.now().year
    month = datetime.datetime.now().month
    quarter = month//3 + 1

    reprt_code_dict = {1: '11013', 2: '11012', 3: '11014', 4: '11011'}

    def get_prev_quarter(yr, qtr): # qtr in [1, 2, 3, 4]
        if qtr == 1: 
            return yr-1, 4 
        else: 
            return yr, qtr-1 

    def get_prev_quarter_except_FY(yr, qtr): # qtr in [1, 2, 3, 4]
        if qtr == 1:
            return yr-1, 3 
        else: 
            return yr, qtr-1 

    y = year
    q = quarter
    ind = 0

    while True:
        ind += 1
        y, q = get_prev_quarter(y, q)
        rec = dart.finstate(code, y, reprt_code=reprt_code_dict[q])
        if len(rec) > 0:
            break
        if ind == 8: # search initial data within last 8 quarters
            # raise Exception('Data not available for code:', code) 
            return pd.DataFrame(), 'Data Not Available'

    record = pd.DataFrame(columns=['stock_code', 'fs_div', 'sj_div', 'account_nm'])
    accounts = BS_ACCOUNTS + IS_ACCOUNTS

    for i in range(len(accounts)):
        record.loc[i] = [code, 'CFS', sj_div(accounts[i]), accounts[i]]  
    for i in range(len(accounts)):
        record.loc[i+len(accounts)] = [code, 'OFS', sj_div(accounts[i]), accounts[i]]  

    if rec['currency'][0] != 'KRW':
        # raise Exception('Currency is not in KRW for code: ', code)
        return pd.DataFrame(), 'Currency Not in KRW'

    y_init = y
    q_init = q

    # data collection method
    # Step 1. collect annual data (and for the last report, collect y-1 and y-2 data)
    # Step 2. collect quarterly data (and if data for the same quarter in the last year), and add 4Q quarterly data column if possible (i.e., if full year data is available) 
    #     note that for BS items, no need to collect previous year data in quarterly data collecting (not provided in Dart)

    # Step 1:
    if q_init != 4: 
        y = y_init-1
    rec = dart.finstate(code, y, reprt_code=reprt_code_dict[4])
    rec = post_process(rec)

    if duration == None or duration <= 0:
        duration = 10000 # a large number enough

    ind = 0
    while len(rec)>0:
        data_term = 'FY'+str(y)
        # print(data_term)
        rec[data_term] = rec['thstrm_amount'].str.replace(',','').astype('Int64')
        record = pd.merge(record, rec[['stock_code', 'fs_div', 'account_nm', data_term]], how='outer', left_on=['stock_code', 'fs_div', 'account_nm'], right_on=['stock_code', 'fs_div', 'account_nm'])

        ind += 1
        if ind == duration:
            break

        # check if there is data in prev year 
        prev_year_rec = dart.finstate(code, y-1, reprt_code=reprt_code_dict[4]) 
        prev_year_rec = post_process(prev_year_rec)
        if len(prev_year_rec) == 0: 
            prev_term = 'FY'+str(y-1)
            pprev_term = 'FY'+str(y-2)
            rec[prev_term] = rec['frmtrm_amount'].str.replace(',','').astype('Int64')
            rec[pprev_term] = rec['bfefrmtrm_amount'].str.replace(',','').astype('Int64')
            record = pd.merge(record, rec[['stock_code', 'fs_div', 'account_nm', prev_term, pprev_term]], how='outer', left_on=['stock_code', 'fs_div', 'account_nm'], right_on=['stock_code', 'fs_div', 'account_nm'])
            break
        else: 
            y = y-1
            rec = prev_year_rec

    # Step 2:
    if q_init == 4: 
        y = y_init
        q = 3
    else: 
        y = y_init
        q = q_init
    rec = dart.finstate(code, y, reprt_code=reprt_code_dict[q])
    rec = post_process(rec)

    ind = 0
    while len(rec)>0:
        data_term = str(y)+'_'+str(q)+'Q'
        # print(data_term)
        rec[data_term] = rec['thstrm_amount'].str.replace(',','').astype('Int64')
        record = pd.merge(record, rec[['stock_code', 'fs_div', 'account_nm', data_term]], how='outer', left_on=['stock_code', 'fs_div', 'account_nm'], right_on=['stock_code', 'fs_div', 'account_nm'])

        # adding 4Q data if 'thstr_add_amount' is available
        # NOTE:
        # some sj_div items are neither IS or BS, and None... 
        # which leaves, 4Q data as sum of 1-3Q... 
        # you may consider correct this in the future
        if q == 3 and 'thstrm_add_amount' in rec.columns:
            if 'FY'+str(y) in record.columns:
                q4_term = str(y)+'_4Q'
                rec.loc[rec['sj_div']=='IS', q4_term] = rec.loc[rec['sj_div']=='IS','thstrm_add_amount'].str.replace(',','').astype('Int64')
                record = pd.merge(record, rec[['stock_code', 'fs_div', 'account_nm', q4_term]], how='outer', left_on=['stock_code', 'fs_div', 'account_nm'], right_on=['stock_code', 'fs_div', 'account_nm'])
                record.loc[record['sj_div']=='IS', q4_term] = record.loc[record['sj_div']=='IS','FY'+str(y)]-record.loc[record['sj_div']=='IS', q4_term]
                record.loc[record['sj_div']=='BS', q4_term] = record.loc[record['sj_div']=='BS','FY'+str(y)]

        ind += 1
        if ind == duration*3:
            break

        # check if there is data in the prev year, the same quarter
        prev_year_rec = dart.finstate(code, y-1, reprt_code=reprt_code_dict[q]) 
        prev_year_rec = post_process(prev_year_rec)
        if len(prev_year_rec) == 0 and 'frmtrm_amount' in rec.columns: 
            prev_term = str(y-1)+'_'+str(q)+'Q'
            # add only IS data
            rec.loc[rec['sj_div']=='IS', prev_term] = rec.loc[rec['sj_div']=='IS', 'frmtrm_amount'].str.replace(',','').astype('Int64')
            record = pd.merge(record, rec[['stock_code', 'fs_div', 'account_nm', prev_term]], how='outer', left_on=['stock_code', 'fs_div', 'account_nm'], right_on=['stock_code', 'fs_div', 'account_nm'])

            if q == 3 and 'frmtrm_add_amount' in rec.columns:
                if 'FY'+str(y-1) in record.columns:
                    last_q4_term = str(y-1)+'_4Q'
                    rec.loc[rec['sj_div']=='IS', last_q4_term] = rec.loc[rec['sj_div']=='IS','frmtrm_add_amount'].str.replace(',','').astype('Int64')
                    record = pd.merge(record, rec[['stock_code', 'fs_div', 'account_nm', last_q4_term]], how='outer', left_on=['stock_code', 'fs_div', 'account_nm'], right_on=['stock_code', 'fs_div', 'account_nm'])
                    record.loc[record['sj_div']=='IS', last_q4_term] = record.loc[record['sj_div']=='IS','FY'+str(y-1)]-record.loc[record['sj_div']=='IS', last_q4_term]
                    record.loc[record['sj_div']=='BS', last_q4_term] = record.loc[record['sj_div']=='BS','FY'+str(y-1)]

        y, q = get_prev_quarter_except_FY(y, q)
        rec = dart.finstate(code, y, reprt_code=reprt_code_dict[q])
        rec = post_process(rec)
    
    # post process
    record.rename(columns={'stock_code':'code', }, inplace=True)
    record['account'] = record['account_nm'].apply(lambda x: ACCOUNT_NAME_DICTIONARY[x] if x in ACCOUNT_NAME_DICTIONARY.keys() else x)
    static_columns = ['code', 'fs_div', 'sj_div', 'account_nm', 'account']
    record = pd.concat([record[static_columns], record[record.columns.difference(static_columns)].sort_index(axis=1)], axis=1)

    message = 'success'
    return record, message

In [2]:
import time

df_krx = pd.read_feather('data/df_krx.feather')
sector = df_krx.index

dart_ind = 0
dart = OpenDartReader(dart_apis[dart_ind])

financial_reports = pd.DataFrame()
save_file_name = 'data/financial_reports_upto_'+str(datetime.date.today())+'.feather'
log_fie = 'data/data_collection_log.txt'

with open(log_fie, 'w') as f:
    f.write('Financial data collection log\n')

current_target_indicator = 0
error_trial = 0
error_trial_limit = 10
sleep_time = 5

ind = 0
# duration = None
duration = 1
while ind < 0:# while True:
    ind += 1
    try:
        code = sector[current_target_indicator]
        current_progress = str(datetime.datetime.now()) + ', no: ' + str(current_target_indicator) + ', code ' + code+' in process / '+df_krx['Name'][code]
        print(current_progress)
        with open(log_fie, 'a') as f:
            f.write(current_progress+'\n')

        if dart.find_corp_code(code) == None: 
            current_progress = '----> no: ' + str(current_target_indicator) + ', code ' + code+' not in corp_code, and therefore data not available / '+df_krx['Name'][code]
            print(current_progress)
            with open(log_fie, 'a') as f:
                f.write(current_progress+'\n')
            current_target_indicator += 1
            continue
    
        record, message = collect_financial_reports(dart, code, duration)
        if message == 'success':
            financial_reports = pd.concat([financial_reports, record], ignore_index=True)
            financial_reports.to_feather(save_file_name)
        elif message == 'Data Not Available':
            current_progress = '----> no: ' + str(current_target_indicator) + ', code ' + code+' data not available, could be a financial institution / '+df_krx['Name'][code]
            print(current_progress)
            with open(log_fie, 'a') as f:
                f.write(current_progress+'\n')
        elif message == 'Currency Not in KRW':
            current_progress = '----> no: ' + str(current_target_indicator) + ', code ' + code+' currency not in KRW, skipping / '+df_krx['Name'][code]
            print(current_progress)
            with open(log_fie, 'a') as f:
                f.write(current_progress+'\n')
        else:
            raise Exception('ERROR in execution loop')

        time.sleep(sleep_time)
        current_target_indicator += 1
        error_trial = 0 # reset

    except Exception as e:
        if error_trial < error_trial_limit:
            error_trial += 1
            dart_ind += 1
            dart = OpenDartReader(dart_apis[dart_ind%3])

        else:
            raise Exception('ERROR TRIAL LIMIT REACHED - Entire Process Halted')
            # break

        current_progress = '----> no: ' + str(current_target_indicator) + ', code ' + code+' unknown exception; process suspended and to be re-tried / '+df_krx['Name'][code]
        print(current_progress)
        print(e)
        with open(log_fie, 'a') as f:
            f.write(current_progress+'\n')

        time.sleep(sleep_time*error_trial)

# display(financial_reports)

In [None]:
import pandas as pd
df_krx = pd.read_feather('data/df_krx.feather')
data = pd.read_feather('data/financial_reports_upto_2023-11-01.feather')
sector = df_krx.index
display(data)

In [4]:
import datetime
today = datetime.datetime.today().strftime('%Y-%m-%d')
yes = (datetime.datetime.today() - datetime.timedelta(days = 7)).strftime('%Y-%m-%d')

dart_ind = 0
dart = OpenDartReader(dart_apis[dart_ind])
# 2022-01-01 ~ 2022-01-10 모든 회사의 모든 공시목록 (3,139 건)
# dart.list(start='2022-01-01', end='2022-01-10')

# 2022-01-01 ~ 2022-01-10 모든 회사의 모든 공시목록 (정정된 공시포함) (3,587 건)
# dart.list(start='2022-01-01', end='2022-01-10', final=False)

# 2022-01-01~2022-03-30 모든 회사의 정기보고서 (corp를 특정 하지 않으면 최대 3개월) (2,352 건)
print(today)
print(yes)
ls = dart.list(start=today, end=today, kind='A', final=False)
display(ls)
a = ls.loc[ls['report_nm'].str.contains('기재정정')]['stock_code'].values
print(a)
b = ls.loc[~ls['report_nm'].str.contains('기재정정')]['stock_code'].values
print(b)
financial_reports = pd.DataFrame()
for i in a: 
    record, message = collect_financial_reports(dart, i, duration=None)
    if message == 'success':
        financial_reports = pd.concat([financial_reports, record], ignore_index=True)
display(financial_reports)

2023-11-02
2023-10-26


Unnamed: 0,corp_code,corp_name,stock_code,corp_cls,report_nm,rcept_no,flr_nm,rcept_dt,rm
0,370200,와이오엠,66430,K,[기재정정]반기보고서 (2023.06),20231102000039,와이오엠,20231102,
1,1207716,앱코,129890,K,분기보고서 (2023.09),20231102000033,앱코,20231102,


['066430']
['129890']


Unnamed: 0,code,fs_div,sj_div,account_nm,account,2015_1Q,2015_2Q,2015_3Q,2015_4Q,2016_1Q,...,FY2013,FY2014,FY2015,FY2016,FY2017,FY2018,FY2019,FY2020,FY2021,FY2022
0,66430,CFS,BS,유동자산,liquid_assets,,,,12949023591.0,,...,989778995.0,10818892819.0,12949023591.0,12185872816.0,,,,,,
1,66430,CFS,BS,비유동자산,illiquid_assets,,,,12748666749.0,,...,6310669968.0,12337979428.0,12748666749.0,2557144675.0,,,,,,
2,66430,CFS,BS,자산총계,assets,,,,25697690340.0,,...,7300448963.0,23156872247.0,25697690340.0,14743017491.0,,,,,,
3,66430,CFS,BS,유동부채,liquid_debts,,,,9650957726.0,,...,2492601637.0,14488009165.0,9650957726.0,5495299819.0,,,,,,
4,66430,CFS,BS,비유동부채,illiquid_debts,,,,1786182439.0,,...,426763898.0,2453029576.0,1786182439.0,19804360.0,,,,,,
5,66430,CFS,BS,부채총계,debts,,,,11437140165.0,,...,2919365535.0,16941038741.0,11437140165.0,5515104179.0,,,,,,
6,66430,CFS,BS,자본금,capital_stock,,,,13069303000.0,,...,11902589500.0,6454602000.0,13069303000.0,15689445500.0,,,,,,
7,66430,CFS,BS,이익잉여금,retained_earnings,,,,-21780709678.0,,...,-15557851135.0,-15123101389.0,-21780709678.0,-32303721404.0,,,,,,
8,66430,CFS,BS,자본총계,equity,,,,14260550175.0,,...,4381083428.0,6215833506.0,14260550175.0,9227913312.0,,,,,,
9,66430,CFS,IS,매출액,revenue,,2144278760.0,340607054.0,3510695959.0,,...,5888243057.0,1315118478.0,6725464600.0,14579405827.0,,,,,,


In [14]:
financial_reports.columns
ls = dart.list('373220', kind="A")
# ls = dart.list('373220')
# display(ls)

# finding index... 
financial_reports.loc[(financial_reports['code']=='066430')&(financial_reports['fs_div'] == 'CFS')].index


Index([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 27], dtype='int64')