# CFR2SBVR Modules

Supporting modules for the chapters 6, 7 of the dissertation.

## Configuration

In [None]:
!mkdir configuration && touch configuration/__init__.py

In [None]:
%%writefile configuration/main.py

import os
import json
import time
import re
import glob
import yaml
from datetime import datetime
from pathlib import Path

DEFAULT_CONFIG_DIR: str = '../config.yaml'  # Google drive: "/content/drive/MyDrive/cfr2sbvr/config.yaml"

def _get_sorted_file_info(file_dir: str, file_prefix: str, extension: str):
    """
    Helper function to retrieve and sort file information based on a specific prefix and extension.

    Args:
        file_dir (str): Directory to search for files.
        file_prefix (str): Prefix for the filenames.
        extension (str): File extension.

    Returns:
        list: Sorted list of file information dictionaries containing 'filename', 'date', and 'number' keys.
    """
    path = Path(file_dir)
    path.mkdir(parents=True, exist_ok=True)

    files = list(path.glob(f"{file_prefix}-*.{extension}"))
    file_info_list = []

    pattern = re.compile(rf'^{file_prefix}-(\d{{4}}-\d{{2}}-\d{{2}})-(\d+)\.{extension}$')
    for filepath in files:
        match = pattern.match(filepath.name)
        if match:
            date_str = match.group(1)
            number = int(match.group(2))
            file_info_list.append({'filename': filepath.name, 'date': date_str, 'number': number})

    return sorted(file_info_list, key=lambda x: (x['date'], x['number']), reverse=True)

def get_next_filename(file_dir: str, file_prefix: str, extension: str) -> str:
    """
    Generates the next filename in a sequence based on existing files in a directory,
    considering the file extension.

    The filename format is: `{file_prefix}-{YYYY-MM-DD}-{N}.{extension}`,
    where `N` is an incrementing integer for files with the same date.
    """
    today_str = datetime.today().strftime('%Y-%m-%d')
    sorted_files = _get_sorted_file_info(file_dir, file_prefix, extension)

    if sorted_files and sorted_files[0]['date'] == today_str:
        new_number = sorted_files[0]['number'] + 1
    else:
        new_number = 1

    new_filename = f'{file_prefix}-{today_str}-{new_number}.{extension}'
    return str(Path(file_dir) / new_filename)

def get_last_filename(file_dir: str, file_prefix: str, extension: str) -> str:
    """
    Retrieves the most recent filename based on the highest date and sequence number
    for files with a specific prefix and extension in the specified directory.
    """
    sorted_files = _get_sorted_file_info(file_dir, file_prefix, extension)
    if sorted_files:
        return str(Path(file_dir) / sorted_files[0]['filename'])
    return None

# Load the YAML config file
def load_config(config_file: str = None):
    if config_file is None:
        config_file = DEFAULT_CONFIG_DIR
    try:
        with open(config_file, "r") as file:
            config = yaml.safe_load(file)
    except FileNotFoundError:
        raise FileNotFoundError(f"Configuration file {config_file} not found.")
    except yaml.YAMLError as exc:
        raise ValueError(f"Error parsing YAML file {config_file}: {exc}")

    # Ensure config structure is correct
    if "LLM" not in config or "DEFAULT_CHECKPOINT_DIR" not in config:
        raise ValueError("Required configuration keys are missing in the config file.")

    # Set the OpenAI API key from environment variable if it's not set in config
    config["LLM"]["OPENAI_API_KEY"] = os.getenv(
        "OPENAI_API_KEY", config["LLM"].get("OPENAI_API_KEY")
    )

    # Dynamically set checkpoint and report files using the get_next_filename function
    config["DEFAULT_CHECKPOINT_FILE"] = get_next_filename(
        config["DEFAULT_CHECKPOINT_DIR"], "documents", "json"
    )
    config["DEFAULT_EXTRACTION_REPORT_FILE"] = get_next_filename(
        config["DEFAULT_OUTPUT_DIR"], "extraction_report", "html"
    )

    return config


If running in Google Colab

In [None]:
import sys
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
  from google.colab import drive
  drive.mount('/content/drive')
  # backup on Google Drive
  !cp -r configuration /content/drive/MyDrive/cfr2sbvr/modules

### Testing

In [None]:
import configuration.main as configuration

# Development mode
import importlib
importlib.reload(configuration)

Load configuration

In [None]:
config = configuration.load_config()

In [None]:
config

## Checkpoint

Checkpoints are stored / retrieved at the directory `DEFAULT_CHECKPOINT_FILE` in the configuration file.

In [None]:
!mkdir checkpoint && touch checkpoint/__init__.py

In [None]:
%%writefile checkpoint/main.py

from typing import List, Dict, Optional, Any, Tuple, Set
from pydantic import BaseModel, Field
import logging
import json
from json import JSONDecodeError

# Set up basic logging configuration for the checkpoint module
logging.basicConfig(
    level=logging.INFO,  # Set to INFO or another level as needed
    format='%(asctime)s - %(levelname)s - %(message)s',  # Log format
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

def convert_set_to_list(data: Any) -> Any:
    """
    Recursively converts sets to lists in the data structure.

    Args:
        data (Any): The data structure to process, which can be a dict, list, set, or other types.

    Returns:
        Any: The data structure with all sets converted to lists.
    """
    if isinstance(data, dict):
        return {key: convert_set_to_list(value) for key, value in data.items()}
    elif isinstance(data, list):
        return [convert_set_to_list(item) for item in data]
    elif isinstance(data, set):
        return list(data)
    else:
        return data


# Define a model for the Document
class Document(BaseModel):
    id: str
    type: str  # New field to represent the type of the document
    content: Any  # Content can be any data type: list, dict, string, etc.

# Define the DocumentManager class
class DocumentManager(BaseModel):
    documents: Dict[Tuple[str, str], Document] = Field(default_factory=dict)  # Keys are tuples (id, type)

    def add_document(self, doc: Document) -> None:
        """
        Adds a document to the manager.

        Args:
            doc (Document): The document to add.
        """
        key = (doc.id, doc.type)
        self.documents[key] = doc

    def retrieve_document(self, doc_id: str, doc_type: str) -> Optional[Document]:
        """
        Retrieves a document by its id and type.

        Args:
            doc_id (str): The ID of the document.
            doc_type (str): The type of the document.

        Returns:
            Optional[Document]: The retrieved document, or None if not found.
        """
        key = (doc_id, doc_type)
        return self.documents.get(key)

    def list_document_ids(self, doc_type: Optional[str] = None) -> List[str]:
        """
        Lists all document ids, optionally filtered by type.

        Args:
            doc_type (Optional[str], optional): The type of documents to list. Defaults to None.

        Returns:
            List[str]: A list of document ids.
        """
        if doc_type:
            return [doc_id for (doc_id, d_type) in self.documents.keys() if d_type == doc_type]
        else:
            return [doc_id for (doc_id, _) in self.documents.keys()]

    def exclude_document(self, doc_id: str, doc_type: str) -> None:
        """
        Excludes a document by its id and type.

        Args:
            doc_id (str): The ID of the document to exclude.
            doc_type (str): The type of the document.
        """
        key = (doc_id, doc_type)
        if key in self.documents:
            del self.documents[key]

    def persist_to_file(self, filename: str) -> None:
        """
        Persists the current state to a file, converting tuple keys to strings and sets to lists.

        Args:
            filename (str): The filename to save the documents.
        """
        serializable_documents = {f"{doc_id}|{doc_type}": convert_set_to_list(doc.dict()) for (doc_id, doc_type), doc in self.documents.items()}
        with open(filename, 'w') as file:
            json.dump(serializable_documents, file, indent=4)

    @classmethod
    def restore_from_file(cls, filename: str) -> 'DocumentManager':
        """
        Restores the state from a file, converting string keys back to tuples.

        Args:
            filename (str): The filename to restore the documents from.

        Returns:
            DocumentManager: The restored DocumentManager instance.
        """
        with open(filename, 'r') as file:
            data = json.load(file)
            documents = {(doc_id.split('|')[0], doc_id.split('|')[1]): Document(**doc_data) for doc_id, doc_data in data.items()}
            return cls(documents=documents)

def restore_checkpoint(filename: Optional[str]) -> DocumentManager:
    """
    Restores the document manager from a checkpoint file.

    Args:
        filename (str, optional): The path to the checkpoint file. Defaults to DEFAULT_CHECKPOINT_FILE.

    Returns:
        DocumentManager: The restored DocumentManager instance.

    Raises:
        FileNotFoundError: If the checkpoint file does not exist.

    See Also:
        - Reset the values delete the documents.json file and run: manager = DocumentManager()
        - Restore the state from the documents.json file, run: DocumentManager.restore_from_file("documents.json")
        - Exclue a document: manager.exclude_document(doc_id="§ 275.0-2", doc_type="section")
        - List documents: manager.list_document_ids(doc_type="section")
        - Get a document: manager.retrieve_document(doc_id=doc, doc_type="section")
    """

    try:
        restored_docs = DocumentManager.restore_from_file(filename)
        logger.info(f"Checkpoint restored from {filename}.")
    except (FileNotFoundError, JSONDecodeError):
        restored_docs = DocumentManager()
        logger.error(f"Checkpoint file '{filename}' not found or is empty, initializing new checkpoint.")
    return restored_docs

def save_checkpoint(filename: Optional[str], manager: DocumentManager) -> None:
    """
    Saves the current state of the DocumentManager to a checkpoint file.

    Args:
        manager (DocumentManager): The DocumentManager instance to save.

    Raises:
        Exception: If there is an error saving the checkpoint.
    """
    try:
        manager.persist_to_file(filename=filename)
        logger.info("Checkpoint saved.")
    except FileNotFoundError:
        logger.error("Error saving checkpoint. Check the directory path and permissions.")

def get_all_checkpoints(checkpoint_dir, prefix="documents", extension="json"):
    managers = []

    path = Path(checkpoint_dir)

    path.mkdir(parents=True, exist_ok=True)

    files = list(path.glob(f"{file_prefix}-*.{extension}"))
    file_info_list = []

    pattern = re.compile(rf'^{file_prefix}-(\d{{4}}-\d{{2}}-\d{{2}})-(\d+)\.{extension}$')
    for filepath in files:
        match = pattern.match(filepath.name)
        if match:
            date_str = match.group(1)
            number = int(match.group(2))
            file_info_list.append({'filename': filepath.name, 'date': date_str, 'number': number})
            
            print(filepath)
            managers.append(manager.restore_from_file(filepath))
    
    return managers, file_info_list

class DocumentProcessor:
    """
    DocumentProcessor is responsible for processing documents and categorizing elements such as terms, names, facts, and rules.

    Attributes:
        manager: Object used to manage document retrieval.
        elements_terms_set (set): Set of unique terms found in the documents.
        elements_names_set (set): Set of unique names found in the documents.
        elements_terms (list): List of detailed information about terms.
        elements_names (list): List of detailed information about names.
        elements_facts (list): List of facts extracted from documents.
        elements_rules (list): List of rules extracted from documents.
        elements_terms_definition (dict): Dictionary to store terms definitions by document ID.
    """

    def __init__(self, manager):
        """
        Initializes the DocumentProcessor instance and processes the documents.

        Args:
            manager: Object used to manage document retrieval.
        """
        self.manager = manager
        self.elements_terms_set = set()
        self.elements_names_set = set()
        self.elements_terms = []
        self.elements_names = []
        self.elements_facts = []
        self.elements_rules = []
        self.elements_terms_definition = {}
        
        # Automatically process definitions and elements when instantiated
        self.process_definitions()
        self.process_elements()

    def add_definition(self, doc_id, term, definition):
        """
        Adds a term definition to the elements_terms_definition dictionary.

        Args:
            doc_id (str): Identifier of the document.
            term (str): The term to be defined.
            definition (str): The definition of the term.
        """
        self.elements_terms_definition.setdefault(doc_id, {})[term] = definition

    def process_definitions(self):
        """
        Processes document terms definitions and stores them in elements_terms_definition.
        """
        docs_p2 = [s for s in self.manager.list_document_ids(doc_type="llm_response") if s.endswith("_P2")]

        for doc in docs_p2:
            doc_id = doc.replace("_P2", "")
            doc_content = self.manager.retrieve_document(doc, doc_type="llm_response").content
            doc_terms = doc_content.get("terms", [])
            for term in doc_terms:
                self.add_definition(doc_id, term.get("term"), term.get("definition"))

    def process_elements(self):
        """
        Processes elements from documents and categorizes them into terms, names, facts, and rules.
        """
        docs_p1 = [s for s in self.manager.list_document_ids(doc_type="llm_response") if s.endswith("_P1")]

        for doc in docs_p1:
            doc_content = self.manager.retrieve_document(doc, doc_type="llm_response").content
            doc_id = doc_content.get("section")
            doc_elements = doc_content.get("elements", [])
            for element in doc_elements:
                element_classification = element.get("classification")
                element_id = element.get("id")
                verb_symbols = element.get("verb_symbols") or element.get("verb_symbol")
                if isinstance(verb_symbols, str):
                    verb_symbols = [verb_symbols]
                elif verb_symbols is None:
                    verb_symbols = []
                element_dict = {
                    "doc_id": doc_id,
                    "statement_id": element_id,
                    "statement": element.get("statement"),
                    "source": element.get("source"),
                    "terms": element.get("terms", []),
                    "verb_symbols": verb_symbols
                }

                match element_classification:
                    case "Fact" | "Fact Type":
                        self.elements_facts.append(element_dict)
                    case "Operative Rule":
                        self.elements_rules.append(element_dict)

                element_terms = element.get("terms", [])
                if element_terms:
                    for term in element_terms:
                        signifier = term.get("term")
                        term_dict = {
                            "doc_id": doc_id,
                            "signifier": signifier,
                            "statement_id": element_id,
                            "definition": self.elements_terms_definition.get(doc_id, {}).get(signifier),
                            "source": element.get("source")
                        }
                        if term.get("classification") == "Common Noun":
                            self.elements_terms.append(term_dict)
                            self.elements_terms_set.add(signifier)
                        else:
                            self.elements_names.append(term_dict)
                            self.elements_names_set.add(signifier)

    def get_unique_terms(self, doc_id=None):
        """
        Returns the set of unique terms found in the documents. If doc_id is provided,
        returns only the unique terms for that specific document.

        Args:
            doc_id (str, optional): Identifier of the document. Defaults to None.

        Returns:
            set: Set of unique terms.
        """
        if doc_id:
            return {term["signifier"] for term in self.elements_terms if term["doc_id"] == doc_id}
        return self.elements_terms_set

    def get_unique_names(self, doc_id=None):
        """
        Returns the set of unique names found in the documents. If doc_id is provided,
        returns only the unique names for that specific document.

        Args:
            doc_id (str, optional): Identifier of the document. Defaults to None.

        Returns:
            set: Set of unique names.
        """
        if doc_id:
            return {name["signifier"] for name in self.elements_names if name["doc_id"] == doc_id}
        return self.elements_names_set

    # def get_terms(self):
    #     """
    #     Returns the list of terms with detailed information.

    #     Returns:
    #         list: List of terms.
    #     """
    #     return self.elements_terms

    # def get_names(self):
    #     """
    #     Returns the list of names with detailed information.

    #     Returns:
    #         list: List of names.
    #     """
    #     return self.elements_names

    def get_terms(self, definition_filter="all"):
        """
        Returns the list of terms with detailed information, filtered by the presence of a definition.

        Args:
            definition_filter (str): Filter for terms based on definition presence. 
                                    "non_null" returns terms with definitions,
                                    "null" returns terms without definitions,
                                    "all" returns all terms regardless of definition.

        Returns:
            list: List of terms.
        """
        if definition_filter == "non_null":
            return [term for term in self.elements_terms if term.get("definition") is not None]
        elif definition_filter == "null":
            return [term for term in self.elements_terms if term.get("definition") is None]
        return self.elements_terms

    def get_names(self, definition_filter="all"):
        """
        Returns the list of names with detailed information, filtered by the presence of a definition.

        Args:
            definition_filter (str): Filter for names based on definition presence. 
                                    "non_null" returns names with definitions,
                                    "null" returns names without definitions,
                                    "all" returns all names regardless of definition.

        Returns:
            list: List of names.
        """
        if definition_filter == "non_null":
            return [name for name in self.elements_names if name.get("definition") is not None]
        elif definition_filter == "null":
            return [name for name in self.elements_names if name.get("definition") is None]
        return self.elements_names


    def get_facts(self):
        """
        Returns the list of facts extracted from documents.

        Returns:
            list: List of facts.
        """
        return self.elements_facts

    def get_rules(self):
        """
        Returns the list of rules extracted from documents.

        Returns:
            list: List of rules.
        """
        return self.elements_rules

    def get_term_info(self, doc_id, term):
        """
        Retrieves information about a specific term from elements.

        Args:
            doc_id (str): Document identifier.
            term (str): Term to retrieve information for.

        Returns:
            dict or None: A dictionary containing term information if found, otherwise None.
        """
        definition = self.elements_terms_definition.get(doc_id, {}).get(term)
        if definition:
            for term_dict in self.elements_terms + self.elements_names:
                if term_dict["doc_id"] == doc_id and term_dict["signifier"] == term:
                    return {
                        "definition": definition,
                        "source": term_dict["source"],
                        "statement_id": term_dict["statement_id"]
                    }
        return None

    def get_name_info(self, doc_id, name):
        """
        Retrieves information about a specific name from elements.

        Args:
            doc_id (str): Document identifier.
            name (str): Name to retrieve information for.

        Returns:
            dict or None: A dictionary containing name information if found, otherwise None.
        """
        for name_dict in self.elements_names:
            if name_dict["doc_id"] == doc_id and name_dict["signifier"] == name:
                return {
                    "definition": name_dict.get("definition"),
                    "source": name_dict["source"],
                    "statement_id": name_dict["statement_id"]
                }
        return None

    def get_fact_info(self, doc_id, statement_id):
        """
        Retrieves information about a specific fact from elements.

        Args:
            doc_id (str): Document identifier.
            statement_id (str): statement identifier of the fact.

        Returns:
            dict or None: A dictionary containing fact information if found, otherwise None.
        """
        for fact_dict in self.elements_facts:
            if fact_dict["doc_id"] == doc_id and fact_dict["statement_id"] == statement_id:
                terms = [term.get("term") for term in fact_dict.get("terms", []) if term.get("classification") == "Common Noun"]
                names = [term.get("term") for term in fact_dict.get("terms", []) if term.get("classification") == "Proper Noun"]
                return {
                    "statement": fact_dict["statement"],
                    "source": fact_dict["source"],
                    "terms": terms,
                    "names": names,
                    "verb_symbols": fact_dict.get("verb_symbols", [])
                }
        return None

    def get_rule_info(self, doc_id, statement_id):
        """
        Retrieves information about a specific rule from elements.

        Args:
            doc_id (str): Document identifier.
            statement_id (str): statement identifier of the rule.

        Returns:
            dict or None: A dictionary containing rule information if found, otherwise None.
        """
        for rule_dict in self.elements_rules:
            if rule_dict["doc_id"] == doc_id and rule_dict["statement_id"] == statement_id:
                terms = [term.get("term") for term in rule_dict.get("terms", []) if term.get("classification") == "Common Noun"]
                names = [term.get("term") for term in rule_dict.get("terms", []) if term.get("classification") == "Proper Noun"]
                return {
                    "statement": rule_dict.get("statement"),
                    "source": rule_dict.get("source"),
                    "terms": terms,
                    "names": names,
                    "verb_symbols": rule_dict.get("verb_symbols", [])
                }
        return None

# # Example usage
# processor = DocumentProcessor(manager)

# # Access processed data
# unique_terms = processor.get_unique_terms()
# unique_names = processor.get_unique_names()
# terms = processor.get_terms()
# names = processor.get_names()
# facts = processor.get_facts()
# rules = processor.get_rules()

# print(f"Unique terms: {len(unique_terms)}")
# print(f"Unique names: {len(unique_names)}")

# print(f'Rules from § 275.0-2: {processor.get_rule_info("§ 275.0-2", 3)}')
# print(f'Facts from § 275.0-2: {processor.get_fact_info("§ 275.0-2", 2)}')

## Logging

In [None]:
!mkdir logging_setup && touch logging_setup/__init__.py

In [None]:
%%writefile logging_setup/main.py

from pathlib import Path
import logging
from logging.handlers import TimedRotatingFileHandler


def setting_logging(log_path: str, log_level: str):
    # Ensure the ../logs directory exists
    log_directory = Path.cwd() / log_path
    log_directory.mkdir(parents=True, exist_ok=True)

    # Path for the log file
    log_file_path = log_directory / "application.log"

    # Set up TimedRotatingFileHandler to rotate logs every day
    file_handler = TimedRotatingFileHandler(
        log_file_path,
        when="midnight",
        interval=1,
        backupCount=0,  # Rotate every midnight, keep all backups
    )

    # Set the file handler's log format
    file_handler.setFormatter(
        logging.Formatter(
            "%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
        )
    )

    # Set up logging configuration
    logging.basicConfig(
        level=log_level,  # Set to the desired log level
        format="%(asctime)s - %(levelname)s - %(message)s",  # Console log format
        datefmt="%Y-%m-%d %H:%M:%S",  # Custom date format
        handlers=[
            file_handler,  # Log to the rotating file in ../logs
            logging.StreamHandler(),  # Log to console
        ],
    )

    # Example logger
    logger = logging.getLogger(__name__)

    # Log a test message to verify
    logger.info("Logging is set up with daily rotation.")

    return logger

## Token Estimator

In [None]:
!mkdir token_estimator && touch token_estimator/__init__.py

In [None]:
%%writefile token_estimator/main.py

import tiktoken

def estimate_tokens(text, model="gpt-4o"):
    """
    Estimates the number of tokens in a given text using the OpenAI `tiktoken` library, 
    which closely approximates the tokenization method used by OpenAI language models.

    Parameters:
        text (str): The text to be tokenized and counted.
        model (str): The model to use for tokenization. Defaults to "gpt-4o".
                     Supported models include "gpt-3.5-turbo" and "gpt-4o".

    Returns:
        int: The estimated number of tokens in the text.
    
    Raises:
        ValueError: If the specified model is not supported by `tiktoken`.

    Example:
        >>> text = "This is a sample text."
        >>> estimate_tokens_tiktoken(text)
        6
    """
    # Load the appropriate tokenizer
    try:
        tokenizer = tiktoken.encoding_for_model(model)
    except KeyError:
        raise ValueError(f"Model '{model}' is not supported by tiktoken.")
    
    # Tokenize the text and return the token count
    tokens = tokenizer.encode(text)
    return len(tokens)

## Rules taxonomy

In [None]:
!mkdir rules_taxonomy_provider && touch rules_taxonomy_provider/__init__.py

In [None]:
%%writefile rules_taxonomy_provider/main.py

import os
from pathlib import Path
import yaml

class RuleInformationProvider:
    """
    A class to provide information about rule classifications and templates based on YAML data.

    This class loads and processes rule classification data, template data, and example data from specified YAML files.
    It is used to generate markdown documentation for a given rule type, including details such as templates and examples.

    Attributes:
    -----------
    data_path : str
        Path to the directory containing the YAML files.
    template_dict : dict
        Dictionary containing template information loaded from the templates YAML file.
    examples_dict : dict
        Dictionary containing example information loaded from the examples YAML file.
    """
    
    def __init__(self, data_path):
        """
        Initializes the RuleInformationProvider with the specified data path.

        Parameters:
        -----------
        data_path : str
            Path to the directory containing the YAML files with rules, templates, and examples.
        """
        self.data_path = data_path
        self.template_dict = self._load_yaml(f'{data_path}/witt_templates.yaml', 'template_list')
        self.examples_dict = self._load_yaml(f'{data_path}/witt_examples.yaml', 'example_list')

    def _load_yaml(self, file_path, list_key=None):
        """
        Loads data from a YAML file.

        Parameters:
        -----------
        file_path : str
            Path to the YAML file to be loaded.
        list_key : str, optional
            Key used to extract a specific list from the YAML data. If provided, returns a dictionary indexed by 'id'.

        Returns:
        --------
        dict
            If list_key is provided, returns a dictionary with items indexed by 'id'.
        Any type
            If list_key is not provided, returns the entire data structure from the YAML file.
        """
        with open(file_path, 'r') as file:
            data = yaml.safe_load(file)
            if list_key:
                return {item['id']: item for item in data[list_key]}
            return data

    def get_classification_and_templates(self, section_title):
        """
        Retrieves classification information and templates for a specified rule section.

        Parameters:
        -----------
        section_title : str
            Title of the section for which to retrieve information.

        Returns:
        --------
        str
            A markdown formatted string containing the classification details, templates, and examples for the given section.
        """
        data = self._load_yaml(f'{self.data_path}/classify_subtypes.yaml')
        filtered_data = self._filter_sections_by_title(data, section_title)
        return self._convert_to_markdown(filtered_data)

    def _filter_sections_by_title(self, data, title):
        """
        Filters sections based on the given title.

        Parameters:
        -----------
        data : list
            List of sections to filter from.
        title : str
            Title to filter sections by.

        Returns:
        --------
        list
            A list of sections that match the given title.
        """
        return [section for section in data if section['section_title'] == title]

    def _convert_to_markdown(self, filtered_data):
        """
        Converts filtered rule classification data to markdown format.

        Parameters:
        -----------
        filtered_data : list
            List of filtered sections to convert into markdown.

        Returns:
        --------
        str
            A markdown formatted string representing the filtered sections.
        """
        def process_section(section, level=2):
            """
            Processes a section recursively and converts it to markdown format.

            Parameters:
            -----------
            section : dict
                The section to process.
            level : int, optional
                The heading level for the section title in markdown (default is 1).

            Returns:
            --------
            str
                A markdown formatted string for the section and its subsections.
            """
            markdown = f"{'#' * level} {section['section_title']}\n\n"
            markdown += f"**ID**: {section['section_id']}\n\n"
            markdown += f"**Definition**: {section['section_definition']}\n\n"

            if 'templates' in section and section['templates']:
                for template_id in section['templates']:
                    if template_id in self.template_dict:
                        template = self.template_dict[template_id]
                        markdown += f"**Template ID**: {template_id}\n\n"
                        markdown += f"**Template Explanation**: {template['explanation']}\n\n"
                        markdown += f"**Template Text**:\n\n```template\n{template['text']}```\n\n"
                    else:
                        markdown += f"**Template ID**: {template_id} - No details found.\n\n"
            else:
                markdown += "**Templates**: Look in the subsection(s).\n\n"

            if 'examples' in section and section['examples']:
                for example_id in section['examples']:
                    if example_id in self.examples_dict:
                        example = self.examples_dict[example_id]
                        markdown += f"**Example ID**: {example_id}\n\n"
                        markdown += f"**Example Text**:\n\n```example\n{example['text']}```\n\n"
                    else:
                        markdown += f"**Example ID**: {example_id} - No details found.\n\n"

            if 'subsections' in section:
                for subsection in section['subsections']:
                    markdown += process_section(subsection, level + 1)

            return markdown

        markdown = ""
        for section in filtered_data:
            markdown += process_section(section)
        return markdown

class RulesTemplateProvider:
    """
    A class to provide information about rules templates and their relationships from YAML data.

    This class loads and processes template data, subtemplate data, and their relationships from specified YAML files.
    It is used to extract information about templates and format them into readable output.

    Attributes:
    -----------
    data_directory : Path
        Path to the directory containing the YAML files.
    data_dicts : dict
        Dictionary containing data loaded from YAML files, including templates, subtemplates, and relationships.
    """
    
    def __init__(self, data_directory):
        """
        Initializes the RulesTemplateProvider with the specified data directory.

        Parameters:
        -----------
        data_directory : str or Path
            Path to the directory containing the YAML files with templates, subtemplates, and relationships.
        """
        self.data_directory = Path(data_directory)
        self.data_dicts = self._load_data()

    def _load_yaml(self, file_path):
        """
        Loads data from a YAML file.

        Parameters:
        -----------
        file_path : Path
            Path to the YAML file to be loaded.

        Returns:
        --------
        dict
            A dictionary containing the data from the YAML file.
        """
        with open(file_path, 'r') as file:
            return yaml.safe_load(file) or {}

    def _load_data(self):
        """
        Loads data from multiple YAML files required for template processing.

        Returns:
        --------
        dict
            A dictionary containing data from templates, subtemplates, and template relationships YAML files.
        """
        witt_template_relationship_file = self.data_directory / 'witt_template_subtemplate_relationship.yaml'
        witt_templates_file = self.data_directory / 'witt_templates.yaml'
        witt_subtemplates_file = self.data_directory / 'witt_subtemplates.yaml'

        witt_template_relationship_data = self._load_yaml(witt_template_relationship_file).get('template_subtemplate_relationship', {})
        witt_templates_data = self._load_yaml(witt_templates_file).get('template_list', [])
        witt_subtemplates_data = self._load_yaml(witt_subtemplates_file).get('subtemplate_list', [])

        return {
            'witt_template_relationship_data': witt_template_relationship_data,
            'witt_templates_data': witt_templates_data,
            'witt_subtemplates_data': witt_subtemplates_data
        }

    def _get_template_data(self, template_key, data):
        """
        Retrieves data for a specific template or subtemplate based on its key.

        Parameters:
        -----------
        template_key : str
            The key of the template or subtemplate to be retrieved.
        data : list or dict
            The data to search in, which can be a list of templates or a dictionary of relationships.

        Returns:
        --------
        dict or None
            The data corresponding to the specified template key, or None if not found.
        """
        if isinstance(data, dict):
            return data.get(template_key, None)
        elif isinstance(data, list):
            for item in data:
                if isinstance(item, dict) and item.get('id', '') == template_key:
                    return item
        return None

    def _format_template_output(self, template_key, template_data):
        """
        Formats the output for a given template or subtemplate.

        Parameters:
        -----------
        template_key : str
            The key of the template or subtemplate.
        template_data : dict
            The data of the template or subtemplate to be formatted.

        Returns:
        --------
        str
            A formatted string representation of the template data.
        """
        output = f"# {template_key}\n\n"
        if not template_data:
            output += "Template data not found.\n\n"
            return output
        if 'usesSubtemplate' in template_data:
            uses_subtemplate = template_data['usesSubtemplate']
            if isinstance(uses_subtemplate, list):
                uses_subtemplate = ', '.join(uses_subtemplate)
            output += f"## usesSubtemplate\n{uses_subtemplate}\n\n"
        if 'text' in template_data:
            output += f"## text\n\n{template_data['text']}\n\n"
        if 'explanation' in template_data:
            output += f"## explanation\n\n{template_data['explanation']}\n\n"
        return output

    def _process_template(self, template_key, processed_keys=None):
        """
        Processes a template or subtemplate recursively, including any subtemplates used.

        Parameters:
        -----------
        template_key : str
            The key of the template or subtemplate to be processed.
        processed_keys : set, optional
            A set of keys that have already been processed to prevent circular references.

        Returns:
        --------
        str
            A formatted string representation of the template and its subtemplates.
        """
        if processed_keys is None:
            processed_keys = set()

        if template_key in processed_keys:
            return ''
        processed_keys.add(template_key)

        template_data = None

        if template_key.startswith('T'):
            template_data = self._get_template_data(template_key, self.data_dicts['witt_templates_data']) or {}
            uses_subtemplate = self._get_template_data(template_key, self.data_dicts['witt_template_relationship_data'])
            if uses_subtemplate:
                template_data['usesSubtemplate'] = uses_subtemplate if isinstance(uses_subtemplate, list) else [uses_subtemplate]
        elif template_key.startswith('S'):
            template_data = self._get_template_data(template_key, self.data_dicts['witt_subtemplates_data']) or {}
            uses_subtemplate = self._get_template_data(template_key, self.data_dicts['witt_template_relationship_data'])
            if uses_subtemplate:
                template_data['usesSubtemplate'] = uses_subtemplate if isinstance(uses_subtemplate, list) else [uses_subtemplate]

        if not template_data:
            return f"# {template_key}\n\nTemplate data not found.\n\n"

        output = self._format_template_output(template_key, template_data)

        if 'usesSubtemplate' in template_data:
            subtemplate_keys = template_data['usesSubtemplate']
            subtemplate_keys = [subtemplate_keys] if isinstance(subtemplate_keys, str) else subtemplate_keys
            for sub_key in subtemplate_keys:
                sub_key = sub_key.strip()
                output += self._process_template(sub_key, processed_keys)

        return output

    def get_rules_template(self, template_key):
        """
        Retrieves the formatted rules template for the specified template key.

        Parameters:
        -----------
        template_key : str
            The key of the template to be retrieved.

        Returns:
        --------
        str
            A formatted string representation of the template and its associated subtemplates.
        """
        return self._process_template(template_key)

# Example usage:
# rule_information_provider = RuleInformationProvider("../data")
# markdown_data = rule_provider.get_classification_and_templates("Data rules")
# print(markdown_data)

# rule_template_provider = RulesTemplateProvider("../data")
# markdown_data = processor.get_rules_template("T7")
# print(markdown_data)


## llm_query

In [None]:
!mkdir llm_query && touch llm_query/__init__.py

In [None]:
%%writefile llm_query/main.py

import time
from openai import OpenAI
import instructor
import logging
from typing import Any

# Set up basic logging configuration for the checkpoint module
logging.basicConfig(
    level=logging.INFO,  # Set to INFO or another level as needed
    format='%(asctime)s - %(levelname)s - %(message)s',  # Log format
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

def measure_time(func):
    """
    Decorator to measure the execution time of a function.
    """
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        elapsed_time = end_time - start_time
        logger.info(f"Execution time for {func.__name__}: {elapsed_time:.2f} seconds")
        return result
    return wrapper

@measure_time
def query_instruct_llm(system_prompt: str,
                        user_prompt: str,
                        llm_model: str,
                        document_model: Any,
                        temperature: float,
                        max_tokens: int) -> Any:
    """
    Queries the LLM with the given system and user prompts.

    Args:
        system_prompt (str): The system prompt to set the context for the LLM.
        user_prompt (str): The user prompt containing the text to analyze.

    Returns:
        Any: The response from the LLM, parsed into a document_model object.

    Raises:
        Exception: If the API call fails.
    """
    client = instructor.from_openai(OpenAI())
    resp = client.chat.completions.create(
        model=llm_model,
        response_model=document_model,
        temperature=temperature,
        max_tokens=max_tokens,
        top_p=1,
        frequency_penalty=0,
        presence_penalty=0,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return resp
