In [1]:
%load_ext autoreload
%autoreload 2

import logging
import requests
import json
import os
import time
import pandas as pd
import numpy as np

wd = os.path.abspath("__file__").replace("/__file__", "").replace("notebooks", "")
os.chdir(wd)

from datetime import datetime, timedelta
from logging.handlers import TimedRotatingFileHandler
from dart.api import Dart_main_api
from dart.utils import get_jinja_yaml_conf, create_db_engine, Postgres_connect
from dart.processing import *

now = datetime.now()

In [2]:
conf = get_jinja_yaml_conf('./conf/api.yml', './conf/logging.yml')
end_date = datetime.now().date() 

stream = logging.StreamHandler()
stream.setLevel(logging.DEBUG)
# logger 설정
logger = logging.getLogger('main')
logging.basicConfig(level=eval(conf['logging']['level']),
    format=conf['logging']['format'],
    handlers = [TimedRotatingFileHandler(filename =  conf['logging']['file_name'],
                                when=conf['logging']['when'],
                                interval=conf['logging']['interval'],
                                backupCount=conf['logging']['backupCount']), #logging.StreamHandler()
                                   stream]
                )


In [3]:
# Only for notebooks
import re

os.environ['_ts'] = datetime.astimezone(datetime.now()).strftime('%Y-%m-%d %H:%M:%S %z')

with open('./conf/credentials', "r") as file:
    # 각 라인 읽기
    for line in file:
        # 주석(#) 또는 빈 줄은 무시
        if line.strip() == '' or line.startswith('#'):
            continue

        # 각 라인을 '='를 기준으로 key와 value로 분리
        key, value = line.strip().split('=', 1)

        # $ENV 형식의 환경변수가 있을 경우 해당 값을 가져와서 설정
        env_var_pattern = re.compile(r'\$(\w+)')
        matches = env_var_pattern.findall(value)
        for match in matches:
            value = value.replace(f"${match}", os.environ.get(match, "")).replace('"', '')

        # 환경변수로 설정
        os.environ[key] = value

In [4]:
engine = create_db_engine(os.environ)
postgres_conn = Postgres_connect(engine)
dart_api = Dart_main_api(auth_key = os.environ['auth_key'])

2024-04-08 00:25:48,007 (utils.py 54) INFO ::: Connect to 172.20.10.3. DB_NAME is stocks
2024-04-08 00:25:48,008 (api.py 23) INFO ::: ### dart main api is initialized! ###


# 1. 공시정보 목록

In [18]:
import pandas as pd
import re
a = pd.DataFrame([['2023.01.22', '2023.01.23'], ['2022년 03월 21일', '2022년 03월 22일']], columns = ['aa', 'bb'])

In [19]:
pattern = r'(\d{4})[년.-] ?(\d{1,2})[월.-] ?(\d{1,2})[일]?'

# apply 내에서 정규식 패턴에 따라 변환 수행
a[['aa', 'bb']] = a[['aa', 'bb']].map(lambda x: '-'.join(re.findall(pattern, x)[0]) if re.match(pattern, x) else x)
a

Unnamed: 0,aa,bb
0,2023-01-22,2023-01-23
1,2022-03-21,2022-03-22


In [5]:
sub = 'disc'

### 고유번호

In [None]:
detail = 'corp_code'
data = dart_api.get_data(conf[sub][detail]['detail_url'], data_type = 'xml', is_zip = True, rename = conf[sub][detail]['rename'])
data[conf[sub][detail]['date_col']] = data[conf[sub][detail]['date_col']].map(lambda x: pd.to_datetime(x, format = '%Y%m%d').date())
data['고유번호'] = data['고유번호'].map(lambda x: str(x).zfill(8))
data.replace('', pd.NA, inplace = True)


max_db_date = postgres_conn.get_max_col(schema_name = conf['schema_name'],
            table_name = conf[sub][detail]['table_name'],
            colname = conf[sub][detail]['date_col'])


if max_db_date:
    data = data[data[conf[sub][detail]['date_col']] >= max_db_date]
    data =  postgres_conn.ext_notin_db(data, schema_name = conf['schema_name'], 
                                       table_name = conf[sub][detail]['table_name'], 
                                       subset = conf[sub][detail]['dup_cols'])


postgres_conn.insert_db(data, schema_name = conf['schema_name'], table_name = conf[sub][detail]['table_name'])



### 공시검색

In [None]:
detail = 'list'
max_db_date = postgres_conn.get_max_col(schema_name = conf['schema_name'],
            table_name = conf[sub][detail]['table_name'],
                         colname = conf[sub][detail]['date_col'])

start_date = max_db_date if max_db_date else conf['api_start_date']
end_date = conf['api_end_date'] if 'api_end_date' in conf else pd.to_datetime(os.environ['_ts']).date() + timedelta(days = 1)

In [None]:
data = pd.DataFrame()
date_list = pd.date_range(start_date, end_date, periods = int((end_date - start_date) / timedelta(days = 90)) + 1)
logger.info(f"data load will be start from {date_list[0].date()} to {date_list[-1].date()}.")

for idx in range(len(date_list) - 1):
    bgn_de, end_de = date_list[idx].strftime('%Y%m%d'), (date_list[idx+1] - timedelta(days = 1)).strftime('%Y%m%d')
    logger.info(f"data load starts from {bgn_de} to {end_de} ({idx+1}/{len(date_list) - 1}).")
    
    while True:
        try:
            data = dart_api.get_data(detail_url = conf['disc'][detail]['detail_url'], 
                              params = {'bgn_de': bgn_de,
                                        'end_de': end_de },
                                rename = conf['disc'][detail]['rename']).replace('', pd.NA)

            data =  postgres_conn.ext_notin_db(data, schema_name = conf['schema_name'], 
                                       table_name = conf[sub][detail]['table_name'], 
                                       subset = conf[sub][detail]['dup_cols'])


            postgres_conn.upsert(data, schema_name = conf['schema_name'], table_name = conf[sub][detail]['table_name'])

            break
        
        except Exception as e:
            logger.warning(e)
            logger.warning("data re-load would be start...")
        

In [34]:
detail = 'corp_code'
data = dart_api.get_data('fnlttXbrl.xml', params = {'rcept_no': 20190401004781}, data_type = 'xml', is_zip = True)
data[conf[sub][detail]['date_col']] = data[conf[sub][detail]['date_col']].map(lambda x: pd.to_datetime(x, format = '%Y%m%d').date())
data['고유번호'] = data['고유번호'].map(lambda x: str(x).zfill(8))
data.replace('', pd.NA, inplace = True)

BadZipFile: File is not a zip file

In [6]:
db_list = postgres_conn.get_data('dart', 'disc_docfile', columns = '접수번호', is_distinct = True)
rcept_list = postgres_conn.get_data('dart', 'disc_list', columns = '접수번호', orderby_cols = '접수번호', where = [f"접수번호 > '{db_list.max()}'"])['접수번호']

In [7]:
from IPython.display import clear_output
detail = 'document'
for idx, rcept_no in enumerate(rcept_list):
    logger.info(f"'{rcept_no}' doc upload starts! ({idx+1}/{len(rcept_list)})")

    res = None
    while res is None or '사용한도를 초과' in res.text:
        try:
            res = dart_api.get_api_data('document', params = {'rcept_no': rcept_no}, data_type = 'xml')
            
            file = zipfile.ZipFile(
                        io.BytesIO(res.content))
    
        except Exception as e:
            if res is not None and '사용한도를 초과' in res.text:
                dart_api.chg_auth_key()


    if '파일이 존재' in res.text:
        logger.warning(f"There doesn't exist rcept_no: {rcept_no}; skip this report.")
        continue
    
    data = pd.DataFrame([[now, rcept_no, file_name, BeautifulSoup(file.read(file_name), features = 'xml').prettify()] for file_name in file.namelist()], columns = ['_ts', '접수번호', '파일이름', '파일내용'])
    postgres_conn.insert_df(data,
    schema_name = 'dart',
    table_name = 'disc_docfile')
    clear_output(wait=True)





2024-03-10 16:32:25,834 (553741849.py 4) INFO ::: '20020814000758' doc upload starts! (1/3741708)


DataError: (psycopg2.errors.InvalidXmlContent) invalid XML content
LINE 1: ...imestamp, '20020814000758', '20020814000758.xml', '<?xml ver...
                                                             ^
DETAIL:  line 30584: Excessive depth in document: 256 use XML_PARSE_HUGE option
                                                                         <주석
                                                                               ^

[SQL: INSERT INTO dart.disc_docfile (_ts, "접수번호", "파일이름", "파일내용") VALUES (%(_ts)s, %(접수번호)s, %(파일이름)s, %(파일내용)s)]
[parameters: {'_ts': datetime.datetime(2024, 3, 10, 16, 31, 55, 526563), '접수번호': '20020814000758', '파일이름': '20020814000758.xml', '파일내용': '<?xml version="1.0" encoding="utf-8"?>\n<DOCUMENT>\n <DOCUMENT-NAME ACODE="00084">\n  분기보고서(일반법인)\n </DOCUMENT-NAME>\n <FORMULA-VERSION ADATE="200204 ... (10077270 characters truncated) ...    </TR>\n        </TR>\n       </TBODY>\n      </TABLE>\n     </SECTION-3>\n    </SECTION-2>\n   </SECTION-1>\n  </LIBRARY>\n </BODY>\n</DOCUMENT>\n'}]
(Background on this error at: https://sqlalche.me/e/20/9h9h)

### 기업개황

In [6]:
detail = 'company'

corp_list = postgres_conn.get_data(schema_name = conf['schema_name'],
            table_name = conf[sub]['list']['table_name'],
                        columns = ['고유번호'],
                      is_distinct = True)

db_list = postgres_conn.get_data(schema_name = conf['schema_name'],
                        table_name = conf[sub][detail]['table_name'],
                        columns = ['고유번호']).to_numpy().ravel()

corp_list = corp_list.loc[~corp_list.isin(db_list).iloc[:, 0]].to_numpy().ravel()

In [7]:
for idx, corp in enumerate(corp_list):
    logger.info(f"Upload company info {corp} ({idx+1}/{len(corp_list)})...")

    is_db = postgres_conn.get_count(schema_name = conf['schema_name'],
                        table_name = conf[sub][detail]['table_name'],
                        where = [f"고유번호 = '{corp}'"])

    if is_db > 0:
        logger.info(f"'{corp}' is existing in db. skip api call.")
        break
        continue
    
    data = dart_api.get_data(detail_url = conf[sub][detail]['detail_url'], 
                              params = {'corp_code': corp},
                                rename = conf[sub][detail]['rename']).drop(columns = ['status', 'message'])

    time.sleep(.01)
    
    postgres_conn.insert_db(data, schema_name = conf['schema_name'], table_name = conf[sub][detail]['table_name'])


2024-04-08 00:26:09,259 (1774576145.py 2) INFO ::: Upload company info 01249448 (1/87335)...
2024-04-08 00:26:09,261 (1774576145.py 9) INFO ::: '01249448' is existing in db. skip api call.


In [20]:
db_list = postgres_conn.get_data(schema_name = conf['schema_name'],
                        table_name = conf[sub][detail]['table_name'],
                        columns = ['고유번호']).to_numpy().ravel()

In [37]:
corp_list.loc[~corp_list.isin(db_list).iloc[:, 0]]

Unnamed: 0,고유번호


# 2. 사업보고서 주요정보

In [339]:
sub = 'br'
report_type = [['1분기보고서', 11013],
                ['반기보고서', 11012],
                ['3분기보고서', 11014],
                ['사업보고서', 11011]]

[autoreload of dart.processing failed: Traceback (most recent call last):
  File "/home/heenj/anaconda3/envs/jupyter/lib/python3.12/site-packages/IPython/extensions/autoreload.py", line 276, in check
    superreload(m, reload, self.old_objects)
  File "/home/heenj/anaconda3/envs/jupyter/lib/python3.12/site-packages/IPython/extensions/autoreload.py", line 500, in superreload
    update_generic(old_obj, new_obj)
  File "/home/heenj/anaconda3/envs/jupyter/lib/python3.12/site-packages/IPython/extensions/autoreload.py", line 397, in update_generic
    update(a, b)
  File "/home/heenj/anaconda3/envs/jupyter/lib/python3.12/site-packages/IPython/extensions/autoreload.py", line 365, in update_class
    update_instances(old, new)
  File "/home/heenj/anaconda3/envs/jupyter/lib/python3.12/site-packages/IPython/extensions/autoreload.py", line 319, in update_instances
    refs = gc.get_referrers(old)
           ^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
]


### 조건부 자본증권 미상환 잔액

In [None]:
detail = 'cndl_capl_scrits_nrdmp_blce'

In [None]:
logger.info(f"{'#' * 15} Starts {conf['br']['cndl_capl_scrits_nrdmp_blce']['table_name']} upload! {'#' * 15}")
corp_codes = postgres_conn.get_data(schema_name = conf['schema_name'], 
                       table_name = conf['disc']['corp_code']['table_name'],
                       columns = ['고유번호'],
                       is_orderby = True).to_numpy().ravel()

idx = 0
corp_code = '00155373'
logger.info(f"data upload starts for '{corp_code}'({idx+1}/{len(corp_codes)})")

In [None]:
report_name = '분기보고서'
report_code = 11013

logger.info(f"starts for '{report_name}'.")
report_list = postgres_conn.get_data(conf['schema_name'], 
                       table_name = conf['disc']['list']['table_name'],
                        columns = ['보고서명', '접수일자', '접수번호'],
                        where = [f"고유번호='{corp_code}'", 
                                 f"보고서명 SIMILAR TO '%%{report_name}%%'", r"보고서명 LIKE '%%(____.__)'"],
                          )

report_list['사업연도'] = report_list['보고서명'].map(lambda x: pd.to_datetime(x[-8:-1], format = '%Y.%m').year)
if len(report_list) == 0:
    logger.info(f"There is no data in {report_name}. Skip data upload.")

In [None]:
report_list

In [None]:
db_date = postgres_conn.get_data(conf['schema_name'], 
                       table_name = conf['br']['cndl_capl_scrits_nrdmp_blce']['table_name'],
                        columns = ['_ts', '사업연도'],
                       where = [f"보고서코드={report_code}"])

if db_date.shape[0] == 0:
    target_year = report_list.loc[report_list['사업연도'] >= conf['api_report_start_year'], '사업연도']
    target_year = target_year.sort_values().unique()
    
else:
    updated_report_year = report_list.loc[report_list['접수일자'] > db_date['_ts'].dt.date.max(), '사업연도']
    notindb_report_year = report_list.loc[~report_list['사업연도'].isin(db_date['사업연도']), '사업연도']
    target_years = pd.concat([updated_report_year, notindb_report_year]).sort_values().unique()
    target_years = target_years[target_years >= conf['api_report_start_year']]

    if len(target_years) == 0:
        logger.info(f"All data is uploaded in db. Skip data upload.")



In [None]:

year = 2024

data = dart_api.get_data(conf[sub][detail]['detail_url'], 
                  rename = conf[sub][detail]['rename'],
                 params = {'corp_code': corp_code, 'bsns_year': year, 'reprt_code': report_code})

data['사업연도'] = year
data['보고서코드'] = report_code

# postgres_conn.upsert(data,
#                      conf['schema_name'], 
#                        table_name = conf['br']['cndl_capl_scrits_nrdmp_blce']['table_name'])

In [None]:
conf['br']['']

In [None]:
p.dtypes

In [None]:
p.apply(lambda x: x.str.replace(r'^([0-9]{2}),([0-9]{2})%', r'\1.\2%', regex = True) if x.dtype == 'object' else x)

In [None]:
p['보유주식비율'].str.replace(r'^([0-9]{2}),([0-9]{2})%', r'\1.\2%', regex = True)

In [None]:
p = dart_api.get_data(detail_url = conf['br']['mrhl_sttus']['detail_url'],
                  params = {'corp_code': '00108241', 'bsns_year': '2020', 'reprt_code': '11014'},
                 rename = conf['br']['mrhl_sttus']['rename'])
p

In [None]:
p

In [None]:
data[['년1초과2이하', '년2초과3이하']].apply(lambda x: x.replace('-', '').replace(',', '')).astype("Int64", inplace = True)

#### 모듈

In [5]:
br_processor = Report_processor(db_conn = postgres_conn,
                 api_conn = dart_api,
                 conf = conf)

2024-03-09 00:13:27,062 (processing.py 253) INFO ::: Initialize business report data processer!
2024-03-09 00:13:27,197 (processing.py 265) INFO ::: There are 105676 corp_codes in db.


In [7]:
detail = 'otr_cpr_invstmnt_sttus'
p = br_processor.func(detail)

2024-03-09 00:14:31,322 (processing.py 292) INFO ::: ############### Starts br_othcorp_inv upload! ###############
2024-03-09 00:14:31,897 (processing.py 287) INFO ::: Run only for corp_code not in DB: (nums: 2994)
2024-03-09 00:14:31,899 (processing.py 297) INFO ::: data upload starts for '00372226'(1/2994)


In [None]:
postgres_conn.upsert(data.iloc[:1],
                                    conf['schema_name'], 
                                    table_name = conf['fn'][detail]['table_name'])

In [29]:
p.iloc[224:225]

Unnamed: 0,접수번호,법인구분,고유번호,회사명,법인명,최초취득일자,출자목적,최초취득금액,기초수량,기초지분율,...,증감금액,증감평가손,기말수량,기말지분율,기말장부가,최근사업년_총자산,최근사업년_당기순이익,_ts,사업연도,보고서코드
3,20171114002158,K,372226,티에스이,(주)이노글로벌(9.29%),2017-08-16,설립출자,300000000.0,,-,...,300000000.0,,6000.0,9.29,300000000.0,,,2024-03-09 00:13:06.591848+09:00,2017,11014


In [39]:
with postgres_conn.engine.begin() as conn:
    conn.exec_driver_sql("""DELETE FROM dart.br_othcorp_inv
                                            WHERE (접수번호, 법인명) IN ((':1', '%%s'))""")

In [48]:
del_sql = "DELETE FROM test"
where = ('%', "'확률을 의미합니다.'")
sql = f"{del_sql} WHERE (이름, 설명) IN ({where})"
sql

'DELETE FROM test WHERE (이름, 설명) IN ((\'%\', "\'확률을 의미합니다.\'"))'

In [33]:
postgres_conn.upsert(p.iloc[224:225],
                       conf['schema_name'], 
                                    table_name = conf['br'][detail]['table_name'])

DELETE FROM dart.br_othcorp_inv
                                            WHERE (접수번호, 법인명) IN (('20171114002158', '(주)이노글로벌(9.29%)'))


TypeError: sqlalchemy.cyextension.immutabledict.immutabledict is not a sequence

# 3. 재무정보

In [5]:
fn_processor = Finance_processor(db_conn = postgres_conn,
                 api_conn = dart_api,
                 conf = conf)


2024-03-06 08:30:41,979 (processing.py 333) INFO ::: Initialize finance report data processer!
2024-03-06 08:30:42,126 (processing.py 345) INFO ::: There are 105617 corp_codes in db.


In [59]:
db_list = postgres_conn.get_data('dart', 'fn_xbrl', columns = '접수번호', is_distinct = True)
where = [f"접수번호 > '{db_list.max()}'"] if db_list.shape[0] > 0 else []
where.append(f"보고서명 SIMILAR TO '%%사업보고서|분기보고서|반기보고서%%'")
rcept_list = postgres_conn.get_data('dart', 'disc_list', columns = '접수번호', orderby_cols = '접수번호', where = where)['접수번호']

In [61]:
from IPython.display import clear_output
detail = 'document'
for idx, rcept_no in enumerate(rcept_list):
    logger.info(f"'{rcept_no}' doc upload starts! ({idx+1}/{len(rcept_list)})")

    res = None
    while res is None or '사용한도를 초과' in res.text:
        try:
            res = dart_api.get_api_data('fnlttXbrl', params = {'rcept_no': rcept_no}, data_type = 'xml')
            
            file = zipfile.ZipFile(
                        io.BytesIO(res.content))
    
        except Exception as e:
            if res is not None and '사용한도를 초과' in res.text:
                dart_api.chg_auth_key()


    if '파일이 존재' in res.text:
        logger.warning(f"There doesn't exist rcept_no: {rcept_no}; skip this report.")
        continue
    
    data = pd.DataFrame([[now, rcept_no, file_name, BeautifulSoup(file.read(file_name), features = 'xml').prettify()] for file_name in file.namelist()], columns = ['_ts', '접수번호', '파일이름', '파일내용'])
    postgres_conn.insert_df(data,
    schema_name = 'dart',
    table_name = 'fn_xbrl')
    clear_output(wait=True)

2024-03-10 22:49:01,436 (817872090.py 4) INFO ::: '19990807000001' doc upload starts! (1/58486)
2024-03-10 22:49:01,540 (817872090.py 4) INFO ::: '19990807000002' doc upload starts! (2/58486)
2024-03-10 22:49:01,637 (817872090.py 4) INFO ::: '19990809000001' doc upload starts! (3/58486)
2024-03-10 22:49:01,734 (817872090.py 4) INFO ::: '19990810000001' doc upload starts! (4/58486)
2024-03-10 22:49:01,821 (817872090.py 4) INFO ::: '19990810000002' doc upload starts! (5/58486)
2024-03-10 22:49:01,921 (817872090.py 4) INFO ::: '19990811000002' doc upload starts! (6/58486)
2024-03-10 22:49:02,013 (817872090.py 4) INFO ::: '19990811000003' doc upload starts! (7/58486)
2024-03-10 22:49:02,102 (817872090.py 4) INFO ::: '19990811000004' doc upload starts! (8/58486)
2024-03-10 22:49:02,185 (817872090.py 4) INFO ::: '19990811000005' doc upload starts! (9/58486)
2024-03-10 22:49:02,273 (817872090.py 4) INFO ::: '19990811000006' doc upload starts! (10/58486)
2024-03-10 22:49:02,354 (817872090.py 4

IndexError: pop from empty list

In [12]:
fn_types = ['BS1', 'BS2', 'BS3', 'BS4', 
'IS1', 'IS2', 'IS3', 'IS4',
'CIS1', 'CIS2', 'CIS3', 'CIS4',
'DCIS1', 'DCIS2', 'DCIS3', 'DCIS4',
'DCIS5', 'DCIS6', 'DCIS7', 'DCIS8',
'CF1', 'CF2', 'CF3', 'CF4',
'SCE1', 'SCE2']

In [40]:
def idx_func(self, detail):
        logger.info(f"{'#' * 15} Starts {self.conf[self.sub][detail]['table_name']} upload! {'#' * 15}")
        corp_codes = self.check_corp_list(detail)


        for corp_idx, corp_code in enumerate(corp_codes):
            logger.info(f"data upload starts for '{corp_code}'({corp_idx+1}/{len(corp_codes)})")
            cnt = 0
            corp_data = pd.DataFrame()


            for rep_idx, report_info in enumerate(self.report_type):
                logger.debug(f"Upload report type: '{report_info[1]}' ({rep_idx+1}/{len(self.report_type)}).")
                target_years = super().get_upload_target_year(detail, corp_code, report_info)
                target_years = [year for year in target_years if year >= 2023]

                for yr_idx, year in enumerate(target_years):
                    logger.debug(f"Upload {year} starts! ({yr_idx+1}/{len(target_years)})")

                    for idx_idx, idx_code in enumerate(['M210000', 'M220000', 'M230000', 'M240000']):

                        try:
                            params = {'corp_code': corp_code, 'bsns_year': year, 'reprt_code': report_info[1], 'idx_cl_code': idx_code}
                            data = self.api_conn.get_data(self.conf[self.sub][detail]['detail_url'], 
                                        rename = self.conf[self.sub][detail]['rename'],
                                        params = params)
                
                        except Exception as e:
                            if isinstance(e.args[0], dict) and e.args[0]['status'] == '013':
                                logger.warning(f"There is some error in {detail}, params: {params}")
                                logger.warning(e)
                                continue
                            raise Exception(e)
                    
                        
                        data['사업연도'] = year
                        data['보고서코드'] = report_info[1]
                        
    
                        if 'no_cols' in self.conf[self.sub][detail]:
                            no_cols = [col for col in self.conf[self.sub][detail]['no_cols'] if col not in data.columns]
                            data[no_cols] = np.nan
    
                        if 'fillna_cols' in self.conf[self.sub][detail]:
                            super().processing_fillna_cols(data, self.conf[self.sub][detail]['fillna_cols'])
    
                        if 'num_cols' in self.conf[self.sub][detail]:
                            super().processing_num_cols(data, self.conf[self.sub][detail]['num_cols'])
    
                        if 'date_cols' in self.conf[self.sub][detail]:
                            super().processing_date_cols(data, self.conf[self.sub][detail]['date_cols'])
    
                        if 'dup_cols' in self.conf[self.sub][detail]:
                            dup_cols = data.columns if self.conf[self.sub][detail]['dup_cols'] == 'all' else self.conf[self.sub][detail]['dup_cols']
                            data.drop_duplicates(subset = dup_cols,
                                inplace = True)
                        
                        if 'dropna_cols' in self.conf[self.sub][detail]:
                            data.dropna(subset = self.conf[self.sub][detail]['dropna_cols'], axis = 'rows', inplace = True)
     
    
                        
                        corp_data = pd.concat([corp_data, data])

            if corp_data.shape[0] > 0:
                self.db_conn.upsert(corp_data,
                                    self.conf['schema_name'], 
                                    table_name = self.conf[self.sub][detail]['table_name'])



            super()._write_history(table_name = self.conf[self.sub][detail]['table_name'],
                           corp_code = corp_code)

2024-03-11 09:14:37,883 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:38,075 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:38,227 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:38,394 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:38,545 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:38,697 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:38,847 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:39,107 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:39,220 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:39,334 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:39,447 (utils.py 123) INFO ::: Upload data successfully (rows: 0).
2024-03-11 09:14:39,550 (utils.py 123) INFO ::: Upload data successfully (ro

# 4. 지분공시

In [5]:
eq_processor = Equity_processor(db_conn = postgres_conn,
                 api_conn = dart_api,
                 conf = conf)


2024-03-01 14:13:03,128 (processing.py 321) INFO ::: Initialize business report data processer!
2024-03-01 14:13:03,262 (processing.py 329) INFO ::: There are 105540 corp_codes in db.


In [None]:
detail = 'majorstock'
eq_processor.func(detail)

# 5. 증권신고서

In [184]:
class Regist_processor(Base_processor):
    def __init__(self, db_conn, api_conn, conf, sub = 'rs'):
        logger.info('Initialize registration report data processer!')
        super().__init__(db_conn, api_conn, conf, sub)
        
        self.corp_codes = db_conn.get_data(schema_name = conf['schema_name'], 
                    table_name = conf['disc']['corp_code']['table_name'],
                    columns = ['고유번호'],
                    orderby_cols = '고유번호').to_numpy().ravel()

        logger.info(f"There are {len(self.corp_codes)} corp_codes in db.")


    def check_corp_list(self, detail):
        db_history = self.db_conn.get_data(self.conf['schema_name'], 
                    table_name = self.conf['history_table_name'],
                        columns = ['고유번호', 'latest_ins_date'],
                        where = [f"테이블이름='{self.conf[self.sub][detail]['table_name']}'"],
                        )

        report_list = self.db_conn.get_data(self.conf['schema_name'], 
                    table_name = self.conf['disc']['list']['table_name'],
                        columns = ['고유번호', '접수일자'],
                        where = [f"보고서명 LIKE '%%{self.conf[self.sub][detail]['report_name']}%%'"],
                    is_distinct = True
                        )
        


        merge_data = report_list.merge(db_history, on = '고유번호', how = 'left')
        corp_idx = merge_data.apply(lambda x: x['접수일자'] >= x['latest_ins_date'] if not pd.isna(x['latest_ins_date']) else True , axis = 1)
        corp_idx &= merge_data['접수일자'] >= datetime(self.conf['api_report_start_year'], 1, 1).date()
        final_data = merge_data.loc[corp_idx, ['고유번호', '접수일자']].reset_index(drop = True)
        logger.info(f"Run only for corp_code not in DB: (nums: {final_data.shape[0]})")
        return final_data

    def func(self, detail):
        logger.info(f"{'#' * 15} Starts {self.conf[self.sub][detail]['table_name']} upload! {'#' * 15}")
        report_list = self.check_corp_list(detail)
        corp_list = report_list.loc[:, '고유번호'].to_numpy().ravel()


        for report_idx, report_info in report_list.iterrows():
            corp_code, date = report_info
            logger.info(f"data upload starts for '{corp_code}'({report_idx+1}/{report_list.shape[0]})")


            try:
                params = {'corp_code': corp_code,
                         'bgn_de': date,
                         'end_de': date}
                
                data = self.api_conn.get_data(self.conf[self.sub][detail]['detail_url'],
                                              content_key = 'group',
                            params = params).T
    
            except Exception as e:
                if isinstance(e.args[0], dict) and e.args[0]['status'] == '013':
                    logger.warning(f"There is some error in {detail}, params: {params}")
                    logger.warning(e)
                    if corp_code not in corp_list[report_idx+1:]:
                        super()._write_history(table_name = self.conf[self.sub][detail]['table_name'],
                           corp_code = corp_code)
                    continue
                
                else:
                    raise Exception(e)

            data.columns = data.loc['title', :]

            data['_ts'] = data.loc['_ts', :].iloc[0]
            data['고유번호'] = corp_code
            data['접수일자'] = date

            data = data.loc['list']

            
            for group in self.conf[self.sub][detail]['group']:
                content = pd.DataFrame(data[group]).rename(columns = self.conf[self.sub][detail]['rename'][group])
                
                this_conf = {key: self.conf[self.sub][detail][key][group] 
                             for key in self.conf[self.sub][detail].keys() 
                             if isinstance(self.conf[self.sub][detail][key], dict) and group in self.conf[self.sub][detail][key]}

                self.processing_cols(content, this_conf)


                data[group] = content.to_json(orient = 'records', force_ascii = False)

            data = pd.DataFrame(data).T
            print(data)


            if data.shape[0] > 0:
                self.db_conn.upsert(data,
                                    self.conf['schema_name'], 
                                    table_name = self.conf[self.sub][detail]['table_name'])

            if corp_code not in corp_list[report_idx+1:]:
                super()._write_history(table_name = self.conf[self.sub][detail]['table_name'],
                           corp_code = corp_code)

In [185]:
rs_processor = Regist_processor(postgres_conn, dart_api, conf)

2024-03-17 01:39:55,245 (2725169331.py 3) INFO ::: Initialize registration report data processer!
2024-03-17 01:39:55,327 (2725169331.py 11) INFO ::: There are 105898 corp_codes in db.


In [186]:
rs_processor.func('estk_rs')

2024-03-17 01:39:55,689 (2725169331.py 38) INFO ::: ############### Starts rs_sec_ess upload! ###############
2024-03-17 01:39:56,196 (2725169331.py 34) INFO ::: Run only for corp_code not in DB: (nums: 7132)
2024-03-17 01:39:56,197 (2725169331.py 45) INFO ::: data upload starts for '00100258'(1/7132)


title                                               일반사항  \
list   [{"접수번호":"20151125000155","법인구분":"E","고유번호":"0...   

title                                              증권의종류  \
list   [{"접수번호":"20151125000155","법인구분":"E","고유번호":"0...   

title                                              인수인정보  \
list   [{"접수번호":"20151125000155","법인구분":"E","고유번호":"0...   

title                                            자금의사용목적  \
list   [{"접수번호":"20151125000155","법인구분":"E","고유번호":"0...   

title                                           매출인에관한사항 일반청약자환매청구권      고유번호  \
list   [{"접수번호":"20151125000155","법인구분":"E","고유번호":"0...         []  00100258   

title        접수일자  
list   2015-09-23  


IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "_ts" of relation "rs_sec_ess" violates not-null constraint
DETAIL:  Failing row contains (null, 00100258, 2015-09-23, [{"접수번호":"20151125000155","법인구분":"E","고유번..., [{"접수번호":"20151125000155","법인구분":"E","고유번..., [{"접수번호":"20151125000155","법인구분":"E","고유번..., [{"접수번호":"20151125000155","법인구분":"E","고유번..., [{"접수번호":"20151125000155","법인구분":"E","고유번..., []).

[SQL: INSERT INTO dart.rs_sec_ess ("일반사항", "증권의종류", "인수인정보", "자금의사용목적", "매출인에관한사항", "일반청약자환매청구권", "고유번호", "접수일자") VALUES (%(일반사항)s, %(증권의종류)s, %(인수인정보)s, %(자금의사용목적)s, %(매출인에관한사항)s, %(일반청약자환매청구권)s, %(고유번호)s, %(접수일자)s)]
[parameters: {'일반사항': '[{"접수번호":"20151125000155","법인구분":"E","고유번호":"00100258","회사명":"에스마크","청약기일":"2015년 12월 16일 ~ 2015년 12월 17일","납입기일":"2015년 12월 24일","청약공고일":"2015년 12월 21일","배정공고일":"2015년 12월 23일","배정기준일":"2015년 11월 11일","행사대상증권":"-","행사가격":null,"행사기간":"-","주요사항보고서_접수번호":"20151023000170"}]', '증권의종류': '[{"접수번호":"20151125000155","법인구분":"E","고유번호":"00100258","회사명":"에스마크","증권종류":"기명식보통주","증권수량":12400000,"액면가액":500,"모집가액":1420,"모집총액":17608000000,"모집방법":"주주배정후 실권주 일반공모"}]', '인수인정보': '[{"접수번호":"20151125000155","법인구분":"E","고유번호":"00100258","회사명":"에스마크","증권의종류":"기명식보통주","인수인구분":"대표","인수인명":"KTB투자증권","인수수량":null,"인수금액":null,"인수대가":"- 대표주관수수료: 금 50,000,000원\\n- 기본인수수수료: 모집총액의 2.5%\\n- 실권수수료: 잔액인수금액의 18.0%","인수방법":"잔액인수"}]', '자금의사용목적': '[{"접수번호":"20151125000155","법인구분":"E","고유번호":"00100258","회사명":"에스마크","구분":"운영자금","금액":17608000000},{"접수번호":"20151125000155","법인구분":"E","고유번호":"00100258","회사명":"에스마크","구분":"발행제비용","금액":563629440}]', '매출인에관한사항': '[{"접수번호":"20151125000155","법인구분":"E","고유번호":"00100258","회사명":"에스마크","보유자":"-","회사와의관계":"-","매출전보유증권수":null,"매출증권수":null,"매출후보유증권수":null}]', '일반청약자환매청구권': '[]', '고유번호': '00100258', '접수일자': datetime.date(2015, 9, 23)}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

# 6. 주요사항보고서

In [83]:
km_processor = keymatter_processor(postgres_conn, dart_api, conf)

2024-03-28 15:47:54,036 (processing.py 723) INFO ::: Initialize key matter report data processer!
2024-03-28 15:47:54,213 (processing.py 731) INFO ::: There are 106315 corp_codes in db.


In [84]:
conf['km'].keys()

dict_keys(['df_ocr', 'bsn_sp', 'ctrcvs_bgrq', 'ds_rs_ocr', 'piic_decsn', 'fric_decsn', 'pifric_decsn', 'cr_decsn', 'bnk_mngt_pcbg', 'lwst_lg', 'ov_lst_decsn', 'ov_dlst_decsn', 'ov_lst', 'ov_dlst', 'cvbd_is_decsn', 'bdwt_is_decsn', 'exbd_is_decsn', 'bnk_mngt_pcsp', 'wd_cocobd_is_decsn', 'ast_inhtrf_etc_ptbk_opt', 'otcpr_stk_invscr_trf_decsn', 'tgast_trf_decsn', 'tgast_inh_decsn', 'otcpr_stk_invscr_inh_decsn', 'bsn_trf_decsn', 'bsn_inh_decsn', 'tsstk_aq_trctr_cc_decsn', 'tsstk_aq_trctr_cns_decsn', 'tsstk_dp_decsn', 'tsstk_aq_decsn', 'stk_extr_decsn', 'cmp_dvmg_decsn', 'cmp_dv_decsn', 'cmp_mg_decsn'])

In [85]:
p = km_processor.func('cmp_mg_decsn')

2024-03-28 15:47:57,341 (processing.py 761) INFO ::: ############### Starts km_merge_firm upload! ###############
2024-03-28 15:47:57,554 (processing.py 757) INFO ::: Run only for corp_code not in DB: (nums: 3082)
2024-03-28 15:47:57,554 (processing.py 768) INFO ::: data upload starts for '00104810'(1/3082)


In [88]:
t

Unnamed: 0,0
접수번호,14
법인구분,1
고유번호,8
회사명,6
이사회결의일,13
사외이사참석수,1
사외이사불참수,1
감사위원참석여부,2
증권신고서제출대상여부,3
제출면제사유,85


In [87]:
pd.set_option('display.max_rows', 500)
t = p.map(lambda x: len(x) if type(x) is str else 3).T
t[t[0]>300]

Unnamed: 0,0


In [43]:
p[p == '해당사항없음'].dropna(axis = 1)

Unnamed: 0,우회상장여부,타법인우회상장요건충족여부,합병형태,감자비율,분할회사_상장유지여부
0,해당사항없음,해당사항없음,해당사항없음,해당사항없음,해당사항없음


In [11]:
dart_api.get_data(detail_url = 'pifricDecsn', params = {'corp_code': '01051092',
                                                       'bgn_de': '19990101',
                                                       'end_de': '20240320'})

Unnamed: 0,rcept_no,corp_cls,corp_code,corp_name,piic_nstk_ostk_cnt,piic_nstk_estk_cnt,piic_fv_ps,piic_bfic_tisstk_ostk,piic_bfic_tisstk_estk,piic_fdpp_fclt,...,fric_nstk_dlprd,fric_nstk_lstprd,fric_bddd,fric_od_a_at_t,fric_od_a_at_b,fric_adt_a_atn,ssl_at,ssl_bgd,ssl_edd,_ts
0,20230831000458,K,1051092,피씨엘,5200000,-,500,11976236,-,-,...,-,2023년 10월 11일,2023년 05월 30일,2,-,참석,Y,20230531,20230830,2024-03-20 10:28:02.076934+09:00


In [19]:
p = km_processor.func('cr_decsn')

2024-03-20 10:41:20,399 (processing.py 757) INFO ::: ############### Starts km_wrt_down upload! ###############
2024-03-20 10:41:20,613 (processing.py 753) INFO ::: Run only for corp_code not in DB: (nums: 1090)
2024-03-20 10:41:20,614 (processing.py 764) INFO ::: data upload starts for '00100258'(1/1090)


In [26]:
cols = ['보통주수', '기타주수', '주당액면가', '감자전자본금', '감자후자본금', '감자전보통주수',
                    '감자후보통주수', '감자전기타주수', '감자후보통주수', '사외이사참석수', '사외이사불참수']
p[cols]

Unnamed: 0,보통주수,기타주수,주당액면가,감자전자본금,감자후자본금,감자전보통주수,감자후보통주수,감자후보통주수.1,감자전기타주수,감자후보통주수.2,감자후보통주수.3,사외이사참석수,사외이사불참수
0,143691325,-,500,75627013000,3781350500,151254026,7562701,-,-,7562701,-,3,1


In [15]:
corp_code, date = report_list.iloc[0]
params = {'corp_code': corp_code,
                         'bgn_de': date,
                         'end_de': date}
                
data = dart_api.get_data('dfOcr',
            params = params)

In [16]:
data

Unnamed: 0,rcept_no,corp_cls,corp_code,corp_name,df_cn,df_amt,df_bnk,dfd,df_rs,_ts
0,20230317001060,K,104573,국일제지,당사발행 만기어음 부도,356799732,신한은행 기업영업부,-,- 부도사유 : 법적 지급제한\n- 부도경위 : 「채무자회생 및 파산에 관한 법률」...,2024-03-17 16:46:01.882739+09:00
