In [None]:
'''
navsea-org-best-text-embedding-3-large:

PPBE
NAVSEA ORGANIZATION
JCIDS
FLEET MAINTENANCE POLICY
SYSTEM ENGINEERING OVERVIEW
EVM
ACQUISITION MILESTONES AND PHASES
WARFARE CENTERS
COST ESTIMATION
'''

In [None]:
''' Stable backup Python files '''

In [None]:
# RAGInitializer.py (Test)
from enum import Enum
from dataclasses import dataclass
from typing import Dict, Tuple, Any
from ModelManager import ModelManager
from ComputeResourceManager import ComputeResourceManager

class LLMType(Enum):
    GPT = "gpt"
    OLLAMA = "ollama"

class EmbeddingType(Enum):
    GPT = "gpt" 
    OLLAMA = "ollama"
    SENTENCE_TRANSFORMER = "sentence_transformer"

@dataclass
class RAGConfig:
    env_path: str
    llm_type: LLMType
    embedding_type: EmbeddingType
    llm_index: int
    embedding_index: int

    def to_dict(self) -> Dict[str, str]:
        """Convert config to format expected by ModelManager"""
        return {
            "selected_llm_type": self.llm_type.value,
            "selected_embedding_scheme": self.embedding_type.value
        }

def initialize_rag_components(config: RAGConfig) -> Tuple[Any, Any, int]:
    """Initialize RAG components using ModelManager's existing methods"""
    try:
        model_manager = ModelManager(config.env_path)
        resource_manager = ComputeResourceManager().get_compute_settings()
        
        # Use existing validate_and_load_models method
        model, embeddings, dimensions, selected_llm, selected_embedding_model = model_manager.validate_and_load_models(
            config=config.to_dict(),
            select_llm=config.llm_index,
            select_embed=config.embedding_index,
            resource_manager=resource_manager
        )
        
        return model, embeddings, dimensions, selected_llm, selected_embedding_model, model_manager
        
    except Exception as e:
        raise Exception(f"Failed to initialize RAG components: {str(e)}")

In [None]:
# ChunkingInitializer.py (Test)
from typing import List, Optional
import sys
from langchain_core.documents import Document
from sentence_transformers import SentenceTransformer
from OCREnhancedPDFLoader import OCREnhancedPDFLoader
from TextPreprocessor import TextPreprocessor
from ChunkingManager import ChunkingMethod, process_document

class ChunkingInitializer:
    """Orchestrates document processing workflow including OCR, preprocessing, and chunking."""
    
    def __init__(self, 
                 source_path: str,
                 chunking_method: ChunkingMethod = ChunkingMethod.PAGE,
                 enable_preprocessing: bool = False,
                 chunk_size: int = 500,
                 chunk_overlap: int = 50,
                 similarity_threshold: float = 0.85,
                 model_name: Optional[str] = None,
                 embedding_model: Optional[any] = None):
        """
        Initialize chunking processor with configuration parameters.
        """
        self.source_path = source_path
        self.chunking_method = chunking_method
        self.enable_preprocessing = enable_preprocessing
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.similarity_threshold = similarity_threshold
        self.model_name = model_name
        self.embedding_model = embedding_model or self._setup_default_embedding()
        
    def _setup_default_embedding(self) -> any:
        """Setup default embedding model based on chunking method."""
        if self.chunking_method == ChunkingMethod.SEMANTIC:
            return SentenceTransformer("multi-qa-mpnet-base-dot-v1")
        return None

    def _load_documents(self) -> List[Document]:
        """Load documents with OCR enhancement."""
        try:
            print("Loading documents with OCR enhancement...")
            loader = OCREnhancedPDFLoader(self.source_path)
            documents = loader.load()
            print(f"Loaded {len(documents)} documents with OCR enhancement")
            return documents
        except Exception as e:
            print(f"Error loading documents: {e}")
            raise

    def _preprocess_documents(self, documents: List[Document]) -> List[Document]:
        """Apply preprocessing to documents if enabled."""
        try:
            if self.enable_preprocessing:
                print("Preprocessing documents...")
                preprocessor = TextPreprocessor()
                return [
                    Document(
                        page_content=preprocessor.preprocess(doc.page_content),
                        metadata={**doc.metadata, "preprocessing": "applied"}
                    ) for doc in documents
                ]
            else:
                print("Skipping preprocessing...")
                return [
                    Document(
                        page_content=doc.page_content,
                        metadata={**doc.metadata, "preprocessing": "skipped"}
                    ) for doc in documents
                ]
        except Exception as e:
            print(f"Error in document preprocessing: {e}")
            raise

    def process(self) -> List[Document]:
        """Execute the complete document processing pipeline."""
        try:
            # Load and preprocess documents
            raw_documents = self._load_documents()
            processed_documents = self._preprocess_documents(raw_documents)
            
            # Process documents using specified chunking method
            documents = process_document(
                source_path=self.source_path,
                method=self.chunking_method,
                enable_preprocessing=self.enable_preprocessing,
                chunk_size=self.chunk_size,
                chunk_overlap=self.chunk_overlap,
                similarity_threshold=self.similarity_threshold,
                model_name=self.model_name,
                embedding_model=self.embedding_model
            )
            
            print(f"Processed {len(documents)} document chunks")
            return documents
            
        except Exception as e:
            print(f"Error in document processing pipeline: {e}")
            raise


In [None]:
# DatastoreInitializer.py (Test)
from enum import Enum
from typing import Optional, Any
from pinecone import Pinecone, ServerlessSpec
from PineconeManager import PineconeManager

class StorageType(Enum):
    PINECONE_NEW = 0
    PINECONE_ADD = 1
    PINECONE_EXISTING = 2
    LOCAL_STORAGE = 3

class DatastoreInitializer:
    """Manages datastore setup and configuration for vector storage."""
    
    def __init__(self, 
                 doc_name: str,
                 pinecone_api_key: str,
                 dimensions: int,
                 embedding_model: Any):
        """Initialize datastore manager with configuration."""
        self.doc_name = doc_name
        self.pinecone_api_key = pinecone_api_key
        self.dimensions = dimensions
        self.embedding_model = embedding_model
        self.pinecone_client = None
        self.manager = None
        
    def initialize_pinecone(self):
        """Initialize Pinecone client and manager."""
        try:
            self.pinecone_client = Pinecone(api_key=self.pinecone_api_key)
            self.manager = PineconeManager(
                self.pinecone_client,
                self.pinecone_api_key,
                self.dimensions,
                self.embedding_model
            )
        except Exception as e:
            raise RuntimeError(f"Failed to initialize Pinecone: {e}")

    def _get_index_name(self) -> str:
        """Generate unique index name."""
        return f"{self.doc_name}-{self.embedding_model}".lower()

    def setup_datastore(self, 
                       storage_type: StorageType,
                       documents: Optional[list] = None,
                       embeddings: Optional[Any] = None) -> Any:
        """Set up and return configured datastore."""
        try:
            # Initialize Pinecone if not already done
            if not self.pinecone_client:
                self.initialize_pinecone()
                
            index_name = self._get_index_name()
            print(f"Active Index: {index_name}")

            # Handle new Pinecone index creation
            if storage_type == StorageType.PINECONE_NEW:
                spec = ServerlessSpec(cloud='aws', region='us-east-1')
                self.manager.setup_index(index_name, spec)

            # Set up datastore using manager
            datastore = self.manager.setup_datastore(
                storage_type.value,
                documents,
                embeddings,
                index_name
            )
            
            return datastore
            
        except Exception as e:
            raise RuntimeError(f"Failed to setup datastore: {e}")
        
    def initialize(self, storage_type: StorageType) -> Any:
        """Initialize and configure datastore"""
        try:
            # Setup Pinecone client and manager
            self.pinecone_client = Pinecone(api_key=self.pinecone_api_key)
            self.manager = PineconeManager(
                self.pinecone_client,
                self.pinecone_api_key,
                self.dimensions,
                self.embedding_model
            )

            # Configure index
            index_name = self._get_index_name()
            print(f"Active Index: {index_name}")

            # Create new index if needed
            if storage_type == StorageType.NEW:
                spec = ServerlessSpec(cloud='aws', region='us-east-1')
                self.manager.setup_index(index_name, spec)

            # Initialize documents for existing index
            documents = None if storage_type == StorageType.EXISTING else []

            # Setup and return datastore
            return self.manager.setup_datastore(
                storage_type.value,
                documents,
                self.embedding_model,
                index_name
            )

        except Exception as e:
            print(f"Datastore initialization failed: {e}")
            raise

In [None]:
# QuestionInitializer.py (Test)
from typing import List, Dict, Optional, Any
import time
from ScoringMetric import ScoringMetric
from QuestionAnsweringPipeline import QuestionAnsweringPipeline
from TemplateManager import TemplateManager
from GroundTruthManager import GroundTruthManager

class QuestionInitializer:
    """Manages question answering workflow including templates, ground truth, and pipeline execution."""
    
    def __init__(self,
                 datastore: Any,
                 model: Any,
                 embedding_model: Any,
                 template_path: str = "templates.json",
                 ground_truth_path: str = "ground_truth.json"):
        """Initialize question processor with models and paths."""
        self.datastore = datastore
        self.model = model
        self.embedding_model = embedding_model
        self.template_path = template_path
        self.ground_truth_path = ground_truth_path
        
        # Initialize components
        self.template_manager = TemplateManager(template_path)
        self.scoring_metric = ScoringMetric(embedding_model)
        
    def _load_template(self, template_name: str = "default") -> str:
        """Load specific template."""
        template = self.template_manager.get_template(template_name)
        print(f"Loaded template: {template}")
        return template
        
    def _get_ground_truth(self, 
                         questions: List[str], 
                         use_ground_truth: bool = False) -> Optional[Dict[str, str]]:
        """Retrieve ground truth answers if available."""
        if not use_ground_truth:
            return None
            
        try:
            ground_truth_manager = GroundTruthManager(self.ground_truth_path)
            return {q: ground_truth_manager.get_answer(q) for q in questions}
        except Exception as e:
            print(f"Warning: Failed to load ground truth: {e}")
            return None
            
    def process_questions(self,
                         questions: List[str],
                         use_ground_truth: bool = False,
                         template_name: str = "default") -> float:
        """Process questions through pipeline and return execution time."""
        try:
            # Load template and ground truth
            template = self._load_template(template_name)
            ground_truth = self._get_ground_truth(questions, use_ground_truth)
            
            # Initialize and run pipeline
            pipeline = QuestionAnsweringPipeline(
                self.datastore,
                self.model,
                template,
                self.scoring_metric,
                embeddings = self.embedding_model
            )
            
            # Process questions and measure time
            start_time = time.time()
            pipeline.answer_questions(questions, ground_truth)
            processing_time = time.time() - start_time
            
            return processing_time
            
        except Exception as e:
            raise RuntimeError(f"Failed to process questions: {e}")

In [None]:
''' Driver cells that use the full functionality of the stable backup Python files '''

In [None]:
# RAGInitializer Driver
from RAGInitializer import LLMType, EmbeddingType, RAGConfig, initialize_rag_components

'''
Available choices for language models (LLMs)
GPT: 
    0 - gpt-4o
Ollama:
    0 - llama3.1:8b-instruct-q5_K_M
    1 - llama3.2:latest
    2 - mistral-nemo:12b-instruct-2407-q5_K_M

Available choices for embedding models
GPT:
    0 - text-embedding-3-small
    1 - text-embedding-3-large
Ollama:
    0 - nomic-embed-text
    1 - mxbai-embed-large
    2 - all-minilm
    3 - snowflake-arctic-embed
Sentence Transformer:
    0 - all-MiniLM-L6-v2
    1 - all-MiniLM-L12-v2
    2 - all-mpnet-base-v2
    3 - all-distilbert-base-v2
    4 - multi-qa-mpnet-base-dot-v1
'''

# Initialize with configuration
config = RAGConfig(
    env_path = r"C:\Users\docsp\Desktop\AI_ML_Folder\Python_Practice_Folder\Natural_Language_Processing\EDQP_RAG_Model\env_variables.env",
    llm_type=LLMType.OLLAMA, # GPT or OLLAMA
    embedding_type=EmbeddingType.SENTENCE_TRANSFORMER, # OLLAMA, GPT, or SENTENCE_TRANSFORMER
    llm_index=1,
    embedding_index=2
)

model, embeddings, dimensions, selected_llm, selected_embedding_model, model_manager = initialize_rag_components(config)
if model and embeddings and dimensions:
    print("\nRAG components successfully initialized.")
    print(f"Model: {selected_llm}")
    print(f"Embeddings: {selected_embedding_model}")

In [None]:
# ChunkingInitializer Driver

from ChunkingInitializer import ChunkingInitializer
from ChunkingMethod import ChunkingMethod

processor = ChunkingInitializer(
    source_path=r"C:\Users\docsp\Desktop\AI_ML_Folder\Python_Practice_Folder\Natural_Language_Processing\Source_Documents\MCDP_1_Warfighting.pdf",
    chunking_method=ChunkingMethod.PAGE,
    enable_preprocessing=False,
    model_name=selected_llm,
    embedding_model=selected_embedding_model
)

# Process documents
try:
    documents = processor.process()
    
    # Output results
    for doc in documents:
        print(f"Chunk metadata: {doc.metadata}")
        print(f"Chunk content: {doc.page_content[:200]}...")
        
except Exception as e:
    print(f"Processing failed: {e}")
    sys.exit(1)

if documents:
    print("\nDocument processing completed successfully.")

In [None]:
# DatastoreInitializer Driver

from DatastoreInitializer import DatastoreInitializer, StorageType

datastore_manager = DatastoreInitializer(
    doc_name='test-datastore',
    pinecone_api_key=model_manager.get_pinecone_api_key(),
    dimensions=dimensions,
    embedding_model=selected_embedding_model
)

try:
    # Set up datastore
    datastore = datastore_manager.setup_datastore(
        storage_type=StorageType.PINECONE_EXISTING,
        documents=documents,
        embeddings=embeddings
    )
except Exception as e:
    print(f"Error: {e}")
    sys.exit(1)

In [None]:
# QuestionInitializer Driver

from QuestionInitializer import QuestionInitializer

# QuestionInitializer Driver
processor = QuestionInitializer(
    datastore=datastore,
    model=model,
    embedding_model=selected_embedding_model
)

# Define questions
template_name = "detailed"  # default, short, or detailed
questions = ["According to the text, what is the definition of war?",
             "What does the text say about uncertainty in warfare?",
             "How does the text define the spectrum of conflict? What are the various factors involved?",
             "What are centers of gravity and critical vulnerabilities in warfare?"
            ]

try:
    # Process questions and get execution time
    processing_time = processor.process_questions(
        questions=questions,
        use_ground_truth=False,
        template_name=template_name  # Added template parameter
    )
    print(f"Total processing time: {processing_time:.2f} seconds")
    
except Exception as e:
    print(f"Error: {e}")

In [None]:
''' Full backup files are stable and functional. '''

In [None]:
# Model and embeddings selection (Stable)

import warnings
warnings.filterwarnings('ignore')
from ModelManager import ModelManager
from ComputeResourceManager import ComputeResourceManager

# Load environment variables, model, and embeddings, including API keys for OpenAI and Pinecone (Test)
env_path = r"C:\Users\docsp\Desktop\AI_ML_Folder\Python_Practice_Folder\Natural_Language_Processing\EDQP_RAG_Model\env_variables.env"
model_manager = ModelManager(env_path)
resource_manager = ComputeResourceManager().get_compute_settings()

'''
Available choices for language models (LLMs)
GPT: 
    0 - gpt-4o
Ollama:
    0 - llama3.1:8b-instruct-q5_K_M
    1 - llama3.2:latest
    2 - mistral-nemo:12b-instruct-2407-q5_K_M

Available choices for embedding models
GPT:
    0 - text-embedding-3-small
    1 - text-embedding-3-large
Ollama:
    0 - nomic-embed-text
    1 - mxbai-embed-large
    2 - all-minilm
    3 - snowflake-arctic-embed
Sentence Transformer:
    0 - all-MiniLM-L6-v2
    1 - all-MiniLM-L12-v2
    2 - all-mpnet-base-v2
    3 - all-distilbert-base-v2
    4 - multi-qa-mpnet-base-dot-v1
'''

# Configuration settings for selecting LLM and embedding scheme
config = {
    "selected_llm_type": "ollama",  # Options: "gpt" or "ollama"
    "selected_embedding_scheme": "gpt"  # Options: "ollama", "gpt", or "sentence_transformer"
}
# Select specific indices from llm_choices and embedding_choices
select_llm = 1
select_embed = 1

# Validate user selections for LLM type and embedding scheme
model_manager.validate_selection(config["selected_llm_type"], model_manager.llm_choices.keys())
model_manager.validate_selection(config["selected_embedding_scheme"], model_manager.embedding_choices.keys())

# Select the LLM and embedding model based on configuration and indices
selected_llm = (
    model_manager.llm_choices[config["selected_llm_type"]]
    if config["selected_llm_type"] == "gpt"
    else model_manager.llm_choices[config["selected_llm_type"]][select_llm]
)
selected_embedding_model = model_manager.embedding_choices[config["selected_embedding_scheme"]][select_embed]

# Load the selected language model and embeddings model

model = model_manager.load_model(config["selected_llm_type"], selected_llm, resource_manager)
embeddings = model_manager.load_embeddings(config["selected_embedding_scheme"], selected_embedding_model)

# Dynamically determine the dimensions of the embeddings for compatibility with the index
dimensions = model_manager.determine_embedding_dimensions(embeddings)

# Output confirmation of selected options
print(f"\nSelected LLM: {selected_llm}")
print(f"Selected Embedding Scheme: {config['selected_embedding_scheme']}")
print(f"Selected Embedding Model: {selected_embedding_model}")
print(f"Embedding dimensions: {dimensions}")

In [None]:
# OCR and Chunking (Stable)

import warnings
import sys
warnings.filterwarnings('ignore')
warnings.filterwarnings("ignore", category=DeprecationWarning)
from OCREnhancedPDFLoader import OCREnhancedPDFLoader
from TextPreprocessor import TextPreprocessor
from ChunkingManager import ChunkingMethod, process_document
from langchain_core.documents import Document
from sentence_transformers import SentenceTransformer

# Parameters to specify the file path and preprocessing settings.
# source_path is the location of the PDF file for OCR and NLP processing.
source_path = r"C:\Users\docsp\Desktop\AI_ML_Folder\Python_Practice_Folder\Natural_Language_Processing\Source_Documents\ED_Basic_Course_Guide\3.1.3_Program_Funding.pdf"
enable_preprocessing = False  # Flag to enable or skip document preprocessing; set to True only for text documents

# Load text documents from the specified PDF file and enhance with OCR.
# Attempts to load the document and perform OCR; exits if an error occurs.
try:
    print("Loading documents with OCR enhancement...")
    loader = OCREnhancedPDFLoader(source_path)
    text_documents = loader.load()
    print(f"Loaded {len(text_documents)} documents with OCR enhancement")
except Exception as e:
    print(f"Error loading documents: {e}")
    sys.exit(1)

# Process documents based on the enable_preprocessing flag.
# Applies preprocessing to each document or skips it, then saves processed documents.
try:
    if enable_preprocessing:
        print("Preprocessing documents...")
        preprocess = TextPreprocessor()
        processed_documents = [
            Document(
                page_content=preprocess.preprocess(doc.page_content),
                metadata={**doc.metadata, "preprocessing": "applied"}
            ) for doc in text_documents
        ]
    else:
        print("Skipping preprocessing...")
        processed_documents = [
            Document(
                page_content=doc.page_content,
                metadata={**doc.metadata, "preprocessing": "skipped"}
            ) for doc in text_documents
        ]
except Exception as e:
    print(f"Error in document processing: {e}")
    sys.exit(1)

# Select the chunking method and process the document.
# The chunking method and parameters control how documents are split into smaller parts for analysis.
chunking_method = ChunkingMethod.PAGE  # Choose page-based (.PAGE) or semantic-based (.SEMANTIC) chunking

embedding_method = "semantic"
if embedding_method == "semantic":
    embedding_model = SentenceTransformer("multi-qa-mpnet-base-dot-v1")
elif embedding_method == "page":
    embedding_model = selected_embedding_model

documents = process_document(
    source_path=source_path,
    method=chunking_method,
    enable_preprocessing=enable_preprocessing,
    chunk_size=500,
    chunk_overlap=50,
    similarity_threshold=0.85,
    model_name=selected_llm,
    embedding_model=selected_embedding_model
)

# Output the processed documents
print(f"Processed {len(documents)} document chunks")
for doc in documents:
    print(f"Chunk metadata: {doc.metadata}")
    print(f"Chunk content: {doc.page_content[:200]}...") 

In [None]:
# Datastore and Pinecone index setup (Stable)

import sys
import warnings
warnings.filterwarnings('ignore')
warnings.filterwarnings("ignore", category=DeprecationWarning)
from pinecone import Pinecone, ServerlessSpec
from PineconeManager import PineconeManager

# Define constants
PINECONE_NEW = 0
PINECONE_ADD = 1
PINECONE_EXISTING = 2
LOCAL_STORAGE = 3

# Configure new Pinecone index, add to an existing index, or use an existing index.
doc_name = 'edqp-test'
pinecone_api_key = model_manager.get_pinecone_api_key()
data_storage = PINECONE_NEW

if data_storage == PINECONE_EXISTING:
    documents = None

try:
    # Initialize Pinecone client and PineconeManager
    pc = Pinecone(api_key=pinecone_api_key)
    manager = PineconeManager(pc, pinecone_api_key, dimensions, selected_embedding_model)

    # Create a unique index name
    index_name = f"{doc_name}-{selected_embedding_model}".lower()
    print(f"Active Index: {index_name}")

    # Set up Pinecone index if required
    if data_storage == PINECONE_NEW:
        spec = ServerlessSpec(cloud='aws', region='us-east-1')
        index = manager.setup_index(index_name, spec)

    # Set up the datastore
    datastore = manager.setup_datastore(data_storage, documents, embeddings, index_name)
except Exception as e:
    print(f"Error: {e}")
    sys.exit(1)

In [None]:
# Answer the questions (Stable)

from ScoringMetric import ScoringMetric
from QuestionAnsweringPipeline import QuestionAnsweringPipeline
from TemplateManager import TemplateManager
from GroundTruthManager import GroundTruthManager

# Initialize TemplateManager with a JSON file and retrieve a specific template
template_manager = TemplateManager("templates.json")
template = template_manager.get_template("default")
print(template)

# List to store questions for model to answer
questions = ["What is the definition of life-cycle cost?",
             "Compare and contrast the analogy and parametric methods of cost estimation.",
             "Compare and contrast total ownership cost and life-cycle cost.",
             "What is learning curve theory?",
             "Define the terms budget authority, committment, obligation, expedniture, and outlay."
             ]  

# Retrieve ground truth answers for the questions
ground_truth_available = False
if ground_truth_available:
    ground_truth_manager = GroundTruthManager("ground_truth.json")
    ground_truth = {q: ground_truth_manager.get_answer(q) for q in questions}
else:
    ground_truth = None

# Initialize ScoringMetric with the selected embedding model
scoring_metric = ScoringMetric(selected_embedding_model)

# Initialize question and answer pipeline
pipeline = QuestionAnsweringPipeline(datastore, model, template, scoring_metric)

# Answer questions
model_output = pipeline.answer_questions(questions, ground_truth)
print(f"Total processing time: {model_output:.2f} seconds")
