In [None]:
from itertools import islice
from rich import print as rprint  # Import rich's print with a distinct name
from typing import Dict, List, Optional, Tuple, Any

def pp(data: Any, limit: int = 5, title: str | None = None):
    """
    Intelligently pretty-prints any Python object with color and a limit for collections.

    - For dictionaries and lists, it shows the first `limit` items.
    - For other types, it prints the object directly.
    - Uses the 'rich' library for beautiful, syntax-highlighted output.

    Args:
        data: The Python object to print (dict, list, str, int, etc.).
        limit: The maximum number of items to display for collections.
        title: An optional title for the output header.
    """
    
    # --- Print a clear, consistent title ---
    if title:
        rprint(f"[bold inverse cyan] {title} [/bold inverse cyan]")

    # === Case 1: Handle Dictionaries ===
    if isinstance(data, dict):
        total_items = len(data)
        if total_items > limit:
            rprint(f"--- Displaying the first [bold]{limit}[/bold] of [bold]{total_items}[/bold] dictionary items ---")
            # Use islice for a memory-efficient slice of the dictionary's items
            display_obj = dict(islice(data.items(), limit))
        else:
            rprint(f"--- Displaying all [bold]{total_items}[/bold] dictionary items ---")
            display_obj = data
        rprint(display_obj)

    # === Case 2: Handle Lists, Tuples, Sets (Sequences) ===
    elif isinstance(data, (list, tuple, set)):
        total_items = len(data)
        if total_items > limit:
            rprint(f"--- Displaying the first [bold]{limit}[/bold] of [bold]{total_items}[/bold] items ---")
            # islice also works perfectly on lists and other iterables
            display_obj = list(islice(data, limit))
        else:
            rprint(f"--- Displaying all [bold]{total_items}[/bold] items ---")
            display_obj = data
        rprint(display_obj)
        
    # === Case 3: Handle all other data types ===
    else:
        rprint(f"--- Displaying object of type: [bold]{type(data).__name__}[/bold] ---")
        rprint(data)

## üìì Notebooks: Enabling Interactive `tqdm` Progress Bars

When working in Jupyter notebooks, you'll want to see clean, interactive progress bars for long-running tasks. However, the standard setup can lead to a couple of common problems. This guide shows you the professional way to fix it.

### The Problem

You might run into one of two issues:

1.  **Messy, multi-line output:** Using the standard `from tqdm import tqdm` will print a new progress bar on every update, flooding your output cell.
2.  **An `ImportError`:** Following the best practice of using `from tqdm.notebook import tqdm` might give you this error:
    ```
    ImportError: IProgress not found. Please update jupyter and ipywidgets.
    ```

This happens because the beautiful, interactive notebook widgets require a special library (`ipywidgets`) and a Jupyter extension that aren't installed by default.

### The Solution: Install and Enable `ipywidgets`

Follow these steps in your terminal at the root of the project to permanently fix this.

#### Step 1: Add `ipywidgets` as a Dev Dependency

The `ipywidgets` library is a tool for your development environment, not a core dependency of the final application. We'll use the `--dev` flag to add it correctly.

```bash
rye add --dev ipywidgets
rye sync
```

#### Step 3: Enable the Jupyter Extension

Installing the package isn't enough. You need to tell Jupyter to activate its interactive components. We use rye run to ensure the command executes within our project's managed environment.

```bash
rye run jupyter nbextension enable --py widgetsnbextension
```

#### Step 4: Restart Your Jupyter Kernel! üîÑ

This is the most important step! Your currently running notebook doesn't know about the new extension yet.
In your notebook interface (e.g., VS Code or Jupyter Lab), click Kernel > Restart Kernel.

In [None]:
import logging
# Configure professional logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

from tqdm.notebook import tqdm
# --- Change 2: Create a custom handler to integrate logging with tqdm ---
# This ensures log messages don't break the progress bar.
class TqdmLoggingHandler(logging.Handler):
    def __init__(self, level=logging.NOTSET):
        super().__init__(level)

    def emit(self, record):
        try:
            msg = self.format(record)
            # Use tqdm's built-in safe writer
            tqdm.write(msg)
            self.flush()
        except Exception:
            self.handleError(record)

# --- Configure Logging to use our new handler ---
# We remove the basicConfig and set up the handler manually
log = logging.getLogger()
log.setLevel(logging.INFO)

# Clear any existing handlers to avoid duplicate messages
if log.hasHandlers():
    log.handlers.clear()

# Add our custom tqdm handler
log.addHandler(TqdmLoggingHandler())

## üìÇ Phase 2a: Data Preparation - Unzipping the Archives

Before we can analyze the financial documents, we need to extract them from their compressed archives. This section details the initial data preparation script which unzips all the source files and creates a master index for our project.

### The Goal

The source data is provided as a directory full of `.zip` files. Each zip file is named after a company's KRS number (e.g., `0000001132.zip`) and contains one or more financial documents for that company.

The purpose of this script is to:
1.  Scan a specified directory for all `.zip` archives.
2.  Unzip each archive into its own dedicated folder to keep the data organized.
3.  Create a comprehensive JSON lookup map, `file_map.json`, that connects each KRS number to the list of all file paths that were extracted for it.

### How the Script Works

The script (`your_script_name.py`) performs the following actions:

1.  **Sets the Source Path:** It starts with a `Path` variable that you must configure to point to your data directory:
    ```python
    ZIP_DATA_PATH = Path("/path/to/your/zip/files-directory/")
    ```

2.  **Scans for Zips:** It walks through the `ZIP_DATA_PATH` and identifies all files ending with `.zip`.

3.  **Extracts in a Loop:** It iterates through each zip file one by one. For a file named `0000001132.zip`, it will:
    *   Create a new directory named `0000001132/` in the same location.
    *   Extract all the contents of the zip file into this new directory.

4.  **Builds the Map:** During the loop, it constructs a Python dictionary. The key is the KRS number (derived from the zip filename, e.g., `"0000001132"`), and the value is a list of the full paths to every file that was just extracted.

5.  **Saves the Output:** Once all zip files have been processed, the script saves the entire dictionary to a file named `file_map.json` in the project's root directory. This JSON file will be the primary input for the next phase of our project.

### üöÄ Performance Enhancement: A Task for You! (TODO)

The current script processes the zip files sequentially, one after another. This is simple and reliable, but it can be slow if you have hundreds or thousands of files.

This task‚Äîunzipping files‚Äîis what's known as an **"embarrassingly parallel" problem**. Each zip file can be processed completely independently of the others. This makes it a perfect candidate for **multiprocessing**.

**Your Challenge:**
Refactor the `unzip_and_map_files` function to use Python's `multiprocessing` module (specifically, a `Pool` of workers) to unzip the files in parallel. This will dramatically speed up the data preparation step, especially on multi-core machines.

**Hints:**
*   You'll want to use `Pool.map()` or `Pool.imap_unordered()`.
*   The function that the pool executes should handle a single zip file (`_unzip_single_file` is already a great starting point).
*   Think about how you will collect the results from all the parallel processes to build the final `directory_file_map` dictionary.

In [None]:
import json
from pathlib import Path
import zipfile



def _unzip_single_file(
    zip_path: Path,
) -> Optional[Tuple[Path, List[Path]]]:
    """
    Unzips a single zip file into a directory named after the zip file.

    For example, 'my_files.zip' will be extracted to a new 'my_files/' directory
    in the same location.

    Args:
        zip_path: The path to the .zip file.

    Returns:
        A tuple containing:
        - The path to the new destination directory.
        - A list of paths to all the files inside the new directory.
        Returns None if an error occurs.
    """
    # Create the destination directory path (e.g., /path/to/file.zip -> /path/to/file)
    destination_dir = zip_path.with_suffix('')
    
    try:
        # Create the directory; exist_ok=True prevents errors if it already exists
        destination_dir.mkdir(parents=True, exist_ok=True)
        # logging.info(f"Extracting '{zip_path.name}' to '{destination_dir.name}/'")
        
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(destination_dir)
            
        # After extraction, find all *files* inside the new directory (recursively)
        # We use rglob to find files in subdirectories too.
        extracted_files = [p for p in destination_dir.rglob('*') if p.is_file()]
        
        return extracted_files
        
    except zipfile.BadZipFile:
        logging.error(f"Error: '{zip_path.name}' is not a valid or is a corrupted zip file. Skipping.")
        return None
    except Exception as e:
        logging.error(f"An unexpected error occurred with '{zip_path.name}': {e}")
        return None


def unzip_and_map_files(
    zip_data_path: Path,
) -> Dict[str, List[str]]:
    """
    Finds all .zip files in a directory, unzips each one into its own folder,
    and returns a dictionary mapping each new folder to the files inside it.

    Args:
        zip_data_path: The path to the directory containing the .zip files.

    Returns:
        A dictionary where keys are the paths of the new unzipped directories
        and values are lists of the file paths within those directories.
    """
    if not zip_data_path.is_dir():
        logging.error(f"Error: Provided path is not a directory: {zip_data_path}")
        return {}
    
    # Find all .zip files in the directory
    zip_files = list(zip_data_path.glob("*.zip"))
    if not zip_files:
        logging.warning(f"No .zip files found in '{zip_data_path}'")
        return {}
    
    pp(zip_files, title="Zip Files Found", limit=5)

    logging.info(f"Found {len(zip_files)} zip files to process.")
    
    # The final dictionary to hold our results
    directory_file_map: Dict[str, List[str]] = {}
    
    for zip_path in tqdm(zip_files, desc="Unzipping files"):
        result = _unzip_single_file(zip_path)
        krs_number = zip_path.stem
        
        # Only add to the map if unzipping was successful
        if result:
            file_list = result
            # We store paths as strings for easier serialization (e.g., to JSON)
            directory_file_map[krs_number] = [str(f) for f in file_list]
            
    return directory_file_map

ZIP_DATA_PATH = Path("/mnt/e/Workspace/PKDData/2Download/FinanceKRS_Sampled_PWR/")
file_map = unzip_and_map_files(ZIP_DATA_PATH)

#! add file_map.json to .gitignore - you do not want to commit this file
FILE_MAP = "file_map.json"
logging.info(f"Found {len(file_map)} directories after unzipping. Generating lookup map {FILE_MAP} at the root level.")
with open(f"../{FILE_MAP}", "w") as f:
    json.dump(file_map, f, indent=4)

## üìä Phase 2b: Extracting and Parsing PDF Tables

This is the core data engineering phase of our project. Here, we take the file paths from our prepared data map and tackle the significant challenge of extracting structured, clean tables from unstructured PDF documents.

The process is broken down into a clear, multi-step pipeline for each PDF file.

### Step 1: Loading the File Map

The entire process begins with the master index we created in the previous step, `file_map.json`. This file is our single source of truth for the locations of all financial documents.

The first action in our script or notebook is to load this map and convert the string paths back into `pathlib.Path` objects, which are easier and safer to work with in Python.

In [None]:
FILE_MAP = "file_map.json"
with open(f"../{FILE_MAP}", "r") as f:
    file_map = json.load(f)
# convert string paths back to Path objects
file_map = {k: [Path(p) for p in v] for k, v in file_map.items()}

### Step 2: The Table Extraction and Parsing Pipeline

For any given PDF, we apply a sophisticated three-stage process to transform its visual tables into clean, usable data.

#### A. Raw Extraction (`extract_raw_tables_from_pdf`)

This is the first-pass extraction.

*   **Input:** A single `Path` object pointing to a PDF file.
*   **Tool:** It uses the `pdfplumber` library to open the PDF and scan each page for anything that looks like a table.
*   **Output:** A list of "raw" tables. This output is often messy and reflects the challenges of PDF parsing:
    *   Columns are frequently merged into a single cell.
    *   A single logical row might be split across multiple lines (`\n`) within one cell.
    *   The data is not yet structured or cleaned.

#### B. Displaying the Results (`display_cleaned_tables`)

The final function is for visualization and validation.

*   **Input:** The list of all cleaned tables extracted from the PDF.
*   **Tool:** It uses the powerful `rich` library to render the clean data.
*   **Output:** A series of beautifully formatted, human-readable tables printed to the console or notebook output. This allows us to immediately inspect the results and verify if the parsing logic was successful for a given document.


In [None]:

import pdfplumber
from rich.console import Console
from rich.table import Table

console = Console()
ExtractedTable = List[List[Optional[str]]]

def extract_tables_from_pdf(pdf_path: Path) -> List[ExtractedTable]:
    """
    Extracts all tables from a single PDF file using pdfplumber.

    Args:
        pdf_path: The file path to the PDF document.

    Returns:
        A list of tables found in the document. Each table is a 2D list
        of strings. Returns an empty list if no tables are found or if an error occurs.
    """
    if not pdf_path.is_file():
        logging.error(f"File not found or is not a file: {pdf_path}")
        return []

    tables: List[ExtractedTable] = []
    try:
        with pdfplumber.open(pdf_path) as pdf:
            # We iterate through each page and extract tables
            for page in pdf.pages:
                # .extract_tables() is a robust method that finds all tables on a page
                extracted_page_tables = page.extract_tables()
                if extracted_page_tables:
                    tables.extend(extracted_page_tables)
    except Exception as e:
        logging.error(f"Could not process PDF file '{pdf_path.name}'. Reason: {e}")
        return []

    return tables


def display_tables(tables: List[ExtractedTable]):
    """Uses rich to render a list of tables in the console."""
    if not tables:
        console.print("[yellow]No tables were found in this document.[/yellow]")
        return

    console.print(f"[bold green]Successfully extracted {len(tables)} tables.[/bold green]\n")

    for i, table_data in enumerate(tables):
        if not table_data:
            continue

        # Create a rich Table object for beautiful printing
        table_display = Table(title=f"Table #{i + 1}", show_header=True, header_style="bold magenta")
        
        # Assume the first row is the header
        header = table_data[0]
        # Clean header: handle None and convert all to string
        cleaned_header = [str(h) if h is not None else f"Col_{j}" for j, h in enumerate(header)]

        for column_header in cleaned_header:
            table_display.add_column(column_header)
        
        # Add the data rows
        for row in table_data[1:]:
            # Clean row: convert all cells to string to satisfy rich Table
            cleaned_row = [str(cell) if cell is not None else "" for cell in row]
            table_display.add_row(*cleaned_row)
        
        console.print(table_display)
        console.print("\n" + "="*80 + "\n")

# pick  first pdf from the file_map for testing
# it's dummy code for testing purposes 
# TODO: add check if the pdf_paths are really pdf files
pdf_paths = next(iter(file_map.values()))
tables = extract_tables_from_pdf(pdf_paths[0])  # Example usage with the first PDF path
display_tables(tables)

### üí• KNOCKDOWN: Your First Real-World Encounter

So, you've built a parser. You've seen it work on one, maybe two, documents. You've cleaned up a messy table and felt the satisfaction of turning chaos into order.

Now, it's time for a dose of reality.

![A person hitting a brick wall](https://i.etsystatic.com/13687543/r/il/433c6a/1098404604/il_570xN.1098404604_eiv9.jpg)

Take a moment. Go into the `data/` directory and open one of the more complex, multi-page PDFs *manually*. Don't run any code. Just scroll through it. Look at the tables that are split across pages. Notice the footnotes, the weirdly merged headers, the inconsistent column spacing.

Now ask yourself: **Do you really believe it's worth cleaning this mess from this point onward?**

The script you wrote is an amazing learning tool, but in the real world, you won't be dealing with ten documents. You'll have **thousands**, each one formatted with its own unique, frustrating quirks. Building a custom parser for every single variation is not just difficult; it's an unwinnable battle.

This is a critical lesson in software engineering. A great engineer doesn't just solve the problem; they find the *right tool* for the problem. Your custom parser was an excellent exercise in understanding the deep complexity of the task.

Now, it's time to go to the internet and find a better tool. Your next mission is to research and evaluate existing solutions‚Äîfrom open-source libraries to cloud-based AI services‚Äîthat are specifically designed to handle this level of document complexity at scale.

Good luck.

---

### Solution to multiprocessing

ADVANCED: Why do you think we had to create another function unzip_worker? It's so dummy, check what sort of error you will see replacing the line:
```bash
results_iterator = pool.imap_unordered(unzip_worker, zip_files)

with:

results_iterator = pool.imap_unordered(lambda zip_path: (zip_path.stem, _unzip_single_file(zip_path)), zip_files)
```

Can you write lambda function on your own?

In [None]:
from multiprocessing import Pool, cpu_count

def unzip_worker(zip_path: Path) -> Optional[Tuple[str, List[str]]]:
    return (zip_path.stem, _unzip_single_file(zip_path))

def unzip_and_map_files_parallel(
    zip_data_path: Path,
) -> Dict[str, List[str]]:
    """
    Finds all .zip files in a directory, unzips each one into its own folder,
    and returns a dictionary mapping each new folder to the files inside it.

    Args:
        zip_data_path: The path to the directory containing the .zip files.

    Returns:
        A dictionary where keys are the paths of the new unzipped directories
        and values are lists of the file paths within those directories.
    """
    if not zip_data_path.is_dir():
        logging.error(f"Error: Provided path is not a directory: {zip_data_path}")
        return {}
    
    # Find all .zip files in the directory
    zip_files = list(zip_data_path.glob("*.zip"))
    if not zip_files:
        logging.warning(f"No .zip files found in '{zip_data_path}'")
        return {}
    
    pp(zip_files, title="Zip Files Found", limit=5)

    logging.info(f"Found {len(zip_files)} zip files to process.")
    
    # The final dictionary to hold our results
    directory_file_map: Dict[str, List[str]] = {}
    # ==============================================================================
    # Change 2: Replace the for loop with a multiprocessing Pool
    # ==============================================================================
    # Use all available CPU cores for maximum speed
    num_processes = cpu_count()

    # The 'with' statement ensures the pool of processes is properly closed
    with Pool(processes=num_processes) as pool:
        # Use pool.imap_unordered for efficiency. It applies the worker function
        # to each item in zip_files and yields results as they are completed.
        # This allows the tqdm progress bar to update in real-time.
        
        # We wrap the imap_unordered call with tqdm to create the progress bar.
        # `total=len(zip_files)` tells tqdm the total number of tasks.
        results_iterator = pool.imap_unordered(unzip_worker, zip_files)
        
        for result in tqdm(results_iterator, total=len(zip_files), desc="Unzipping files"):
            if result:
                directory_file_map[result[0]] = result[1]
                # results.append(result)
    logging.info("Parallel processing complete. Assembling final map.")

    # ==============================================================================
    # Change 3: Efficiently convert the list of (key, value) tuples into a dictionary
    # ==============================================================================
    return directory_file_map

ZIP_DATA_PATH = Path("/mnt/e/Workspace/PKDData/2Download/FinanceKRS_Sampled_PWR/")
file_map = unzip_and_map_files_parallel(ZIP_DATA_PATH)


### The `multiprocessing` and `lambda` Rule

A common pitfall in Python's `multiprocessing` is the `AttributeError: Can't get local object '<locals>.<lambda>'`. This error occurs when you try to pass a `lambda` function to a worker pool. Here is the compact explanation of why this happens.

**The "Why" in one sentence:** A `lambda` function cannot be sent to another process because it is **anonymous** and **local**; a worker process needs a **named, top-level function** that it can reliably import and find.

#### The Core Problem: Process Isolation & Pickling

1.  **Separate Processes:** `multiprocessing` creates brand-new, isolated processes with their own memory. They do not share local variables with the main process.
2.  **Sending Instructions:** To send a task to a worker process, Python must **serialize** (or "pickle") the function and its arguments into a byte stream.

#### `lambda` vs. `def`: The Pickling Test

This is where the two types of functions fundamentally differ.

| Characteristic | `lambda` (Inside a function) | `def` (Top-Level Function) |
| :--- | :--- | :--- |
| **Analogy** | A temporary "sticky note" | A named "book" in a library |
| **Scope** | Local & Anonymous | Global & Named |
| **Picklable?** | ‚ùå **No** | ‚úÖ **Yes** |
| **Reason** | The worker process receives instructions to find a nameless, local "sticky note" that doesn't exist in its own separate memory space. | The worker process is told to "import the `module` and find the function named `unzip_worker`," which is a reliable, findable address. |

#### The Error Message Decoded

The error `AttributeError: Can't get local object ... <lambda>` is Python's direct way of saying: "I was sent to a new process and told to find this local, anonymous function, but it doesn't exist here."

> **The Golden Rule:** When using `multiprocessing`, any function passed as a task to a worker **must be a named function defined at the top level of a module**.