In [3]:
import pandas as pd
import re
from bs4 import BeautifulSoup
import requests
from urllib.request import urlopen
import time

## Dart 전자공시의 사업보고서 중  '이사의 경영진단 및 분석의견' 크롤링

해당 프로젝트의 데이터는 kospi200기업들의 10년치 사업보고서 중 텍스트 분석을 하기 위해 '이사의 경영진단 및 분석의견' 파트만을 크롤링하여 만들었습니다. 전자공시 사이트에서 문서검색에 도움을 주는 api를 제공하기 때문에 input으로 필요한 기업의 종목코드를 row로 가지고 있는 데이터프레임을 넣을 시 dart에서 제공하는 사업보고서에 대한 정보와 그 내용을 데이터프레임 형식으로 산출하는 함수를 만들었습니다. 

사업보고서의 url은 일관적으로 'https://dart.fss.or.kr/dsaf001/main.do?rcpNo=' + rcp_no 의 형식을 취하기 때문에 발급받은 개인 api 키를 이용하여 해당기업의 rcp_no를 가져오고 , html 태그를 검색해 최종적으로 '이사의 경영진단 및 분석의견' 파트의 url 만  가져오는 식으로 만들었고, 다시 본문 내용을 request하여 최종적으로 텍스트를 저장합니다. 이 문서에서는 전처리가 포함되지않은 크롤링 작업만 하고 있습니다.

api는 json과 xml의 형식을 제공하며 json으로 크롤링했을 때 실제 남은 사업보고서가 존재하지만 6-7년치 데이터만 크롤링되는 경우가 있었습니다. 따라서 코드에는 존재하지 않지만 xml과 json으로 모두 request하고 비교하여 채워넣는 방법을 택했습니다.

Dart에서 1분에 100건 이상 검색을 할 시 사이트를 24시간동안 차단하고 있습니다. 이를 피하기 위해 user-agent를 추가하고 time sleep을 줘봤지만 차단당하는 경우가 많아서 파티션을 나눠서 크롤링하였습니다. 

아래는 예시입니다.

In [4]:
df = pd.read_csv('./data/crawling/preprocessed/df.csv',index_col=0)
df.head()

FileNotFoundError: [Errno 2] File ./data/crawling/preprocessed/df.csv does not exist: './data/crawling/preprocessed/df.csv'

In [5]:
api_code = 'api_code' # 개인 api 키를 발급받아서 사용
start_date = '20100101'
end_date = '20191231'
crp_code = ''
page_set = '100'
url = 'http://dart.fss.or.kr/api/search.xml?auth='+api_code+'&crp_cd='+crp_code+'&start_dt='+start_date+'&bsn_tp=A001&page_set=' + page_set

In [4]:
def get_df(crp_code): #검색된 회사의 사업보고서 내용을 데이터프레임으로 저장
  
    url = 'http://dart.fss.or.kr/api/search.xml?auth='+api_code+'&crp_cd='+crp_code+'&start_dt='+start_date+'&bsn_tp=A001&page_set=' + page_set
    headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit 537.36 (KHTML, like Gecko) Chrome",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8"}
    
    req = requests.get(url,headers = headers)
    html = req.text
    xmlsoup = BeautifulSoup(html, 'html.parser')
    te = xmlsoup.findAll('list')
    data = pd.DataFrame()
    
    for t in te:
                temp = pd.DataFrame(([[t.crp_cls.string, t.crp_nm.string, t.crp_cd.string, t.rpt_nm.string,
                                  t.rcp_no.string, t.flr_nm.string, t.rcp_dt.string, t.rmk.string]]),
                                  columns = ["crp_cls", "crp_nm", "crp_cd", "rpt_nm", "rcp_no", "flr_nm", "rcp_dt", "rmk"])
                data = pd.concat([data,temp])
    data.reset_index(drop=True,inplace=True)
    
    return(data)


In [5]:
def get_df2(crp_code): #검색된 회사의 사업보고서 내용을 데이터프레임으로 저장
  
    url = 'http://dart.fss.or.kr/api/search.xml?auth='+api_code+'&crp_cd='+crp_code+'&start_dt='+start_date+'&bsn_tp=A001&page_set=' + page_set
    xmlsoup = BeautifulSoup(urlopen(url).read(), 'html.parser')
    te = xmlsoup.findAll('list')
    data = pd.DataFrame()
    
    for t in te:
                temp = pd.DataFrame(([[t.crp_cls.string, t.crp_nm.string, t.crp_cd.string, t.rpt_nm.string,
                                  t.rcp_no.string, t.flr_nm.string, t.rcp_dt.string, t.rmk.string]]),
                                  columns = ["crp_cls", "crp_nm", "crp_cd", "rpt_nm", "rcp_no", "flr_nm", "rcp_dt", "rmk"])
                data = pd.concat([data,temp])
    data.reset_index(drop=True,inplace=True)
    
    return(data)


In [6]:
def get_url(data): #dataframe을 넣었을 때 해당 사업보고서의 주소를 저장
    rcp_ls=[]
    for row in data['rcp_no']:
        rcp_ls.append(row)
        urls = []
        for i in rcp_ls:
            urls.append('https://dart.fss.or.kr/dsaf001/main.do?rcpNo='+i)
    
    return urls

In [7]:
def final_url(url): #사업보고서 주소를 넣으면 '이사의 경영진단 및 분석' 주소 반환
    try:
        req2 = requests.get(url)
        html2 = req2.text
        soup2 = BeautifulSoup(html2, 'html.parser')
        body = str(soup2.find('head'))

        a = re.search('이사의 경영진단 및 분석의견',body).span()
        b= re.search(r'viewDoc(.*);',body[a[0]:]).group()
        list = b[8:-2].split(',')
        list = [i[1:-1] for i in list]
        list[1] = list[1][1:] #드러움
        list[2] = list[2][1:]
        list[3] = list[3][1:]
        list[4] = list[4][1:]
        list[5] = list[5][1:]    

        url_final = 'http://dart.fss.or.kr/report/viewer.do?rcpNo='+ list[0] + '&dcmNo='+ list[1]+'&eleId=' +list[2] + '&offset=' + list[3] + '&length=' + list[4] + '&dtd=dart3.xsd'
        return(url_final)
    
    except AttributeError as e:
        print(e)

In [8]:
def extracting_text(url): #경영진단 url 넣었을 시 해당 본문 텍스트 반환
    req3 = requests.get(url)
    html3 = req3.text
    soup3 = BeautifulSoup(html3,'html.parser')
    tables = []
    ka = soup3.find_all('p') #table을 제외한 본문 text 부분 찾아서 리스트안에저장
    for k in ka:
        tables.append(k.get_text())
    tables = tables[4:] #개요부분 삭제
    table = ''.join(tables) #리스트를 string으로 바꾼다
    table = table.replace('\xa0','') #정리
    table = table.replace('\n','')
    # table = re.sub('[[a-zA-Z]',"",table) #전처리
    # table = re.sub('[-=+,#/\?:^$@*\"※~&%ㆍ!』\\‘|\(\)\[\]\<\>`\'…》]','',table)
    #table = re.sub('[0-9]+','num',table) #숫자를 special token num으로 치환
    # pattern = re.compile(r'\s+') #중복띄어쓰기 제거
    # table = re.sub(pattern,' ',table)
    return table

In [9]:
def final(crp_code): #기업의 종목 코드 넣었을 시 근 10개년 종목내용을 저장한 dataframe 반환
    try:
        df = get_df(crp_code)
        urls = get_url(df)
        strs=[]
        for i in urls:
            strs.append(extracting_text(final_url(i)))
        str_series = pd.Series(strs)
        df['str']= str_series
        return(df)
    
    except:
        pass

In [9]:
def make_df(df): #종목코드가 들어간 리스트가 있는 데이터프레임을 넣었을 시 str이 모두 추출된 데이터프레임 반환
    # start_vect=time.time()                                                                                                                                                                 
    ls = []
    body = pd.DataFrame()
    
    try:
        
        for i in df['Symbol']:
            ls.append(str(i).zfill(6))
        for process,i in enumerate(ls):
            
            print("Process : {} | Total : {}".format(process+1,len(ls)),end = '\r')
            temp = final(i)
            body = pd.concat([body,temp],ignore_index=True)
            
            rand_value = randint(1, MAX_SLEEP_TIME)
           
  
        
    except AttributeError as e:
        print(e)
    
    # print("training Runtime: %0.2f Minutes"%((time.time() - start_vect)/60))
    return(body)



In [10]:
a = 1
b = 2