<a href="https://colab.research.google.com/github/iim0663418/LLMDataOptimization/blob/main/Apache_Tika_%E6%96%87%E6%9C%AC%E6%8A%BD%E5%8F%96.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Apache Tika

Apache Tika 是一個開源的內容分析工具，它可以從多種文檔格式中提取文本和元數據。雖然它主要用於內容分析，但也可以用來進行文件格式轉換。

In [1]:
!pip install tika

Collecting tika
  Downloading tika-2.6.0.tar.gz (27 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: tika
  Building wheel for tika (setup.py) ... [?25l[?25hdone
  Created wheel for tika: filename=tika-2.6.0-py3-none-any.whl size=32623 sha256=664c158cbb4119ca3e78c4c9bb0cf767b776cf9fd93a261def80e9ab0f45674a
  Stored in directory: /root/.cache/pip/wheels/5f/71/c7/b757709531121b1700cffda5b6b0d4aad095fb507ec84316d0
Successfully built tika
Installing collected packages: tika
Successfully installed tika-2.6.0


In [None]:
import os

def create_directories():
    input_dir = "/content/input"
    output_dir = "/content/output"

    # 創建 /content/input 目錄
    os.makedirs(input_dir, exist_ok=True)
    print(f"Directory created: {input_dir}")

    # 創建 /content/output 目錄
    os.makedirs(output_dir, exist_ok=True)
    print(f"Directory created: {output_dir}")

# 執行函數創建目錄
create_directories()


In [5]:
import os
from multiprocessing import Pool, TimeoutError
from tika import parser

# 記錄錯誤日誌
def log_error(file_path, error):
    with open("error_log.txt", "a", encoding="utf-8") as log_file:
        log_file.write(f"Error processing {file_path}: {error}\n")

import re

def remove_control_characters(text):
    # 移除非打印字符
    text = re.sub(r'[\x00-\x1F\x7F]', '', text)
    return text

def remove_duplicate_punctuation(text):
    text = re.sub(r'([.?!,])\1+', r'\1', text)  # 替換重複的標點
    return text

def remove_unwanted_tags(text):
    # 移除特定的標記，如頁碼（假設格式為 "Page X of Y"）
    text = re.sub(r'Page \d+ of \d+', '', text)
    # 移除其他特定格式（例如 "[註腳1]" 等）
    text = re.sub(r'\[\w+\d+\]', '', text)
    return text

def rejoin_broken_paragraphs(text):
    # 假設段落之間有兩個換行符，段落內部只有一個換行符
    text = re.sub(r'(?<!\n)\n(?!\n)', ' ', text)
    return text

def normalize_whitespace(text):
    text = re.sub(r'\s+', ' ', text)  # 多個空格轉換為一個
    text = re.sub(r'\s([?.!,;:])', r'\1', text)  # 去掉標點符號前的空格
    return text

def fix_encoding_issues(text):
    # 例如，將常見的編碼問題字符替換為正確字符
    text = text.replace('â€™', "'").replace('â€œ', '"').replace('â€', '"')
    return text

def remove_urls_and_emails(text):
    # 移除 URL
    text = re.sub(r'http\S+', '', text)
    # 移除 Email 地址
    text = re.sub(r'\S+@\S+', '', text)
    return text

def handle_bullet_points(text):
    # 將項目符號（如 "*", "-"）前的空格進行規範化
    text = re.sub(r'\n\s*([*-])', r'\n\1', text)
    return text

def comprehensive_text_cleaning(text):
    text = remove_control_characters(text)
    text = remove_duplicate_punctuation(text)
    text = normalize_whitespace(text)
    text = remove_unwanted_tags(text)
    text = rejoin_broken_paragraphs(text)
    text = handle_bullet_points(text)
    text = fix_encoding_issues(text)
    text = remove_urls_and_emails(text)
    return text

# 提取文本並保存的函數
def extract_text_and_save(args):
    input_path, output_folder = args
    try:
        file_base = os.path.splitext(os.path.basename(input_path))[0]
        output_path = os.path.join(output_folder, f"{file_base}.txt")

        # 使用 Tika 解析文件並提取文本
        text_content = parser.from_file(input_path)["content"]

        # 綜合文本清理
        cleaned_text = comprehensive_text_cleaning(text_content)

        # 保存清理後的文本
        with open(output_path, "w", encoding="utf-8") as output_file:
            output_file.write(cleaned_text or "")
        print(f"Processed {input_path} -> {output_path}")

    except Exception as e:
        print(f"Error processing {input_path}: {e}")
        log_error(input_path, e)

# 批次處理文件的函數
def process_batch(file_batch, output_folder, timeout):
    for file_path in file_batch:
        try:
            extract_text_and_save_with_retry((file_path, output_folder), timeout)
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            log_error(file_path, e)

# 將文件分成批次
def chunk_files(file_list, chunk_size):
    for i in range(0, len(file_list), chunk_size):
        yield file_list[i:i + chunk_size]

# 改進的批次處理函數
def batch_process_with_chunks(input_folder, output_folder, num_workers=4, timeout=300, chunk_size=100):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    file_list = [os.path.join(input_folder, filename)
                 for filename in os.listdir(input_folder)
                 if os.path.isfile(os.path.join(input_folder, filename))]

    file_chunks = list(chunk_files(file_list, chunk_size))

    with Pool(processes=num_workers) as pool:
        for file_chunk in file_chunks:
            pool.apply_async(process_batch, args=(file_chunk, output_folder, timeout))

        pool.close()
        pool.join()

# 帶重試的提取文本並保存函數
def extract_text_and_save_with_retry(args, retries=3):
    input_path, output_folder = args
    for attempt in range(retries):
        try:
            extract_text_and_save(args)
            return
        except Exception as e:
            print(f"Attempt {attempt + 1} failed for {input_path}: {e}")
            if attempt + 1 == retries:
                log_error(input_path, e)

# 批次處理資料夾
def batch_process_with_timeout(input_folder, output_folder, num_workers=4, timeout=300):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    file_list = [(os.path.join(input_folder, filename), output_folder)
                 for filename in os.listdir(input_folder)
                 if os.path.isfile(os.path.join(input_folder, filename))]

    with Pool(processes=num_workers) as pool:
        for file_args in file_list:
            try:
                pool.apply_async(extract_text_and_save_with_retry, args=(file_args,)).get(timeout=timeout)
            except TimeoutError:
                print(f"Timeout processing {file_args[0]}")
                log_error(file_args[0], "TimeoutError")
            except Exception as e:
                print(f"Error processing {file_args[0]}: {e}")
                log_error(file_args[0], e)

# 使用範例
input_folder = "/content/input"
output_folder = "/content/output"
num_workers = 4  # 設置使用的進程數
timeout = 300  # 設置每個文件的處理超時時間（秒）
chunk_size = 100  # 每批次處理的文件數量

batch_process_with_chunks(input_folder, output_folder, num_workers, timeout, chunk_size)


Processed /content/input/附件1-113年第1次資料開放工作小組會議紀錄 (1).odt -> /content/output/附件1-113年第1次資料開放工作小組會議紀錄 (1).txt
Processed /content/input/113年第2次資料開放工作小組會議紀錄.odt -> /content/output/113年第2次資料開放工作小組會議紀錄.txt
Processed /content/input/附件1-1111006業務會議紀錄.odt -> /content/output/附件1-1111006業務會議紀錄.txt


In [6]:
import shutil
import os

def zip_output_directory():
    output_dir = "/content/output"
    zip_file_path = "/content/output_archive.zip"

    # 確保 output 目錄存在
    if os.path.exists(output_dir):
        # 使用 shutil.make_archive 創建 ZIP 文件
        shutil.make_archive(zip_file_path.replace('.zip', ''), 'zip', output_dir)
        print(f"Directory {output_dir} has been zipped into {zip_file_path}")
    else:
        print(f"Directory {output_dir} does not exist")

# 執行打包
zip_output_directory()


Directory /content/output has been zipped into /content/output_archive.zip
