# Orchestrator Notebook
This notebook runs all the pipeline processes in sequence.



## Project root setup

This section ensures the notebook runs modules like `python -m src...` by setting CWD and sys.path.


In [1]:
import os, sys, pathlib

def _resolve_project_root(start: pathlib.Path, marker: pathlib.Path) -> pathlib.Path:
    """Walk upwards from `start` until a directory containing `marker` is found."""
    for candidate in (start, *start.parents):
        if (candidate / marker).exists():
            return candidate
    return start

START_DIR = pathlib.Path(os.getcwd()).resolve()
NOTEBOOK_MARKER = pathlib.Path("src")
ROOT = _resolve_project_root(START_DIR, NOTEBOOK_MARKER)

if START_DIR != ROOT:
    os.chdir(ROOT)

# Add the project root to sys.path (so 'import src.*' works)
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

print("Notebook start dir:", START_DIR)
print("CWD:", os.getcwd())
print("sys.executable:", sys.executable)


Notebook start dir: /home/paloli/Documentos/MDS/ADSDB/ADSDB-Project/src
CWD: /home/paloli/Documentos/MDS/ADSDB/ADSDB-Project
sys.executable: /home/paloli/Documentos/MDS/ADSDB/ADSDB-Project/.venv/bin/python3.13


## Configuration and modules definition

Defines each module in order to be executed. The order is the following:
1. MinIO and ChromaDB managers
2. Data ingestion
3. Landing zone
4. Formatted zone
5. Trusted zone
6. Exploitation zone

In [2]:
# --- Configuration ---
from pathlib import Path
import os


MODULES = [
    # --------- Managers ---------
    {
        "name": "MinIO manager",
        "module": "src.common.minio_manager",  
        "args": []
    },
    {
        "name": "ChromaDB manager",
        "module": "src.common.chroma_client", 
        "args": []
    },

    # --------- Data ingestion ---------
    {
        "name": "Text ingestion",
        "module": "src.data_management.data_ingestion.text_ingestion",
        "args": []
    },
    {
        "name": "Image ingestion",
        "module": "src.data_management.data_ingestion.image_ingestion",
        "args": []
    },
    {
        "name": "Video ingestion",
        "module": "src.data_management.data_ingestion.video_ingestion",
        "args": []
    },

    # --------- Landing zone --------- 
    {
        "name": "Landing zone",
        "module": "src.data_management.landing_zone.landing_zone",
        "args": []
    },


    # --------- Formatted zone --------- 
    {
        "name": "Formatted text",
        "module": "src.data_management.formatted_zone.formatted_text",
        "args": []
    },
    {
        "name": "Formatted images",
        "module": "src.data_management.formatted_zone.formatted_images",
        "args": []
    },
    {
        "name": "Formatted videos",
        "module": "src.data_management.formatted_zone.formatted_videos",
        "args": []
    },


    # --------- Trusted zone --------- 
    {
        "name": "Trusted text",
        "module": "src.data_management.trusted_zone.trusted_text",
        "args": []
    },
    {
        "name": "Trusted images",
        "module": "src.data_management.trusted_zone.trusted_images",
        "args": []
    },
    {
        "name": "Trusted videos",
        "module": "src.data_management.trusted_zone.trusted_videos",
        "args": []
    }, 

    # --------- Exploitation zone ---------
    {
        "name": "Exploitation text",
        "module": "src.data_management.exploitation_zone.exploitation_text",
        "args": []
    },
    {
        "name": "Exploitation images",
        "module": "src.data_management.exploitation_zone.exploitation_images",
        "args": []
    },
    {
        "name": "Exploitation videos",
        "module": "src.data_management.exploitation_zone.exploitation_videos",
        "args": []
    },

    # --------- Tasks ---------
    {
        "name": "Same-Modality Task",
        "module": "src.multi_modal_tasks.same_modality_task",
        "args": []
    },
    {
        "name": "Multi-Modality Task",
        "module": "src.multi_modal_tasks.multi_modality_task",
        "args": []
    },
    {
        "name": "Generative Task",
        "module": "src.multi_modal_tasks.generative_task",
        "args": []
    }
]


## Module selection
Use variable <i>RUN_ONLY</i> to whitelist the pipeline steps you actually want to execute—list their names or zero-based indexes and any unspecified steps will be skipped—while variable <i>SKIP_STEPS</i> provides the opposite filter, letting you exclude steps by name or index even if they would otherwise run; if both filters remove every module the notebook raises an error, so make sure at least one module slips through. If you want to use integers, here is the list of processes:

0. MinIO manager
1. ChromaDB manager
2. Text ingestion
3. Image ingestion
4. Video ingestion
5. Landing zone
6. Formatted text
7. Formatted images
8. Formatted videos
9. Trusted text
10. Trusted images
11. Trusted videos
12. Exploitation text
13. Exploitation images
14. Exploitation videos
15. Same-Modality Task
16. Multi-Modality Task
17. Generative Task

In [6]:
from typing import Iterable, Sequence, Union, Tuple, Set

# Configure which modules should run.
# Leave RUN_ONLY and SKIP_STEPS empty to execute every module.
RUN_ONLY: Sequence[Union[str, int]] = [16]  # e.g. ['Image ingestion', 3]
SKIP_STEPS: Sequence[Union[str, int]] = []  # e.g. ['Formatted videos', 5]

def _divide_targets(targets: Iterable[Union[str, int]]) -> Tuple[Set[str], Set[int]]:
    names: Set[str] = set()
    indexes: Set[int] = set()
    for target in targets:
        if isinstance(target, str):
            names.add(target.strip())
        elif isinstance(target, int):
            indexes.add(target)
        elif target is None:
            continue
        else:
            raise TypeError(f'Unsupported selector type: {type(target)!r}')
    return names, indexes

def _should_include(idx: int, name: str, only_names: Set[str], only_idx: Set[int], skip_names: Set[str], skip_idx: Set[int]) -> bool:
    if only_names or only_idx:
        if name not in only_names and idx not in only_idx:
            return False
    if name in skip_names or idx in skip_idx:
        return False
    return True

only_names, only_idx = _divide_targets(RUN_ONLY)
skip_names, skip_idx = _divide_targets(SKIP_STEPS)

MODULES_TO_RUN = [
    step for idx, step in enumerate(MODULES)
    if _should_include(idx, step['name'], only_names, only_idx, skip_names, skip_idx)
]

if not MODULES_TO_RUN:
    raise ValueError('The actual configuration does not run any module. Please set RUN_ONLY and SKIP_STEPS correctly.')

print("Modules available:")
for idx, step in enumerate(MODULES):
    marker = 'RUN ' if step in MODULES_TO_RUN else 'SKIP'
    print(f"  [{idx:02d}] {marker} {step['name']}")

print("To be executed:", ', '.join(step['name'] for step in MODULES_TO_RUN))


Modules available:
  [00] SKIP MinIO manager
  [01] SKIP ChromaDB manager
  [02] SKIP Text ingestion
  [03] SKIP Image ingestion
  [04] SKIP Video ingestion
  [05] SKIP Landing zone
  [06] SKIP Formatted text
  [07] SKIP Formatted images
  [08] SKIP Formatted videos
  [09] SKIP Trusted text
  [10] SKIP Trusted images
  [11] SKIP Trusted videos
  [12] SKIP Exploitation text
  [13] SKIP Exploitation images
  [14] SKIP Exploitation videos
  [15] SKIP Same-Modality Task
  [16] RUN  Multi-Modality Task
  [17] SKIP Generative Task
To be executed: Multi-Modality Task


## Helpers to run modules

In [7]:
# --- Helpers to run modules ---
import subprocess, sys, shlex, time

def run_module(module: str, args=None, env=None, check=True, timeout=None, cwd=None):
    """
    Run a Python module with `python -m <module> [args...]` and stream output
    """

    if args is None: args = []
    cmd = [sys.executable, "-u", "-m", module, *map(str, args)]
    print("\n$", " ".join(shlex.quote(c) for c in cmd)) # shows command to be executed in console

    env2 = os.environ.copy()
    if env: env2.update(env)

    start = time.time()
    rc = subprocess.run(cmd, env=env2, cwd=cwd, check=False, timeout=timeout).returncode
    if check and rc != 0:
        raise subprocess.CalledProcessError(rc, cmd)
    print(f"[done in {time.time()-start:.1f}s]")
    return rc



## Run the pipeline

In [8]:
from datetime import datetime

print("=== Pipeline start:", datetime.now().isoformat(), "===")

for step in MODULES_TO_RUN:
    print(f"\n=== Step: {step['name']} ===")
    run_module(
        module = step["module"],
        args = step.get("args", []),
        check = True,
        timeout = None
    )
print("\n=== Pipeline completed successfully ===")

=== Pipeline start: 2025-10-28T14:31:39.952336 ===

=== Step: Multi-Modality Task ===

$ /home/paloli/Documentos/MDS/ADSDB/ADSDB-Project/.venv/bin/python3.13 -u -m src.multi_modal_tasks.multi_modality_task
Resultado 1
ID: 5a314379905b59df-0
Distancia: 0.7096
Metadatos: {'split_id': 0, 'source_key': 'trusted/text_data/full_format_recipes.json', 'created_at': 1761579972, 'file_hash': '5a314379905b59df', 'idx': 2284, 'lang': 'en'}
Documento:
slow-roasted lamb shoulder with pancetta 8 garlic cloves, peeled 4 ounces pancetta (italian bacon), diced 3 tablespoons chopped fresh rosemary 2 teaspoons cracked black pepper 1 4-pound rolled boned lamb shoulder (about 7 pounds before boning), excess fat trimmed drop garlic down feed tube of mini processor; chop finely. scrape down bowl sides. add pancetta, rosemary, and pepper; blend to coarse paste. using small sharp knife, make 1/2-inch-deep slits all over lamb and fill each with pancetta paste. spread any remaining paste over outside of lamb. pla