# Marovi Pipeline Framework Demo

This notebook demonstrates the power of combining the Marovi Pipeline Framework with MaroviAPI client services for document processing workflows.

## Setup

First, let's import the necessary components from the Marovi framework:

In [8]:
# Import core pipeline components
from marovi.pipelines.core import PipelineStep, Pipeline
from marovi.pipelines.context import PipelineContext
from typing import List

# Import API client and schemas
from marovi.api.core.client import MaroviAPI
from marovi.api.custom.schemas import FormatConversionRequest, SummarizationRequest

# Import MaroviAPI steps
from marovi.modules.steps.marovi_api import TranslateStep

import logging

# Configure logging to reduce verbosity
logging.basicConfig(level=logging.WARNING)

## Basic Pipeline Example

Let's start with a simple pipeline that capitalizes text:

In [9]:
# Create a list of input strings
input_strings = [
    "Hello, world!",
    "This is a test.",
    "Pipelines are fun!"
]

# Define a simple step that capitalizes all strings
class CapitalizeStep(PipelineStep[str, str]):
    def process(self, inputs: List[str], context: PipelineContext) -> List[str]:
        return [text.upper() for text in inputs]

# Create a pipeline with a single step that capitalizes all strings
capitalize_step = CapitalizeStep(step_id="capitalize")
pipeline = Pipeline(steps=[capitalize_step], name="capitalize_pipeline")

# Create a pipeline context
context = PipelineContext(metadata={"description": "Basic capitalization demo"})

# Run the pipeline
results = pipeline.run(input_strings, context)

# Print the results
print("Pipeline results:")
for i, result in enumerate(results):
    print(f"  [{i}]: {result}")

# Access execution data from the context
print("\nExecution metrics:")
print(f"  Total execution time: {context.get_metric('pipeline_total_execution_time'):.4f} seconds")
print(f"  Steps executed: {context.get_metric('pipeline_steps_executed')}")

2025-05-11 22:28:58 - marovi.pipelines.core - INFO - Initialized capitalize_pipeline pipeline with 1 steps
2025-05-11 22:28:58 - marovi.pipelines.context - INFO - Initialized PipelineContext ctx_1747027738_10c65f340 with metadata: {'description': 'Basic capitalization demo'}
2025-05-11 22:28:58 - marovi.pipelines.core - INFO - Running step 'capitalize' (1/1) with 3 inputs
2025-05-11 22:28:58 - marovi.pipelines.core - INFO - capitalize: Successfully processed 1 items in 0.00s
2025-05-11 22:28:58 - marovi.pipelines.core - INFO - capitalize: Successfully processed 1 items in 0.00s
2025-05-11 22:28:58 - marovi.pipelines.core - INFO - capitalize: Successfully processed 1 items in 0.00s
2025-05-11 22:28:58 - marovi.pipelines.context - INFO - Saved checkpoint to checkpoints/ctx_1747027738_10c65f340_capitalize_pipeline_after_capitalize.json
2025-05-11 22:28:58 - marovi.pipelines.core - INFO - Checkpoint saved: checkpoints/ctx_1747027738_10c65f340_capitalize_pipeline_after_capitalize.json
2025-

## Research Paper Processing Pipeline

Now let's create a more complex pipeline that processes research papers through multiple stages:

1. Convert HTML to Markdown
2. Summarize the content
3. Translate to Spanish
4. Convert back to HTML

In [10]:
# Sample research paper HTML content
research_papers = [
    # Paper 1: Machine Learning
    """<article>
      <h1>Advances in Deep Reinforcement Learning</h1>
      <div class="abstract">
        <h2>Abstract</h2>
        <p>Deep reinforcement learning has emerged as a powerful technique for solving complex decision-making problems. This paper presents a novel approach that combines hierarchical learning with transformer architectures to improve sample efficiency and generalization. Our method demonstrates state-of-the-art performance on benchmark environments while requiring 40% fewer training samples.</p>
      </div>
    </article>""",
    
    # Paper 2: Natural Language Processing
    """<article>
      <h1>Efficient Fine-tuning Methods for Large Language Models</h1>
      <div class="abstract">
        <h2>Abstract</h2>
        <p>As large language models grow in size, efficient fine-tuning becomes increasingly important. We investigate parameter-efficient techniques including LoRA, prefix tuning, and prompt tuning. Our experiments show that these methods can achieve comparable performance to full fine-tuning while updating less than 1% of the parameters, significantly reducing computational requirements and carbon footprint.</p>
      </div>
    </article>"""
]

print(f"Loaded {len(research_papers)} sample research papers")

Loaded 2 sample research papers


### Custom Pipeline Steps

Let's create custom steps for each stage of our document processing:

In [11]:
class HTMLToMarkdownStep(PipelineStep[str, str]):
    """Converts HTML to Markdown using the MaroviAPI."""
    
    def __init__(self, step_id: str = "html_to_markdown"):
        super().__init__(step_id=step_id)
        self.client = MaroviAPI()
    
    def process(self, inputs: List[str], context: PipelineContext) -> List[str]:
        results = []
        for html in inputs:
            # Create request object
            request = FormatConversionRequest(
                text=html,
                source_format="html",
                target_format="markdown",
                preserve_structure=True,
                preserve_links=True
            )
            
            # Call the API directly
            response = self.client.custom.convert_format(request)
            
            # Extract converted text
            results.append(response.converted_text)
        
        return results

class SummarizeTextStep(PipelineStep[str, str]):
    """Summarizes text using the MaroviAPI."""
    
    def __init__(self, style: str = "paragraph", max_length: int = 100, step_id: str = "summarize"):
        super().__init__(step_id=step_id)
        self.client = MaroviAPI()
        self.style = style
        self.max_length = max_length
    
    def process(self, inputs: List[str], context: PipelineContext) -> List[str]:
        results = []
        for text in inputs:
            # Create request object
            request = SummarizationRequest(
                text=text,
                style=self.style,
                max_length=self.max_length
            )
            
            # Call the API directly
            response = self.client.custom.summarize(request)
            
            # Extract summary text
            results.append(response.summary)
        
        return results

class MarkdownToHTMLStep(PipelineStep[str, str]):
    """Converts Markdown to HTML using the MaroviAPI."""
    
    def __init__(self, step_id: str = "markdown_to_html"):
        super().__init__(step_id=step_id)
        self.client = MaroviAPI()
    
    def process(self, inputs: List[str], context: PipelineContext) -> List[str]:
        results = []
        for markdown in inputs:
            # Create request object
            request = FormatConversionRequest(
                text=markdown,
                source_format="markdown",
                target_format="html",
                preserve_structure=True,
                preserve_links=True
            )
            
            # Call the API directly
            response = self.client.custom.convert_format(request)
            
            # Extract converted text
            results.append(response.converted_text)
        
        return results

### Building and Running the Pipeline

Now let's assemble the pipeline and run it on our research papers:

In [12]:
# Create the step instances
html_to_markdown_step = HTMLToMarkdownStep()
summarize_step = SummarizeTextStep(style="paragraph", max_length=100)
translate_step = TranslateStep(source_lang="en", target_lang="es", provider="google")
markdown_to_html_step = MarkdownToHTMLStep()

# Create the pipeline
research_pipeline = Pipeline(
    steps=[
        html_to_markdown_step,
        summarize_step,
        translate_step,
        markdown_to_html_step
    ],
    name="research_paper_pipeline"
)

print("Pipeline created with 4 steps")

2025-05-11 22:28:58 - marovi.api.clients.translation - INFO - Initialized TranslationClient with provider=google
2025-05-11 22:28:59 - marovi.modules.steps.marovi_api - INFO - Initialized translate_en_to_es with endpoint translation.translate
2025-05-11 22:28:59 - marovi.pipelines.core - INFO - Initialized research_paper_pipeline pipeline with 4 steps
Pipeline created with 4 steps


In [13]:
# Create a context for our pipeline run
paper_context = PipelineContext(
    metadata={
        "description": "Research paper processing pipeline",
        "version": "1.0",
        "paper_count": len(research_papers)
    }
)

# Run the pipeline
try:
    print("Starting pipeline execution...")
    final_outputs = research_pipeline.run(research_papers, paper_context)
    print("Pipeline execution completed successfully!")
except Exception as e:
    import traceback
    print(f"Pipeline execution failed: {str(e)}")
    traceback.print_exc()
    print("Note: This example requires the MaroviAPI client to be configured with valid API credentials.")

2025-05-11 22:28:59 - marovi.pipelines.context - INFO - Initialized PipelineContext ctx_1747027739_10cac4c70 with metadata: {'description': 'Research paper processing pipeline', 'version': '1.0', 'paper_count': 2}
Starting pipeline execution...
2025-05-11 22:28:59 - marovi.pipelines.core - INFO - Running step 'html_to_markdown' (1/4) with 2 inputs
2025-05-11 22:28:59 - marovi.api.clients.custom - INFO - Initialized CustomClient for endpoint=convert_format
2025-05-11 22:28:59 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-05-11 22:28:59 - marovi.pipelines.core - INFO - html_to_markdown: Successfully processed 1 items in 0.93s
2025-05-11 22:29:00 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-05-11 22:29:00 - marovi.pipelines.core - INFO - html_to_markdown: Successfully processed 1 items in 0.83s
2025-05-11 22:29:00 - marovi.pipelines.context - INFO - Saved checkpoint to checkpoi

### Examining Results and Metrics

Let's examine the intermediate and final results, as well as performance metrics:

In [14]:
# Show intermediate results
print("\n===== INTERMEDIATE RESULTS =====")

# Get results after HTML to Markdown conversion
markdown_results = paper_context.get_outputs("html_to_markdown")
if markdown_results and len(markdown_results) > 0:
    print("\n----- MARKDOWN FORMAT (Sample) -----")
    print(markdown_results[0][:300] + "..." if len(markdown_results[0]) > 300 else markdown_results[0])
else:
    print("\n----- MARKDOWN FORMAT (Sample) -----")
    print("No markdown results available")

# Get results after summarization
summary_results = paper_context.get_outputs("summarize")
if summary_results and len(summary_results) > 0:
    print("\n----- SUMMARY (Sample) -----")
    print(summary_results[0])
else:
    print("\n----- SUMMARY (Sample) -----")
    print("No summary results available")

# Get results after translation
translation_results = paper_context.get_outputs("translate_en_to_es")
if translation_results and len(translation_results) > 0:
    print("\n----- SPANISH TRANSLATION (Sample) -----")
    print(translation_results[0])
else:
    print("\n----- SPANISH TRANSLATION (Sample) -----")
    print("No translation results available")


===== INTERMEDIATE RESULTS =====

----- MARKDOWN FORMAT (Sample) -----
```markdown
# Efficient Fine-tuning Methods for Large Language Models

## Abstract

As large language models grow in size, efficient fine-tuning becomes increasingly important. We investigate parameter-efficient techniques including LoRA, prefix tuning, and prompt tuning. Our experiments show that t...

----- SUMMARY (Sample) -----
The text discusses the importance of efficient fine-tuning methods for large language models as their size increases. It explores parameter-efficient techniques such as LoRA, prefix tuning, and prompt tuning. The findings from the experiments indicate that these methods can achieve performance levels comparable to full fine-tuning while updating less than 1% of the model's parameters. This approach significantly reduces computational demands and the associated carbon footprint, highlighting the potential for more sustainable and resource-efficient model training practices.

----- SPANISH

In [15]:
# Final HTML results
if final_outputs and len(final_outputs) > 0:
    print("\n----- FINAL HTML (Sample) -----")
    print(final_outputs[0][:300] + "..." if len(final_outputs[0]) > 300 else final_outputs[0])
else:
    print("\n----- FINAL HTML (Sample) -----")
    print("No final HTML results available")


----- FINAL HTML (Sample) -----
```html
<p>El artículo analiza los avances en el aprendizaje por refuerzo profundo, destacando un enfoque novedoso que integra el aprendizaje jerárquico con arquitecturas de transformador. Este método busca mejorar la eficiencia y la generalización de las muestras en la resolución de problemas compl...


In [16]:
# Pipeline metrics
print("\n===== PIPELINE METRICS =====")
print(f"Total execution time: {paper_context.get_metric('pipeline_total_execution_time'):.2f} seconds")

# Individual step metrics
print("\nStep execution times:")
html_md_time = paper_context.get_metric('step_html_to_markdown_execution_time')
summarize_time = paper_context.get_metric('step_summarize_execution_time')
translate_time = paper_context.get_metric('step_translate_en_to_es_execution_time')
md_html_time = paper_context.get_metric('step_markdown_to_html_execution_time')

print(f"  HTML to Markdown: {html_md_time:.2f} seconds" if html_md_time is not None else "  HTML to Markdown: No time recorded")
print(f"  Summarization: {summarize_time:.2f} seconds" if summarize_time is not None else "  Summarization: No time recorded")
print(f"  Translation: {translate_time:.2f} seconds" if translate_time is not None else "  Translation: No time recorded")
print(f"  Markdown to HTML: {md_html_time:.2f} seconds" if md_html_time is not None else "  Markdown to HTML: No time recorded")


===== PIPELINE METRICS =====
Total execution time: 12.11 seconds

Step execution times:
  HTML to Markdown: 1.76 seconds
  Summarization: 6.76 seconds
  Translation: 0.27 seconds
  Markdown to HTML: 3.31 seconds


## Benefits of the Pipeline Framework

This example demonstrates several key advantages of using the Marovi Pipeline Framework:

1. **Modularity**: Each step is self-contained and can be tested/developed independently
2. **Type Safety**: Strong typing ensures data consistency through the pipeline
3. **Checkpointing**: Results are automatically saved after each step
4. **Observability**: Comprehensive metrics tracking
5. **Reusability**: Pipeline components can be reconfigured for different workflows
6. **Error Handling**: Robust retry mechanisms and error reporting

By combining MaroviAPI services with the Pipeline Framework, we can create powerful document processing workflows with minimal code.