In [2]:
# modules and functions
# 1. 전체 기업 정보 불러오기 -> 파일럿 데이터: stock 있는 기업만 따로 분류하기(parse2)
# 2. 기업들 최근 공시 불러오기 -> 최근 문서 번호 가지고 오기(parse1)
# 3. 최근 문서 불러오기 -> 파싱하기(parse2)
# 모듈

# request 모듈. argument(url, params). 특이사항: josn과 xml이 다르다. json은 josn으로 xml은 io byte으로 변환해야 한다

import requests
import xml.etree.ElementTree as ET
import zipfile
import io
from bs4 import BeautifulSoup
import lxml
import time
import tqdm.notebook as tq
import pickle
from pathos.pools import ProcessPool
import re

# api_key = 'd81e78aa719d1c1e4ec7867ef22a737ab6cbb4c7' # 어디서 받아온 거 ㅎㅎ

# api_key = 'ea372803285b0209349791434379d3fa748ae416' # 내꺼
# api_key = 'c7ed49e31def2fbd7708c8e8684631dde89cb572' # 호영님꺼
# api_key = '894583c60a20e9c904eb149616830888bfcc653d' # 요한님꺼

def init_bs4(xml_str):
    return BeautifulSoup(xml_str, "lxml")

def generate_parser(response, hook):
    """
        받아온 다트 데이터를 parsing할 generator입니다. xml을 파싱하기도 하고 json을 파싱하기도 해서 factory 패턴을 적용했습니다.
    """
    zf = zipfile.ZipFile(io.BytesIO(response.content))
    info_list = zf.infolist()
    xml_data = zf.read(hook(info_list))

    try:
        xml_text = xml_data.decode('euc-kr')
        # print("xml paring case 1")
    except UnicodeDecodeError as e:
        xml_text = xml_data.decode('utf-8')
        # print("xml paring case 2")
    except UnicodeDecodeError as e:
        # print("xml paring case #")
        xml_text = xml_data
    return init_bs4(xml_text)

def create_get_corp_xml_parser(response):
    def hook(info_list):
        return info_list[0].filename
    return generate_parser(response, hook)

def create_report_xml_parser(response, report_idx):

    def hook(info_list):
        target_doc = None
        for i in range(len(info_list)):
            filename = info_list[i].filename.split(".")[0]
            if report_idx == filename:
                target_doc = info_list[i]
                break
        if target_doc == None:
            print(target_doc, info_list)
            assert target_doc != None, "ERROR"
        return target_doc

    return generate_parser(response, hook)


# 기본 적인 다트 api 전송함수. 이 함수를 사용해서 다른 api 요청.
def request_dart(url, params):
    r = requests.get(url, params=params)
    return r

# api에 따라 파라미터 요청하는 경우가 있음. 거기에 맞춰서 요청. key와 value에 list 형태로 넣음.
def create_params(keys = None, values= None):
    params = {
        'crtfc_key': api_key,
    }

    if keys:
        assert len(keys) == len(values), "key and value lengths must be same."
        for i in range(len(keys)):
            params[keys[i]] = values[i]

    return params




# 다트 데이터 받아옴

## 다트의 전체 기업 리스트 받아오기

In [None]:

def request_dart_corps_info():
    url = 'https://opendart.fss.or.kr/api/corpCode.xml'
    params = create_params()
    res = request_dart(url, params)
    return res
    
def get_corp(only_stock = False):
    res = request_dart_corps_info()

    parser = create_get_corp_xml_parser(res)
    corps_xml = parser.find_all("list")

    corps = []
    num_stock = 0

    for l in corps_xml:
        corp_dict = {}
        stock = None
        if l.stock_code.string != " ":
            stock = l.stock_code.string
            num_stock += 1

        if ( only_stock == True and stock ) or only_stock == False:
            corp_dict["주식 코드"] = str(stock)
            corp_dict["기업 코드"] = str(l.corp_code.string)
            corp_dict["기업 이름"] = str(l.corp_name.string)
            
            corp_dict["수정 일자"] = str(l.modify_date.string)
            corps.append( corp_dict )
    return corps

In [None]:
# 기업 정보
# corps = get_corp(only_stock = True)
# len(corps)
# with open("all_corp_list.pickle", "wb") as fp:   #Pickling
  # pickle.dump(corps, fp)

## 받아온 기업 리스트에서 상장된 것만 추리기. 그리고 그 기업들에게서 최신 보고서 id을 받아온다.

1. 그러기 위해서 각 기업의 정보를 받아옴
2. 받아온 정보에서 stock code가 있으면 상장된 것
3. 그 기업들에서 보고서 정보를 다시 받아옴
4. 그 정보를 취합해서 다시 저장

In [None]:
# 기업 보고서 id 받아오기 위해서 query 날릴 파라미터 설정
def create_report_idx_params():
    keys = ['bgn_de', 'pblntf_ty']
    values = ['20210101', 'A']

    params = create_params(keys, values)
    return params

def request_dart_corps_report_index(corp):
    params = create_report_idx_params()
    
    code = corp['기업 코드']
    params['corp_code'] = code
    url = 'https://opendart.fss.or.kr/api/list.json'

    res = request_dart(url, params)
    return res

def add_report_index_to_corp(latest_doc, corp):
    act_corp = {}
    for k, v in corp.items():
        act_corp[k] = v
        act_corp['report_idx'] = latest_doc['rcept_no']
    return act_corp

def get_repot_index_single(corp:str):
    res = request_dart_corps_report_index(corp)
    res = res.json()
    if res['status'] == "000":
        latest_doc = res['list'][0]
        act_corp = add_report_index_to_corp(latest_doc, corp)
        
    else: # when the corp does not have report in range year 2021
        # print(f"{res['status']}: {res['message']}: {corp} ")
        act_corp = corp
    return act_corp

def get_report_indexes(corps):
    active_corps = []
    non_active_corps = []
    
    for i, corp in tq.tqdm(enumerate(corps)):
        try:
            corp_result = get_repot_index_single(corp)
        except Exception as e: # catch network error
            print(e)
            print(corp)

        if corp_result:
            active_corps.append( corp_result )
        else:# when the corp does not have report in range year 2021
            non_active_corps.append( corp_result )
        LIMIT = 800
        if i > 0 and i % LIMIT == 0:
            print(f"so far {len(active_corps)} has been collected...")
            print(f"{LIMIT} requests has done. sleep 60 secs...")

            time.sleep(60)
          
    return active_corps

In [None]:

# # from copr information -> get latest report id
# active_corps = get_report_index(corps)
# len(active_corps)
# with open("active_corps.pickle", "wb") as fp:   #Pickling
#   pickle.dump(active_corps, fp)

## 상장된 기업에서 기업 보고서 요청해서 xml 받아오기.
그리고 dict 처리해서 저장함.

In [None]:
# 실제로 요청하는 함수
def request_dart_report(corp):
    r = corp['report_idx']
    keys = ['rcept_no']
    values = [r]

    params = create_params(keys, values)

    url = 'https://opendart.fss.or.kr/api/document.xml'
    r = requests.get(url, params=params)  
    return r

# 각 기업 마다 보고서 요청. 
def get_report(corp):
    """corp 1개에 대해서 공시 보고서 내용 긁어서 corp dict에 저장해서 반환"""
      
    try:
        r = request_dart_report(corp)
        soup = create_report_xml_parser(r, corp['report_idx'])
        corp['original_xml'] = str(soup)
        remove_table(soup)
        # contents = parse_content_from_xml(soup)
        # corp['contents'] = contents
    except Exception as e:
        print(e)
        print(corp)
        corp['original_xml'] = None
        # corp['contents'] = None
    return corp
    
# 병렬화 하기 위해서 1개씩 들어오는지 확인
def parallel_get_doc(active_corps):
    assert isinstance(active_corps, dict)
    active_corps = get_report(active_corps)
    return active_corps

In [None]:
# from latest report id -> get xml and parse them. 
# without multiprocess, it takes about 1 hour...
# pool = ProcessPool()
# LIMIT = 900
# content_orig_total = []
# for i in tq.tqdm(range(0, len(active_corps), LIMIT)):
#     print(f"from {i} to {i+LIMIT}")
#     content_orig = pool.map(parallel_get_doc, active_corps[i:i+LIMIT])
#     print(len(content_orig))
#     content_orig_total.extend(content_orig)
#     time.sleep(60)
# with open("content_orig_total.pickle", "wb") as fp:   #Pickling
    # pickle.dump(content_orig_total, fp)