In [1]:
import os
import time
import json
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError

# --- Mock PDF Processing Functions (from previous example) ---
def _read_pdf_to_markdown(pdf_filepath):
    base_name = os.path.basename(pdf_filepath)
    if "article_5" in base_name:
        print(f"  [Markdown Converter] Simulating long Markdown conversion for {base_name}...")
        time.sleep(1.2) # Simulate a 5-second conversion
        return f"# Markdown Content for {base_name}\n\nThis is a simulated long markdown conversion result."
    else:
        time.sleep(0.2) # Simulate a 0.5-second conversion for others
        return f"# Markdown Content for {base_name}\n\nThis is the simulated markdown content."

def _read_pdf_plain_text(pdf_filepath):
    base_name = os.path.basename(pdf_filepath)
    time.sleep(0.1) # Simulate a quick read
    return f"Plain Text Content for {base_name}. This is the fallback text."

# --- New Mock XML Processing Function ---
def _read_xml_file(xml_filepath):
    """
    Mocks XML file processing.
    """
    base_name = os.path.basename(xml_filepath)
    time.sleep(0.1) # Simulate some XML parsing time
    # In a real scenario, you'd use lxml or xml.etree.ElementTree
    # to parse the XML and extract relevant data.
    return f"<processed_xml><file>{base_name}</file><data>Extracted data from XML.</data></processed_xml>"

# --- File Specific Processing Task Functions ---

def _process_pdf_task(pdf_filepath, markdown_timeout_seconds):
    """
    Encapsulates the PDF processing logic (Markdown with timeout, fallback to plain text).
    Returns content and metadata about the processing.
    """
    base_name = os.path.basename(pdf_filepath)
    markdown_content = None
    status = "success"
    message = "Processed with Markdown"

    with ThreadPoolExecutor(max_workers=1) as markdown_executor:
        markdown_future = markdown_executor.submit(_read_pdf_to_markdown, pdf_filepath)
        try:
            markdown_content = markdown_future.result(timeout=markdown_timeout_seconds)
            print(f"  [PDF Logic] Markdown conversion successful for {base_name}.")
        except TimeoutError:
            print(f"  [PDF Logic] Timeout: Markdown conversion for {base_name} took too long ({markdown_timeout_seconds}s). Falling back to plain text.")
            status = "timeout_fallback"
            message = "Markdown conversion timed out, fell back to plain text"
        except Exception as e:
            print(f"  [PDF Logic] Error during Markdown conversion for {base_name}: {e}. Falling back to plain text.")
            status = "error_fallback"
            message = f"Markdown conversion failed: {e}, fell back to plain text"

    if markdown_content is None:
        try:
            final_content = _read_pdf_plain_text(pdf_filepath)
            print(f"  [PDF Logic] Successfully read plain text for {base_name}.")
            content_type = "plain_text"
        except Exception as e:
            print(f"  [PDF Logic] Critical Error: Could not read plain text for {base_name}: {e}.")
            status = "critical_error"
            message = f"Failed to read plain text: {e}"
            final_content = "Error: Could not process file."
            content_type = "error"
    else:
        final_content = markdown_content
        content_type = "markdown"

    return {
        "content": final_content,
        "status": status,
        "message": message,
        "content_type": content_type
    }

def _process_xml_task(xml_filepath):
    """
    Encapsulates the XML processing logic.
    Returns content and metadata about the processing.
    """
    base_name = os.path.basename(xml_filepath)
    print(f"  [XML Logic] Processing XML file: {base_name}...")
    try:
        processed_xml_content = _read_xml_file(xml_filepath)
        status = "success"
        message = "Processed XML content"
        content_type = "xml"
    except Exception as e:
        print(f"  [XML Logic] Error processing XML for {base_name}: {e}.")
        processed_xml_content = f"Error processing XML: {e}"
        status = "error"
        message = f"XML processing failed: {e}"
        content_type = "error"

    return {
        "content": processed_xml_content,
        "status": status,
        "message": message,
        "content_type": content_type
    }

# --- Generic Worker Function (submitted to ThreadPoolExecutor) ---
def _generic_file_worker(filepath, output_dir, processing_task_func, **logic_kwargs):
    """
    Generic worker function that calls a specific processing logic function
    and handles saving the results.
    """
    base_name = os.path.basename(filepath)
    print(f"Processing {base_name} using {processing_task_func.__name__}...")

    try:
        os.makedirs(output_dir, exist_ok=True)
        
        # Call the specific processing logic function
        logic_result = processing_task_func(filepath, **logic_kwargs)
        
        # Extract results from the logic function
        final_content = logic_result.get("content", "No content")
        status = logic_result.get("status", "unknown")
        message = logic_result.get("message", "No message")
        content_type = logic_result.get("content_type", "unknown")

        processed_data = {
            "filename": base_name,
            "status": status,
            "message": message,
            "content_type": content_type,
            "processed_chars": len(final_content)
        }
        
        # Save the result
        output_filepath = os.path.join(output_dir, base_name + ".json")
        with open(output_filepath, 'w', encoding='utf-8') as f_out:
            json.dump(processed_data, f_out, indent=2)
        print(f"  [Generic Worker] Saved result for {base_name}.")
        return {"filepath": filepath, "status": status, "data": processed_data}
    except Exception as e:
        print(f"  [Generic Worker] Error processing or saving result for {base_name}: {e}")
        return {"filepath": filepath, "status": "worker_error", "message": str(e)}

# --- Concurrent File Processor Class ---
class ConcurrentFileProcessor:
    def __init__(self, output_dir="processed_files", max_workers=None):
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)
        self.max_workers = max_workers 

    def process_files_concurrently(self, filepaths, processing_task_func, **logic_kwargs):
        """
        Processes files concurrently using ThreadPoolExecutor.
        Uses a specified processing_logic_func for each file.
        """
        print("\n--- Starting Concurrent File Processing (ThreadPoolExecutor) ---")
        print(f"Using processing logic: {processing_task_func.__name__}")
        print(f"Logic arguments: {logic_kwargs}")
        start_time = time.time()
        results = []

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(_generic_file_worker, fp, self.output_dir, processing_task_func, **logic_kwargs): fp
                for fp in filepaths
            }
            
            for future in as_completed(futures):
                filepath = futures[future]
                try:
                    result = future.result() 
                    results.append(result)
                except Exception as exc:
                    print(f'{filepath} generated an unhandled exception: {exc}')
                    results.append({"filepath": filepath, "status": "unhandled_error", "message": str(exc)})
        
        end_time = time.time()
        print(f"Concurrent processing finished in {end_time - start_time:.2f} seconds.")
        return results

# # --- Sequential File Processor (for comparison) ---
# class SequentialFileProcessor:
#     def __init__(self, output_dir="processed_files"):
#         self.output_dir = output_dir
#         os.makedirs(output_dir, exist_ok=True)

#     def _sequential_file_worker(self, filepath, output_dir, processing_logic_func, **logic_kwargs):
#         base_name = os.path.basename(filepath)
#         print(f"Processing {base_name} sequentially using {processing_logic_func.__name__}...")
#         try:
#             os.makedirs(output_dir, exist_ok=True)
#             logic_result = processing_logic_func(filepath, **logic_kwargs)
            
#             final_content = logic_result.get("content", "No content")
#             status = logic_result.get("status", "unknown")
#             message = logic_result.get("message", "No message")
#             content_type = logic_result.get("content_type", "unknown")

#             processed_data = {
#                 "filename": base_name,
#                 "status": status,
#                 "message": message,
#                 "content_type": content_type,
#                 "processed_chars": len(final_content)
#             }
            
#             output_filepath = os.path.join(output_dir, base_name + ".json")
#             with open(output_filepath, 'w', encoding='utf-8') as f_out:
#                 json.dump(processed_data, f_out, indent=2)
#             print(f"  [Sequential Worker] Saved result for {base_name}.")
#             return {"filepath": filepath, "status": status, "data": processed_data}
#         except Exception as e:
#             print(f"  [Sequential Worker] Error processing or saving result for {base_name}: {e}")
#             return {"filepath": filepath, "status": "worker_error", "message": str(e)}

#     def process_files_sequentially(self, filepaths, processing_logic_func, **logic_kwargs):
#         print("\n--- Starting Sequential File Processing ---")
#         start_time = time.time()
#         results = []
#         for filepath in filepaths:
#             results.append(self._sequential_file_worker(filepath, self.output_dir, processing_logic_func, **logic_kwargs))
#         end_time = time.time()
#         print(f"Sequential processing finished in {end_time - start_time:.2f} seconds.")
#         return results


In [2]:

# --- Demonstration ---
if __name__ == "__main__":
    # Create dummy PDF files
    dummy_pdf_dir = "dummy_pdfs"
    os.makedirs(dummy_pdf_dir, exist_ok=True)
    dummy_pdf_filepaths = []
    for i in range(10):
        filepath = os.path.join(dummy_pdf_dir, f"article_{i}.pdf")
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(f"This is the content of PDF article {i}. It has some data related to research. DOI: 10.1234/data.{i}")
        dummy_pdf_filepaths.append(filepath)

    # Create dummy XML files
    dummy_xml_dir = "dummy_xmls"
    os.makedirs(dummy_xml_dir, exist_ok=True)
    dummy_xml_filepaths = []
    for i in range(10):
        filepath = os.path.join(dummy_xml_dir, f"metadata_{i}.xml")
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(f"<root><id>{i}</id><title>Metadata for {i}</title></root>")
        dummy_xml_filepaths.append(filepath)

    # --- DEMO 1: Process PDFs concurrently with timeout ---
    print("\n\n===== DEMO 1: Processing PDFs with Markdown Timeout =====")
    concurrent_pdf_processor = ConcurrentFileProcessor(max_workers=4) 
    concurrent_pdf_processor.process_files_concurrently(
        dummy_pdf_filepaths,
        processing_task_func=_process_pdf_task,
        markdown_timeout_seconds=1.0 # Specific argument for PDF logic
    )

    # --- DEMO 2: Process XMLs concurrently ---
    print("\n\n===== DEMO 2: Processing XMLs =====")
    concurrent_xml_processor = ConcurrentFileProcessor(max_workers=4) 
    concurrent_xml_processor.process_files_concurrently(
        dummy_xml_filepaths,
        processing_task_func=_process_xml_task # No specific kwargs needed for XML logic
    )

    # # --- DEMO 3: Sequential PDF Processing (for comparison) ---
    # print("\n\n===== DEMO 3: Sequential PDF Processing =====")
    # seq_pdf_processor = SequentialFileProcessor()
    # seq_pdf_processor.process_files_sequentially(
    #     dummy_pdf_filepaths,
    #     processing_logic_func=_process_pdf_logic,
    #     markdown_timeout_seconds=3.0
    # )

    # Clean up dummy files and directories
    for fp in dummy_pdf_filepaths:
        os.remove(fp)
    if os.path.exists(dummy_pdf_dir):
        os.rmdir(dummy_pdf_dir)

    for fp in dummy_xml_filepaths:
        os.remove(fp)
    if os.path.exists(dummy_xml_dir):
        os.rmdir(dummy_xml_dir)
    
    # Clean up processed_files directory
    if os.path.exists("processed_files"):
        for f in os.listdir("processed_files"):
            os.remove(os.path.join("processed_files", f))
        os.rmdir("processed_files")



===== DEMO 1: Processing PDFs with Markdown Timeout =====

--- Starting Concurrent File Processing (ThreadPoolExecutor) ---
Using processing logic: _process_pdf_task
Logic arguments: {'markdown_timeout_seconds': 1.0}
Processing article_0.pdf using _process_pdf_task...
Processing article_1.pdf using _process_pdf_task...
Processing article_2.pdf using _process_pdf_task...
Processing article_3.pdf using _process_pdf_task...
  [PDF Logic] Markdown conversion successful for article_1.pdf.
  [PDF Logic] Markdown conversion successful for article_0.pdf.
  [PDF Logic] Markdown conversion successful for article_2.pdf.
  [PDF Logic] Markdown conversion successful for article_3.pdf.
  [Generic Worker] Saved result for article_0.pdf.
Processing article_4.pdf using _process_pdf_task...
  [Generic Worker] Saved result for article_2.pdf.
Processing article_5.pdf using _process_pdf_task...
  [Generic Worker] Saved result for article_1.pdf.
Processing article_6.pdf using _process_pdf_task...
  [Gener