## Requirements

To run this notebook, you will need:

- **YOLO Model**: A trained YOLO model file (e.g., `best.pt`).  
    - The path to the model is specified by the `model_path` variable.

- **Directory with XMLs**:  
    - This should contain annotation files in XML format (e.g., Pascal VOC format).
    - The path is set in `DIRS["xml_src_dir"]` (default: `texts`).

- **Directory with Images**:  
    - This should contain the source images (e.g., `.jpg` files) to be processed.
    - The path is set in `DIRS["images_src_dir"]` (default: `downloaded_images`).

Make sure these files and directories exist and are correctly referenced in the `DIRS` dictionary before running the pipeline.

## Required Scripts for the Pipeline

This notebook relies on several custom Python scripts to process images and annotations. Make sure the following scripts are available in your working directory or Python path:

- **process_yolo.py**  
    Handles running the YOLO model on images and saving the detection results as JSON.

- **filter_picture_descriptions.py**  
    Filters XML annotation files based on YOLO detection results, removing or updating picture description regions.

- **trim_images.py**  
    Crops images according to detected regions (e.g., "Obrázek" labels) from YOLO output.

- **process_descriptions.py**  
    Matches cropped images with text descriptions using a CLIP model.  
    **Requires:**  
    - `cut_text.py`

Ensure all these scripts (and their dependencies) are present before running the notebook pipeline.

In [None]:
# Define directories for the complete pipeline
DIRS = {
    "xml_src_dir": "texts",
    "images_src_dir": "downloaded_images",
    
    "sample_images_dir": "pipeline/test/images",
    "sample_texts_dir": "pipeline/test/xmls",
    
    "yolo_jsons": "pipeline/test/yolo_out",
    "yolo_visualizations": "pipeline/test/visualizations",
    
    "filtered_xmls_dir": "pipeline/test/filtered_xmls",
    "cropped_images_dir": "pipeline/test/cropped_images",
    
    "output_dir": "pipeline/test/output",
    "output_jsons_dir": "pipeline/test/output_jsons",
}

model_path = "best.pt"

In [None]:
# Cell to clear all pipeline directories
import os
import shutil
from IPython.display import display, Markdown

# Clear all directories in DIRS
display(Markdown("## 🧹 Clearing Pipeline Directories"))

# Track statistics
cleared_files = 0
created_dirs = 0

# Process each directory in DIRS
for dir_key, dir_path in DIRS.items():
    # Skip source directories to avoid accidentally deleting source data
    if dir_key in ["xml_src_dir", "images_src_dir"]:
        display(Markdown(f"⏭️ Skipping source directory: `{dir_key}` ({dir_path})"))
        continue
        
    display(Markdown(f"🗑️ Processing: `{dir_key}` ({dir_path})"))
    
    # Check if directory exists
    if os.path.exists(dir_path):
        # Get list of files before deletion
        files = [f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]
        dirs = [d for d in os.listdir(dir_path) if os.path.isdir(os.path.join(dir_path, d))]
        
        if files or dirs:
            try:
                # Remove all contents
                for item in os.listdir(dir_path):
                    item_path = os.path.join(dir_path, item)
                    if os.path.isfile(item_path):
                        os.unlink(item_path)
                        cleared_files += 1
                    elif os.path.isdir(item_path):
                        shutil.rmtree(item_path)
                        cleared_files += 1  # Count directory as one item for simplicity
                
                display(Markdown(f"  ✅ Cleared {len(files)} files and {len(dirs)} subdirectories"))
            except Exception as e:
                display(Markdown(f"  ❌ Error: {str(e)}"))
        else:
            display(Markdown(f"  ✅ Directory already empty"))
    else:
        try:
            # Create directory if it doesn't exist
            os.makedirs(dir_path, exist_ok=True)
            created_dirs += 1
            display(Markdown(f"  ✅ Created new directory"))
        except Exception as e:
            display(Markdown(f"  ❌ Error creating directory: {str(e)}"))

# Display summary
display(Markdown(f"""
## 📊 Summary
- Directories processed: {len(DIRS) - 2} (excluding source directories)
- Files/directories removed: {cleared_files}
- New directories created: {created_dirs}

All pipeline directories have been prepared for a fresh run.
"""))

In [None]:
import os
import shutil
import random


# List all jpg images in the source directory
all_images = [f for f in os.listdir(DIRS["images_src_dir"]) if f.lower().endswith('.jpg')]

# Randomly select 10 images
selected_images = random.sample(all_images, 100)

print(f"Copying {len(selected_images)} randomly selected images and their XML files")

for img_file in selected_images:
    img_id = os.path.splitext(img_file)[0]
    xml_file = f"{img_id}.xml"
    src_img_path = os.path.join(DIRS["images_src_dir"], img_file)
    src_xml_path = os.path.join(DIRS["xml_src_dir"], xml_file)
    dst_img_path = os.path.join(DIRS["sample_images_dir"], img_file)
    dst_xml_path = os.path.join(DIRS["sample_texts_dir"], xml_file)

    # Copy image
    shutil.copy2(src_img_path, dst_img_path)

    # Copy corresponding xml if exists
    if os.path.exists(src_xml_path):
        shutil.copy2(src_xml_path, dst_xml_path)
    else:
        print(f"Warning: XML file not found for image {img_file}")

print(f"Successfully copied files to {DIRS['sample_images_dir']} and {DIRS['sample_texts_dir']}")

In [None]:
import os
from ultralytics import YOLO

model = YOLO(model_path)

In [None]:

import process_yolo

def process_images_with_xml(images_dir, xml_dir, output_dir, model):
    # Initialize model once
    

    # List all jpg images
    image_files = [f for f in os.listdir(images_dir) if f.lower().endswith('.jpg')]

    for img_file in image_files:
        img_id = os.path.splitext(img_file)[0]
        xml_file = f"{img_id}.xml"
        xml_path = os.path.join(xml_dir, xml_file)
        img_path = os.path.join(images_dir, img_file)

        if not os.path.exists(xml_path):
            print(f"Warning: XML file not found for image {img_file}, skipping.")
            continue

        results, json_path, _ = process_yolo.process_image_with_model(
            model=model,
            image_path=img_path,
            output_dir=output_dir,
            create_visualization=True,
        )
        print(f"JSON results saved to: {json_path}")

process_images_with_xml(DIRS["sample_images_dir"], DIRS["sample_texts_dir"], DIRS["yolo_jsons"], model)

In [None]:
# Import the picture description filtering module
import filter_picture_descriptions
from IPython.display import display, Markdown, Image

# Setup logging for notebook environment
filter_picture_descriptions.setup_logging(use_notebook=True)

# Run the filtering process with our directory structure
result = filter_picture_descriptions.run_filter_descriptions(
    json_dir=DIRS["yolo_jsons"],             # JSON files from filtered directory
    xml_dir=DIRS["sample_texts_dir"],              # XML files from filtered directory
    output_dir=DIRS["filtered_xmls_dir"],             # Output directory for filtered XMLs
    iou_threshold=0.00005,                           # IoU threshold for matching
    show_progress=False                               # Show progress updates (ensure this is True)
)

# Display results as markdown
if "error" in result:
    display(Markdown(f"## ❌ Error\n{result['error']}"))
else:
    display(Markdown(f"""
    ## Picture Description Filtering Results
    
    - **Processed Files:** {result['processed_files']} file pairs
    - **Files with Matches:** {result['files_with_matches']}
    - **Files Copied Without Filtering:** {result['copied_without_filtering']}
    
    ### Matching Statistics:
    - Total picture description regions: {result['total_json_regions']}
    - Matched regions: {result['total_matches']}
    {f"- Match percentage: {result['match_percentage']:.2f}%" if 'match_percentage' in result else ""}
    - Text regions removed from XML: {result['regions_removed']}
    
    ### Processing Details:
    - IoU threshold used: {result['iou_threshold']}
    - Processing time: {result['elapsed_time']:.2f} seconds
    
    Filtered XML files are available in: `{result['output_dir']}`
    """))

In [None]:
# Import the image cropping module
import trim_images

# Setup logging for notebook environment
trim_images.setup_logging(use_notebook=True)

# Run the cropping process with our directory structure
result = trim_images.run_crop_images(
    jsons_dir=DIRS["yolo_jsons"],      # Use the filtered JSONs
    images_dir=DIRS["sample_images_dir"],    # Use the filtered images
    output_dir=DIRS["cropped_images_dir"],     # Where cropped images will go
    target_label="Obrázek",                    # Label to look for
    show_progress=True                         # Show progress updates
)

# Display results as markdown
if "error" in result:
    display(Markdown(f"## ❌ Error\n{result['error']}"))
else:
    display(Markdown(f"""
    ## Image Cropping Results
    
    - **Processed Files:** {result['files_processed']} of {result['total_files_found']} JSON files
    - **Files with Errors:** {result['files_with_errors']}
    - **Crops Created:** {result['crops_created']} images
    - **Label Used:** "{result['target_label']}"
    - **Processing Time:** {result['elapsed_time']:.2f} seconds
    
    All cropped images were saved to `{result['output_dir']}`
    """))

In [None]:
# Import the process descriptions module
import glob
import process_descriptions
import shutil

# Set up logging for notebook environment
process_descriptions.setup_logging(use_notebook=True, log_to_file=True)

# Check if required directories have data
required_dirs = {
    "filtered_jsons_dir": DIRS["yolo_jsons"],
    "cropped_images_dir": DIRS["cropped_images_dir"],
    "filtered_texts_no_desc_dir": DIRS["filtered_xmls_dir"]
}

for name, path in required_dirs.items():
    file_count = len(glob.glob(os.path.join(path, "*")))
    display(Markdown(f"✓ Found {file_count} files in `{path}`"))

# Let's add a notification that this might take a while
display(Markdown("## ⚙️ Running CLIP model - this may take several minutes..."))

# Run the processing function with our directory structure
result = process_descriptions.run_process_descriptions(
    json_dir=DIRS["yolo_jsons"],      # Use filtered JSONs
    images_dir=DIRS["cropped_images_dir"],    # Use cropped images 
    texts_dir=DIRS["filtered_xmls_dir"],  # Use filtered texts without descriptions
    output_dir=DIRS["output_dir"],            # Where output files will go
    original_images_dir=DIRS["yolo_visualizations"],  # Original images directory
    similarity_threshold=0.25,                # Threshold for text matching
    max_lines_context=3,                      # Include 3 lines above/below matches
    max_ids=100,                              # Process 10 IDs
    model_name="M-CLIP",
    #model_name="ViT-B/32",                    # CLIP model to use
    top_k=3,
    best_only=True,                           # Only use best matching text
    show_progress=True,                       # Use tqdm.notebook for progress bars
    verbose=False,                            # Disable verbose output for cleaner logs
    output_jsons_dir=DIRS["output_jsons_dir"]  # Where output JSONs will go
)

# Display results as markdown
if "error" in result:
    display(Markdown(f"## ❌ Error\n{result['error']}"))
else:
    display(Markdown(f"""
    ## CLIP Text-Image Matching Results
    
    ### Processing Summary:
    - Total IDs processed: {result['summary']['total_ids']}
    - Successful matches: {result['summary']['successful_ids']}
    - Success rate: {result['summary']['success_rate']:.1f}%
    - Total images processed: {result['summary']['total_images_processed']}
    - Images below threshold: {result['summary']['images_below_threshold']}
    
    ### Configuration:
    - Model used: {result['config']['model']} on {result['config']['device']}
    - Similarity threshold: {result['config']['similarity_threshold']}
    - Max lines context: {result['config']['max_lines_context']}
    
    ### Performance:
    - Total processing time: {result['summary']['elapsed_time']:.2f} seconds
    - Average time per ID: {result['summary']['average_time_per_id']:.2f} seconds
    """))
    
    # Show a sample image if any were successful
    successful_results = [r for r in result['details'] if r.get('success', False)]
    if successful_results:
        sample = successful_results[0]
        display(Markdown(f"### Sample Result: ID {sample['id']}"))
        sample_path = os.path.join(DIRS["output_dir"], sample['output_file'])
        if os.path.exists(sample_path):
            display(Image(filename=sample_path, width=800))
            display(Markdown(f"- Context blocks: {sample['context_blocks']}"))
            display(Markdown(f"- Processing time: {sample['time']:.2f} seconds"))
        else:
            display(Markdown(f"Image file not found: {sample_path}"))
    else:
        display(Markdown("No successful results to display"))