# Integration Patterns

This notebook demonstrates various patterns for integrating the Self-Critique pipeline into larger, real-world systems, covering asynchronous processing, data persistence, and CI/CD workflows.

## Learning Objectives

- **Asynchronous Workflows**: Use a task queue like Celery to handle long-running pipeline executions without blocking.
- **Data Persistence**: Store pipeline results in a PostgreSQL database.
- **CI/CD Integration**: Automate testing and quality checks using GitHub Actions.
- **Prompt Versioning**: Manage prompt templates with DVC.

---


## Section 1: Async Pipeline Integration (Celery)

For long-running tasks, it's best to use a task queue to process them in the background. This allows the API to return a response immediately.


In [None]:
from celery import Celery
import time

# Configure Celery
# In a real app, this would be in a separate config file.
# Requires a message broker like RabbitMQ or Redis.
celery_app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@celery_app.task
def run_pipeline_async(paper_text: str):
    """Celery task to run the pipeline asynchronously."""
    print(f"Starting pipeline for paper: {paper_text[:50]}...")
    # Simulate a long-running process
    time.sleep(10)
    result = {"summary": "This is the final summary.", "status": "SUCCESS"}
    print("Pipeline finished.")
    return result

# Example of how to call the task from an API endpoint
def start_pipeline_job(paper_text: str):
    task = run_pipeline_async.delay(paper_text)
    return {"task_id": task.id, "status": "PENDING"}

print("✓ Celery task defined. To run a worker: celery -A your_module.celery_app worker -l info")

# Example usage:
# job = start_pipeline_job("Attention is all you need...")
# print(job)


## Section 2: Database Integration (PostgreSQL)

Storing results in a database allows for historical analysis, caching, and querying.


In [None]:
import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, Text, MetaData, Table
from sqlalchemy.orm import sessionmaker
import json

# Database connection (use a real connection string in production)
DATABASE_URL = "sqlite:///./test.db" # Using SQLite for demonstration
engine = create_engine(DATABASE_URL)
metadata = MetaData()

# Define the results table
pipeline_results = Table('pipeline_results',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('paper_text_hash', String(64), unique=True),
    Column('summary', Text),
    Column('critique', Text),
    Column('final_summary', Text),
    Column('metrics', Text) # Storing metrics as a JSON string
)

metadata.create_all(engine)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def save_results(db_session, result_data: dict):
    """Saves pipeline results to the database."""
    import hashlib
    
    paper_hash = hashlib.sha256(result_data['paper_text'].encode()).hexdigest()
    
    insert_stmt = pipeline_results.insert().values(
        paper_text_hash=paper_hash,
        summary=result_data['summary'],
        critique=result_data['critique'],
        final_summary=result_data['final_summary'],
        metrics=json.dumps(result_data['metrics'])
    )
    db_session.execute(insert_stmt)
    db_session.commit()
    print(f"✓ Results saved to database with hash: {paper_hash}")

# Example usage
db = SessionLocal()
sample_result = {
    'paper_text': 'This is the paper text.',
    'summary': 'Initial summary.',
    'critique': 'A critique.',
    'final_summary': 'The final, revised summary.',
    'metrics': {'tokens': 500, 'latency': 5.2}
}
save_results(db, sample_result)
db.close()


## Section 3: CI/CD Pipeline Integration (GitHub Actions)

A CI/CD pipeline automates testing and deployment, ensuring code quality and consistency.


In [None]:
%%writefile ../../.github/workflows/ci.yaml
name: CI Pipeline

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v3
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install -r requirements-dev.txt
        
    - name: Lint with flake8
      run: |
        flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
        
    - name: Run unit tests
      run: |
        pytest
        
  quality-gate:
    runs-on: ubuntu-latest
    needs: test
    steps:
    - uses: actions/checkout@v3
    # ... (steps to run model_evaluation_qa.ipynb and check quality gate)
    - name: Run Quality Gate
      run: echo "Simulating quality gate check... PASSED"


## Section 4: DVC Integration for Prompt Versioning

Using DVC (Data Version Control) allows us to version our prompt templates alongside our code, ensuring reproducibility.


In [None]:
# 1. Initialize DVC (do this once in your repo)
# dvc init

# 2. Create a directory for prompts
mkdir -p ../../prompts
echo "Summarize this paper: {{paper_text}}" > ../../prompts/summary_v1.txt

# 3. Add the prompts directory to DVC tracking
# dvc add prompts

# 4. Commit to Git
# git add prompts.dvc .gitignore
# git commit -m "feat: Add initial prompt templates with DVC"

print("✓ DVC setup for prompts complete. Use `dvc add prompts` after each change.")