# Unified Preprocessing Pipeline

This notebook demonstrates the task-based pipeline that processes multiple tasks sequentially.

## Overview

The task-based pipeline (`run_pipeline.py`) integrates:
1. **Data Ingestion**: Combines CSVs/Parquets from a directory into a unified dataset
2. **Processing Tasks**: Each task processes a column (e.g., ingredients, cuisine) through normalization, deduplication, and encoding
3. **Training Tasks**: Trains models (e.g., NER, classification) from processed data

## Task-Based Architecture

### Processing Tasks
Each task defines:
- `input_path`: Source data file
- `output_path`: Final processed output
- `target_column`: Input column to process
- `output_column`: Output column name
- `steps`: List of processing steps (spacy, sbert, encoder, etc.)

### Training Tasks
Each training task defines:
- `task_type`: Type of model (token_classification, text_classification)
- `input_path`: Training data file
- `model_dir`: Where to save the trained model
- `params`: Training hyperparameters


## Setup


In [1]:
# Setup: Add pipeline to path
import sys
from pathlib import Path
# set sys.path to the parent directory
sys.path.insert(0, str(Path.cwd().parent))


## Load Configuration


In [2]:

from pipeline.config import PipelineConfig
from pathlib import Path

# Load configuration
config_path = Path("../pipeline/config/config.yaml")
if not config_path.exists():
    print(f"Config not found at {config_path}")
    print("Please ensure the config file exists.")
else:
    config = PipelineConfig.from_yaml(config_path)
    print(f"Loaded config from: {config_path}")
    print(f"\nGlobal Settings:")
    if config.global_settings:
        print(f"  Base dir: {config.global_settings.base_dir}")
        print(f"  Logging level: {config.global_settings.logging_level}")
    
    print(f"\nIngestion:")
    if config.ingestion:
        print(f"  Enabled: {config.ingestion.enabled}")
        print(f"  Input dir: {config.ingestion.input_dir}")
        print(f"  Output file: {config.ingestion.output_file}")
    
    print(f"\nProcessing Tasks ({len(config.tasks)}):")
    for task in config.tasks:
        print(f"  - {task.name} ({'enabled' if task.enabled else 'disabled'})")
        print(f"    Input: {task.input_path}")
        print(f"    Output: {task.output_path}")
        print(f"    Column: {task.target_column} -> {task.output_column}")
        print(f"    Steps: {len(task.steps)}")
    
    print(f"\nTraining Tasks ({len(config.training_tasks)}):")
    for task in config.training_tasks:
        print(f"  - {task.name} ({'enabled' if task.enabled else 'disabled'})")
        print(f"    Type: {task.task_type}")
        print(f"    Input: {task.input_path}")
        print(f"    Model dir: {task.model_dir}")


Loaded config from: ..\pipeline\config\config.yaml

Global Settings:
  Base dir: ./data
  Logging level: INFO

Ingestion:
  Enabled: True
  Input dir: ./data/raw
  Output file: ./data/intermediate/combined_raw.parquet

Processing Tasks (2):
  - ingredients_pipeline (enabled)
    Input: ./data/intermediate/combined_raw.parquet
    Output: ./data/processed/ingredients_encoded.parquet
    Column: ingredients_raw -> ingredients_clean
    Steps: 3
  - cuisine_pipeline (enabled)
    Input: ./data/intermediate/combined_raw.parquet
    Output: ./data/processed/cuisine_encoded.parquet
    Column: cuisine_raw -> cuisine_clean
    Steps: 4

Training Tasks (2):
  - ingredient_ner_model (enabled)
    Type: token_classification
    Input: ./data/intermediate/combined_raw.parquet
    Model dir: ./models/ingredient_ner_trf/model-best
  - cuisine_classification_model (enabled)
    Type: text_classification
    Input: ./data/processed/cuisine_encoded.parquet
    Model dir: ./models/cuisine_cls_trf/model

## Example: Run Unified Pipeline

The unified pipeline can run in different modes. Configure the mode and execute.


In [3]:
from pipeline.task_orchestrator import TaskBasedOrchestrator
from pipeline.common.logging_setup import setup_logging
import logging

# Setup logging
setup_logging(config.to_dict())
logger = logging.getLogger(__name__)

# Display configuration summary
print("Task-Based Pipeline Configuration:")
print(f"  Ingestion: {'Enabled' if config.ingestion and config.ingestion.enabled else 'Disabled'}")
print(f"  Processing tasks: {len([t for t in config.tasks if t.enabled])} enabled")
print(f"  Training tasks: {len([t for t in config.training_tasks if t.enabled])} enabled")

if config.ingestion:
    print(f"\nIngestion:")
    print(f"  Input: {config.ingestion.input_dir}")
    print(f"  Output: {config.ingestion.output_file}")

if config.tasks:
    print(f"\nProcessing Tasks:")
    for task in config.tasks:
        if task.enabled:
            print(f"  - {task.name}: {task.input_path} -> {task.output_path}")

if config.training_tasks:
    print(f"\nTraining Tasks:")
    for task in config.training_tasks:
        if task.enabled:
            print(f"  - {task.name}: {task.input_path} -> {task.model_dir}")


  from .autonotebook import tqdm as notebook_tqdm


Task-Based Pipeline Configuration:
  Ingestion: Enabled
  Processing tasks: 2 enabled
  Training tasks: 2 enabled

Ingestion:
  Input: ./data/raw
  Output: ./data/intermediate/combined_raw.parquet

Processing Tasks:
  - ingredients_pipeline: ./data/intermediate/combined_raw.parquet -> ./data/processed/ingredients_encoded.parquet
  - cuisine_pipeline: ./data/intermediate/combined_raw.parquet -> ./data/processed/cuisine_encoded.parquet

Training Tasks:
  - ingredient_ner_model: ./data/intermediate/combined_raw.parquet -> ./models/ingredient_ner_trf/model-best
  - cuisine_classification_model: ./data/processed/cuisine_encoded.parquet -> ./models/cuisine_cls_trf/model-best


In [4]:
# Create orchestrator and setup steps
orchestrator = TaskBasedOrchestrator(config)
orchestrator.setup_steps()

print(f"Configured {len(orchestrator.steps)} step(s):")
for i, step in enumerate(orchestrator.steps, 1):
    print(f"  {i}. {step.name}")
    if hasattr(step, 'input_path') and step.input_path:
        print(f"      Input: {step.input_path}")
    if hasattr(step, 'output_path') and step.output_path:
        print(f"      Output: {step.output_path}")


[2025-11-25 16:17:32] INFO pipeline.task_orchestrator.TaskOrchestrator: Setting up task-based pipeline steps...
[2025-11-25 16:17:32] INFO pipeline.task_orchestrator.TaskOrchestrator: Setting up ingestion: ./data/raw -> ./data/intermediate/combined_raw.parquet
[2025-11-25 16:17:32] INFO pipeline.base.RawDataCombiner: Initialized with data_dir=data\raw
[2025-11-25 16:17:32] INFO pipeline.task_orchestrator.TaskOrchestrator: Setting up task: ingredients_pipeline
[2025-11-25 16:17:32] INFO pipeline.task_orchestrator.TaskOrchestrator:   Input: ./data/intermediate/combined_raw.parquet
[2025-11-25 16:17:32] INFO pipeline.task_orchestrator.TaskOrchestrator:   Output: ./data/processed/ingredients_encoded.parquet
[2025-11-25 16:17:32] INFO pipeline.task_orchestrator.TaskOrchestrator:   Target column: ingredients_raw -> ingredients_clean
[2025-11-25 16:17:32] INFO pipeline.task_orchestrator.TaskOrchestrator: Creating step 'normalization' (type: spacy)
[2025-11-25 16:17:33] INFO pipeline.base.Ingr

Configured 9 step(s):
  1. RawDataCombiner
      Input: data\raw
      Output: data\intermediate\combined_raw.parquet
  2. IngredientNormalizer
      Input: data\intermediate\combined_raw.parquet
      Output: data\processed\ingredients_pipeline_step_0_normalization.parquet
  3. DeduplicationStep_SBERT
      Input: data\processed\ingredients_pipeline_step_0_normalization.parquet
      Output: data\processed\ingredients_pipeline_step_1_deduplication.parquet
  4. EncodingStep
      Input: data\processed\ingredients_pipeline_step_1_deduplication.parquet
      Output: data\processed\ingredients_encoded.parquet
  5. CuisinePreprocessing
      Input: data\intermediate\combined_raw.parquet
      Output: data\processed\cuisine_pipeline_step_0_split_lists.parquet
  6. IngredientNormalizer
      Input: data\processed\cuisine_pipeline_step_0_split_lists.parquet
      Output: data\processed\cuisine_pipeline_step_1_normalization.parquet
  7. DeduplicationStep_SBERT
      Input: data\processed\cuisi

In [5]:
# Execute pipeline (uncomment to run)
# orchestrator.run(force=False)

# Or run with force to rebuild all artifacts:
# orchestrator.run(force=True)

print("Pipeline configured. Uncomment orchestrator.run() above to execute.")


Pipeline configured. Uncomment orchestrator.run() above to execute.


## Inspect Results


In [6]:
import pandas as pd

# Check output files from processing tasks
print("Checking output files from processing tasks:\n")
for task in config.tasks:
    if task.enabled:
        output_path = Path(task.output_path)
        if output_path.exists():
            df = pd.read_parquet(output_path)
            print(f"✓ {task.name}: {output_path}")
            print(f"  Rows: {len(df):,}")
            print(f"  Columns: {list(df.columns)}")
            print(f"  Sample data:")
            print(f"    {task.output_column}: {df[task.output_column].iloc[0] if task.output_column in df.columns and len(df) > 0 else 'N/A'}")
            print()
        else:
            print(f"✗ {task.name}: {output_path} (not found)")
            print()

# Check training model outputs
if config.training_tasks:
    print("\nChecking training model outputs:\n")
    for task in config.training_tasks:
        if task.enabled:
            model_dir = Path(task.model_dir)
            if model_dir.exists() and any(model_dir.iterdir()):
                print(f"✓ {task.name}: {model_dir} (exists)")
            else:
                print(f"✗ {task.name}: {model_dir} (not found)")


Checking output files from processing tasks:

✗ ingredients_pipeline: data\processed\ingredients_encoded.parquet (not found)

✗ cuisine_pipeline: data\processed\cuisine_encoded.parquet (not found)


Checking training model outputs:

✗ ingredient_ner_model: models\ingredient_ner_trf\model-best (not found)
✗ cuisine_classification_model: models\cuisine_cls_trf\model-best (not found)
