In [2]:
import os
import pandas as pd
import yaml

def get_document_files(config_path, patients_parquet, output_path):
    # 設定ファイルを読み込む
    with open(config_path, 'r') as file:
        config = yaml.safe_load(file)
    
    # 患者フォルダ一覧をロード
    patients_df = pd.read_parquet(patients_parquet)
    
    # ベースフォルダの設定（../mydata/00_working）
    base_folder = os.path.normpath('../mydata/00_working')
    
    # 収集する拡張子
    target_extensions = {'.ppt', '.doc', '.xls', '.pptx', '.docx', '.xlsx'}
    
    # ファイル情報を収集
    document_files = []
    for _, row in patients_df.iterrows():
        # 患者フォルダのパスを取得
        dir_path = os.path.join(base_folder, row['path'])
        dirname = os.path.basename(dir_path)
        
        if not os.path.isdir(dir_path):
            print(f"Warning: '{dir_path}' is not a valid directory.")
            continue
        
        # フォルダ内のファイルを探索
        for root, _, files in os.walk(dir_path):
            for file in files:
                ext = os.path.splitext(file)[1].lower()
                if ext in target_extensions:  # 指定された拡張子の場合
                    # 相対パスを計算
                    file_rel_path = os.path.relpath(os.path.join(root, file), base_folder)
                    document_files.append({
                        "dirname": dirname,
                        "dir_path": row['path'],
                        "filename": file,
                        "ext": ext,
                        "file_path": file_rel_path
                    })
    
    # データフレームに変換
    df_documents = pd.DataFrame(document_files)
    df_documents.index.name = 'index'  # インデックス列
    df_documents = df_documents.reset_index()
    
    # データフレームをParquet形式で保存
    df_documents.to_parquet(output_path, index=False)
    print(f"Document files saved to {output_path}")

# 設定ファイルのパス
config_path = './config2.yaml'

# 患者フォルダ一覧のParquetファイル
patients_parquet = '../01_Process_Exceptions/patients_dirs.parquet'

# 出力先のParquetファイル
output_path = './document_files.parquet'

# 実行
get_document_files(config_path, patients_parquet, output_path)


Document files saved to ./document_files.parquet


In [None]:
# データフレームの全列の文字列表示幅を設定
pd.set_option('display.max_colwidth', None)

df_docs = pd.read_parquet('./document_files.parquet')
print(len(df_docs))
df_docs[:10]

In [None]:
# 古いドキュメントファイルを新しいドキュメントファイルに変換

import subprocess

# LibreOfficeのパス（適宜変更してください）
LIBREOFFICE_PATH = '/Applications/LibreOffice.app/Contents/MacOS/soffice'

def get_unique_file_path(base_path):
    """
    ファイルパスが重複する場合に連番を付けて一意なパスを返す
    """
    if not os.path.exists(base_path):
        return base_path
    base, ext = os.path.splitext(base_path)
    counter = 1
    while os.path.exists(f"{base}_{counter}{ext}"):
        counter += 1
    return f"{base}_{counter}{ext}"

def convert_to_new_format(old_path, new_ext, convert_command):
    """
    LibreOfficeを使用してファイルを新しい形式に変換
    """
    # 新しいファイルパスを作成（同階層に保存）
    new_path = os.path.splitext(old_path)[0] + new_ext
    new_path = get_unique_file_path(new_path)  # ファイル名の重複を防ぐ

    # LibreOfficeを使用して変換
    command = [LIBREOFFICE_PATH, '--headless', '--convert-to', new_ext[1:], '--outdir', os.path.dirname(old_path), old_path]
    result = subprocess.run(command, capture_output=True, text=True)

    # エラー時の処理
    if result.returncode != 0:
        print(f"Error converting {old_path}: {result.stderr}")
        return None

    # 新しいファイルが正しく作成されたか確認
    if not os.path.exists(new_path) or os.path.getsize(new_path) == 0:
        print(f"Conversion failed or resulted in an empty file for {old_path}")
        return None

    return new_path

def process_documents(parquet_path, base_folder='../mydata/00_working'):
    """
    .doc, .ppt, .xls ファイルを新形式に変換し、失敗したファイルをリスト化
    """
    df_documents = pd.read_parquet(parquet_path)
    failed_files = []  # 変換に失敗したファイルのリスト

    # 対象拡張子と変換コマンドをマッピング
    converters = {
        '.ppt': '.pptx',
        '.doc': '.docx',
        '.xls': '.xlsx'
    }

    # 各ファイルを処理
    for _, row in df_documents.iterrows():
        ext = row['ext'].lower()
        if ext in converters:
            old_file_path = os.path.join(base_folder, row['file_path'])
            if not os.path.exists(old_file_path):
                print(f"File not found: {old_file_path}")
                failed_files.append(old_file_path)
                continue

            print(f"Converting {old_file_path}...")
            new_file_path = convert_to_new_format(
                old_path=old_file_path,
                new_ext=converters[ext],
                convert_command=LIBREOFFICE_PATH
            )

            if new_file_path:
                print(f"Converted to: {new_file_path}")
            else:
                print(f"Failed to convert: {old_file_path}")
                failed_files.append(old_file_path)

    # 開けなかったファイルを出力
    print("\nFiles that could not be converted:")
    for failed_file in failed_files:
        print(failed_file)

    return failed_files

# Parquetファイルのパス
parquet_path = './document_files.parquet'

# 実行
failed_files = process_documents(parquet_path)


In [None]:
# 新しいドキュメント形式にしたものだけを抽出する

def collect_files_by_extensions(config_path, patients_parquet, target_extensions, output_path):
    """
    指定した拡張子のファイルを収集し、Parquet形式で保存する関数

    Parameters:
        config_path (str): 設定ファイルのパス
        patients_parquet (str): 患者フォルダ一覧のParquetファイルパス
        target_extensions (set): 収集したい拡張子のセット
        output_path (str): 出力先のParquetファイルパス
    """
    # 設定ファイルを読み込む
    with open(config_path, 'r') as file:
        config = yaml.safe_load(file)
    
    # 患者フォルダ一覧をロード
    patients_df = pd.read_parquet(patients_parquet)
    
    # ベースフォルダの設定（../mydata/00_working）
    base_folder = os.path.normpath('../mydata/00_working')
    
    # ファイル情報を収集
    collected_files = []
    for _, row in patients_df.iterrows():
        # 患者フォルダのパスを取得
        dir_path = os.path.join(base_folder, row['path'])
        dirname = os.path.basename(dir_path)
        
        if not os.path.isdir(dir_path):
            print(f"Warning: '{dir_path}' is not a valid directory.")
            continue
        
        # フォルダ内のファイルを探索
        for root, _, files in os.walk(dir_path):
            for file in files:
                ext = os.path.splitext(file)[1].lower()
                if ext in target_extensions:  # 指定された拡張子の場合
                    # 相対パスを計算
                    file_rel_path = os.path.relpath(os.path.join(root, file), base_folder)
                    collected_files.append({
                        "dirname": dirname,
                        "dir_path": row['path'],
                        "filename": file,
                        "ext": ext,
                        "file_path": file_rel_path
                    })
    
    # データフレームに変換
    df_collected = pd.DataFrame(collected_files)
    df_collected.index.name = 'index'  # インデックス列
    df_collected = df_collected.reset_index()
    
    # データフレームをParquet形式で保存
    df_collected.to_parquet(output_path, index=False)
    print(f"Files saved to {output_path}")

# 設定ファイルのパス
config_path = './config2.yaml'

# 患者フォルダ一覧のParquetファイル
patients_parquet = '../01_Process_Exceptions/patients_dirs.parquet'

# 正確に収集したい拡張子のみを指定
target_extensions = {'.pptx', '.docx', '.xlsx'}

# 出力先のParquetファイル
output_path = './new_document_files.parquet'

# 実行
collect_files_by_extensions(config_path, patients_parquet, target_extensions, output_path)

df_new_docs = pd.read_parquet('./new_document_files.parquet')
print(len(df_new_docs))
df_new_docs[:10]

In [None]:
import zipfile
from pptx import Presentation
from docx import Document


def get_unique_folder_path(base_path):
    """
    フォルダパスが重複する場合に連番を付けて一意なパスを返す
    """
    if not os.path.exists(base_path):
        return base_path
    counter = 1
    while os.path.exists(f"{base_path}_{counter}"):
        counter += 1
    return f"{base_path}_{counter}"

def extract_images_from_pptx(pptx_path, extract_dir):
    """Extract images from .pptx files"""
    presentation = Presentation(pptx_path)
    supported_formats = ['bmp', 'gif', 'jpeg', 'jpg', 'png', 'tiff', 'wmf']
    image_count = 0

    for slide in presentation.slides:
        for shape in slide.shapes:
            if shape.shape_type == 13:  # shape_type 13は画像
                try:
                    image = shape.image
                    image_format = image.ext.lower()
                    if image_format in supported_formats:
                        image_bytes = image.blob
                        image_filename = f"image{image_count}.{image_format}"
                        image_path = os.path.join(extract_dir, image_filename)
                        with open(image_path, 'wb') as image_file:
                            image_file.write(image_bytes)
                        image_count += 1
                except ValueError:
                    continue

def extract_images_from_docx(docx_path, extract_dir):
    """Extract images from .docx files"""
    doc = Document(docx_path)
    image_count = 0
    for rel in doc.part.rels.values():
        if "image" in rel.target_ref:
            image = rel.target_part.blob
            image_format = rel.target_ref.split('.')[-1]
            image_filename = f"image{image_count}.{image_format}"
            image_path = os.path.join(extract_dir, image_filename)
            with open(image_path, 'wb') as image_file:
                image_file.write(image)
            image_count += 1

def extract_images_from_xlsx(xlsx_path, extract_dir):
    """Extract images from .xlsx files"""
    with zipfile.ZipFile(xlsx_path, 'r') as zip_ref:
        for file in zip_ref.namelist():
            if file.startswith('xl/media/'):
                extracted_file_path = zip_ref.extract(file, extract_dir)
                image_filename = os.path.basename(extracted_file_path)
                os.rename(
                    extracted_file_path,
                    os.path.join(extract_dir, image_filename)
                )

def process_documents_for_images(parquet_path, base_folder='../mydata/00_working', failed_files_path='./failed_files.txt'):
    """Process .pptx, .docx, .xlsx files to extract images"""
    df_documents = pd.read_parquet(parquet_path)
    failed_files = []  # 開けなかったドキュメントファイルを記録

    for _, row in df_documents.iterrows():
        file_path = os.path.join(base_folder, row['file_path'])
        ext = row['ext'].lower()
        if not os.path.exists(file_path):
            print(f"File not found: {file_path}")
            failed_files.append(file_path)
            continue

        # 抽出用のフォルダを同階層に作成
        extract_folder_name = f"extracted_{row['filename'].split('.')[0]}"
        extract_folder_path = os.path.join(os.path.dirname(file_path), extract_folder_name)
        extract_folder_path = get_unique_folder_path(extract_folder_path)  # 重複防止

        os.makedirs(extract_folder_path, exist_ok=True)

        # ファイル形式に応じて画像を抽出
        try:
            print(f"Processing {file_path}...")
            if ext == '.pptx':
                extract_images_from_pptx(file_path, extract_folder_path)
            elif ext == '.docx':
                extract_images_from_docx(file_path, extract_folder_path)
            elif ext == '.xlsx':
                extract_images_from_xlsx(file_path, extract_folder_path)
            else:
                print(f"Unsupported file type: {file_path}")
        except Exception as e:
            print(f"Failed to process {file_path}: {e}")
            failed_files.append(file_path)

        print(f"Images extracted to: {extract_folder_path}")

    # 開けなかったファイルを保存
    with open(failed_files_path, 'w') as f:
        for failed_file in failed_files:
            f.write(f"{failed_file}\n")

    print(f"Failed files list saved to {failed_files_path}")

# Parquetファイルのパス
parquet_path = './new_document_files.parquet'

# 実行
process_documents_for_images(parquet_path)