# ETL Pipeline: Extract Academic Program Data with Ollama LLM and MongoDB

This notebook demonstrates an ETL (Extract, Transform, Load) pipeline that extracts academic program data from text files using the Ollama LLM and loads the structured data into MongoDB.

**Outline:**
1. Install and Import Required Packages
2. Define Pydantic Models for Institution and Program
3. Configure ETL Pipeline Settings
4. Implement Ollama LLM Client for Data Extraction
5. Set Up MongoDB Manager
6. Define ETL Pipeline Logic
7. Read and Process Text Files
8. Run the ETL Pipeline and Display Summary

## 1. Install and Import Required Packages

Install the necessary Python packages (`pydantic`, `pymongo`, `requests`) and import all required modules for the ETL pipeline.

In [None]:
# Install required packages (if not already installed)
import sys
import subprocess

def install_package(package):
    try:
        __import__(package.split('[')[0])
    except ImportError:
        print(f"Installing {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

packages = ['pydantic', 'pymongo', 'requests']
for pkg in packages:
    install_package(pkg)

# Import all required modules
import os
import json
import requests
from pathlib import Path
from typing import Optional, List, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field, ConfigDict
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError

## 2. Define Pydantic Models for Institution and Program

Create Pydantic models to validate and structure the data for institutions and academic programs.

In [None]:
class Institution(BaseModel):
    name: str = Field(..., description="Official institution name")
    institution_code: Optional[str] = Field(None, description="Unique identifier")
    description: Optional[str] = Field(None, description="Institution overview")
    type: List[str] = Field(default_factory=list, description="Institution types")
    country: str = Field(default="Sri Lanka", description="Country")
    website: Optional[str] = Field(None, description="Website URL")
    recognition: Optional[Dict[str, Any]] = Field(None, description="Accreditation")
    contact_info: Optional[Dict[str, Any]] = Field(None, description="Contact details")
    confidence_score: float = Field(..., ge=0.0, le=1.0, description="Confidence (0-1)")
    model_config = ConfigDict(extra="forbid")

class Program(BaseModel):
    name: str = Field(..., description="Program name")
    program_code: Optional[str] = Field(None, description="Unique identifier")
    description: Optional[str] = Field(None, description="Program overview")
    level: Optional[str] = Field(None, description="Academic level")
    duration: Optional[Dict[str, Any]] = Field(None, description="Duration details")
    delivery_mode: Optional[List[str]] = Field(None, description="Delivery modes")
    fees: Optional[Dict[str, Any]] = Field(None, description="Fee structure")
    eligibility: Optional[Dict[str, Any]] = Field(None, description="Requirements")
    curriculum_summary: Optional[str] = Field(None, description="Curriculum overview")
    specializations: Optional[List[str]] = Field(None, description="Specializations")
    extensions: Optional[Dict[str, Any]] = Field(None, description="Additional data")
    confidence_score: float = Field(..., ge=0.0, le=1.0, description="Confidence (0-1)")
    model_config = ConfigDict(extra="forbid")

## 3. Configure ETL Pipeline Settings

Set up configuration variables for the Ollama LLM API, MongoDB connection, and default values for missing fields in the extracted data.

In [None]:
class Config:
    OLLAMA_BASE_URL = "http://localhost:11434"
    OLLAMA_MODEL = "llama3.2"  # Change to your model
    MONGO_URI = "mongodb://localhost:27017/"
    DB_NAME = "education_db"
    INSTITUTION_COLLECTION = "institutions"
    DATA_FOLDER = "data"  # Folder containing text files
    # Realistic defaults for missing fields
    DEFAULT_VALUES = {
        "level": "Undergraduate",
        "delivery_mode": ["On-campus"],
        "duration": {"years": 4, "months": 0},
        "fees": {"currency": "LKR", "amount": "To be determined"},
        "eligibility": {"minimum_qualification": "A-Level or equivalent"}
    }

## 4. Implement Ollama LLM Client for Data Extraction

Define a client class to interact with the Ollama LLM API and extract academic program data from text content.

In [None]:
class OllamaClient:
    def __init__(self, base_url: str = Config.OLLAMA_BASE_URL, model: str = Config.OLLAMA_MODEL):
        self.base_url = base_url
        self.model = model
        
    def extract_program_data(self, text: str) -> Dict[str, Any]:
        """Extract program data from text using Ollama"""
        prompt = f"""You are a data extraction assistant. Analyze the following text and extract academic program information.\n\nText:\n{text}\n\nExtract the following information in JSON format:\n- name: Program name (required)\n- program_code: Program code if mentioned\n- description: Brief description\n- level: Academic level (Undergraduate/Postgraduate/Diploma/Certificate)\n- duration: Duration in years/months as {{\"years\": X, \"months\": Y}}\n- delivery_mode: List of delivery modes (On-campus/Online/Hybrid)\n- fees: Fee information as {{\"currency\": \"XXX\", \"amount\": \"value\"}}\n- eligibility: Eligibility requirements as {{\"minimum_qualification\": \"...\"}}\n- curriculum_summary: Brief curriculum overview\n- specializations: List of specializations if any\n- confidence_score: Your confidence in this extraction (0.0 to 1.0)\n\nIf the text does NOT contain program information, respond with: {\"is_program\": false}\n\nRespond ONLY with valid JSON, no additional text."""

        try:
            response = requests.post(
                f"{self.base_url}/api/generate",
                json={
                    "model": self.model,
                    "prompt": prompt,
                    "stream": False,
                    "format": "json"
                },
                timeout=60
            )
            response.raise_for_status()
            result = response.json()
            extracted = json.loads(result['response'])
            return extracted
        except Exception as e:
            print(f"‚ùå Ollama extraction error: {e}")
            return {"is_program": False, "error": str(e)}

## 5. Set Up MongoDB Manager

Create a class to manage MongoDB connections and operations, including inserting institutions and adding programs to the database.

In [None]:
class MongoDBManager:
    def __init__(self, uri: str = Config.MONGO_URI, db_name: str = Config.DB_NAME):
        self.client = MongoClient(uri)
        self.db = self.client[db_name]
        self.institutions = self.db[Config.INSTITUTION_COLLECTION]
        
    def create_institution(self, name: str, **kwargs) -> str:
        """Create institution and return its ID"""
        institution = Institution(
            name=name,
            confidence_score=kwargs.get('confidence_score', 1.0),
            **{k: v for k, v in kwargs.items() if k != 'confidence_score'}
        )
        
        institution_dict = institution.model_dump()
        result = self.institutions.insert_one(institution_dict)
        print(f"‚úÖ Created institution: {name} (ID: {result.inserted_id})")
        return str(result.inserted_id)
    
    def add_program_to_institution(self, institution_id: str, program: Program):
        """Add program to institution using $push"""
        from bson.objectid import ObjectId
        
        program_dict = program.model_dump()
        
        result = self.institutions.update_one(
            {"_id": ObjectId(institution_id)},
            {"$push": {"programs": program_dict}}
        )
        
        if result.modified_count > 0:
            return True
        return False
    
    def close(self):
        """Close MongoDB connection"""
        self.client.close()

## 6. Define ETL Pipeline Logic

Implement the `ETLPipeline` class to orchestrate reading files, extracting data, filling missing fields, and loading the results into MongoDB.

In [None]:
class ETLPipeline:
    def __init__(self):
        self.ollama = OllamaClient()
        self.db = MongoDBManager()
        self.stats = {
            'total_files': 0,
            'processed': 0,
            'skipped': 0,
            'errors': 0,
            'programs_added': 0
        }
    
    def read_text_file(self, filepath: Path) -> str:
        """Read text file content"""
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                return f.read().strip()
        except Exception as e:
            print(f"‚ùå Error reading {filepath}: {e}")
            return ""
    
    def fill_missing_fields(self, program_data: Dict[str, Any]) -> Dict[str, Any]:
        """Fill missing fields with realistic defaults"""
        for field, default in Config.DEFAULT_VALUES.items():
            if field not in program_data or program_data[field] is None:
                program_data[field] = default
                print(f"  üìù Filled missing field '{field}' with default value")
        return program_data
    
    def process_file(self, filepath: Path, institution_id: str):
        """Process single text file"""
        print(f"\nüìÑ Processing: {filepath.name}")
        
        text = self.read_text_file(filepath)
        if not text:
            print(f"  ‚ö†Ô∏è  Empty file, skipping...")
            self.stats['skipped'] += 1
            return
        
        # Extract data using Ollama
        print(f"  ü§ñ Extracting data with Ollama...")
        extracted = self.ollama.extract_program_data(text)
        
        # Check if it's program-related
        if not extracted.get('is_program', True):
            print(f"  ‚è≠Ô∏è  Not program-related, skipping...")
            self.stats['skipped'] += 1
            return
        
        # Fill missing fields
        extracted = self.fill_missing_fields(extracted)
        
        # Ensure required fields
        if 'name' not in extracted or not extracted['name']:
            print(f"  ‚ùå No program name found, skipping...")
            self.stats['skipped'] += 1
            return
        
        # Ensure confidence score
        if 'confidence_score' not in extracted:
            extracted['confidence_score'] = 0.7
        
        try:
            # Create Program object
            program = Program(**extracted)
            
            # Add to MongoDB
            success = self.db.add_program_to_institution(institution_id, program)
            
            if success:
                print(f"  ‚úÖ Added program: {program.name} (confidence: {program.confidence_score:.2f})")
                self.stats['programs_added'] += 1
                self.stats['processed'] += 1
            else:
                print(f"  ‚ùå Failed to add program to database")
                self.stats['errors'] += 1
                
        except Exception as e:
            print(f"  ‚ùå Error creating program: {e}")
            self.stats['errors'] += 1
    
    def run(self):
        """Run the ETL pipeline"""
        print("=" * 70)
        print("ETL PIPELINE: Text Files ‚Üí Ollama ‚Üí MongoDB")
        print("=" * 70)
        
        # Step 1: Get institution name from user
        institution_name = input("\nüèõÔ∏è  Enter institution name: ").strip()
        if not institution_name:
            print("‚ùå Institution name is required!")
            return
        
        # Step 2: Create institution
        print(f"\nüìã Creating institution...")
        institution_id = self.db.create_institution(institution_name)
        
        # Step 3: Get all text files
        data_folder = Path(Config.DATA_FOLDER)
        if not data_folder.exists():
            print(f"‚ùå Data folder '{Config.DATA_FOLDER}' not found!")
            return
        
        text_files = list(data_folder.glob("*.txt"))
        self.stats['total_files'] = len(text_files)
        
        if not text_files:
            print(f"‚ùå No .txt files found in '{Config.DATA_FOLDER}'")
            return
        
        print(f"\nüìö Found {len(text_files)} text files")
        
        # Step 4: Process each file
        for filepath in text_files:
            self.process_file(filepath, institution_id)
        
        # Step 5: Print summary
        self.print_summary()
        
        # Step 6: Cleanup
        self.db.close()
    
    def print_summary(self):
        """Print pipeline execution summary"""
        print("\n" + "=" * 70)
        print("PIPELINE SUMMARY")
        print("=" * 70)
        print(f"Total files:       {self.stats['total_files']}")
        print(f"Processed:         {self.stats['processed']}")
        print(f"Skipped:           {self.stats['skipped']}")
        print(f"Errors:            {self.stats['errors']}")
        print(f"Programs added:    {self.stats['programs_added']}")
        print("=" * 70)

## 7. Read and Process Text Files

Read text files from the data folder, process each file, and extract program information using the ETL pipeline.

In [None]:
# Example usage: Read and process all text files in the data folder
# (This is handled inside ETLPipeline.run(), but you can call process_file directly if needed)
#
# pipeline = ETLPipeline()
# data_folder = Path(Config.DATA_FOLDER)
# for filepath in data_folder.glob('*.txt'):
#     pipeline.process_file(filepath, institution_id)
#
# For full pipeline execution, see the next cell.

## 8. Run the ETL Pipeline and Display Summary

Execute the ETL pipeline, prompt for the institution name, process all files, and print a summary of the results.

In [None]:
# Run the ETL pipeline interactively
pipeline = ETLPipeline()
pipeline.run()
# The pipeline will prompt for the institution name, process all text files in the data folder, and print a summary.