# 재무제표 크롤링


## 기업정보 불러오기

In [2]:
import pandas as pd

import warnings 
warnings.filterwarnings("ignore")


from tqdm import tqdm_notebook

In [2]:
# 고유번호 불러오기
code = pd.read_csv("고유번호.csv", index_col=0, dtype = {'corp_code':'str', 'stock_code':'str', 'modify_date':'str'})

In [3]:
# ESG 기업 목록 불러오기
corp_esg = pd.read_csv('esg기업 목록.csv', encoding='utf-8')
corp_esg['기업코드'] = corp_esg.기업코드.str.replace("A", "")
corp_esg.columns = ['corp_name', 'stock_code']

In [4]:
# 고유번호와 ESG 기업목록 합치기
codes = pd.merge(code, corp_esg, how='right', on='stock_code')

code_api = codes[codes.corp_code.notnull()]
code_api.drop('corp_name_y', axis=1, inplace=True)
code_api.rename(columns={'corp_name_x':'corp_name'}, inplace=True)

code_sel = codes[codes.corp_code.isnull()]
code_sel = code_sel[['corp_name_y', 'stock_code']]
code_sel.rename(columns={'corp_name_y':'corp_name'}, inplace=True)

In [6]:
display(code_api.head(3))

display(code_sel.head(3))

Unnamed: 0,corp_code,corp_name,stock_code,modify_date
0,159698,지역난방공사,71320,20210127
1,458234,아시아나IDT,267850,20220114
2,164706,현대약품,4310,20220114


Unnamed: 0,corp_name,stock_code
967,BNK캐피탈,801190
968,DB생명보험,818158
969,DGB생명보험,813363


## open API 사용

- 2015년까지의 데이터만 불러올 수 있음

In [5]:
import requests
from bs4 import BeautifulSoup as bs

In [6]:
# 불러올 정보 세팅
crtfc_key = '93a471927e5df8c8dd171591b9b80e38f24039fe'
corp_code = ','.join(code_api['corp_code'].values[:100])
bsns_year = '2015'
reprt_code = '11011'

url = f'https://opendart.fss.or.kr/api/fnlttMultiAcnt.json?crtfc_key={crtfc_key}&corp_code={corp_code}&bsns_year={bsns_year}&reprt_code={reprt_code}'
soup_json = requests.get(url)
json_obj = json.loads(soup_json.text)

In [251]:
# json을 데이터프레임으로 바꾸기
df = pd.DataFrame(json_obj['list'])

# 찾고자 하는 계정만 불러오기
df = df[df.account_nm.str.contains('자산총계|부채총계|당기순|자본총계')].drop_duplicates(['corp_code', 'account_nm'], keep = 'first')

# 기준 연도 정하기
year1 = int(df.thstrm_dt.str[0:4].values[0])

# 데이터 정리
df = df[['corp_code', 'account_nm', 'thstrm_amount', 'frmtrm_amount', 'bfefrmtrm_amount']]
df['thstrm_amount'] = df['thstrm_amount'].str.replace(',', '').replace("-", '')
df['frmtrm_amount'] = df['frmtrm_amount'].str.replace(',', '').replace("-", '')
df['bfefrmtrm_amount'] = df['bfefrmtrm_amount'].str.replace(',', '').replace("-", '')
df.rename(columns = {'thstrm_amount':year1, 'frmtrm_amount':year1-1, 'bfefrmtrm_amount':year1-2}, inplace=True)

In [335]:
# 연도별로 만들어주는 함수
def get_cfs_api(crtfc_key, corp_code, end_year, account_nm):
    data = pd.DataFrame()
    ac_name = '|'.join(s for s in account_nm)
    for bsns_year in range(end_year, 2014, -1):
        url = f'https://opendart.fss.or.kr/api/fnlttMultiAcnt.json?crtfc_key={crtfc_key}&corp_code={corp_code}&bsns_year={bsns_year}&reprt_code=11011'
        soup_json = requests.get(url)
        json_obj = json.loads(soup_json.text)
        
        df = pd.DataFrame(json_obj['list'])
        
        df = df[df.account_nm.str.contains(ac_name)].drop_duplicates(['corp_code','account_nm'], keep = 'first')

        df = df[['corp_code', 'account_nm', 'thstrm_amount', 'frmtrm_amount', 'bfefrmtrm_amount']]
        df['thstrm_amount'] = df['thstrm_amount'].str.replace(',', '').replace("-", '')
        df['frmtrm_amount'] = df['frmtrm_amount'].str.replace(',', '').replace("-", '')
        df['bfefrmtrm_amount'] = df['bfefrmtrm_amount'].str.replace(',', '').replace("-", '')
        df.rename(columns = {'thstrm_amount':bsns_year, 'frmtrm_amount':bsns_year-1, 'bfefrmtrm_amount':bsns_year-2}, inplace=True)

        if len(data) == 0:
            data = df
        else:
            data = pd.merge(data, df, how='outer')

    data.reset_index(drop=True, inplace=True)
    return data

In [336]:
# 코드를 100개 단위로 나누기
n = 100

codes = [code_api['corp_code'].values[i * n:(i + 1) * n] for i in range((len(code_api) - 1 + n) // n )] 

In [337]:
crtfc_key = '93a471927e5df8c8dd171591b9b80e38f24039fe'

data = pd.DataFrame()

for code in tqdm_notebook(codes):
    corp_code = ','.join(code)
    df = get_cfs_api(crtfc_key, corp_code, 2021, ['자산총계', '자본총계', '부채총계', '당기순'])
    data = pd.concat([data, df])

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))




In [338]:
data = pd.merge(code_api[['corp_code', 'corp_name']], data)

In [339]:
자산총계 = data[data['account_nm']=='자산총계']
부채총계 = data[data['account_nm']=='부채총계']
자본총계 = data[data['account_nm']=='자본총계']
당기순이익 = data[data['account_nm'].str.contains('당기순')]

In [340]:
자산총계_중복 = 자산총계[자산총계.duplicated(['corp_code'], keep=False)]

lst = pd.DataFrame()

for code in 자산총계_중복['corp_code'].unique():
    df = 자산총계_중복[자산총계_중복.corp_code == code]
    for num in range(0, len(df)):
        na_num = df[df.corp_code == code].isnull().sum(axis=1).values[0]
        df.iloc[0, -na_num:] = df.iloc[num, -na_num:]
    lst = pd.concat([lst, df.iloc[0]], axis=1)
    
자산총계1 = 자산총계.drop_duplicates(['corp_code'], keep=False)
자산총계 = pd.concat([자산총계1, lst.T])
자산총계.to_csv("자산총계.csv")

In [None]:
부채총계_중복 = 부채총계[부채총계.duplicated(['corp_code'], keep=False)]

lst = pd.DataFrame()

for code in 부채총계_중복['corp_code'].unique():
    df = 부채총계_중복[부채총계_중복.corp_code == code]
    for num in range(0, len(df)):
        na_num = df[df.corp_code == code].isnull().sum(axis=1).values[0]
        df.iloc[0, -na_num:] = df.iloc[num, -na_num:]
    lst = pd.concat([lst, df.iloc[0]], axis=1)
    
부채총계1 = 부채총계.drop_duplicates(['corp_code'], keep=False)
부채총계 = pd.concat([부채총계1, lst.T])
부채총계.to_csv("부채총계.csv")

In [341]:
자본총계_중복 = 자본총계[자본총계.duplicated(['corp_code'], keep=False)]

lst = pd.DataFrame()

for code in 자본총계_중복['corp_code'].unique():
    df = 자본총계_중복[자본총계_중복.corp_code == code]
    for num in range(0, len(df)):
        na_num = df[df.corp_code == code].isnull().sum(axis=1).values[0]
        df.iloc[0, -na_num:] = df.iloc[num, -na_num:]
    lst = pd.concat([lst, df.iloc[0]], axis=1)
    
자본총계1 = 자본총계.drop_duplicates(['corp_code'], keep=False)
자본총계 = pd.concat([자본총계1, lst.T])
자본총계.to_csv("자본총계.csv")

In [367]:
당기순이익_중복 = 당기순이익[당기순이익.duplicated(['corp_code'], keep=False)]

lst = pd.DataFrame()

for code in 당기순이익_중복['corp_code'].unique():
    df = 당기순이익_중복[당기순이익_중복.corp_code == code]
    for num in range(0, len(df)):
        na_num = df[df.corp_code == code].isnull().sum(axis=1).values[0]
        df.iloc[0, -na_num:] = df.iloc[num, -na_num:]
    lst = pd.concat([lst, df.iloc[0]], axis=1)
    
당기순이익1 = 당기순이익.drop_duplicates(['corp_code'], keep=False)
당기순이익 = pd.concat([당기순이익1, lst.T])
당기순이익.to_csv("당기순이익.csv")

## Selenium으로 2012년 이후 재무제표 불러오기


### 사업보고서 link 불러오기


In [5]:
import selenium
from selenium import webdriver

from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By

from bs4 import BeautifulSoup as bs
import re

from time import sleep

In [419]:
# 기업별 보고서 주소 불러오는 함수
def sel_CFSlink(corp_name):
    search_box = driver.find_element_by_xpath('//*[@id="textCrpNm"]')
    search_box.send_keys(corp_name)
    search_box.send_keys(Keys.RETURN)
    search_box.clear()
    
    sleep(0.5)
    
    html = driver.page_source
    soup = bs(html, "html.parser")

    table_soup = soup.find("table", class_ = 'tbList')
    a_soup = table_soup.find_all('a', {'title': '사업보고서 공시뷰어 새창'})

    links = []
    for i in a_soup:
        links.append('https://dart.fss.or.kr/'+i['href'])
        
    return links

In [424]:
import chromedriver_autoinstaller as chr
chr.install()

'C:\\Users\\min\\Anaconda3\\lib\\site-packages\\chromedriver_autoinstaller\\101\\chromedriver.exe'

In [426]:
URL = 'https://dart.fss.or.kr/dsab007/main.do'

driver = webdriver.Chrome(executable_path='C:\\Users\\min\\Anaconda3\\lib\\site-packages\\chromedriver_autoinstaller\\101\\chromedriver.exe')
driver.get(url=URL)

driver.find_element_by_xpath('//*[@id="dsab007_main"]/a').click()  # 보고서 보기 클릭
driver.find_element_by_xpath('//*[@id="date7"]').click()    # 10년 클릭
driver.find_element_by_xpath('//*[@id="li_01"]/label[1]').click()    # 정기공시 클릭
driver.find_element_by_xpath('//*[@id="publicTypeDetail_A001"]').click()   # 사업계획서 클릭

In [1]:
cfs_link2 = []
for stock in tqdm_notebook(code_api['stock_code']):
    cfs_link2.append([code_api.loc[i, 'corp_name'], sel_CFSlink(stock)])

NameError: name 'tqdm_notebook' is not defined

### 재무제표 link로 변환

In [16]:
import re
import requests
import json

from tqdm import tqdm_notebook

In [732]:
import copy
test_link = copy.deepcopy(cfs_link2)

In [733]:
for corp in tqdm_notebook(test_link):
    url = []
    for link in corp[1]:
        page_source = requests.get(link).text
        
        rcpNo = link.split('rcpNo=')[1]
        dcmNo = re.sub('[^가-힣A-Za-z0-9]', '', page_source).split('dcmNo')[1].split("node")[0]
        eleId = re.sub('[^가-힣A-Za-z0-9]', '', page_source).split('연결재무제표')
        
        try:
            if len(eleId) != 1:
                eleId = eleId[1].split('id')[1].split('node')[0]
                offset = re.sub('[^가-힣A-Za-z0-9]', '', page_source).split('연결재무제표')[1].split('offset')[1].split('node')[0]
                length = re.sub('[^가-힣A-Za-z0-9]', '', page_source).split('연결재무제표')[1].split('length')[1].split('node')[0]
            else:
                eleId = re.sub('[^가-힣A-Za-z0-9]', '', page_source).split('재무제표등')[1].split('id')[1].split('node')[0]
                offset = re.sub('[^가-힣A-Za-z0-9]', '', page_source).split('재무제표등')[1].split('offset')[1].split('node')[0]
                length = re.sub('[^가-힣A-Za-z0-9]', '', page_source).split('재무제표등')[1].split('length')[1].split('node')[0]
        except:
            continue
        html = f'https://dart.fss.or.kr//report/viewer.do?rcpNo={rcpNo}&dcmNo={dcmNo}&eleId={eleId}&offset={offset}&length={length}&dtd=dart3.xsd'
        url.append(html)
    corp.insert(2, url)

HBox(children=(FloatProgress(value=0.0, max=967.0), HTML(value='')))




In [734]:
# 리스트 저장하기
import pickle
with open('api기업링크.pkl','wb') as f:
     pickle.dump(test_link,f)

In [7]:
import pickle

# 저장한 리스트 불러오기
with open('api기업링크.pkl','rb') as f:
    cfs_link2 = pickle.load(f)

### 재무제표에서 필요한 데이터 불러오기

In [235]:
# 함수만들기
def get_data(report_list):
    data = pd.DataFrame()
    for num, link in enumerate(report_list):
        res = requests.get(link)
        soup = bs(res.text, 'html.parser')

        table1 = soup.find_all("thead")[0].find_parent('table').find_previous_siblings()[0]
        table2 = soup.find_all("thead")[0].find_parent('table')
        table3 = soup.find_all("thead")[1].find_parent('table').find_previous_siblings()[0]
        table4 = soup.find_all("thead")[1].find_parent('table')

        tr_soup = table1.find_all('tr')
        A = [list(filter(None, re.sub('[^가-힣0-9\n]', '', tr.text).split('\n'))) for tr in tr_soup]
        A = [a[0].split('기')[1][:4] for a in A  if re.match('제.*기', a[0])]
        A.insert(0, '과목')

        tr_soup = table2.find_all('tr')
        B = [list(filter(None, re.sub('[^가-힣0-9\n(]', '', tr.text).split('\n'))) for tr in tr_soup]
        
        tr_soup = table3.find_all('tr')
        C = [list(filter(None, re.sub('[^가-힣0-9\n]', '', tr.text).split('\n'))) for tr in tr_soup]
        C = [a[0].split('기')[1][:4] for a in C  if re.match('제.*기', a[0])]
        C.insert(0, '과목')

        tr_soup = table4.find_all("tr")
        D = [list(filter(None, re.sub('[^가-힣0-9\n(]', '', tr.text).split('\n'))) for tr in tr_soup]
        
        df1 = pd.DataFrame(B[1:], columns = A)
        df2 = pd.DataFrame(D[1:], columns = C)
        df = pd.concat([df1, df2])
        
        if num == 0:
            data = df
        else:
            data = pd.merge(data, df, how='outer')
    return data

In [215]:
total = pd.DataFrame()
for link in tqdm_notebook(cfs_link2):
    try:
        df2 = get_data(link[2])
        df2.insert(0, '기업명', link[0])
        total = pd.concat([total, df2], axis=0)
    except:
        pass

HBox(children=(FloatProgress(value=0.0, max=967.0), HTML(value='')))




In [216]:
total.rename(columns = {'기업명':'corp_name'}, inplace=True)
total.to_csv("셀레.csv")

In [405]:
자산총계_sel = total[total.과목 == '자산총계']
자산총계_sel = 자산총계_sel[자산총계_sel['2012'].notnull()][['corp_name', '과목', '2012', '2011']]

자산총계 = pd.read_csv("자산총계.csv", index_col = 0)
자산총계 = 자산총계.astype("str")

자산총계_sel['2012'] = 자산총계_sel['2012'].str.replace("(", '-')
자산총계_sel['2012'] = 자산총계_sel['2012'].str.replace(")", '')

자산총계_sel['2011'] = 자산총계_sel['2011'].str.replace("(", '-')
자산총계_sel['2011'] = 자산총계_sel['2011'].str.replace(")", '')

자산총계 = pd.merge(자산총계, 자산총계_sel[['corp_name', '2012', '2011']], how='left')
자산총계 = 자산총계.drop_duplicates(['corp_name'], keep='last')
자산총계.reset_index(drop=True, inplace=True)
자산총계.to_csv("자산총계_최종.csv")

In [410]:
부채총계_sel = total[total.과목 == '부채총계']
부채총계_sel = 부채총계_sel[부채총계_sel['2012'].notnull()][['corp_name', '과목', '2012', '2011']]

부채총계 = pd.read_csv("부채총계.csv", index_col = 0)
부채총계 = 부채총계.astype("str")

부채총계_sel['2012'] = 부채총계_sel['2012'].str.replace("(", '-')
부채총계_sel['2012'] = 부채총계_sel['2012'].str.replace(")", '')

부채총계_sel['2011'] = 부채총계_sel['2011'].str.replace("(", '-')
부채총계_sel['2011'] = 부채총계_sel['2011'].str.replace(")", '')

부채총계 = pd.merge(부채총계, 부채총계_sel[['corp_name', '2012', '2011']], how='left')
부채총계 = 부채총계.drop_duplicates(['corp_name'], keep='last')
부채총계.reset_index(drop=True, inplace=True)
부채총계.to_csv("부채총계_최종.csv")

In [407]:
자본총계_sel = total[total.과목 == '자본총계']
자본총계_sel = 자본총계_sel[자본총계_sel['2012'].notnull()][['corp_name', '과목', '2012', '2011']]

자본총계 = pd.read_csv("자본총계.csv", index_col = 0)
자본총계 = 자본총계.astype("str")

자본총계_sel['2012'] = 자본총계_sel['2012'].str.replace("(", '-')
자본총계_sel['2012'] = 자본총계_sel['2012'].str.replace(")", '')

자본총계_sel['2011'] = 자본총계_sel['2011'].str.replace("(", '-')
자본총계_sel['2011'] = 자본총계_sel['2011'].str.replace(")", '')

자본총계 = pd.merge(자본총계, 자본총계_sel[['corp_name', '2012', '2011']], how='left')
자본총계 = 자본총계.drop_duplicates(['corp_name'], keep='last')
자본총계.reset_index(drop=True, inplace=True)
자본총계.to_csv("자본총계_최종.csv")

In [None]:
당기순이익_sel = total[(total.과목 == '당기순이익') | (total.과목 == '당기순이익(손실)')]
당기순이익_sel = 당기순이익_sel[당기순이익_sel['2011'].notnull()][['corp_name', '과목', '2011', '2010']]
당기순이익_sel.columns = ['corp_name', '과목', '2012', '2011']

당기순이익_sel['2012'] = 당기순이익_sel['2012'].str.replace("(", '-')
당기순이익_sel['2012'] = 당기순이익_sel['2012'].str.replace(")", '')

당기순이익_sel['2011'] = 당기순이익_sel['2011'].str.replace("(", '-')
당기순이익_sel['2011'] = 당기순이익_sel['2011'].str.replace(")", '')

당기순이익 = pd.read_csv("당기순이익.csv", index_col = 0)
당기순이익 = 당기순이익.astype("str")

당기순이익 = pd.merge(당기순이익, 당기순이익_sel[['corp_name', '2012', '2011']], how='left')
당기순이익 = 당기순이익.drop_duplicates(['corp_name'], keep='first')
당기순이익.reset_index(drop=True, inplace=True)
당기순이익.to_csv("당기순이익_최종.csv")

In [421]:
자산총계 = pd.read_csv("자산총계_최종.csv", index_col=0)
부채총계 = pd.read_csv("부채총계_최종.csv", index_col=0)
자본총계 = pd.read_csv("자본총계_최종.csv", index_col=0)
당기순이익 = pd.read_csv("당기순이익_최종.csv", index_col=0)