[Python：EDINET企業データの自動取得･スクリーニングを5分ではじめる](https://investment.abbamboo.com/programming/google-colab-edinet-screening/)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
PROJECT_NAME = 'EDINETScraping'
BASE_DIR = f'/content/drive/MyDrive/Purchased_products/csv/{ PROJECT_NAME }/'

import os
os.makedirs( BASE_DIR ,exist_ok=True )

### EDINETAPIからURLを取得

In [None]:
import csv ,time ,re ,os ,json ,requests
from tqdm import tqdm
from datetime import datetime ,timedelta

import urllib3
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)

start_date = '2019-06-26'
start_date_8 = '20190626'
end_date = '2019-06-26'
end_date_8 = '20190626'
download_file = 'dat_download_20190626_20190626.csv'

class catcher() :
  def __init__( self ,since ,until ,base_dir=None ,wait_time=2 ) :
      self.csv_tag = [ 'id' ,'title' ,'url' ,'code' ,'update' ]
      self.encode_type = 'utf-8'
      self.wait_time = wait_time
      self.base_url = 'https://disclosure.edinet-fsa.go.jp/api/v1/documents'
      self.out_of_since = False
      self.since = since
      self.until = until
      self.file_info_str = since.strftime( '_%y%m%d_' ) + until.strftime( '%y%m%d' )
      self.file_name = f'dat_download{ self.file_info_str }.csv'
      self.base_path = f'{ os.getcwd() if base_dir==None else base_dir }'

  def __get_link_info_str( self ,datetime ) :
      str_datetime = datetime.strftime( '%Y-%m-%d' )
      params = { "date" : str_datetime ,"type" : 2 }
      count ,retry = 0 ,3
      while True:
          try :
              response = requests.get( f'{ self.base_url }.json' ,params=params ,verify=False )
              return response.text
          except Exception :
              print( f'{str_datetime} のアクセスに失敗しました。[ {count} ]' )
              if count < retry :
                  count += 1
                  time.sleep( 3 )
                  continue
              else : raise

  def __parse_json( self ,string ) :
      res_dict = json.loads( string )
      return res_dict["results"]

  def __get_link( self ,target_list ) :
      edinet_dict = {}
      for target_dict in target_list :
          title = f'{ target_dict["filerName"] } { target_dict["docDescription"] }'
          if not self.__is_yuho( title ) : continue
          docID = target_dict["docID"]
          url = f'{ self.base_url }/{ docID }'
          edinet_code = target_dict['edinetCode']
          updated = target_dict['submitDateTime']
          edinet_dict[ docID ] = { 'id':docID ,'title':title ,'url':url ,'code':edinet_code ,'update':updated }
      return edinet_dict

  def __is_yuho( self ,title ) :
      if all( ( s in str( title ) ) for s in [ '有価証券報告書' ,'株式会社' ] ) and '受益証券' not in str( title ) :
          return True
      return False

  def __dump_file( self ,result_dict ) :
      with open( os.path.join( self.base_path ,self.file_name ) ,'w' ,encoding=self.encode_type ) as of :
          writer = csv.DictWriter( of ,self.csv_tag ,lineterminator='\n' )
          writer.writeheader()
          for key in result_dict : writer.writerow( result_dict[ key ] )

  def create_xbrl_url_csv( self ) :
      print( f'since: { self.since.strftime( "%Y-%m-%d" ) } ,until: { self.until.strftime( "%Y-%m-%d" ) } ({ self.file_info_str })' )
      target_date ,result_dict = self.since ,{}
      while True :
          print( f'date { target_date.strftime( "%Y-%m-%d" ) }, loading...' )
          response_string = self.__get_link_info_str( target_date )
          target_list = self.__parse_json( response_string )
          info_dict = self.__get_link( target_list )
          result_dict.update( info_dict )
          time.sleep( self.wait_time )
          target_date = target_date + timedelta( days=1 )
          if target_date > self.until : break
      self.__dump_file( result_dict )
      print( 'complete a download!!' )

def edinet_operator( since ,until ,base_dir=None ) :
  edinet_catcher = catcher( since ,until ,base_dir )
  edinet_catcher.create_xbrl_url_csv()

In [None]:
from datetime import datetime

since = datetime.strptime(start_date ,'%Y-%m-%d')
until = datetime.strptime(end_date ,'%Y-%m-%d')
edinet_operator( since ,until ,base_dir=BASE_DIR )

since: 2019-06-26 ,until: 2019-06-26 (_190626_190626)
date 2019-06-26, loading...
complete a download!!


### ZIPファイルをDLして全ての財務データをCSVに保存

In [None]:
!pip install python-xbrl

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import os ,re ,csv ,io ,time ,requests
import pandas as pd
from tqdm import tqdm
from zipfile import ZipFile
from xbrl import XBRLParser

default_tag = ['file_nm','element_id','amount']
custom_tag  = ['unit_ref','decimals','contextref']
encode_type = 'utf-8'



class downloader() :
    def __init__( self ,wait_time=1 ,base_dir=None ) :
        self.wait_time = wait_time
        self.base_path = f'{ os.getcwd() if base_dir==None else base_dir }'

    def __make_directory( self ,dir_path ) :
        os.makedirs( dir_path ,exist_ok=True )

    def __download_all_xbrl_files( self ,info_df ) :
        counter ,mp_dict = 0 ,{}
        for index ,row in info_df.iterrows() :
            mp_dict[ counter ] = row.to_dict()
            counter += 1
        self.__download_xbrl_file( mp_dict )

    def __download_xbrl_file( self ,info_dicts ) :
        for no in tqdm( info_dicts ) :
            info_dict = info_dicts[ no ]
            company_path = f'{ self.directory_path }{ info_dict["code"] }/'
            ir_path = f'{ company_path }{ info_dict["id"] }'
            self.__make_directory( company_path )
            self.__make_directory( ir_path )
            self.__download_and_unzip( info_dict['url'] ,ir_path )    
            no += 1

    def __download_and_unzip( self ,url ,dir_path ) :
        count ,retry = 0 ,3
        while True:
            r = requests.get( url ,params={ 'type' : 1 } )
            time.sleep( self.wait_time )
            if r.status_code == 200 :
                z = ZipFile( io.BytesIO( r.content ) )
                z.extractall( dir_path )
                break
            else :
                print( f'download failed [{ count }]_{ url }' )
                if count < retry :
                    count += 1
                    continue
                else : raise

    def download( self ,list_dat_csv ) :
        for dat_csv in list_dat_csv :
            info_df = pd.read_csv( os.path.join( self.base_path ,dat_csv ) ,parse_dates=['update'] )
            if len( info_df ) > 0 :
                self.directory_path = f'{ os.getcwd() }/xbrl_files_{ dat_csv.replace( ".csv" ,"" ).replace( "dat_download_" ,"" ) }/'
                self.__make_directory( self.directory_path )
                self.__download_all_xbrl_files( info_df )
        print( 'complete a download!!' )



class XbrlParser( XBRLParser ) :
    def __init__( self ,xbrl_filepath ) :
        self.xbrl_filepath = xbrl_filepath

    def parse_xbrl( self ):
        # parse xbrl file
        with open( self.xbrl_filepath ,'r' ,encoding='utf-8' ) as of:
            xbrl = XBRLParser.parse( of )
        result_dicts = {}
        i = 0
        name_space = 'jp*'
        for node in xbrl.find_all( name=re.compile(name_space+':*') ):
            if self.ignore_pattern( node ) : continue
            row_dict = {}
            row_dict['file_nm'] = self.xbrl_filepath.rsplit( os.sep ,1 )[1]
            row_dict['element_id'] = node.name
            row_dict['amount'] = node.string
            for tag in custom_tag:
                row_dict[tag] = self.get_attrib_value( node ,tag )
            result_dicts[i] = row_dict
            i += 1
        return result_dicts

    def ignore_pattern( self ,node ):
        if 'xsi:nil' in node.attrs:
            if node.attrs['xsi:nil']=='true'   : return True
        if not isinstance( node.string ,str )  : return True #結果が空の場合は対象外にする
        if str( node.string ).find(u'\n') > -1 : return True #結果が空の場合は対象外にする
        if u'textblock' in str( node.name )    : return True #結果が空の場合は対象外にする
        return False

    def get_attrib_value( self, node, attrib ):
        if attrib in node.attrs.keys() : return node.attrs[ attrib ]
        else : return None



class parse_operator() :
    def __init__( self ,list_dat_csv ,base_dir=None ) :
        self.list_dat_csv = list_dat_csv
        self.base_path = f'{ os.getcwd() if base_dir==None else base_dir }'

    def __fild_all_files( self ):
        result = []
        for root, dirs, files in os.walk( self.search_path ) :
            for file in files:
                if not self.__is_xbrl_file( root ,file ) : continue
                result.append( os.path.join( root ,file ) )
        return result

    def __is_xbrl_file( self ,root_path ,file_name ) :
        if not file_name.endswith('.xbrl') : return False #xbrlファイルでなければ対象外
        if u'AuditDoc' in str( root_path ) : return False #AuditDocは対象外
        if 'xbrl_files_'+self.str_period in str( root_path ) : return True

    def __dump_file( self ,writer ,dicts_info ) :
        i = 0
        while i < len( dicts_info ) :
            row_dict = dicts_info[i]
            writer.writerow( row_dict )
            i += 1

    def xbrl_to_csv( self ) :
        for dat_csv in self.list_dat_csv :
            self.str_period = dat_csv.replace( ".csv" ,"" ).replace( "dat_download_" ,"" )
            eggs_file = f'eggs_{ self.str_period }.csv'
            self.search_path = f'{ os.getcwd() }/xbrl_files_{ self.str_period }/'
            with open( os.path.join( self.base_path ,eggs_file ) ,'w' ,encoding=encode_type ) as of :
                resultCsvWriter = csv.DictWriter( of ,default_tag + custom_tag ,lineterminator='\n' )
                resultCsvWriter.writeheader()
                list_xbrl_files = self.__fild_all_files()
                for xbrl_file in tqdm( list_xbrl_files ) :
                    xp = XbrlParser( xbrl_file )
                    dicts_info = xp.parse_xbrl()
                    self.__dump_file( resultCsvWriter ,dicts_info )
        print( 'completed conversions!!' )



def xbrl_to_csv_operator( list_dat_csv ,base_dir=None ) :
    xbrl_downloader = downloader( base_dir=base_dir )
    xbrl_downloader.download( list_dat_csv )
    xbrl_parse_operator = parse_operator( list_dat_csv ,base_dir=base_dir )
    xbrl_parse_operator.xbrl_to_csv()

In [None]:
dat_list = [download_file]
xbrl_to_csv_operator( dat_list ,base_dir=BASE_DIR )

100%|██████████| 1/1 [00:04<00:00,  4.41s/it]


complete a download!!


100%|██████████| 1/1 [00:01<00:00,  1.08s/it]

completed conversions!!





### 全財務データのCSVから必要なデータを抽出

In [None]:
import re
import pandas as pd
from tqdm import tqdm

def get_dict_edinet_codes( DIR ,index='証券コード' ) :
    import codecs
    file_path = f'{DIR}EdinetcodeDlInfo.csv'

    with codecs.open( file_path, "r", "Shift-JIS", "ignore" ) as file :
        df = pd.read_csv( file ,skiprows=[0] ,usecols=['ＥＤＩＮＥＴコード','提出者名','証券コード','提出者業種'] )
    df = df.loc[ df['証券コード'] > 0 ,: ]
    df['証券コード'] = df['証券コード'] / 10
    df['証券コード'] = df['証券コード'].astype( int )
    result = df.set_index( index ).T.to_dict()
    print( f'対象(EDINET)： { len( result ) }銘柄' )
    return result

def make_element_ids_csv( egg_file_name ,base_dir=None ) :
    base_path = f'{ os.getcwd() if base_dir==None else base_dir }'
    result_file_name = f'element_ids ( { egg_file_name } ).csv'
    df_egg = pd.read_csv( os.path.join( base_path ,egg_file_name ) ).drop_duplicates()
    df_egg.loc[ : ,'element_id' ].drop_duplicates().to_csv( os.path.join( base_path ,result_file_name ) )

def make_sample_data_csv( egg_file_name ,edinet_code ,base_dir=None ) :
    base_path = f'{ os.getcwd() if base_dir==None else base_dir }'
    result_file_name = f'sample_data ( { egg_file_name } { edinet_code } ).csv'
    df_egg = pd.read_csv( os.path.join( base_path ,egg_file_name ) ).drop_duplicates()
    check1 = df_egg[ ( df_egg['file_nm'].str.contains( edinet_code ) ) ]
    if len( check1 ) > 0 : check1.to_csv( os.path.join( base_path ,result_file_name ) )
    else : print( f'{ edinet_code }のデータはありませんでした' )

class eggs_operator() :
    def __init__( self ,list_eggs ,dict_codes ,dict_cols ,result_file_name='com_indices.csv' ,base_dir=None ) :
        self.base_path = f'{ os.getcwd() if base_dir==None else base_dir }'
        self.result_file_name = result_file_name
        self.list_df_eggs = [ pd.read_csv( os.path.join( self.base_path ,egg_file_name ) ).drop_duplicates() for egg_file_name in list_eggs ]
        self.dict_codes = dict_codes
        self.dict_cols = dict_cols

    def __get_element( self ,df ,col ) :
        element_ids = self.dict_cols[ col ]['element_id']
        if col in [ '会社名' ,'提出書類' ,'提出日' ,'年度開始日' ,'年度終了日' ] :
            check1 = df[ df['element_id'].str.contains( element_ids[0].lower() ) ]
            if len( check1 )==1 : return check1['amount'].values[0]
            else : return 0
        else :
            contextref = self.dict_cols[col]['contextref']
            for element_id in element_ids :
                check1 = df[ df['element_id'].str.contains( element_id.lower() ) ]
                check2 = check1[ check1['contextref']==contextref ].copy()
                if len( check2 )==1 :
                    return check2['amount'].values[0]
                elif len( check2 ) > 1 :
                    check2['str_len'] = check2['element_id'].apply( lambda x: len( str(x) ) )
                    return check2.loc[ check2['str_len']==check2['str_len'].min() ,'amount' ].values[0]
            for element_id in element_ids :
                check1 = df[ df['element_id'].str.contains( element_id.lower() ) ]
                check2 = check1[ check1['contextref']==f'{ contextref }_NonConsolidatedMember' ].copy()
                if len( check2 )==1 :
                    return check2['amount'].values[0]
                elif len( check2 ) > 1 :
                    check2['str_len'] = check2['element_id'].apply( lambda x: len( str(x) ) )
                    return check2.loc[ check2['str_len']==check2['str_len'].min() ,'amount' ].values[0]
            return 0

    def get_elements( self ) :
        com_indices = pd.DataFrame()
        for df_eggs in self.list_df_eggs :
            file_nms = df_eggs[ df_eggs['element_id']=='jpcrp_cor:companynamecoverpage' ].drop_duplicates()['file_nm'].values
            for file_nm in tqdm( file_nms ) :
                edinet_code = re.search( r'E[0-9]{5}' ,file_nm ).group(0)
                if not edinet_code in self.dict_codes : continue
                df_target = df_eggs[ df_eggs['file_nm']==file_nm ]
                data = { col : self.__get_element( df_target ,col ) for col in self.dict_cols }
                data['証券コード'] = self.dict_codes[ edinet_code ]['証券コード']
                data['業種'] = self.dict_codes[ edinet_code ]['提出者業種']
                data['訂正'] = 1 if '訂正' in data['提出書類'] else 0
                data['file_nm'] = file_nm
                raw = pd.DataFrame( data ,index=[ edinet_code ] )
                com_indices = pd.concat( [ com_indices ,raw ] )
        com_indices.to_csv( os.path.join( self.base_path ,self.result_file_name ) )

In [None]:
dict_codes = get_dict_edinet_codes( BASE_DIR ,'ＥＤＩＮＥＴコード' )

対象(EDINET)： 3890銘柄


In [None]:
##
# 読み込みの設定
#

list_eggs = [ 'eggs_' + start_date_8 + '_' + end_date_8 + '.csv']
# list_eggs = [ 'eggs_200601_200602.csv' ]
result_file_name = [ 'com_indices_' + start_date_8 + '_' + end_date_8 + '.csv']
# result_file_name = 'com_indices_200601_200602.csv'

#（複数あるものは上のものほど優先される）
dict_cols = {
      '会社名'           : { 'element_id' : ['companynamecoverpage'] }
    , '提出書類'         : { 'element_id' : ['documenttitlecoverpage'] }
    , '提出日'           : { 'element_id' : ['filingdatecoverpage'] }
    , '年度開始日'       : { 'element_id' : ['currentfiscalyearstartdatedei'] }
    , '年度終了日'       : { 'element_id' : ['currentfiscalyearenddatedei'] }
    #---ここまで必須---
    #以下は 'contextref' が必須
    , '発行済み株式数'   : { 'element_id' : ['totalnumberofissuedsharessummaryofbusinessresults'
                                            ,'totalnumberofissuedsharescommonstocksummaryofbusinessresults'
                                            ,'totalnumberofissuedshares']
                            ,'contextref' : 'CurrentYearInstant' }
    , '営業CF'           : { 'element_id' : ['netcashprovidedbyusedinoperatingactivitiessummaryofbusinessresults'
                                            ,'cashflowsfromusedinoperatingactivitiesifrssummaryofbusinessresults'
                                            ,'CashFlowsFromUsedInOperatingActivitiesUSGAAPSummaryOfBusinessResults']
                            ,'contextref' : 'CurrentYearDuration' }
    , '財務CF'           : { 'element_id' : ['netcashprovidedbyusedinfinancingactivitiessummaryofbusinessresults'
                                            ,'cashflowsfromusedinfinancingactivitiesifrssummaryofbusinessresults'
                                            ,'CashFlowsFromUsedInFinancingActivitiesUSGAAPSummaryOfBusinessResults']
                            ,'contextref' : 'CurrentYearDuration' }
    , '投資CF'           : { 'element_id' : ['netcashprovidedbyusedininvestingactivitiessummaryofbusinessresults'
                                            ,'cashflowsfromusedininvestingactivitiesifrssummaryofbusinessresults'
                                            ,'CashFlowsFromUsedInInvestingActivitiesUSGAAPSummaryOfBusinessResults']
                            ,'contextref' : 'CurrentYearDuration' }
    , '純利益'           : { 'element_id' : ['profitlossattributabletoownersofparentsummaryofbusinessresults'
                                            ,'ProfitLossAttributableToOwnersOfParentIFRSSummaryOfBusinessResults'
                                            ,'NetIncomeLossAttributableToOwnersOfParentUSGAAPSummaryOfBusinessResults'
                                            ,'netincomelosssummaryofbusinessresults']
                            ,'contextref' : 'CurrentYearDuration' }
    , '売上高'           : { 'element_id' : ['netsalessummaryofbusinessresults'
                                            ,'NetSalesIFRSSummaryOfBusinessResults'
                                            ,'RevenuesUSGAAPSummaryOfBusinessResults'
                                            ,'operatingrevenue1summaryofbusinessresults'
                                            ,'revenueifrssummaryofbusinessresults'
                                            ,'netoperatingrevenuesummaryofbusinessresults'
                                            ,'businessrevenuesummaryofbusinessresults']
                            ,'contextref' : 'CurrentYearDuration' }
    , 'BS 現金預金'      : { 'element_id' : ['cashanddeposits']
                            ,'contextref' : 'CurrentYearInstant' }
    , 'BS 負債合計'      : { 'element_id' : ['liabilities']
                            ,'contextref' : 'CurrentYearInstant' }
    }

In [None]:
eggs_operator = eggs_operator( list_eggs ,dict_codes ,dict_cols ,result_file_name=result_file_name ,base_dir=BASE_DIR )
eggs_operator.get_elements()