# Educational Data ETL Pipeline with LLM Extraction

This notebook implements a production-ready ETL pipeline for extracting structured educational data using LLMs, Pydantic validation, and MongoDB, based on the provided Python code. Each section demonstrates a key part of the pipeline, with code and explanations.

## 1. Configuration and Directory Setup

Define and initialize the configuration for the ETL pipeline, including input, output, and log directories. This section uses a dataclass for centralized configuration and ensures all directories exist.

In [None]:
from dataclasses import dataclass
from pathlib import Path
import os, sys

@dataclass
class Config:
    """Centralized configuration"""
    base_dir: Path = Path("/content" if 'google.colab' in sys.modules else os.getcwd())
    input_dir: Path = None
    logs_dir: Path = None
    output_dir: Path = None
    mongodb_uri: str = "mongodb+srv://ict22006_db_user:gGgnHUqamNcU5jAy@development.ps1jayw.mongodb.net/?appName=development"
    database_name: str = "edu_platform"
    ollama_base_url: str = "http://localhost:11434"
    ollama_model: str = "llama3"
    ollama_temperature: float = 0.1
    ollama_timeout: int = 300
    chunk_max_chars: int = 3000
    chunk_overlap: int = 200
    confidence_threshold: float = 0.6
    max_retries: int = 2
    retry_delay: int = 5
    def __post_init__(self):
        self.input_dir = self.base_dir / "bci_lk"
        self.logs_dir = self.base_dir / "logs"
        self.output_dir = self.base_dir / "output"
        for dir_path in [self.input_dir, self.logs_dir, self.output_dir]:
            dir_path.mkdir(exist_ok=True, parents=True)

config = Config()
print("Config initialized:", config)

## 2. Logging Initialization

Set up logging to both file and console, and verify that logging output works as expected.

In [None]:
import logging
from datetime import datetime

def setup_logging():
    log_file = config.logs_dir / f'pipeline_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)
    logger = logging.getLogger("etl_notebook")
    logger.setLevel(logging.DEBUG)
    if not logger.handlers:
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
    return logger

logger = setup_logging()
logger.info("Logging initialized!")

## 3. Pydantic Schema Definitions

Define the Pydantic models for Institution, Program, and ExtractionResult. Validate example data to demonstrate schema usage.

In [None]:
try:
    from pydantic import BaseModel, Field, ConfigDict
except ImportError:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pydantic"])
    from pydantic import BaseModel, Field, ConfigDict
from typing import Optional, List, Dict, Any
from datetime import datetime

class Institution(BaseModel):
    name: str = Field(..., description="Official institution name")
    institution_code: Optional[str] = Field(None, description="Unique identifier")
    description: Optional[str] = Field(None, description="Institution overview")
    type: List[str] = Field(default_factory=list, description="Institution types")
    country: str = Field(default="Sri Lanka", description="Country")
    website: Optional[str] = Field(None, description="Website URL")
    recognition: Optional[Dict[str, Any]] = Field(None, description="Accreditation")
    contact_info: Optional[Dict[str, Any]] = Field(None, description="Contact details")
    confidence_score: float = Field(..., ge=0.0, le=1.0, description="Confidence (0-1)")
    model_config = ConfigDict(extra="forbid")

class Program(BaseModel):
    name: str = Field(..., description="Program name")
    program_code: Optional[str] = Field(None, description="Unique identifier")
    description: Optional[str] = Field(None, description="Program overview")
    level: Optional[str] = Field(None, description="Academic level")
    duration: Optional[Dict[str, Any]] = Field(None, description="Duration details")
    delivery_mode: Optional[List[str]] = Field(None, description="Delivery modes")
    fees: Optional[Dict[str, Any]] = Field(None, description="Fee structure")
    eligibility: Optional[Dict[str, Any]] = Field(None, description="Requirements")
    curriculum_summary: Optional[str] = Field(None, description="Curriculum overview")
    specializations: Optional[List[str]] = Field(None, description="Specializations")
    extensions: Optional[Dict[str, Any]] = Field(None, description="Additional data")
    confidence_score: float = Field(..., ge=0.0, le=1.0, description="Confidence (0-1)")
    model_config = ConfigDict(extra="forbid")

class ExtractionResult(BaseModel):
    institution: Institution
    programs: List[Program] = Field(default_factory=list)
    raw_text: str = Field(..., description="Original text")
    extraction_timestamp: datetime = Field(default_factory=datetime.now)
    source_file: Optional[str] = Field(None, description="Source filename")
    model_config = ConfigDict(extra="allow")

# Example validation
test_inst = Institution(name="Test University", confidence_score=0.95)
test_prog = Program(name="BSc Computer Science", confidence_score=0.9)
test_result = ExtractionResult(institution=test_inst, programs=[test_prog], raw_text="Sample text")
print("Validated ExtractionResult:", test_result)

## 4. Ollama LLM Client Usage

Instantiate the OllamaClient, test the connection, and demonstrate making a sample prompt call to the LLM. This section shows how to interact with the LLM for data extraction.

In [None]:
try:
    import requests
except ImportError:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "requests"])
    import requests
import re, json

class OllamaClient:
    def __init__(self):
        self.base_url = config.ollama_base_url
        self.model = config.ollama_model
        self.api_url = f"{self.base_url}/api/generate"
    def call(self, prompt: str) -> dict:
        response = requests.post(
            self.api_url,
            json={
                "model": self.model,
                "prompt": prompt,
                "stream": False,
                "temperature": config.ollama_temperature,
                "options": {"num_predict": 4096, "top_k": 40, "top_p": 0.9}
            },
            timeout=config.ollama_timeout
        )
        response.raise_for_status()
        raw_text = response.json().get("response", "")
        cleaned = re.sub(r'```json\\s*', '', raw_text)
        cleaned = re.sub(r'```\\s*', '', cleaned)
        match = re.search(r'\{.*\}', cleaned, re.DOTALL)
        return json.loads(match.group(0) if match else cleaned.strip())
    def test_connection(self) -> bool:
        try:
            response = requests.get(f"{self.base_url}/api/tags", timeout=5)
            return response.status_code == 200
        except:
            return False

ollama = OllamaClient()
print("Ollama connection:", "OK" if ollama.test_connection() else "FAILED")

# Example LLM call (replace with real prompt for actual use)
# sample_prompt = "{"institution": {"name": "Test U", "confidence_score": 1.0}, "programs": []}"
# print(ollama.call(sample_prompt))

## 5. Text Chunking and Cleaning

Use the TextChunker class to clean and split a sample text into chunks, demonstrating the chunking logic used in the pipeline.

In [None]:
import re
class TextChunker:
    def __init__(self):
        self.max_chars = config.chunk_max_chars
        self.overlap = config.chunk_overlap
    def chunk_text(self, text: str):
        if len(text) <= self.max_chars:
            return [text]
        chunks, current = [], ""
        paragraphs = text.split('\n\n')
        for para in paragraphs:
            if len(para) > self.max_chars:
                sentences = para.split('. ')
                for sent in sentences:
                    if len(current) + len(sent) + 2 <= self.max_chars:
                        current += sent + '. '
                    else:
                        if current:
                            chunks.append(current.strip())
                            current = current[-self.overlap:] + sent + '. '
                        else:
                            current = sent + '. '
            else:
                if len(current) + len(para) + 2 <= self.max_chars:
                    current += para + '\n\n'
                else:
                    if current:
                        chunks.append(current.strip())
                        current = current[-self.overlap:] + para + '\n\n'
                    else:
                        current = para + '\n\n'
        if current:
            chunks.append(current.strip())
        return chunks
    def clean_text(self, text: str) -> str:
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'[^\w\s\.\,\:\;\-\(\)\[\]\/\&]', '', text)
        return text.strip()

chunker = TextChunker()
sample_text = """This is a sample educational program.\n\nIt covers multiple topics.\n\nThe duration is 3 years.\n\n"""
cleaned = chunker.clean_text(sample_text)
chunks = chunker.chunk_text(cleaned)
print("Chunks:", chunks)

## 6. Data Validation and Confidence Scoring

Validate a sample extraction result using DataValidator, and normalize confidence scores with ConfidenceScorer. This ensures extracted data meets schema and quality requirements.

In [None]:
class DataValidator:
    @staticmethod
    def validate_extraction(data: dict, source_file: str = None):
        if source_file:
            data['source_file'] = source_file
        result = ExtractionResult(**data)
        logger.info(f"Validation passed: {result.institution.name}")
        logger.info(f"Programs: {len(result.programs)}")
        return result
    @staticmethod
    def validate_confidence(result: ExtractionResult) -> bool:
        min_conf = config.confidence_threshold
        if result.institution.confidence_score < min_conf:
            logger.warning(f"Low institution confidence: {result.institution.confidence_score}")
            return False
        low_progs = [p.name for p in result.programs if p.confidence_score < min_conf]
        if low_progs:
            logger.warning(f"Low confidence programs: {len(low_progs)}")
        return True

class ConfidenceScorer:
    def normalize_confidence(self, result: ExtractionResult) -> ExtractionResult:
        result.institution.confidence_score = self._normalize_institution(result.institution)
        for program in result.programs:
            program.confidence_score = self._normalize_program(program)
        return result
    def _normalize_institution(self, inst: Institution) -> float:
        score = inst.confidence_score
        if score < config.confidence_threshold:
            score *= 0.9
        if not inst.website:
            score *= 0.95
        if not inst.institution_code:
            score *= 0.98
        return round(min(score, 1.0), 3)
    def _normalize_program(self, prog: Program) -> float:
        score = prog.confidence_score
        if score < config.confidence_threshold:
            score *= 0.85
        missing = sum([not prog.level, not prog.duration, not prog.curriculum_summary])
        score *= (1.0 - 0.03 * missing)
        return round(min(score, 1.0), 3)

# Example usage
validator = DataValidator()
scorer = ConfidenceScorer()

sample_data = {
    'institution': {'name': 'Test U', 'confidence_score': 0.7},
    'programs': [{'name': 'BSc CS', 'confidence_score': 0.5}],
    'raw_text': 'test'
}
result = validator.validate_extraction(sample_data)
result = scorer.normalize_confidence(result)
validator.validate_confidence(result)
print("Normalized institution confidence:", result.institution.confidence_score)

## 7. MongoDB Writer Operations

Connect to MongoDB, insert a sample ExtractionResult, and retrieve database statistics using MongoDBWriter. This demonstrates database integration for storing extracted data.

In [None]:
try:
    from pymongo import MongoClient, errors
    from pymongo.server_api import ServerApi
except ImportError:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pymongo[srv]"])
    from pymongo import MongoClient, errors
    from pymongo.server_api import ServerApi
from datetime import datetime

class MongoDBWriter:
    def __init__(self):
        self.client = MongoClient(
            config.mongodb_uri,
            server_api=ServerApi('1'),
            serverSelectionTimeoutMS=5000
        )
        self.client.admin.command('ping')
        self.db = self.client[config.database_name]
        self.institutions = self.db.institutions
        self.programs = self.db.programs
        self.raw_documents = self.db.raw_documents
        self.extraction_logs = self.db.extraction_logs
        self._create_indexes()
    def _create_indexes(self):
        self.institutions.create_index("name")
        self.institutions.create_index(
            "institution_code", unique=True, sparse=True,
            partialFilterExpression={"institution_code": {"$type": "string"}}
        )
        self.programs.create_index("institution_id")
        self.programs.create_index("name")
        self.programs.create_index("level")
    def write_extraction(self, result: ExtractionResult) -> dict:
        inst_data = result.institution.model_dump()
        inst_data['inserted_at'] = datetime.now()
        if inst_data.get('institution_code') is None:
            inst_data.pop('institution_code', None)
        inst_result = self.institutions.insert_one(inst_data)
        inst_id = inst_result.inserted_id
        prog_ids = []
        for program in result.programs:
            prog_data = program.model_dump()
            prog_data['institution_id'] = inst_id
            prog_data['inserted_at'] = datetime.now()
            prog_result = self.programs.insert_one(prog_data)
            prog_ids.append(prog_result.inserted_id)
        raw_doc = {
            'institution_id': inst_id,
            'raw_text': result.raw_text[:5000],
            'source_file': result.source_file,
            'extraction_timestamp': result.extraction_timestamp,
            'inserted_at': datetime.now()
        }
        raw_result = self.raw_documents.insert_one(raw_doc)
        self.extraction_logs.insert_one({
            'institution_id': inst_id,
            'source_file': result.source_file,
            'programs_count': len(result.programs),
            'institution_confidence': result.institution.confidence_score,
            'avg_program_confidence': sum(p.confidence_score for p in result.programs) / len(result.programs) if result.programs else 0,
            'timestamp': datetime.now(),
            'status': 'success'
        })
        return {'institution_id': inst_id, 'program_ids': prog_ids, 'raw_document_id': raw_result.inserted_id}
    def get_statistics(self):
        return {
            'institutions': self.institutions.count_documents({}),
            'programs': self.programs.count_documents({}),
            'raw_documents': self.raw_documents.count_documents({}),
            'extraction_logs': self.extraction_logs.count_documents({})
        }
    def close(self):
        self.client.close()

# Example usage
try:
    mongo = MongoDBWriter()
    inserted = mongo.write_extraction(result)
    print("Inserted IDs:", inserted)
    print("DB stats:", mongo.get_statistics())
    mongo.close()
except Exception as e:
    print("MongoDB error:", e)

## 8. Prompt Template Construction

Show how to construct system and user prompts for LLM extraction using the provided templates. This ensures the LLM receives clear, schema-aligned instructions.

In [None]:
SYSTEM_PROMPT = """You are a structured data extraction engine for educational institutions.\n\nCRITICAL RULES:\n1. Output ONLY valid JSON - no markdown, no code blocks, no explanations\n2. Follow the JSON schema EXACTLY\n3. Do NOT guess or hallucinate information\n4. If data is missing, OMIT the field\n5. Preserve original wording\n6. Assign honest confidence_score (0.0-1.0)\n\nCONFIDENCE GUIDELINES:\n- 1.0: Explicitly stated, clear\n- 0.8-0.9: Clearly stated, minor ambiguity\n- 0.6-0.7: Implied or partially stated\n- 0.4-0.5: Inferred from context\n- 0.0-0.3: Highly uncertain\n\nOUTPUT FORMAT:\n{\n  \"institution\": {\n    \"name\": \"...\",\n    \"description\": \"...\",\n    \"type\": [\"...\"],\n    \"country\": \"Sri Lanka\",\n    \"website\": \"...\",\n    \"confidence_score\": 0.0-1.0\n  },\n  \"programs\": [\n    {\n      \"name\": \"...\",\n      \"description\": \"...\",\n      \"level\": \"...\",\n      \"duration\": {...},\n      \"curriculum_summary\": \"...\",\n      \"confidence_score\": 0.0-1.0\n    }\n  ]\n}\n"""

USER_PROMPT_TEMPLATE = """Extract structured educational data from the following content.\n\nSource content:\n<<<\n{content}\n>>>\n\nReturn ONLY the JSON object. No explanations. No markdown.\n"""

# Example prompt construction
sample_content = "Sample university offers a BSc in Computer Science."
prompt = SYSTEM_PROMPT + USER_PROMPT_TEMPLATE.format(content=sample_content)
print(prompt[:400] + "...\n[truncated]")

## 9. Extraction Pipeline Execution

Run the ExtractionPipeline on a sample text file, demonstrating the full ETL process for a single file. This section ties together all previous components.

In [None]:
import traceback
class ExtractionPipeline:
    def __init__(self):
        self.ollama = OllamaClient()
        self.chunker = TextChunker()
        self.validator = DataValidator()
        self.scorer = ConfidenceScorer()
        try:
            self.mongo = MongoDBWriter()
        except Exception as e:
            logger.warning(f"MongoDB not available: {e}")
            self.mongo = None
    def process_file(self, file_path, save_to_db=True):
        logger.info(f"Processing: {file_path}")
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()
            text = self.chunker.clean_text(text)
            chunks = self.chunker.chunk_text(text)
            all_programs = []
            institution_data = None
            for chunk in chunks:
                prompt = SYSTEM_PROMPT + USER_PROMPT_TEMPLATE.format(content=chunk)
                try:
                    response = self.ollama.call(prompt)
                except Exception as e:
                    logger.error(f"LLM call failed: {e}")
                    continue
                if 'institution' in response and institution_data is None:
                    institution_data = response['institution']
                if 'programs' in response:
                    all_programs.extend(response['programs'])
            if not institution_data:
                logger.error("No institution data extracted")
                return None
            merged_data = {'institution': institution_data, 'programs': all_programs, 'raw_text': text[:5000]}
            result = self.validator.validate_extraction(merged_data, str(file_path))
            result = self.scorer.normalize_confidence(result)
            self.validator.validate_confidence(result)
            if save_to_db and self.mongo:
                inserted = self.mongo.write_extraction(result)
                logger.info(f"Saved to MongoDB: {inserted['institution_id']}")
            return result
        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
            logger.error(traceback.format_exc())
            return None

# Example usage (replace 'sample.txt' with a real file path)
# pipeline = ExtractionPipeline()
# result = pipeline.process_file(config.input_dir / 'sample.txt')

## 10. Batch Processing of Files

Use BatchProcessor to process multiple files with retry logic, and display a summary of results. This enables robust, large-scale ETL runs.

In [None]:
import time
class BatchProcessor:
    def __init__(self, pipeline):
        self.pipeline = pipeline
        self.results = {'success': [], 'failed': []}
    def process_all(self, files, save_to_db=True):
        total = len(files)
        logger.info(f"BATCH PROCESSING: {total} files")
        start_time = time.time()
        for idx, file_path in enumerate(files, 1):
            logger.info(f"[{idx}/{total}] Processing: {file_path}")
            result = self._process_with_retry(file_path, save_to_db)
            if result:
                self.results['success'].append(str(file_path))
                logger.info("Success")
            else:
                self.results['failed'].append(str(file_path))
                logger.info("Failed")
            success_rate = len(self.results['success']) / idx * 100
            logger.info(f"Progress: {idx}/{total} ({success_rate:.1f}% success)")
        self._print_summary(time.time() - start_time)
    def _process_with_retry(self, file_path, save_to_db):
        for attempt in range(config.max_retries + 1):
            try:
                result = self.pipeline.process_file(file_path, save_to_db)
                if result:
                    return result
                if attempt < config.max_retries:
                    logger.warning(f"Retry {attempt + 1}/{config.max_retries}")
                    time.sleep(config.retry_delay)
            except Exception as e:
                logger.error(f"Attempt {attempt + 1} failed: {e}")
                if attempt < config.max_retries:
                    time.sleep(config.retry_delay)
        return None
    def _print_summary(self, elapsed):
        total = len(self.results['success']) + len(self.results['failed'])
        success_count = len(self.results['success'])
        logger.info(f"BATCH PROCESSING COMPLETE\nSummary: Total: {total}, Success: {success_count}, Failed: {len(self.results['failed'])}, Time: {elapsed:.1f}s")
        if self.results['failed']:
            logger.info("Failed files: " + ", ".join(self.results['failed']))

# Example usage (uncomment to run on all .txt files)
# pipeline = ExtractionPipeline()
# files = list(config.input_dir.glob('*.txt'))
# batch = BatchProcessor(pipeline)
# batch.process_all(files)

## 11. Main Pipeline Execution Example

Provide an example of running the main() function, including Ollama connection check and processing a test file. This cell demonstrates a full pipeline run from start to finish.

In [None]:
def main():
    logger.info("Educational Data ETL Pipeline Started")
    ollama = OllamaClient()
    if not ollama.test_connection():
        logger.error("Cannot connect to Ollama. Please start the server: ollama serve")
        return
    logger.info("Ollama connected")
    pipeline = ExtractionPipeline()
    files = list(config.input_dir.glob("*.txt"))
    logger.info(f"Found {len(files)} files to process")
    if not files:
        logger.error(f"No files found in {config.input_dir}")
        return
    batch_processor = BatchProcessor(pipeline)
    if len(files) > 0:
        logger.info("Processing single file for testing...")
        test_result = pipeline.process_file(files[0], save_to_db=True)
        if test_result:
            logger.info(f"Test successful! Institution: {test_result.institution.name}, Programs: {len(test_result.programs)}")
    # To process all files, uncomment:
    # batch_processor.process_all(files, save_to_db=True)
    logger.info("Pipeline execution completed")

# Example usage (uncomment to run)
# main()