In [5]:
import pandas as pd

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin

from threading import Thread
from tqdm import tqdm

import os
import io
from datetime import date
import re
import unicodedata

import pymupdf

In [6]:
def get_text_from_pdf_url(pdf_url):
    web_response = requests.get(pdf_url, timeout = (3.0, 7.5))
    pdf_datastream = io.BytesIO(web_response.content)

    with pymupdf.open(stream = pdf_datastream) as pdf_doc:
        pdf_text = [" " + unicodedata.normalize("NFKC", "".join(pdf_page.get_text().splitlines())).replace(" ", "") for pdf_page in pdf_doc]
    return pdf_text

def get_list_emptiness(list_targets, threshold = 0.5):
    return(sum([(len(list_target) > 0) for list_target in list_targets]) > (threshold * len(list_targets)))

def search_ir_reports(stock_code, search_words = ["中期経営", "中期戦略"]):
    search_url = "https://irbank.net/td/search?q="
    df_result = pd.DataFrame()

    for search_word in search_words:
        list_file_name = []
        list_file_date = []
        list_file_name_num = []
        list_file_link = []
        list_file_content = []
        list_file_content_readable = []

        search_soup = BeautifulSoup(requests.get(search_url + search_word + " " + str(stock_code)).content, "html.parser")
        search_links = search_soup.select("td:last-child > a")

        for search_link in search_links:
            file_name = search_link.text
            list_file_name.append(file_name)
            list_file_name_num.append("_".join([str(s) for s in re.findall(r"\d+", file_name)]))

            file_date = search_link.parent.parent.find_previous_sibling("tr", class_ = "occ").text
            file_date = [int(s) for s in re.findall(r"\d+", file_date)]
            file_date = date(file_date[0], file_date[1], file_date[2])
            list_file_date.append(file_date)

            subpage_link = urljoin(search_url, search_link.get("href"))
            subpage_soup = BeautifulSoup(requests.get(subpage_link).content, "html.parser")

            try: file_link = subpage_soup.select(".fa-file-pdf")[0].parent.get("href")
            except: file_link = ""
            if list_file_link:
                file_content = []
                file_content_readable = False
            else:
                try:
                    file_content = get_text_from_pdf_url(file_link)
                    file_content_readable = get_list_emptiness(file_content)
                except:
                    file_content = []
                    file_content_readable = False
            list_file_link.append(file_link)
            list_file_content.append(file_content)
            list_file_content_readable.append(file_content_readable)
        
        df_temp = pd.DataFrame({"file_name": list_file_name, "file_date": list_file_date, "file_name_num": list_file_name_num, "file_link": list_file_link, "file_content": list_file_content, "file_content_readable": list_file_content_readable})
        df_temp["search_word"] = search_word

        if len(df_result) == 0: df_result = df_temp
        else: df_result = pd.concat([df_result, df_temp])
        if len(df_temp) > 0: continue

    df_result["stock_code"] = stock_code
    df_result = df_result[["stock_code", "search_word", "file_name", "file_date", "file_name_num", "file_link", "file_content", "file_content_readable"]].drop_duplicates(subset = "file_link")

    return df_result

def get_irbank_reports(list_stock_code, track_progress = False):
    df_irbank_report = pd.DataFrame()

    if track_progress:
        for stock_code in tqdm(list_stock_code):
            df_temp = search_ir_reports(stock_code)
            if len(df_irbank_report) == 0: df_irbank_report = df_temp
            else: df_irbank_report = pd.concat([df_irbank_report, df_temp])
    else:
        for stock_code in list_stock_code:
            df_temp = search_ir_reports(stock_code)
            if len(df_irbank_report) == 0: df_irbank_report = df_temp
            else: df_irbank_report = pd.concat([df_irbank_report, df_temp])
    
    return df_irbank_report

def irbank_thread(thread_num, thread_count = 10, company_list_file = "00_company_list.xlsx", company_list_sheet = "対象企業リスト"):
    df_company = pd.read_excel(company_list_file, sheet_name = company_list_sheet)
    df_company = df_company.rename(columns = {"証券コード": "stock_code", "企業名": "company_name"})
    df_company.astype({"stock_code": "str"})
    list_stock_code = list(df_company["stock_code"])
    thread_length = int(len(list_stock_code) / thread_count)
    if thread_num != thread_count: list_stock_code_thread =  list_stock_code[thread_length * (thread_num - 1):thread_length * thread_num]
    else: list_stock_code_thread =  list_stock_code[thread_length * (thread_num - 1):]
    if thread_num == 1: df_thread = get_irbank_reports(list_stock_code_thread, track_progress = True)
    else: df_thread = get_irbank_reports(list_stock_code_thread)
    df_thread.to_feather("99_irbank_thread_" + str(thread_num) + ".arrow")

class ThreadClass:
    def irbank_thread_1():  irbank_thread(1)
    def irbank_thread_2():  irbank_thread(2)
    def irbank_thread_3():  irbank_thread(3)
    def irbank_thread_4():  irbank_thread(4)
    def irbank_thread_5():  irbank_thread(5)
    def irbank_thread_6():  irbank_thread(6)
    def irbank_thread_7():  irbank_thread(7)
    def irbank_thread_8():  irbank_thread(8)
    def irbank_thread_9():  irbank_thread(9)
    def irbank_thread_10(): irbank_thread(10)

def combine_irbank_threads(company_list_file = "00_company_list.xlsx", company_list_sheet = "対象企業リスト"):
    thread_functions = [method for method in dir(ThreadClass) if callable(getattr(ThreadClass, method)) and not method.startswith("__")]
    threads = [Thread(target = getattr(ThreadClass, thread_function)) for thread_function in thread_functions]
    for thread in threads: thread.start()
    for thread in threads: thread.join()

    df_combined = pd.DataFrame()
    for thread_num in range(len(thread_functions)):
        df_thread = pd.read_feather("99_irbank_thread_" + str(thread_num + 1) + ".arrow")
        if len(df_combined) == 0: df_combined = df_thread
        else: df_combined = pd.concat([df_combined, df_thread])
    df_short = df_combined.drop_duplicates(subset = "stock_code")
    df_combined = df_combined.drop(["file_content", "file_content_readable"], axis = 1)

    df_company = pd.read_excel(company_list_file, sheet_name = company_list_sheet)
    df_company = df_company.rename(columns = {"証券コード": "stock_code", "企業名": "company_name"})
    df_short = pd.merge(df_company, df_short, on = "stock_code", how = "left")
    df_short = df_short.explode("file_content")

    df_combined.to_feather("99_irbank_thread_long.arrow")
    df_short.to_feather("99_irbank_thread_short.arrow")
    for thread_num in range(len(thread_functions)):
        os.remove("99_irbank_thread_" + str(thread_num + 1) + ".arrow")

    with pd.ExcelWriter("01_irbank_results.xlsx") as writer:
        df_combined.to_excel(writer, sheet_name = "df_long", index = False)
        df_short.to_excel(writer, sheet_name = "df_short", index = False)

In [7]:
combine_irbank_threads()

  2%|▏         | 9/393 [00:16<08:46,  1.37s/it]

MuPDF error: format error: cycle in structure tree



 30%|██▉       | 117/393 [03:31<04:55,  1.07s/it]

MuPDF error: format error: cycle in structure tree



100%|██████████| 393/393 [11:02<00:00,  1.68s/it]
