In [1]:
import os
import fitz  # PyMuPDF for PDFs
import pypandoc
import docx
import pytesseract
from pdf2image import convert_from_path
from PIL import Image
from bs4 import BeautifulSoup
import logging
from tqdm import tqdm

pytesseract.pytesseract.tesseract_cmd = "/usr/bin/tesseract"

# Configure logging to file only
log_file_path = "conversion_log.txt"
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file_path)  # Log only to file
    ]
)


def convert_to_text(file_path):
    """
    Convert a file to plain text.

    Parameters:
        file_path (str): The path to the input file.

    Returns:
        str: The plain text extracted from the file.
    """
    _, file_extension = os.path.splitext(file_path)
    file_extension = file_extension.lower()

    try:
        if file_extension == '.txt':
            logging.info(f"Reading plain text file: {file_path}")
            with open(file_path, 'r', encoding='utf-8') as file:
                return file.read()

        elif file_extension == '.pdf':
            logging.info(f"Extracting text from PDF: {file_path}")
            text = ""
            doc = fitz.open(file_path)
            for page in doc:
                text += page.get_text("text")
            doc.close()
            return text

        elif file_extension == '.docx':
            logging.info(f"Extracting text from DOCX: {file_path}")
            doc = docx.Document(file_path)
            return "\n".join(paragraph.text for paragraph in doc.paragraphs)

        elif file_extension in ['.doc', '.rtf', '.docx']:
            logging.info(f"Extracting text from {file_extension.upper()} using Pandoc: {file_path}")
            return pypandoc.convert_file(file_path, 'plain')

        elif file_extension == '.html':
            logging.info(f"Extracting text from HTML: {file_path}")
            with open(file_path, 'r', encoding='utf-8') as file:
                soup = BeautifulSoup(file, 'html.parser')
                return soup.get_text()

        else:
            raise ValueError(f"Unsupported file format: {file_extension} in {file_path}")

    except Exception as e:
        raise RuntimeError(f"Failed to convert {file_path} to text. Error: {e}")


def extract_from_scanned(file_path):
    logging.info(f"Performing OCR on scanned PDF: {file_path}")
    images = convert_from_path(file_path)
    text = ""
    for page_num, image in enumerate(images):
        page_text = pytesseract.image_to_string(image)
        text += f"--- Page {page_num + 1} ---\n{page_text}\n"
    return text


def to_text(file_path):
    try:
        text = convert_to_text(file_path)
        
        if len(text) == 0:
            raise RuntimeError("No content extracted from file.")
    except RuntimeError as e:
        if file_path.lower().endswith(".pdf"):
            logging.warning(f"Text extraction failed for {file_path}, attempting OCR. Error: {e}")
            try: 
                return extract_from_scanned(file_path)
            except Exception as e:
                logging.error(f"OCR failed for {file_path}. Error: {e}")
                return ""
        else:
            logging.error(f"Unsupported file or error in conversion: {file_path}. Skipping. Error: {e}")
            return ""


def convert_directory_to_text(dir_path, mirror_dir):
    """
    Recursively convert all supported files in a directory to text and save in a mirror directory.

    Parameters:
        dir_path (str): The path to the input directory.
        mirror_dir (str): The path to the mirror output directory where text files will be saved.
    """
    logging.info(f"Starting conversion. Source directory: {dir_path}, Target mirror directory: {mirror_dir}")

    # Collect all file paths
    all_files = []
    for root, _, files in os.walk(dir_path):
        for file_name in files:
            file_path = os.path.join(root, file_name)
            all_files.append(file_path)
    
    # Process files with tqdm progress bar
    for file_path in tqdm(all_files, desc="Converting files", unit="file"):
        # Determine mirror directory path
        relative_path = os.path.relpath(os.path.dirname(file_path), dir_path)
        target_dir = os.path.join(mirror_dir, relative_path)
        os.makedirs(target_dir, exist_ok=True)
        
        # Define target file path in mirror directory
        target_file_path = os.path.join(target_dir, f"{os.path.splitext(os.path.basename(file_path))[0]}.txt")

        try:
            text_content = to_text(file_path)
            if text_content:
                with open(target_file_path, 'w', encoding='utf-8') as text_file:
                    text_file.write(text_content)
                logging.info(f"Successfully converted and saved: {file_path} -> {target_file_path}")
            else:
                logging.warning(f"No content extracted from {file_path}. Check format and content.")
        
        except Exception as e:
            logging.error(f"Error processing file {file_path}. Skipping. Error: {e}")


# Example usage
convert_directory_to_text("official-docs", "ez-cache")  # replace with your directories


Converting files:   3%|▎         | 9/258 [01:19<36:29,  8.79s/file]


KeyboardInterrupt: 