In [None]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import time
import re
import random
import tqdm
from tqdm.contrib import tzip

In [None]:
# 전처리를 위한 함수 구현
def replacer(text, input):
    for garbage in input:
        text = text.replace(garbage, "")
    return text

In [None]:
class KIND_REPORTS:
    __kind_url = "https://kind.krx.co.kr/disclosure/details.do"
    __kind_header = {
        'authority': 'kind.krx.co.kr',
        'method': 'POST',
        'path': '/disclosure/details.do',
        'scheme': 'https',
        'accept': 'text/html, */*; q=0.01',
        'accept-encoding': 'gzip, deflate, br',
        'accept-language': 'ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7',
        'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
        'origin': 'https://kind.krx.co.kr',
        'referer': 'https://kind.krx.co.kr/disclosure/details.do?method=searchDetailsMain',
        'sec-ch-ua': """"Google Chrome";v="105", "Not)A;Brand";v="8", "Chromium";v="105""""",
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': "Windows",
        'sec-fetch-dest': 'empty',
        'sec-fetch-mode': 'cors',
        'sec-fetch-site': 'same-origin',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
        'x-requested-with': 'XMLHttpRequest'
    }
    __kind_params ={
        'method': 'searchDetailsSub',
        'currentPageSize': '100',
        'pageIndex': '1',
        'orderMode': '1',
        'orderStat': 'D',
        'forward': 'details_sub',
        'disclosureType01': '',
        'disclosureType02': '',
        'disclosureType03': '',
        'disclosureType04': '',
        'disclosureType05': '',
        'disclosureType06': '',
        'disclosureType07': '',
        'disclosureType08': '',
        'disclosureType09': '',
        'disclosureType10': '',
        'disclosureType11': '',
        'disclosureType13': '',
        'disclosureType14': '',
        'disclosureType20': '',
        'pDisclosureType01': '',
        'pDisclosureType02': '',
        'pDisclosureType03': '',
        'pDisclosureType04': '',
        'pDisclosureType05': '',
        'pDisclosureType06': '',
        'pDisclosureType07': '',
        'pDisclosureType08': '',
        'pDisclosureType09': '',
        'pDisclosureType10': '',
        'pDisclosureType11': '',
        'pDisclosureType13': '',
        'pDisclosureType14': '',
        'pDisclosureType20': '',
        'searchCodeType': '',
        'repIsuSrtCd': '',
        'allRepIsuSrtCd': '',
        'oldSearchCorpName': '',
        'disclosureType': '',
        'disTypevalue': '',
        'reportNm': '',
        'reportCd': '',
        'searchCorpName': '',
        'business': '',
        'marketType': '',
        'settlementMonth': '',
        'securities': '',
        'submitOblgNm': '',
        'enterprise': '',
        'fromDate': '2',
        'toDate': '',
        'reportNmTemp': '',
        'reportNmPop': '',
        'bfrDsclsType': 'on'
    }

    def __init__(self):
        self.__sleep_min = 0.2
        self.__sleep_max = 0.5
        self.report_list = []
        self.report_content = []
        pass
    
    def __isResultExist(self, what_result):
        if len(what_result) != 0:
            print("Warning! Previous "+ str(what_result)+" remains")
            answer = input("Do you want to continue and replace the result? [y/n] :")
            if answer == 'y':
                pass
            else:
                raise Exception("Process closed")
        else:
            pass

    def set_sleep(self, min=0.2, max=0.5):
        self.__sleep_min = min
        self.__sleep_max = max
    
    def get_list(self, report, start, end, market_type, submit_by=""):
        '''
        marker_type = 1 : 코스피, 2 코스닥 / 검색어는 report에 입력
        '''
        self.__isResultExist(self.report_list)
        self.market_type = market_type
        params = self.__kind_params   # __kind_params 인수를 복제해서 사용
        params['reportNm'] = report
        params['fromDate'] = start
        params['toDate'] = end
        params['marketType'] = market_type
        params['reportNmTemp'] = report

        if submit_by == 'krx':
            submit_by = "유가증권시장본부"
        params['submitOblgNm'] = submit_by
        res = requests.post(self.__kind_url, headers=self.__kind_header, data=params) # 쿼리 날려요
        bs = BeautifulSoup(res.text, 'html.parser')

        end_page_loc = str(bs).find('</strong>/')
        page_length = int(str(bs)[end_page_loc+len('</strong>/'):end_page_loc+str(bs)[end_page_loc:].find('\xa0')])

        report_date = []
        company_name = []
        report_code = []
        report_name = []
        isCorrection = []

        for i in tqdm.tqdm(range(page_length)):
            time.sleep(random.uniform(self.__sleep_min, self.__sleep_max))
            params['pageIndex'] = str(i+1)
            res = requests.post(self.__kind_url, headers=self.__kind_header, data=params) # 쿼리 날려요
            bs = BeautifulSoup(res.text, 'html.parser')

            tags = bs.find_all("a")

            for tag in tags:
                if tag.get('href') == "#companysum":
                    company_name.append(tag.get('title'))

                elif tag.get('href') == "#viewer":
                    tmp_code = re.findall(r'\d+', str(tag.get('onclick')))[0]
                    report_code.append(tmp_code)
                    report_date.append(tmp_code[:4]+'-'+tmp_code[4:6]+'-'+tmp_code[6:8])                
                    report_name.append(tag.get('title'))

                    if '정정' in str(tag):
                        isCorrection.append('YES')
                    else:
                        isCorrection.append('NO')

        self.report_list = pd.DataFrame([report_date,company_name,report_code,report_name,isCorrection]).T
        self.report_list.columns = ['공시일','회사명','코드','보고서명','정정신고']
        # 자회사인 경우는 drop
        self.report_list = self.report_list.loc[~self.report_list["보고서명"].str.contains("자회사"), :].reset_index(drop=True)

        #if report == '상호변경':
        #    self.report_list = self.report_list.loc[(self.report_list['보고서명'] == '상호변경안내') | (self.report_list['보고서명'] == '변경상장(상호변경)')]
        #    self.report_list.reset_index(drop=True)

        # 회사명이 None인 경우를 해결해보자
        if self.report_list['회사명'].isnull().sum() >= 1:
            df_null = self.report_list.loc[self.report_list['회사명'].isnull()]
            index_list = df_null.index
            date_list = df_null['공시일'].values
            report_l = df_null['보고서명'].values

            find_name = []
            for date, r in zip(date_list, report_l):
                name_tag = self.__find_none(report = r, start=date,end = date, market_type=market_type)
                find_name.append(name_tag)
            
            for indx, name in zip(index_list, find_name):
                self.report_list['회사명'].loc[indx] = name
        print()
        print('Jobs Done')

    def __engine_read_report(self, doc):
        url = "https://kind.krx.co.kr/common/disclsviewer.do?method=search&acptno=" + doc + "&docno=&viewerhost=&viewerport="
        res = requests.get(url) # 쿼리 날려요
        bs = BeautifulSoup(res.text, 'html.parser')

        # 기업공시코드를 찾는다
        h1_tag = bs.find_all("h1")[0]
        company_code = re.findall(r"\d{6,6}", str(h1_tag))[0]

        # url을 찾는다
        option_tag = bs.find_all("option")
        select_str = ''

        for tag in option_tag:
            if str(tag).startswith('<option selected="selected"'):
                select_str = str(tag)
                if "[정정]" in select_str:
                    return ('No','No','No', 'No') #정정공시일 경우에는 읽지 않는다(기존공시의 부수로 읽을 거기 때문에)
                else:
                    new_doc = re.findall(r'\d{6,}', str(option_tag))
                    doc_code = None  # 현재 읽은 공시의 코드를 리턴한다
                    url = "https://kind.krx.co.kr/common/disclsviewer.do?method=searchContents&docNo=" + str(new_doc[0]) #최초의 공시로 쿼리
                    doc_code = new_doc[0]
                    after_list = new_doc[1:] # 쿼리를 날린 0번째를 제외한 나머지를

                    # 횡령오류 수정하기 위해서 추가함 :  정정이 없는데 있는 걸로 인식함 -> 20120906000228(셀바스AI) 20140310001866(디지텍시스템) 20190730001174(바이오빌) 20200721001091(코오롱티슈진)
                    if ("20140310001866" in after_list) or ("20120906000228" in after_list) or  ("20190730001174" in after_list) or ("20200721001091" in after_list ) or ("20210409001774" in after_list):
                        after_list = ''  

                    res = requests.get(url) # 쿼리 날려요
                    bs = str(BeautifulSoup(res.text, 'html.parser'))
                    inner_url = re.findall('https://[a-z0-9/.]+\.htm', bs)[0] #v7코드개선
                    res = requests.get(inner_url) # 쿼리 날려요
                    html = res.content.decode('utf-8','replace')
                    bs = BeautifulSoup(html, 'html.parser')

                    text_only_list = []

                    if self.read_what == 'sales':
                        text_only_list.append([tag.get_text() for tag in bs.find_all('span')])
                    else:
                        if '▶ 업종코드 :' not in str(bs):
                            tags = bs.find_all('td')
                            for tag in tags:
                                text = tag.get_text()
                                text = text.replace('\r\n', ', ')
                                text = replacer(text, [", \n","\r",'\n'])
                                text_only_list.append(text)
                        elif '▶ 업종코드 :' in str(bs): # 유가증권시장본부가 공시하는 '변경상장(상호변경)'
                            text = replacer(bs.get_text(), [", \n","\r",'\n','   → ','- '])
                            text = text.split('    ')

                            if len(text) <=4:
                                return "No","No","No","No"
                            
                            text_only_list = text_only_list + replacer(text[3], ['(영문명',')','보통주','우선주']).split(': ')
                            text_only_list = text_only_list + replacer(text[5], ['(영문명',')','보통주','우선주']).split(': ')
                    if len(after_list) <=0:
                        after_list = '없음'
                        if self.read_what =='embezzle':
                            after_list=''
                    return text_only_list, after_list, doc_code, company_code # after_list는 이후 정정된 공시코드 리스트임 / doc_code는 지금 읽은 공시의 코드
    
    # v7 추가
    def __doc_filter(self, doc_list):    # 필터 기업코드 찾는 걸로 손봐야한다
        '''정정된 공시를 제외한 첫번째 공시만 읽어온다'''
        self.__isResultExist(self.report_content)
        self.read_what = 'filter'

        announce_date = doc_list['공시일'].values
        name_ar = doc_list['회사명'].values
        code_ar = doc_list['코드'].values
        report_list = doc_list['보고서명'].values
        rereport_list = doc_list['정정신고'].values

        preprocessed_data = []

        for code, name, date, report, rereport in tzip(code_ar, name_ar, announce_date, report_list, rereport_list):
            time.sleep(random.uniform(self.__sleep_min, self.__sleep_max))
            url = "https://kind.krx.co.kr/common/disclsviewer.do?method=search&acptno=" + code + "&docno=&viewerhost=&viewerport="
            res = requests.get(url) # 쿼리 날려요
            bs = BeautifulSoup(res.text, 'html.parser')
            option_tag = bs.find_all("option")
            select_str = ''

            for tag in option_tag:
                if str(tag).startswith('<option selected="selected"'):
                    select_str = str(tag)
                    if "[정정]" in select_str:
                        continue #정정공시일 경우에는 읽지 않는다(기존공시의 부수로 읽을 거기 때문에)
                    preprocessed_data.append([date, name, code, report, rereport])
        return pd.DataFrame(preprocessed_data, columns=["공시일","회사명","코드","보고서명","정정신고"])
        
    def read_embezzle(self, doc_list):   # '횡령ㆍ배임사실확인' 로 검색해야함
        '''횡령ㆍ배임사실확인  / 횡령배임혐의발생일 경우 ->"1. 사고발생내용" 으로 코드 수정해야함'''
        self.__isResultExist(self.report_content)
        self.read_what = 'embezzle'
        doc_list = doc_list.loc[~ doc_list["보고서명"].str.contains("기타"), :]

        code_ar = doc_list['코드'].values
        name_ar = doc_list["회사명"].values
        announce_date = doc_list["공시일"].values
        report_name = doc_list["보고서명"].values
        preprocessed_data = []

        for code, name, date,report_n in tzip(code_ar, name_ar, announce_date, report_name):
            time.sleep(random.uniform(self.__sleep_min, self.__sleep_max))            
            text_only_list, after_list, now_code, company_code = self.__engine_read_report(code) # 쿼리 날려요 두번 날려요
            list_of_corp = []

            if (text_only_list == 'No') or (text_only_list == []): # 정정공시일 경우에는 No을 리턴해서 SKIP
                continue
            
            # 정정공시가 존재하는 경우 마지막 공시로 쿼리를 바로 날린다(엔진을 거치지 않고)
            if len(after_list)>=1:
                after_code = str(after_list[-1])
                after_url = "https://kind.krx.co.kr/common/disclsviewer.do?method=searchContents&docNo=" + after_code
                res = requests.get(after_url)
                bs = str(BeautifulSoup(res.text, 'html.parser'))

                inner_url = re.findall('https://[a-z0-9/.]+\.htm', bs)[0] #v7코드개선

                res = requests.get(inner_url) # 쿼리 날려요
                html = res.content.decode('utf-8','replace')
                bs = BeautifulSoup(html, 'html.parser')

                text_only_list = []

                if '▶ 업종코드 :' not in str(bs):
                    tags = bs.find_all('td')
                    for tag in tags:
                        text = tag.get_text()
                        text = text.replace('\r\n', ', ')
                        text = replacer(text, [", \n","\r",'\n'])
                        text_only_list.append(text)
                elif '▶ 업종코드 :' in str(bs): # 유가증권시장본부가 공시하는 '변경상장(상호변경)'
                    text = replacer(bs.get_text(), [", \n","\r",'\n','   → ','- '])
                    text = text.split('    ')
                    if len(text) <=4:
                        return "No","No","No","No"
                    
                    text_only_list = text_only_list + replacer(text[3], ['(영문명',')','보통주','우선주']).split(': ')
                    text_only_list = text_only_list + replacer(text[5], ['(영문명',')','보통주','우선주']).split(': ')
                # 정정공시인 경우 정정공시 이후 따라오는 공시를 읽기 위해서 따라오는 공시의 인덱스를 찾는다
                idx_lst = [i for i, string in enumerate(text_only_list) if ("1. 사실확인 내용" in string) or ("1. 사고발생내용" in string)] # 1. 사실확인 내용 / 1. 사고발생내용
                start_idx = idx_lst[-1]

                list_of_corp.extend([name,company_code,date,text_only_list[start_idx+1], text_only_list[start_idx+4], text_only_list[start_idx+8],text_only_list[start_idx+18],text_only_list[start_idx+20], "Yes"])
                preprocessed_data.append(list_of_corp)
            else: # 정정공시가 존재하지 않는 경우
                list_of_corp.extend([name,company_code,date,text_only_list[1], text_only_list[4], text_only_list[8],text_only_list[18], text_only_list[20], "No"])
                preprocessed_data.append(list_of_corp)

        df = pd.DataFrame(preprocessed_data, columns=["회사명","기업공시코드","공시일","확인내용","사실확인금액(원)","자기자본대비(%)","기타사항","관련공시", "정정인가?"])
        self.report_content = df
        print("Jobs Done")
        print("Check .report_content")

In [None]:
# 검색어 : [단일판매ㆍ공급계약체결 / 업종변경 / 상호변경 / 추가상장 유상증자 /전환사채권발행결정 / 신주인수권부사채권발행결정/투자주의환기종목지정 / 최대주주변경 / 불성실공시법인지정/조회공시요구/타법인주식및출자증권취득결정]
test20 = KIND_REPORTS()
test20.get_list('횡령ㆍ배임사실확인', start='2014-01-01', end='2014-12-31', market_type='2')

In [None]:
test20.report_list

In [None]:
test20.read_embezzle(test20.report_list)#.to_csv("횡령예시.csv", encoding="utf-8-sig")

In [None]:
test20.report_content

In [None]:
start_index = pd.date_range(start="2012-01-01",end="2022-12-31",freq="YS")   
end_index = pd.date_range(start="2012-01-01",end="2022-12-31",freq="Y")

for kor, num in zip(["코스피","코스닥"],["1","2"]):
    for start_idx,end_idx in zip(start_index, end_index): 
        start_idx,end_idx = str(start_idx)[:10], str(end_idx)[:10]
        save_kind = KIND_REPORTS()
        save_kind.get_list("횡령ㆍ배임혐의발생",start=start_idx, end=end_idx, market_type=num)
        save_kind.read_embezzle(save_kind.report_list)       # 340이 한솔?
        save_kind.report_content.to_csv("횡령배임혐의발생_{}_{}-{}.csv".format(kor, start_idx, end_idx))