In [1]:
#テキストの属性をもとに段落を判別しつつテキストデータを取得
#ヘッダ・フッタ等をテキストブロックの属性情報（特に位置情報）で除外
#テキストを取得する範囲も、テキストブロックの開始位置の座標情報をもとに、強制的に読み込み順序を制御
#（リスト形式で、x軸の閾値＝段組みの境界線を設定：ブランクのリストであればブロック・リストの順番通りに読み込み）
#各ブロック・リストの座標の分布は、Extract_Pdf_Attrib.ipynbで確認可能
import fitz  # PyMuPDF
import pandas as pd
import re
import sys
sys.path.append('/work_dir') 
import csv
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import html
import xml.etree.ElementTree as ET
import xml.sax.saxutils as saxutils  # XMLのエスケープ処理
import re
from xml.dom import minidom  # XML整形用

# ログファイルの設定
log_filename = "data/logfile_Pdf2Txt.log"
logging.basicConfig(
    level=logging.INFO,  # ログレベルの設定 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    format="%(asctime)s - %(levelname)s - %(message)s",  # ログのフォーマット
    handlers=[
        logging.FileHandler(log_filename, mode="a", encoding="utf-8"),  # ファイルに記録
        #logging.StreamHandler()  # コンソールにも出力
    ]
)


In [2]:
# CSVを読み込む関数
def load_csv(file_path):
    """
    CSVを読み込み、DataFrameを返す
    file_path: CSVファイルのパス
    """
    try:
        return pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: CSV file not found at {file_path}")
        logging.error(f"Error: CSV file not found at {file_path}")
        return None
    except pd.errors.EmptyDataError:
        print(f"Error: CSV file at {file_path} is empty or invalid")
        logging.error(f"Error: CSV file at {file_path} is empty or invalid")
        return None
    except Exception as e:
        print(f"Error reading CSV file: {e}")
        logging.error(f"Error reading CSV file: {e}")
        return None

In [3]:
def clean_text_by_attributes(page, header_y, footer_y, left_margin_x, right_margin_x, column_thresholds, line_spacing_factor):
    """
    指定されたページのテキストを、指定したヘッダー・フッター・マージンの範囲外を除外しながら整形する。
    - ヘッダー、フッター、左右マージンを考慮し、不要なテキストを除去。
    - 複数列のテキストを適切に処理する。
    - フォントサイズに応じて適切な行間を設定し、改行を挿入。
    - XMLエスケープ処理を行う。
    """
    text = ""  # 抽出されたテキストを格納する変数
    blocks = page.get_text("dict")["blocks"]  # ページからテキストブロックを取得

    def get_range_index(x0, ranges):
        """ 指定された x0 座標がどのカラム範囲に属するかを判定 """
        for i, bound in enumerate(ranges):
            if x0 < bound:
                return i
        return len(ranges)  # 範囲外は最後の区分として扱う

    # `column_thresholds` が None の場合は空のリストに初期化
    if column_thresholds is None:
        column_thresholds = []

    # 列ごとにテキストブロックを分類する辞書を初期化
    sorted_blocks = {i: [] for i in range(len(column_thresholds) + 1)}

    # 各テキストブロックを処理
    for block_index, block in enumerate(blocks):
        if "lines" not in block or not block["lines"]:
            continue  # 空のブロックをスキップ
        for line_index, line in enumerate(block["lines"]):
            if not line["spans"]:
                continue  # 空の行をスキップ
            line_bbox_x0 = line["spans"][0]["bbox"][0]  # 行の最初のスパンのX座標を取得
            range_index = get_range_index(line_bbox_x0, column_thresholds)  # カラムを判定
            sorted_blocks.setdefault(range_index, []).append((block_index, block, line))  # カテゴリごとに分類

    # 各範囲のブロックを処理
    previous_font = None
    previous_size = None
    previous_color = None
    previous_bbox = None
    previous_text_type = None

    # `line_spacing_factor` が None の場合、デフォルト値を設定
    if line_spacing_factor is None:
        line_spacing_factor = 2

    for range_index, blocks in sorted_blocks.items():  # カラムごとに処理
        for block_index, block, line in blocks:
            for span in line["spans"]:
                bbox = span["bbox"]  # テキストの座標情報
                content = span["text"]  # 実際のテキスト内容
                font = span["font"]  # フォント情報
                size = span["size"]  # 文字サイズ
                color = span["color"]  # 文字色
                text_type = span.get("text_type", "text")  # 文字種別

                # 指定されたヘッダー・フッター・マージンの範囲外を除外
                if bbox[3] is not None and header_y is not None and bbox[3] < header_y:
                    continue
                if bbox[1] is not None and footer_y is not None and bbox[1] > footer_y:
                    continue
                if bbox[2] is not None and left_margin_x is not None and bbox[2] < left_margin_x:
                    continue
                if bbox[0] is not None and right_margin_x is not None and bbox[0] > right_margin_x:
                    continue

                # 行間の閾値をフォントサイズに基づいて設定
                line_threshold = size * line_spacing_factor

                # 前の行との行間を比較し、新しい段落とみなすか判定
                if (
                    (previous_bbox and abs(bbox[3] - previous_bbox[3]) > line_threshold) or
                    previous_text_type != text_type
                ):
                    text += "\n\n"  # 新しい段落を開始
                else:
                    text += ""  # 既存の段落に追加

                # 不要なスペースの削除
                content = re.sub(r" \n", "\n", content)  # 行末の半角スペースを削除
                content = re.sub(r"^ ", "", content)  # 行頭のスペースを削除

                text += content  # 整形済みテキストを追加

                # 前の状態を更新
                previous_font = font
                previous_size = size
                previous_color = color
                previous_bbox = bbox
                previous_text_type = text_type

    # 余分な改行を調整
    text = re.sub(r"\n{3,}", "\n\n", text).strip()

    # XML/HTML特殊文字のエスケープ処理
    escaped_text = html.escape(text)

    return escaped_text  # 整形されたテキストを返す


In [4]:
# 1. 目次データをCSVから読み込む
def load_toc_mapping(csv_path):
    df = pd.read_csv(csv_path)  # "page" 列に開始ページ, "keyword" 列にタグ
    return df.sort_values(by="page").reset_index(drop=True)

# 2. PDFからページごとのテキストを抽出
def extract_text_from_pdf(pdf_path):
    doc = fitz.open(pdf_path)
    extracted_text = {}

    for page_num in range(len(doc)):
        text = doc[page_num].get_text("text")
        text = clean_text(text)  # テキストのクリーンアップ（エンコード, 制御文字削除）
        extracted_text[page_num + 1] = text

    return extracted_text

# 3. 特殊文字をエスケープ & 制御文字を削除
def clean_text(text):
    # 制御文字（不可視文字）を削除
    text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text)

    # XMLで問題になる特殊文字をエスケープ (&, <, >)
    text = saxutils.escape(text)

    return text

# 4. 目次キーワードをページごとに適用
def assign_tags_from_csv(extracted_text, toc_mapping):
    """
    抽出されたPDFのテキストデータに対して、CSVで指定された目次キーワードを適用し、
    各ページに対応するタグを付与する。
    
    - 同じページに複数のキーワードが存在する場合、元の `toc_mapping` の順序を維持して格納する。
    - 目次データのページ範囲を考慮し、次の見出しが始まる前まで適用。
    - `toc_mapping` に記載のないページには、直前のページのタグを適用する。
    
    Parameters:
        extracted_text (dict): {ページ番号: テキスト} の辞書
        toc_mapping (DataFrame): 目次データ（'page'列と'keyword'列を含む）
    
    Returns:
        dict: {ページ番号: {"text": テキスト, "tag": [キーワード]}} の辞書
    """
    tagged_data = {}  # タグ付けされたデータを格納する辞書
    pages = sorted(extracted_text.keys())  # ページ番号をソート

    # `page` 列を整数型に変換
    toc_mapping["page"] = toc_mapping["page"].astype(int)
    
    # `toc_mapping` の順序を維持したまま、ページごとのキーワードリストを作成
    sorted_toc = toc_mapping.sort_values(by=["page"], ascending=True)
    sorted_pages = sorted_toc["page"].tolist()  # 目次のページリスト
    prev_tags = []  # 直前のタグを保持するリスト
    
    for page_num in pages:  # すべてのページに対して処理
        if page_num in sorted_pages:
            # `toc_mapping` に記載があるページはそのキーワードを取得
            prev_tags = sorted_toc.loc[sorted_toc["page"] == page_num, "keyword"].dropna().tolist()
        
        if not prev_tags:
            prev_tags = ["unknown"]  # どのタグも適用されていない場合、'unknown' を設定
        
        tagged_data[page_num] = {"text": extracted_text[page_num], "tag": prev_tags.copy()}  # タグをリストとして格納
        #logging.info(f"Page {page_num}: Assigned tags {prev_tags}")
    
    return tagged_data  # タグ付けされたデータを返す

    
# 5. XMLを生成（タグの順序変更 + 可読性向上）
def create_pretty_xml(tagged_data, output_file):
    """
    タグ付けされたデータをXML形式で出力し、適切に整形する。
    
    Parameters:
        tagged_data (dict): {ページ番号: {"text": テキスト, "tag": [キーワード]}} の辞書
        output_file (str): 出力するXMLファイルのパス
    """
    root = ET.Element("document")

    for page_num, data in tagged_data.items():
        page_elem = ET.SubElement(root, "page", number=str(page_num))

        tag_elem = ET.SubElement(page_elem, "tag")
        tag_elem.text = ", ".join(data["tag"]) if isinstance(data["tag"], list) else "Unknown"  # タグをカンマ区切りで結合

        content_elem = ET.SubElement(page_elem, "content")
        content_elem.text = f"<![CDATA[{data['text']}]]>" if isinstance(data["text"], str) else ""

    rough_string = ET.tostring(root, encoding="utf-8")

    try:
        parsed_xml = minidom.parseString(rough_string)
        pretty_xml = parsed_xml.toprettyxml(indent="  ")
    except Exception as e:
        logging.error(f"XML Parsing Error: {e}")
        pretty_xml = rough_string.decode("utf-8")

    with open(output_file, "w", encoding="utf-8") as f:
        f.write("\n".join(line for line in pretty_xml.split("\n") if line.strip()))


In [5]:
def main():

    #実行モードを設定
    #複数企業の情報をcsvから読み込んで一括処理する場合、特定の企業のみ処理する場合を選択
    #0:ファイルから読込、1:直接指定
    Process_Mode=1 

    if Process_Mode == 0:
        # CSVファイルのパスを指定
        #csv_path = 'data/Attrib_Analysis_Target_20250114_test.csv'  # CSVのパスを指定
        #csv_path = 'data/Attrib_Analysis_Target_20250219_Denka.csv'  # CSVのパスを指定
        csv_path = 'data/Attrib_Analysis_Target_20250228_temp.csv'  # CSVのパスを指定

        # 統合報告書がリストされたCSVを読み込む
        df_target = load_csv(csv_path)
        if df_target is None:
            print("Failed to load CSV. Exiting program.")
            logging.error("Failed to load CSV. Exiting program.")
            return

    elif Process_Mode ==1:
        #=========================================================================================
        #-----------------------------------------------------------------------------------------
        #単独処理：格納フォルダ、統合報告書PDFのファイル名、行間設定を指定
        temp_list = ['79360_アシックス_2022', 'アシックス統合報告書2022.pdf', 2]
        df_target = pd.DataFrame([temp_list],columns=['格納フォルダ','統合報告書PDF','行間設定'])
        #-----------------------------------------------------------------------------------------
        #=========================================================================================

    else:
        print('Process_Modeは "0" または "1" を指定してください。')
        return
    
    # 必要な列名を指定
    folder_col = '格納フォルダ'
    file_col = '統合報告書PDF'
    #target_col = 'target_page'
    line_spacing_factor_col = '行間設定'

    # CSVに必要な列がなければエラー表示
    if folder_col not in df_target.columns or file_col not in df_target.columns not in df_target.columns:
        #print(f"Error: CSV file must contain '{folder_col}' and '{file_col}' columns")
        logging.error(f"Error: CSV file must contain '{folder_col}' and '{file_col}' columns")
        return

    # リストの企業ごとに、pdfのテキストデータを取得・保存
    #file_paths = []
    for index, row in df_target.iterrows():

        #================================================================
        #前処理（フォルダの確認・パス名の定義　等）
        folder_name = 'ir/2025/' + row[folder_col]
        file_name = row[file_col]
        line_spacing_factor = row[line_spacing_factor_col]

        info_file_path = folder_name + '/' + 'info'
        pdf_path = folder_name + "/" + file_name
        margin_path = info_file_path  + "/" + "margin.csv"
        multicolumn_path = info_file_path  + "/" + "multicolumn.txt"
        # 目次見出し出力ファイルパス
        contents_csv_path = info_file_path  + "/" + "contents.csv"
        # 出力ファイルパス
        output_file_path = pdf_path[:len(pdf_path)-4] + ".xml"
        raw_file_path = pdf_path[:len(pdf_path)-4] + ".txt"

        
        # パスが有効かチェック
        if pd.isna(folder_name) or pd.isna(file_name):
            print(f"Skipping row {index}: Folder or File is missing")
            logging.warning(f"Skipping row {index}: Folder or File is missing")
            file_path =''
            return
        #属性情報を格納するフォルダ（info）の有無を確認
        if not os.path.exists(info_file_path):
            logging.warning(f"Skipping row {index}: No info Folder")
            return

        print(f"Processing {folder_name}/{file_name}")
        logging.info(f"Processing {folder_name}/{file_name}")

        # フィルタリング範囲の指定（適宜調整）
        # 初期化 (ファイルが存在しない場合もNoneになるようにする)
        header_y = None
        footer_y = None
        left_margin_x = None
        right_margin_x = None

        try:
            # margin.csvの読み込み
            df_margin = pd.read_csv(margin_path)
            
            # 変数への値の格納 (ブランクの場合はNoneに)
            header_y = df_margin.iloc[0]['header_y'] if 'header_y' in df_margin.columns and pd.notnull(df_margin.iloc[0]['header_y']) else None
            footer_y = df_margin.iloc[0]['footer_y'] if 'footer_y' in df_margin.columns and pd.notnull(df_margin.iloc[0]['footer_y']) else None
            left_margin_x = df_margin.iloc[0]['left_margin_x'] if 'left_margin_x' in df_margin.columns and pd.notnull(df_margin.iloc[0]['left_margin_x']) else None
            right_margin_x = df_margin.iloc[0]['right_margin_x'] if 'right_margin_x' in df_margin.columns and pd.notnull(df_margin.iloc[0]['right_margin_x']) else None
        
        except FileNotFoundError:
            # ファイルが存在しない場合、全ての変数をNoneのままにする
            logging.warning(f"File 'margin.csv' is missing, default values were applied.")
            pass
    
        # 結果を確認
        #print(f"header_y: {header_y}")
        #print(f"footer_y: {footer_y}")
        #print(f"left_margin_x: {left_margin_x}")
        #print(f"right_margin_x: {right_margin_x}")
        
        #段組み情報を設定
        # 段組みの閾値を x0 で設定
        column_thresholds = []  
        try:
            # ファイルを開いて読み込む
            with open(multicolumn_path, 'r', encoding='utf-8-sig') as mc_file:
                for line in mc_file:
                    # 各行をカンマで分割し、数値に変換してリストに格納
                    column_thresholds.extend([float(value) for value in line.strip().split(',') if value])
        except FileNotFoundError:
            logging.warning(f"File 'multicolumn.txt' is missing.")
            pass

        except ValueError:
            logging.warning(f"File 'multicolumn.txt' includes non-value data.")
            pass
        
        # 読み込んだ段組み閾値情報を確認
        #print(column_thresholds)
    
        # PDFファイルのオープン
        doc = fitz.open(pdf_path)
        
        # PDFからテキストを抽出し、ヘッダ等を除去
        cleaned_text = {}
        tmp_text = ""
        cleaned_raw_text =""
        
        for page_number, page in enumerate(doc, start=0):

            tmp_text= clean_text_by_attributes(page, header_y, footer_y, left_margin_x, right_margin_x, column_thresholds, line_spacing_factor).encode("utf-8", errors="ignore").decode("utf-8") + "\n"
            
            #====================================
            #不要な特定文字列の除去
            #XML作成時にエラー（ExpatError: not well-formed (invalid token)）が起こる時は、raw_textから該当文字列を探して除去
            cleaned_text[page_number +1] = re.sub(r"[ʝ]+", "", tmp_text)
            #cleaned_text[page_number +1] = re.sub(r"[ʝػମ࣌ࣄۀʢ݄ൃද࣌]+", "", tmp_text)

            #====================================
            
            cleaned_raw_text += cleaned_text[page_number +1]
        
        #ファイル出力
        #テキストファイル出力：エスケープ文字を元に戻してから出力
        cleaned_raw_text = html.unescape(cleaned_raw_text)
        with open(raw_file_path, "w", encoding="utf-8") as f:
            f.write(cleaned_raw_text)
        #XMLファイルを生成：エラーが出たら都度検証・対応
        toc_mapping = load_toc_mapping(contents_csv_path)  # 目次情報を読み込み
        tagged_data = assign_tags_from_csv(cleaned_text, toc_mapping)  # タグを適用
        create_pretty_xml(tagged_data, output_file_path)  # XMLを生成（可読性向上）

if __name__ == "__main__":
    main()
    print("処理が終了しました。")

Processing ir/2025/79360_アシックス_2022/アシックス統合報告書2022.pdf
処理が終了しました。
