### Uncomment and install the following  

In [29]:
# !pip install transformers
# !pip install torch torchvision torchaudio
# !pip install accelerate==0.26.0
# !pip install openai

# !pip install -U langchain langchain-openai
# !pip install pdf2image
# !pip install psycopg2
# !pip install protobuf
# !pip install sentencepiece
# !pip install tiktoken==0.1.1
# !pip install python-dotenv

In [4]:
import os
import torch
from typing import List, Dict, Tuple
from transformers import AutoModelForSequenceClassification
import json
import logging
from dataclasses import dataclass
from pathlib import Path
# from poppler import Poppler
from PIL import Image
import base64
from pdf2image import convert_from_path
from langchain_openai import ChatOpenAI
import tempfile



  from .autonotebook import tqdm as notebook_tqdm


**Input Your Own API keys. Note, Open API KEY is required**

In [None]:
os.environ['LANGCHAIN_TRACING_V2'] = ''
os.environ['LANGCHAIN_ENDPOINT'] = ''
os.environ['LANGCHAIN_API_KEY'] = ''
os.environ["OPENAI_API_KEY"] = ''


**Automated Concept-to-Standard Mapping with NLI Models**
This notebook demonstrates an approach to automatically map educational concepts to relevant Common Core Standards using advanced Natural Language Inference (NLI) techniques. The workflow is designed to process annotated algebra textbooks and evaluate their alignment with Common Core standards in a robust and automated manner.

***Key Components:***

1. Concept Extraction: OpenAI's Vision Model (GPT-4o) is utilized to extract key concepts from textbook chapters.
2. Cluster Alignment: Extracted concepts are initially compared to broader Common Core clusters using the NLI model. Concepts meeting a threshold similarity are further analyzed.
3. Standard Evaluation: For clusters that surpass the threshold, the standards within them are compared against the extracted concepts to identify the most relevant standards.
4. Evaluation Against Ground Truth: The identified standards are compared to the textbook’s ground-truth annotations, enabling performance evaluation of the model.

***Resources***

* Link to the annotated textbook: https://flexbooks.ck12.org/cbook/ck-12-algebra-i-concepts/
* PDF of the textbook could be found in Data folder



**PDFProcessor Class**
The PDFProcessor class handles the conversion of PDF documents into images and their optimization for further processing.

***Key Attributes:***

1. dpi:

 * The resolution for converting PDF pages to images. Default is 200 DPI.
 * Higher DPI ensures better image quality but increases processing time and memory usage.

**logger:**

2. A logger to record the processing progress and errors.

***Key Methods:***

1. pdf_to_images(pdf_path: str) -> List[str]:
   * Converts each page of a PDF into a JPEG image.
   * Optimizes the images and encodes them in Base64 format for easy transmission or embedding.
   * Steps:
       1. Uses the convert_from_path function to generate images from the PDF.
       2. Each image is optimized using the optimize_image method.
       3. Saves the optimized image to a temporary directory and encodes it to Base64 using the encode_image method.
       4. Logs progress for each processed page.
   * Returns a list of Base64-encoded strings representing the PDF pages.
2. optimize_image(image: Image.Image) -> Image.Image:
   * Resizes the image to fit within a maximum dimension (default: 2000 pixels) while maintaining aspect ratio.
   * Uses the Lanczos resampling filter for high-quality resizing.
3. encode_image(image_path: str) -> str:
   * Converts an image file to a Base64 string, suitable for embedding or sharing.

In [None]:

class PDFProcessor:
    def __init__(self, dpi: int = 200):
        self.dpi = dpi
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def pdf_to_images(self, pdf_path: str) -> List[str]:
        """Convert PDF pages to base64-encoded images."""
        try:
            self.logger.info(f"Converting PDF: {pdf_path}")
            images = convert_from_path(
                pdf_path,
                dpi=self.dpi,
                fmt='JPEG',
                thread_count=4
            )

            base64_images = []

            with tempfile.TemporaryDirectory() as temp_dir:
                for i, image in enumerate(images):
                    image = self.optimize_image(image)
                    temp_path = os.path.join(temp_dir, f'page_{i}.jpg')
                    image.save(temp_path, 'JPEG', quality=85)
                    base64_str = self.encode_image(temp_path)
                    base64_images.append(base64_str)
                    self.logger.info(f"Processed page {i+1}/{len(images)}")

            return base64_images

        except Exception as e:
            self.logger.error(f"Error processing PDF: {str(e)}")
            raise

    def optimize_image(self, image: Image.Image) -> Image.Image:
        """Optimize image size while maintaining quality."""
        max_dimension = 2000
        ratio = min(max_dimension / max(image.size[0], image.size[1]), 1.0)
        new_size = tuple(int(dim * ratio) for dim in image.size)

        if ratio < 1.0:
            image = image.resize(new_size, Image.Resampling.LANCZOS)

        return image

    @staticmethod
    def encode_image(image_path: str) -> str:
        """Convert image to base64 string."""
        with open(image_path, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')


**ConceptAnalyzer Class**
The ConceptAnalyzer class uses OpenAI’s language models to extract key concepts from documents and process metadata.

***Key Attributes:***

1. pdf_processor:
    * An instance of the PDFProcessor class to handle PDF-to-image conversion.
2. model:
    * The OpenAI language model (e.g., gpt-4o-mini) for analyzing the extracted images and metadata.
3. logger:
    * A logger to record progress and handle errors during concept analysis.
    
***Key Methods:***

1. analyze_document(pdf_path: str, metadata: Dict = None) -> Dict:
    * The main method to process a PDF document and extract key concepts.
    * Steps:
           1. Metadata Handling: Initializes metadata (e.g., grade, domain, file name) if not provided.
           2. PDF Conversion: Uses PDFProcessor.pdf_to_images to convert the PDF into Base64-encoded images.
           3. Concept Extraction: Calls _process_concepts to send the images and metadata to the OpenAI model for concept extraction.
           4. Result Saving: Saves the analysis result as a JSON file in a results directory next to the PDF.
    * Returns the analysis result as a dictionary.
2. _process_concepts(base64_images: List[str], metadata: Dict) -> Dict:
    * Prepares a request for the OpenAI model, including metadata and the Base64-encoded images.
    * Sends the request to the model and parses the response into a structured JSON format.
    * Handles possible errors in the response (e.g., invalid JSON) and attempts to clean and parse it.
3. _get_output_path(pdf_path: str) -> str:
    * Constructs a path for saving the analysis results. Results are saved as a JSON file in a subdirectory named results.
4. get_analysis_result(pdf_path: str) -> Dict:
    * Retrieves previously saved analysis results, if available.

In [None]:

class ConceptAnalyzer:
    def __init__(self, openai_api_key: str, model_name: str = "gpt-4o-mini", dpi: int = 200):
        self.pdf_processor = PDFProcessor(dpi=dpi)
        self.model = ChatOpenAI(
            model=model_name,
            max_tokens=4096,
            temperature=0,
            openai_api_key=openai_api_key
        )
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def analyze_document(self, pdf_path: str, metadata: Dict = None) -> Dict:
        """Analyze concepts in a PDF document."""
        try:
            if metadata is None:
                metadata = {
                    'grade': 'unknown',
                    'domain': 'unknown',
                    'file_name': os.path.basename(pdf_path)
                }

            self.logger.info(
                f"Starting analysis for document: {metadata['file_name']}")

            # Convert PDF to images
            base64_images = self.pdf_processor.pdf_to_images(pdf_path)

            # Process concepts
            concepts_result = self._process_concepts(base64_images, metadata)

            # Save results
            output_path = self._get_output_path(pdf_path)
            os.makedirs(os.path.dirname(output_path), exist_ok=True)

            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(concepts_result, f, indent=2)

            self.logger.info(f"Analysis completed and saved to: {output_path}")
            return concepts_result

        except Exception as e:
            self.logger.error(f"Error analyzing document: {str(e)}")
            raise

    def _process_concepts(self, base64_images: List[str], metadata: Dict) -> Dict:
        """Process document for concepts analysis."""
        content = [
            {
                "type": "text",
                "text": f"""
You are an advanced AI assistant tasked with extracting key math concepts from a document. The document was
uploaded with the following metadata:
1. **Grade level:** {metadata['grade']}
2. **Domain/Module:** {metadata['domain']}
3. **File name:** {metadata['file_name']}

Your goals are:
1. Identify the **concepts** covered in the document (e.g., algebra, calculus, geometry).
2. For each concept, extract its name and provide a concise description based on the content of the document.
3. Optionally infer the significance of the grade level, domain/module, or file name if it helps clarify the
concepts or content.

### Output Schema:
{{
"concepts": [
  {{
    "name": "string (name of the concept)",
    "description": "string (description of the concept)"
  }}
]
}}

Note, make sure you return valid json format with no escape problems"""
            }
        ]

        # Add images to content
        for base64_image in base64_images:
            content.append({
                "type": "image_url",
                "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}
            })

        # Get model response
        message = [{"role": "user", "content": content}]
        response = self.model.invoke(message)

        # Parse JSON response
        try:
            # First attempt: direct JSON parsing
            return json.loads(response.content)
        except json.JSONDecodeError:
            try:
                # Second attempt: clean the response and try again
                cleaned_content = response.content.strip()

                # Remove any markdown code block indicators
                cleaned_content = cleaned_content.replace(
                    '```json', '').replace('```', '')

                # Find the JSON object boundaries
                start_idx = cleaned_content.find('{')
                end_idx = cleaned_content.rfind('}') + 1

                if start_idx != -1 and end_idx > start_idx:
                    json_str = cleaned_content[start_idx:end_idx]

                    # Replace escaped quotes and clean up common issues
                    json_str = json_str.replace('\\"', '"')
                    json_str = json_str.replace('\\n', ' ')
                    json_str = json_str.replace('\\', '')

                    # Attempt to parse the cleaned JSON
                    return json.loads(json_str)

                self.logger.error(
                    f"Could not find valid JSON in response: {cleaned_content}")
                raise ValueError("No valid JSON found in response")
            except Exception as e:
                self.logger.error(
                    f"Error parsing response: {str(e)}\nResponse content: {response.content}")
                # Return a structured error response
                return {
                    "error": "Failed to parse response",
                    "concepts": [],
                    "raw_response": response.content
                }

    def _get_output_path(self, pdf_path: str) -> str:
        """Generate path for analysis result file."""
        base_path = Path(pdf_path)
        return str(base_path.parent/"results" / f"{base_path.stem}_concepts.json")

    def get_analysis_result(self, pdf_path: str) -> Dict:
        """Retrieve analysis results from file."""
        output_path = self._get_output_path(pdf_path)
        if os.path.exists(output_path):
            with open(output_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        return None

In [None]:
@dataclass
class NLIResult:
    entailment: float
    neutral: float
    contradiction: float
    text1: str
    text2: str

    @property
    def max_score(self) -> float:
        return self.entailment


**NLIProcessor Class**
The NLIProcessor class is a utility designed to leverage a pre-trained Natural Language Inference (NLI) model to evaluate the semantic relationship between two text inputs. It specifically helps map educational concepts to relevant clusters or standards within the context of Common Core alignment.

***Key Attributes:***

**model_name:**
The name of the pre-trained NLI model used. By default, it uses "ynie/xlnet-large-cased-snli_mnli_fever_anli_R1_R2_R3-nli", a large, fine-tuned model for various NLI tasks.

**max_length:**
Specifies the maximum token length for inputs. Long texts are truncated to fit this limit.

**threshold:**
A configurable similarity threshold (default: 0.7). Only pairs exceeding this threshold are considered relevant.
tokenizer and model:
The tokenizer and model are initialized using Hugging Face's transformers library to process and infer relationships between input pairs.

***device:***
Automatically uses a GPU if available, otherwise falls back to CPU, ensuring optimal performance.

***Key Methods:***

***process_pair***(premise: str, hypothesis: str) -> NLIResult:
* Compares a single "premise" (e.g., a concept description) with a "hypothesis" (e.g., a cluster or standard).
* Tokenizes the input pair, runs it through the NLI model, and calculates probabilities for three categories:
 * Entailment: How strongly the hypothesis follows from the premise.
 * Neutral: No strong relationship between the premise and hypothesis.
 * Contradiction: The hypothesis contradicts the premise. Outputs an NLIResult object containing these probabilities and the original texts.

***process_concept_against_clusters***(concept_description: str, clusters: List[Cluster]) -> List[Tuple[Cluster, NLIResult]]:

* Compares a concept description against a list of clusters.
* For each cluster, it evaluates the semantic similarity (entailment score) between the concept and the cluster’s description (cluster.clustername).
* Filters and sorts results based on the threshold and alignment strength.

***process_concept_against_standards***(concept_description: str, standards: List[Standard]) -> List[Tuple[Standard, NLIResult]]:
* Similar to process_concept_against_clusters, but compares the concept description against individual standards instead of clusters.
* Evaluates the semantic alignment with each standard’s description (standard.standarddescription).

In [4]:
from transformers import XLNetTokenizer
class NLIProcessor:


    def __init__(self, model_name: str = "ynie/xlnet-large-cased-snli_mnli_fever_anli_R1_R2_R3-nli"):
        self.model_name = model_name
        self.max_length = 256
        self.threshold = 0.7  # Configurable threshold for relationship strength

        # Initialize logger
        self.logger = logging.getLogger(__name__)

        # Load model and tokenizer
        self.logger.info(f"Loading NLI model: {model_name}")
        self.tokenizer = XLNetTokenizer.from_pretrained(
            model_name,
            use_fast=True,
            model_max_length=256
        )
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        # Move model to GPU if available
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model.to(self.device)
        self.model.eval()  # Set to evaluation mode

    def process_pair(self, premise: str, hypothesis: str) -> NLIResult:
        """Process a single premise-hypothesis pair."""
        try:
            # Tokenize
            inputs = self.tokenizer.encode_plus(
                premise,
                hypothesis,
                max_length=self.max_length,
                return_token_type_ids=True,
                truncation=True,
                padding='max_length',
                return_tensors='pt'
            )

            # Move inputs to same device as model
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            # Get predictions
            with torch.no_grad():
                outputs = self.model(**inputs)
                probabilities = torch.softmax(outputs.logits, dim=1)[0]

            # Create result object
            result = NLIResult(
                entailment=probabilities[0].item(),
                neutral=probabilities[1].item(),
                contradiction=probabilities[2].item(),

                text1=premise,
                text2=hypothesis
            )

            return result

        except Exception as e:
            self.logger.error(f"Error processing NLI pair: {str(e)}")
            raise

    def process_concept_against_clusters(self,
                                      concept_description: str,
                                      clusters: List['Cluster']) -> List[Tuple['Cluster', NLIResult]]:
        """Process a concept description against multiple clusters."""
        results = []
        for cluster in clusters:
            result = self.process_pair(concept_description, cluster.clustername)
            if result.max_score >= self.threshold:
                # print(result)
                results.append((cluster, result))

        return sorted(results, key=lambda x: x[1].max_score, reverse=True)

    def process_concept_against_standards(self,
                                       concept_description: str,
                                       standards: List['Standard']) -> List[Tuple['Standard', NLIResult]]:

        """Process a concept description against multiple standards."""
        results = []
        for standard in standards:
            result = self.process_pair(concept_description, standard.standarddescription)
            print(result)
            if result.max_score >= self.threshold:
                results.append((standard, result))

        return sorted(results, key=lambda x: x[1].max_score, reverse=True)


**DataProcessor Class**  
The DataProcessor class manages and verifies PDF files required for processing, ensuring all necessary files are present and providing metadata about them.

***Key Attributes:***

1. `base_dir`:
    * The base directory containing PDF files. Default: `"Data"`.

***Key Methods:***

1. `get_chapter_files() -> List[Path]`:
    * Retrieves all chapter files matching the pattern `"chapter *.pdf"`.
    * Sorts the files numerically by chapter number.

2. `get_test_file() -> Path`:
    * Retrieves the test file (`test_file.pdf`) from the base directory.
    * Raises a `FileNotFoundError` if the file does not exist.

3. `verify_files() -> Tuple[bool, str]`:
    * Ensures all chapters (1–12) and the test file are present.
    * Returns:
        - `True, "All files verified successfully"` if all files are valid.
        - `False, "Missing chapters: [...]"` if any chapters are missing.

4. `get_file_info() -> Dict`:
    * Returns metadata for all relevant files:
        - Test file: Path, size, last modified timestamp.
        - Chapters: Number, path, size, and last modified timestamp for each file.


In [6]:
import os
from pathlib import Path
from typing import Dict, List, Tuple

class DataProcessor:
    def __init__(self, base_dir: str = "vision_model_results/"):
        """
        Initialize the DataProcessor with the base directory path.

        Args:
            base_dir (str): Base directory containing the PDF files
        """
        # self.base_dir = Path(base_dir)
        self.base_dir = Path('Data')

    def get_chapter_files(self) -> List[Path]:
        """Get all chapter PDF files sorted numerically."""
        chapter_files = [f for f in self.base_dir.glob("chapter *.pdf")]
        # Sort chapters numerically (1, 2, 3... 10, 11, 12 instead of 1, 10, 11...)
        return sorted(chapter_files, key=lambda x: int(x.stem.split()[1]))

    def get_test_file(self) -> Path:
        """Get the test file path."""
        test_file = self.base_dir / "test_file.pdf"
        if not test_file.exists():
            raise FileNotFoundError(f"Test file not found in {self.base_dir}")
        return test_file

    def verify_files(self) -> Tuple[bool, str]:
        """
        Verify that all necessary files exist and are accessible.

        Returns:
            Tuple[bool, str]: (success status, message)
        """
        try:
            chapter_files = self.get_chapter_files()
            test_file = self.get_test_file()

            # Check if we have all chapters (1-12)
            chapter_numbers = [int(f.stem.split()[1]) for f in chapter_files]
            expected_chapters = set(range(1, 13))  # 1 to 12
            missing_chapters = expected_chapters - set(chapter_numbers)

            if missing_chapters:
                return False, f"Missing chapters: {sorted(missing_chapters)}"

            return True, "All files verified successfully"

        except Exception as e:
            return False, f"Verification failed: {str(e)}"

    def get_file_info(self) -> Dict:
        """
        Get information about all PDF files in the directory.

        Returns:
            Dict: Dictionary containing file information
        """
        chapter_files = self.get_chapter_files()
        test_file = self.get_test_file()

        return {
            "test_file": {
                "path": str(test_file),
                "size": test_file.stat().st_size,
                "last_modified": test_file.stat().st_mtime
            },
            "chapters": [
                {
                    "number": int(f.stem.split()[1]),
                    "path": str(f),
                    "size": f.stat().st_size,
                    "last_modified": f.stat().st_mtime
                }
                for f in chapter_files
            ]
        }

In [None]:
# Initialize the processor
processor = DataProcessor()

# Verify all files exist
status, message = processor.verify_files()
print(f"Status: {status}")
print(f"Message: {message}")



# Get all chapter files
chapter_files = processor.get_chapter_files()
print("\nChapter files:")
for chapter in chapter_files:
    print(f"- {chapter}")

# Get detailed file information
file_info = processor.get_file_info()
print("\nFile information:")
print(f"Chapter path: {file_info['test_file']['path']}")
print(f"Number of chapters: {len(file_info['chapters'])}")

Status: True
Message: All files verified successfully
Test file: Data/test_file.pdf

Chapter files:
- Data/chapter 1.pdf
- Data/chapter 2.pdf
- Data/chapter 3.pdf
- Data/chapter 4.pdf
- Data/chapter 5.pdf
- Data/chapter 6.pdf
- Data/chapter 7.pdf
- Data/chapter 8.pdf
- Data/chapter 9.pdf
- Data/chapter 10.pdf
- Data/chapter 11.pdf
- Data/chapter 12.pdf

File information:
Test file path: Data/test_file.pdf
Number of chapters: 12


### **StandardsComparator Class**

The `StandardsComparator` class facilitates mapping educational concepts to clusters and standards using a Natural Language Inference (NLI) model and database queries. It supports comparisons against ground truth for performance evaluation.

---

#### **Key Attributes:**

1. `nli`:
   * An instance of an NLI processor for semantic comparison between concepts, clusters, and standards.
   
2. `query_executor`:
   * A utility to execute SQL queries for retrieving data from the database.

3. `threshold`:
   * The minimum similarity score for a concept-cluster or concept-standard match (default: `0.7`).

4. `logger`:
   * A logger to track progress and handle errors during processing.

---

#### **Key Methods:**

1. **`get_domains_for_grade(grade_id: int) -> List[Domain]`**:
   * Retrieves all domains associated with a specific grade from the database.
   * **Returns:** A list of `Domain` objects.

2. **`get_clusters_for_domain(domain_id: int) -> List[Cluster]`**:
   * Fetches all clusters for a given domain.
   * **Returns:** A list of `Cluster` objects.

3. **`get_standards_for_cluster(cluster_id: int) -> List[Standard]`**:
   * Retrieves all standards for a specific cluster.
   * **Returns:** A list of `Standard` objects.

4. **`match_concepts_to_clusters(concepts: List[Dict], clusters: List[Cluster]) -> List[Tuple[Dict, Cluster, float]]`**:
   * Matches extracted concepts to clusters using NLI.
   * **Steps:**
       - Compares each concept (name + description) to cluster names.
       - Retains matches with a similarity score above the threshold.
       - Sorts results by score in descending order.
   * **Returns:** A list of (concept, cluster, score) tuples.

5. **`match_concepts_to_standards(concepts: List[Dict], standards: List[Standard]) -> List[Tuple[Dict, Standard, float]]`**:
   * Matches extracted concepts to standards using NLI.
   * **Steps:** Similar to `match_concepts_to_clusters`, but matches against standards' descriptions.
   * **Returns:** A list of (concept, standard, score) tuples.

6. **`compare_to_ground_truth(matched_standards: List[Tuple[Dict, Standard, float]], ground_truth_standards: List[int]) -> Dict`**:
   * Compares matched standards to a ground truth set of standard IDs.
   * **Metrics:**
       - Precision: Proportion of correctly matched standards out of all matches.
       - Recall: Proportion of ground truth standards correctly identified.
       - F1 Score: Harmonic mean of precision and recall.
   * **Returns:** A dictionary with metrics (`precision`, `recall`, `f1`) and counts (`true_positives`, `false_positives`, `false_negatives`).

---

#### **Workflow Summary:**
1. Retrieve domains, clusters, and standards for a specific grade using database queries.
2. Match extracted concepts to clusters and standards using semantic similarity (NLI).
3. Filter results by threshold and sort by relevance.
4. Evaluate performance against annotated ground truth standards using precision, recall, and F1 score.

This class integrates data querying, semantic analysis, and evaluation, making it a comprehensive tool for educational concept mapping.


In [6]:


# Update the StandardsComparator methods to use the new query executor


import logging
from typing import List, Dict, Tuple
import pandas as pd
from dataclasses import dataclass

@dataclass
class Standard:
    standardid: int
    standardcode: str
    standarddescription: str
    clusterid: int
    stadid: int

@dataclass
class Cluster:
    clusterid: int
    clustername: str
    clustertype: str
    domainid: int

@dataclass
class Domain:
    domainid: int
    domainname: str
    domain_abb: str
    gradeid: int

class StandardsComparator:

    def __init__(self, nli_processor, query_executor, threshold: float = 0.7 ):
        """
        Initialize the standards comparator.
        
        Args:
            nli_processor: NLI model processor instance
            threshold: Threshold for relationship strength
        """
        self.nli = nli_processor
        self.threshold = threshold
        self.query_executor = query_executor
        self.logger = logging.getLogger(__name__)
    
    def get_domains_for_grade(self, grade_id: int) -> List[Domain]:
        """Get all domains for a given grade"""
        query = """
        SELECT domainid, gradeid, domain_abb, domainname
        FROM domains
        where gradeid = %s
        """
        
        try:
            results = self.query_executor.execute_query(query, (grade_id,))
            return [Domain(
                domainid=row[0],
                domainname=row[1],
                domain_abb=row[2],
                gradeid=row[3]
            ) for row in results]
        except Exception as e:
            self.logger.error(f"Error fetching clusters: {str(e)}")
            raise




    def get_clusters_for_domain(self, domain_id: int) -> List[Cluster]:
        """Get all clusters for a given domain."""
        query = """
            SELECT clusterid, clustername, clustertype, domainid
            FROM clusters
            WHERE domainid = %s
        """
        try:
            results = self.query_executor.execute_query(query, (domain_id,))
            return [Cluster(
                clusterid=row[0],
                clustername=row[1],
                clustertype=row[2],
                domainid=row[3]
            ) for row in results]
        except Exception as e:
            self.logger.error(f"Error fetching clusters: {str(e)}")
            raise


    def get_standards_for_cluster(self, cluster_id: int) -> List[Standard]:
        """Get all standards for a given cluster."""
        query = """
            SELECT standardid, standardcode, standarddescription, clusterid, stadid
            FROM standards
            WHERE clusterid = %s
        """
        
        try:
            results = self.query_executor.execute_query(query, (cluster_id,))
            return [Standard(
                standardid=row[0],
                standardcode=row[1],
                standarddescription=row[2],
                clusterid=row[3],
                stadid=row[4]
            ) for row in results]
        except Exception as e:
            self.logger.error(f"Error fetching standards: {str(e)}")
            raise

    def match_concepts_to_clusters(
        self,
        concepts: List[Dict],
        clusters: List[Cluster]
    ) -> List[Tuple[Dict, Cluster, float]]:
        """
        Match extracted concepts to clusters using NLI.
        
        Args:
            concepts: List of concept dictionaries with 'name' and 'description'
            clusters: List of Cluster objects
            
        Returns:
            List of (concept, cluster, score) tuples above threshold
        """
        matches = []
        
        for concept in concepts:
            concept_text = f"{concept['name']}: {concept['description']}"
            
            for cluster in clusters:
                result = self.nli.process_pair(concept_text, cluster.clustername)
                
                if result.max_score >= self.threshold:
                    matches.append((concept, cluster, result.max_score))
        
        return sorted(matches, key=lambda x: x[2], reverse=True)

    def match_concepts_to_standards(
        self,
        concepts: List[Dict],
        standards: List[Standard]
    ) -> List[Tuple[Dict, Standard, float]]:
        """
        Match extracted concepts to standards using NLI.
        
        Args:
            concepts: List of concept dictionaries
            standards: List of Standard objects
            
        Returns:
            List of (concept, standard, score) tuples above threshold
        """
        matches = []
        
        for concept in concepts:
            concept_text = f"{concept['name']}: {concept['description']}"
            
            for standard in standards:
                result = self.nli.process_pair(concept_text, standard.standarddescription)
                
                if result.max_score >= self.threshold:
                    matches.append((concept, standard, result.max_score))
        
        return sorted(matches, key=lambda x: x[2], reverse=True)

    def compare_to_ground_truth(
        self,
        matched_standards: List[Tuple[Dict, Standard, float]],
        ground_truth_standards: List[int]
    ) -> Dict:
        """
        Compare matched standards to ground truth standards.
        
        Args:
            matched_standards: List of (concept, standard, score) tuples
            ground_truth_standards: List of correct standard IDs
            
        Returns:
            Dictionary with precision, recall, and F1 metrics
        """
        matched_ids = set(standard.standardid for _, standard, _ in matched_standards)
        ground_truth_ids = set(ground_truth_standards)
        
        true_positives = len(matched_ids.intersection(ground_truth_ids))
        false_positives = len(matched_ids - ground_truth_ids)
        false_negatives = len(ground_truth_ids - matched_ids)
        
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'true_positives': true_positives,
            'false_positives': false_positives,
            'false_negatives': false_negatives
        }

**DatabaseConnection and DBQueryExecutor Classes**

These classes manage database connections and execute queries, providing a robust interface for database operations in Python.

---

**DatabaseConnection Class**

The `DatabaseConnection` class handles establishing, maintaining, and closing connections to a PostgreSQL database.

***Key Attributes:***
1. `params`:
   * Connection parameters, including database name, user, password, host, and port.
2. `conn`:
   * The active database connection object (`psycopg2.extensions.connection`).
3. `cur`:
   * The active database cursor object (`psycopg2.extensions.cursor`).
4. `logger`:
   * A logger to track connection events and errors.

***Key Methods:***

1. **`connect()`**:
   * Establishes a connection to the database and initializes the cursor.
   * Logs success or raises an error if the connection fails.

2. **`disconnect()`**:
   * Closes the active database connection and cursor.
   * Logs success or raises an error if disconnection fails.

3. **`__enter__()`**:
   * Enables the class to be used as a context manager (`with` statement).
   * Automatically calls `connect()` when entering the block.

4. **`__exit__()`**:
   * Automatically calls `disconnect()` when exiting a `with` block.

---

**DBQueryExecutor Class**

The `DBQueryExecutor` class uses an instance of `DatabaseConnection` to execute SQL queries and retrieve results.

***Key Attributes:***
1. `db`:
   * An instance of `DatabaseConnection` to manage the database connection lifecycle.
2. `logger`:
   * A logger to track query execution events and errors.

***Key Methods:***

1. **`execute_query(query: str, params: tuple = None) -> list`**:
   * Executes a provided SQL query with optional parameters.
   * **Steps**:
       1. Uses the `DatabaseConnection` instance as a context manager to ensure the connection is properly opened and closed.
       2. Executes the query and retrieves the results.
   * **Returns:** A list of rows fetched from the database.
   * Logs success or raises an error if query execution fails.

---

**Workflow Summary:**

1. **DatabaseConnection**:
   - Manages the lifecycle of a database connection.
   - Allows the use of a `with` statement for clean and automatic resource management.

2. **DBQueryExecutor**:
   - Executes SQL queries using the database connection.
   - Fetches and returns query results, ensuring robust error handling.



In [11]:
import psycopg2
from psycopg2 import Error
from typing import Optional
import logging

class DatabaseConnection:
    def __init__(self, 
                 dbname: str = "capstone_db",
                 user: str = "postgres",
                 password: str = "mysecretpassword",
                 host: str = "localhost",
                 port: str = "5432"):
        """
        Initialize database connection parameters.
        
        Args:
            dbname: Database name
            user: Database user
            password: Database password
            host: Database host
            port: Database port
        """
        self.params = {
            "dbname": dbname,
            "user": user,
            "password": password,
            "host": host,
            "port": port
        }
        self.conn: Optional[psycopg2.extensions.connection] = None
        self.cur: Optional[psycopg2.extensions.cursor] = None
        self.logger = logging.getLogger(__name__)

    def connect(self):
        """Establish database connection and create cursor."""
        try:
            self.conn = psycopg2.connect(**self.params)
            self.cur = self.conn.cursor()
            self.logger.info("Database connection established successfully")
        except Error as e:
            self.logger.error(f"Error connecting to database: {str(e)}")
            raise

    def disconnect(self):
        """Close database connection and cursor."""
        try:
            if self.cur:
                self.cur.close()
            if self.conn:
                self.conn.close()
            self.logger.info("Database connection closed successfully")
        except Error as e:
            self.logger.error(f"Error closing database connection: {str(e)}")
            raise

    def __enter__(self):
        """Context manager entry point."""
        self.connect()
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit point."""
        self.disconnect()
        
class DBQueryExecutor:
    def __init__(self, db_connection: DatabaseConnection):
        """
        Initialize query executor with database connection.
        
        Args:
            db_connection: DatabaseConnection instance
        """
        self.db = db_connection
        self.logger = logging.getLogger(__name__)

    def execute_query(self, query: str, params: tuple = None) -> list:
        """
        Execute a query and return results.
        
        Args:
            query: SQL query string
            params: Query parameters
            
        Returns:
            List of query results
        """
        try:
            with self.db as conn:
                with conn.cursor() as cur:
                    cur.execute(query, params)
                    return cur.fetchall()
        except Error as e:
            self.logger.error(f"Error executing query: {str(e)}")
            raise

**Here I'm initilizing the database connection. For reproducibility make sure you provide the appropriate database configurations**

In [12]:
# Initialize database connection
db = DatabaseConnection(
    dbname="capstone_db",
    user="postgres",
    password="mysecretpassword",
    host="localhost",
    port="5432"
)
query_executor = DBQueryExecutor(db)



### **Code Explanation: Initializing and Using NLIProcessor and StandardsComparator**

This snippet demonstrates how to initialize the necessary components and retrieve domains for specific grades using the `StandardsComparator`.

---

#### **Key Components:**

1. **`nli_processor`**:
   * An instance of the `NLIProcessor` class.
   * Responsible for evaluating semantic relationships between concepts, clusters, and standards using Natural Language Inference (NLI).

2. **`standards_comparator`**:
   * An instance of the `StandardsComparator` class.
   * Combines NLI capabilities (`nli_processor`) with database querying (`query_executor`).
   * Facilitates mapping concepts to standards and clusters while supporting grade-specific filtering.

3. **`domains_to_be_checked`**:
   * A list to store domains retrieved for grades 6 through 12.


In [None]:
nli_processor = NLIProcessor()

standards_comparator = StandardsComparator(nli_processor, query_executor)


domains_to_be_checked = []
for grade_id in range(6,13):
    domains = standards_comparator.get_domains_for_grade(grade_id)
    domains_to_be_checked.append(domains)


INFO:__main__:Loading NLI model: ynie/xlnet-large-cased-snli_mnli_fever_anli_R1_R2_R3-nli
INFO:__main__:Database connection established successfully
INFO:__main__:Database connection closed successfully
INFO:__main__:Database connection established successfully
INFO:__main__:Database connection closed successfully
INFO:__main__:Database connection established successfully
INFO:__main__:Database connection closed successfully
INFO:__main__:Database connection established successfully
INFO:__main__:Database connection closed successfully
INFO:__main__:Database connection established successfully
INFO:__main__:Database connection closed successfully
INFO:__main__:Database connection established successfully
INFO:__main__:Database connection closed successfully
INFO:__main__:Database connection established successfully
INFO:__main__:Database connection closed successfully


### **Code Explanation: Retrieving Clusters for Domains**

This snippet retrieves clusters for all domains across grades 6 to 12 and stores them.

---

1. **Input:**
   - `domains_to_be_checked`: A list of domains grouped by grade (from previous processing).

2. **Process:**
   - Iterates through each grade's domains.
   - For each domain, calls `get_clusters_for_domain` to fetch associated clusters from the database.

3. **Output:**
   - `cluster_matches`: A list containing clusters for all domains, grouped by grade.

---

**Result:**  
Clusters are now retrieved and organized for further analysis.


In [None]:
cluster_matches = []
for grade_domains in domains_to_be_checked:
    for domains in grade_domains:

        clusters = standards_comparator.get_clusters_for_domain(domains.domainid)

        cluster_matches.append(clusters)


### **Workflow: Using DataProcessor and ConceptAnalyzer**

The following demonstrates how to initialize and use the `DataProcessor` and `ConceptAnalyzer` classes to process educational materials using chapter files store in the Data file which I already define in the DataProcessor Class.

---

**Steps:**

1. **Initialize the Classes**:
   * `DataProcessor` is used to manage and validate PDF files (e.g., chapters).
   * `ConceptAnalyzer` extracts key concepts from the files using OpenAI’s language models.
   ```python
   processor = DataProcessor()
   analyzer = ConceptAnalyzer(openai_api_key=os.getenv('OPENAI_API_KEY'))


In [None]:

# Initialize both classes
processor = DataProcessor()
concept_analyzer = ConceptAnalyzer(openai_api_key=os.getenv('OPENAI_API_KEY'))

# Process test file
test_file = processor.get_chapter_files()
concepts = analyzer.analyze_document(str(chapter_files))



INFO:__main__:Starting analysis for document: test_file.pdf
INFO:__main__:Converting Chapter 1: test_file.pdf
INFO:__main__:Processed page 1/12 of Chapter 1
INFO:__main__:Processed page 2/12 of Chapter 1
INFO:__main__:Processed page 3/12 of Chapter 1
INFO:__main__:Processed page 4/12 of Chapter 1
INFO:__main__:Processed page 5/12 of Chapter 1
INFO:__main__:Processed page 6/12 of Chapter 1
INFO:__main__:Processed page 7/12 of Chapter 1
INFO:__main__:Processed page 8/12 of Chapter 1
INFO:__main__:Processed page 9/12 of Chapter 1
INFO:__main__:Processed page 10/12 of Chapter 1
INFO:__main__:Processed page 11/12 of Chapter 1
INFO:__main__:Processed page 12/12 of Chapter 1
INFO:__main__:Completed processing for Chapter 1
INFO:__main__:Converting Chapter 2: test_file.pdf
INFO:__main__:Processed page 1/15 of Chapter 2
INFO:__main__:Processed page 2/15 of Chapter 2
INFO:__main__:Processed page 3/15 of Chapter 2
INFO:__main__:Processed page 4/15 of Chapter 2
INFO:__main__:Processed page 5/15 of

### **Code Explanation: Matching Concepts to Clusters**

This snippet matches extracted concepts to clusters using the `StandardsComparator`.

---

1. **Input:**
   - `concepts['concepts']`: A list of extracted concepts.
   - `cluster_matches`: Clusters retrieved for all domains.

2. **Process:**
   - Iterates through each set of clusters.
   - Calls `match_concepts_to_clusters` to compare concepts with clusters using NLI.
   - Appends the matches to `cluster_matches_collector`.

3. **Output:**
   - `cluster_matches_collector`: A list of concept-to-cluster matches, grouped by the original clusters.

---

In [29]:
cluster_matches_collector = []
for clusters in cluster_matches:
    
    cluster_matches = standards_comparator.match_concepts_to_clusters(concepts['concepts'], clusters)
    cluster_matches_collector.append(cluster_matches)

### **Code Explanation: Matching Concepts to Standards**

This snippet refines the matching process by linking concepts to relevant standards within clusters.

---

1. **Input:**
   - `cluster_matches_collector[0]`: The first element of cluster_matches_collector is the retrived clusters, hence cluster_matches_collector[0].
   - `standards`: Standards retrieved for each cluster.

2. **Process:**
   - Iterates through the matched concepts and clusters.
   - Retrieves standards for each cluster using `get_standards_for_cluster`.
   - Matches the concept to the standards using `match_concepts_to_standards`.
   - Appends the matches to `all_standard_matches`.

3. **Output:**
   - `all_standard_matches`: A comprehensive list of concept-to-standard matches.


In [45]:
all_standard_matches = []
for concept, cluster, score in cluster_matches_collector[0]:
    standards = standards_comparator.get_standards_for_cluster(cluster.clusterid)
    standard_matches = standards_comparator.match_concepts_to_standards([concept], standards)
    all_standard_matches.extend(standard_matches)

INFO:__main__:Database connection established successfully
INFO:__main__:Database connection closed successfully


### **Code Explanation: Evaluating Concept-to-Standard Matches**

This snippet evaluates the performance of the matching process by comparing the predicted matches against the ground truth standards.

---

1. **Input:**
   - `ground_truth_standards`: A list of correct standard codes ( from aformentione annotated textbook).
   - `all_standard_matches`: Predicted matches between concepts and standards.

2. **Process:**
   - Calls `compare_to_ground_truth` to calculate evaluation metrics:
     - **Precision**: Proportion of correctly matched standards out of all predicted matches.
     - **Recall**: Proportion of ground truth standards correctly identified.
     - **F1 Score**: Harmonic mean of precision and recall.
   - Metrics are stored in the `metrics` dictionary.

3. **Output:**
   - Prints evaluation metrics (precision, recall, F1 score).

---

**Result:**
Performance metrics provide insight into the accuracy and relevance of the concept-to-standard matching process, enabling further optimization or validation.


In [None]:
ground_truth_standards = [
    "6.EE.A.2",
    "6.EE.A.2",
    "6.EE.B.6",
    "6.EE.B.8",
    "6.EE.C.9",
    "6.EE.A.1",
    "HS.N-RN.A.1",
    "8.EE.A.1",
    "8.EE.A.2",
    "6.EE.A.2",
    "6.EE.A.2",
    "6.EE.A.4",
    "HS.A-SSE.A.1",
    "7.EE.A.1",
    "7.EE.A.2",
    "7.EE.B.3",
    "6.EE.B.5",
    "7.EE.B.4",
    "HS.F-IF.A.1",
    "HS.F-IF.A.2",
    "HS.F-IF.B.4",
    "HS.F-IF.C.8",
    "HS.F-BF.A.1",
    "HS.F-BF.A.1",
    "HS.F-BF.A.1",
    "8.F.A.1",
    "HS.F-IF.C.9",
    "HS.A-REI.D.10",
    "HS.F-IF.C.7",
    "HS.F-IF.C.7",
    "8.EE.B.5",
    "HS.F-LE.A.3",
    "HS.F-LE.A.2",
    "6.SP.B.5.A",
    "HS.A-CED.A.1",
    "6.SP.B.5.B"
]

metrics = standards_comparator.compare_to_ground_truth(all_standard_matches, ground_truth_standards)

print("Evaluation Metrics:")
print(f"Precision: {metrics['precision']:.2f}")
print(f"Recall: {metrics['recall']:.2f}")
print(f"F1 Score: {metrics['f1']:.2f}")



Evaluation Metrics:
Recall: 0.89
Precision: 0.73
F1 Score: 0.80
