In [1]:
import os
import re
import json
import pandas as pd
from pathlib import Path
from typing import List, Dict
import PyPDF2
from docx import Document
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import sent_tokenize

import nltk

# Download required NLTK data
try:
    nltk.data.find('tokenizers/punkt_tab')
except LookupError:
    print("Downloading required NLTK data...")
    nltk.download('punkt_tab')
    nltk.download('punkt')

from nltk.tokenize import sent_tokenize

In [2]:
class DataCollector:
    """Collect and extract text from various file formats"""
    
    def __init__(self, data_directory: str):
        self.data_directory = Path(data_directory)
        self.collected_data = []
    
    def extract_from_pdf(self, file_path: str) -> Dict:
        """Extract text from PDF files"""
        try:
            with open(file_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                text = ""
                for page in pdf_reader.pages:
                    text += page.extract_text()
                
                return {
                    'text': text,
                    'source': file_path,
                    'type': 'pdf',
                    'metadata': {
                        'pages': len(pdf_reader.pages),
                        'filename': Path(file_path).name
                    }
                }
        except Exception as e:
            print(f"Error extracting PDF {file_path}: {e}")
            return None
    
    def extract_from_docx(self, file_path: str) -> Dict:
        """Extract text from Word documents"""
        try:
            doc = Document(file_path)
            text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
            
            return {
                'text': text,
                'source': file_path,
                'type': 'docx',
                'metadata': {
                    'paragraphs': len(doc.paragraphs),
                    'filename': Path(file_path).name
                }
            }
        except Exception as e:
            print(f"Error extracting DOCX {file_path}: {e}")
            return None
    
    def extract_from_txt(self, file_path: str) -> Dict:
        """Extract text from plain text files"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()
                
                return {
                    'text': text,
                    'source': file_path,
                    'type': 'txt',
                    'metadata': {
                        'filename': Path(file_path).name
                    }
                }
        except Exception as e:
            print(f"Error extracting TXT {file_path}: {e}")
            return None
    
    def extract_from_csv(self, file_path: str, text_column: str = 'text') -> List[Dict]:
        """Extract text from CSV files (e.g., clinical trial data)"""
        try:
            df = pd.read_csv(file_path)
            results = []
            
            for idx, row in df.iterrows():
                if text_column in df.columns:
                    results.append({
                        'text': str(row[text_column]),
                        'source': file_path,
                        'type': 'csv',
                        'metadata': {
                            'row_index': idx,
                            'filename': Path(file_path).name,
                            'additional_data': row.to_dict()
                        }
                    })
            return results
        except Exception as e:
            print(f"Error extracting CSV {file_path}: {e}")
            return []
    
    def extract_from_json(self, file_path: str, text_field: str = 'text') -> List[Dict]:
        """Extract text from JSON files (e.g., FHIR data)"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                data = json.load(file)
                results = []
                
                # Handle both single object and array of objects
                if isinstance(data, list):
                    for idx, item in enumerate(data):
                        if text_field in item:
                            results.append({
                                'text': str(item[text_field]),
                                'source': file_path,
                                'type': 'json',
                                'metadata': {
                                    'index': idx,
                                    'filename': Path(file_path).name,
                                    'additional_data': item
                                }
                            })
                else:
                    if text_field in data:
                        results.append({
                            'text': str(data[text_field]),
                            'source': file_path,
                            'type': 'json',
                            'metadata': {
                                'filename': Path(file_path).name,
                                'additional_data': data
                            }
                        })
                
                return results
        except Exception as e:
            print(f"Error extracting JSON {file_path}: {e}")
            return []
    
    def collect_all_documents(self) -> List[Dict]:
        """Collect documents from all supported formats"""
        for file_path in self.data_directory.rglob('*'):
            if file_path.is_file():
                suffix = file_path.suffix.lower()
                
                if suffix == '.pdf':
                    result = self.extract_from_pdf(str(file_path))
                    if result:
                        self.collected_data.append(result)
                
                elif suffix == '.docx':
                    result = self.extract_from_docx(str(file_path))
                    if result:
                        self.collected_data.append(result)
                
                elif suffix == '.txt':
                    result = self.extract_from_txt(str(file_path))
                    if result:
                        self.collected_data.append(result)
                
                elif suffix == '.csv':
                    results = self.extract_from_csv(str(file_path))
                    self.collected_data.extend(results)
                
                elif suffix == '.json':
                    results = self.extract_from_json(str(file_path))
                    self.collected_data.extend(results)
        
        print(f"Collected {len(self.collected_data)} documents")
        return self.collected_data


In [3]:
class TextCleaner:
    """Clean and preprocess extracted text"""
    
    @staticmethod
    def remove_html_tags(text: str) -> str:
        """Remove HTML tags from text"""
        soup = BeautifulSoup(text, 'html.parser')
        return soup.get_text()
    
    @staticmethod
    def remove_special_characters(text: str) -> str:
        """Remove special characters while preserving medical terms"""
        # Keep alphanumeric, spaces, periods, commas, hyphens, parentheses
        text = re.sub(r'[^a-zA-Z0-9\s\.\,\-\(\)]', ' ', text)
        return text
    
    @staticmethod
    def remove_extra_whitespace(text: str) -> str:
        """Remove extra whitespace and blank lines"""
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'\n\s*\n', '\n\n', text)
        return text.strip()
    
    @staticmethod
    def remove_page_numbers(text: str) -> str:
        """Remove page numbers and headers/footers"""
        # Remove standalone numbers that might be page numbers
        text = re.sub(r'\n\d+\n', '\n', text)
        return text
    
    @staticmethod
    def standardize_medical_terms(text: str) -> str:
        """Standardize common autism-related abbreviations"""
        replacements = {
            r'\bASD\b': 'autism spectrum disorder',
            r'\bDSM\b': 'Diagnostic and Statistical Manual',
            r'\bADOS\b': 'Autism Diagnostic Observation Schedule',
            r'\bM-CHAT\b': 'Modified Checklist for Autism in Toddlers',
            r'\bICD\b': 'International Classification of Diseases'
        }
        
        for pattern, replacement in replacements.items():
            text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
        
        return text
    
    def clean_text(self, text: str, expand_abbreviations: bool = False) -> str:
        """Apply all cleaning steps"""
        text = self.remove_html_tags(text)
        text = self.remove_page_numbers(text)
        text = self.remove_extra_whitespace(text)
        text = self.remove_special_characters(text)
        
        # Optionally expand medical abbreviations
        if expand_abbreviations:
            text = self.standardize_medical_terms(text)
        
        return text
    
    def clean_documents(self, documents: List[Dict], expand_abbreviations: bool = False) -> List[Dict]:
        """Clean all collected documents"""
        for doc in documents:
            doc['cleaned_text'] = self.clean_text(doc['text'], expand_abbreviations)
        
        print(f"Cleaned {len(documents)} documents")
        return documents


In [4]:
class TextChunker:
    """Chunk text into smaller, manageable pieces"""
    
    def __init__(self, chunk_size: int = 500, overlap: int = 100):
        self.chunk_size = chunk_size  # characters per chunk
        self.overlap = overlap  # overlap between chunks
    
    def chunk_by_sentences(self, text: str, doc_metadata: Dict) -> List[Dict]:
        """Chunk text by sentences with semantic boundaries"""
        sentences = sent_tokenize(text)
        chunks = []
        current_chunk = ""
        chunk_id = 0
        
        for sentence in sentences:
            if len(current_chunk) + len(sentence) <= self.chunk_size:
                current_chunk += " " + sentence
            else:
                if current_chunk:
                    chunks.append({
                        'chunk_id': chunk_id,
                        'text': current_chunk.strip(),
                        'char_count': len(current_chunk.strip()),
                        'metadata': doc_metadata.copy()
                    })
                    chunk_id += 1
                current_chunk = sentence
        
        # Add the last chunk
        if current_chunk:
            chunks.append({
                'chunk_id': chunk_id,
                'text': current_chunk.strip(),
                'char_count': len(current_chunk.strip()),
                'metadata': doc_metadata.copy()
            })
        
        return chunks
    
    def chunk_with_overlap(self, text: str, doc_metadata: Dict) -> List[Dict]:
        """Chunk text with sliding window overlap"""
        chunks = []
        start = 0
        chunk_id = 0
        
        while start < len(text):
            end = start + self.chunk_size
            chunk_text = text[start:end]
            
            # Try to break at sentence boundary
            if end < len(text):
                last_period = chunk_text.rfind('.')
                if last_period > self.chunk_size * 0.5:
                    end = start + last_period + 1
                    chunk_text = text[start:end]
            
            if chunk_text.strip():  # Only add non-empty chunks
                chunks.append({
                    'chunk_id': chunk_id,
                    'text': chunk_text.strip(),
                    'char_count': len(chunk_text.strip()),
                    'metadata': doc_metadata.copy()
                })
                chunk_id += 1
            
            start = end - self.overlap
            
            # Prevent infinite loop
            if start >= len(text):
                break
        
        return chunks
    
    def chunk_by_paragraphs(self, text: str, doc_metadata: Dict) -> List[Dict]:
        """Chunk text by paragraphs"""
        paragraphs = text.split('\n\n')
        chunks = []
        chunk_id = 0
        
        for para in paragraphs:
            para = para.strip()
            if para and len(para) > 50:  # Minimum paragraph length
                chunks.append({
                    'chunk_id': chunk_id,
                    'text': para,
                    'char_count': len(para),
                    'metadata': doc_metadata.copy()
                })
                chunk_id += 1
        
        return chunks
    
    def chunk_documents(self, documents: List[Dict], method: str = 'sentences') -> List[Dict]:
        """Chunk all documents using specified method"""
        all_chunks = []
        
        for doc in documents:
            text = doc.get('cleaned_text', doc.get('text', ''))
            
            # Skip empty documents
            if not text or len(text) < 50:
                continue
            
            metadata = {
                'source': doc['source'],
                'type': doc['type'],
                **doc.get('metadata', {})
            }
            
            if method == 'sentences':
                chunks = self.chunk_by_sentences(text, metadata)
            elif method == 'paragraphs':
                chunks = self.chunk_by_paragraphs(text, metadata)
            else:  # overlap method
                chunks = self.chunk_with_overlap(text, metadata)
            
            all_chunks.extend(chunks)
        
        print(f"Created {len(all_chunks)} chunks from {len(documents)} documents")
        return all_chunks


In [5]:
def save_chunks_to_json(chunks: List[Dict], output_file: str):
    """Save processed chunks to JSON file"""
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(chunks, f, indent=2, ensure_ascii=False)
    print(f"Saved {len(chunks)} chunks to {output_file}")


def save_chunks_to_jsonl(chunks: List[Dict], output_file: str):
    """Save processed chunks to JSONL file (one JSON object per line)"""
    with open(output_file, 'w', encoding='utf-8') as f:
        for chunk in chunks:
            f.write(json.dumps(chunk, ensure_ascii=False) + '\n')
    print(f"Saved {len(chunks)} chunks to {output_file}")


def print_statistics(chunks: List[Dict]):
    """Print statistics about the processed chunks"""
    if not chunks:
        print("No chunks to analyze")
        return
    
    total_chunks = len(chunks)
    avg_length = sum(chunk['char_count'] for chunk in chunks) / total_chunks
    min_length = min(chunk['char_count'] for chunk in chunks)
    max_length = max(chunk['char_count'] for chunk in chunks)
    
    # Count by document type
    type_counts = {}
    for chunk in chunks:
        doc_type = chunk['metadata'].get('type', 'unknown')
        type_counts[doc_type] = type_counts.get(doc_type, 0) + 1
    
    print("\n" + "="*50)
    print("PROCESSING STATISTICS")
    print("="*50)
    print(f"Total chunks: {total_chunks}")
    print(f"Average chunk length: {avg_length:.0f} characters")
    print(f"Min chunk length: {min_length} characters")
    print(f"Max chunk length: {max_length} characters")
    print("\nChunks by document type:")
    for doc_type, count in type_counts.items():
        print(f"  {doc_type}: {count}")
    print("="*50)


In [7]:
def main():
    """Main execution function"""
    
    # ========== CONFIGURATION ==========
    DATA_DIR = "./"  # Change to your data directory
    OUTPUT_JSON_FILE = "./processed_chunks.json"
    OUTPUT_JSONL_FILE = "./processed_chunks.jsonl"
    
    # Chunking parameters
    CHUNK_SIZE = 5000  # characters per chunk
    OVERLAP = 100  # overlap between chunks
    CHUNKING_METHOD = 'sentences'  # Options: 'sentences', 'overlap', 'paragraphs'
    
    # Cleaning parameters
    EXPAND_ABBREVIATIONS = False  # Set to True to expand medical abbreviations
    
    # ========== STEP 1: COLLECT DOCUMENTS ==========
    print("="*60)
    print("STEP 1: COLLECTING DOCUMENTS")
    print("="*60)
    
    collector = DataCollector(DATA_DIR)
    documents = collector.collect_all_documents()
    print(documents)
    
    
    if not documents:
        print("\n❌ No documents found! Please check your data directory.")
        print(f"   Looking in: {os.path.abspath(DATA_DIR)}")
        return
    
    print(f"✓ Successfully collected {len(documents)} documents\n")
    
    # ========== STEP 2: CLEAN DOCUMENTS ==========
    print("="*60)
    print("STEP 2: CLEANING DOCUMENTS")
    print("="*60)
    
    cleaner = TextCleaner()
    cleaned_docs = cleaner.clean_documents(documents, expand_abbreviations=EXPAND_ABBREVIATIONS)
    print(f"✓ Successfully cleaned {len(cleaned_docs)} documents\n")
    
    # ========== STEP 3: CHUNK DOCUMENTS ==========
    print("="*60)
    print("STEP 3: CHUNKING DOCUMENTS")
    print("="*60)
    
    chunker = TextChunker(chunk_size=CHUNK_SIZE, overlap=OVERLAP)
    chunks = chunker.chunk_documents(cleaned_docs, method=CHUNKING_METHOD)
    
    if not chunks:
        print("\n❌ No chunks created! Check your documents.")
        return
    
    print(f"✓ Successfully created {len(chunks)} chunks\n")
    
    # ========== STEP 4: SAVE PROCESSED DATA ==========
    print("="*60)
    print("STEP 4: SAVING PROCESSED DATA")
    print("="*60)
    
    save_chunks_to_json(chunks, OUTPUT_JSON_FILE)
    save_chunks_to_jsonl(chunks, OUTPUT_JSONL_FILE)
    print()
    
    # ========== STEP 5: PRINT STATISTICS ==========
    print_statistics(chunks)
    
    # ========== COMPLETION MESSAGE ==========
    print("\n✅ DATA PREPARATION COMPLETE!")
    print(f"\nOutput files:")
    print(f"  - JSON format: {os.path.abspath(OUTPUT_JSON_FILE)}")
    print(f"  - JSONL format: {os.path.abspath(OUTPUT_JSONL_FILE)}")
    print("\nNext step: Generate embeddings and store in vector database")


if __name__ == "__main__":
    main()


STEP 1: COLLECTING DOCUMENTS
Collected 18 documents
✓ Successfully collected 18 documents

STEP 2: CLEANING DOCUMENTS
Cleaned 18 documents
✓ Successfully cleaned 18 documents

STEP 3: CHUNKING DOCUMENTS
Created 104 chunks from 18 documents
✓ Successfully created 104 chunks

STEP 4: SAVING PROCESSED DATA
Saved 104 chunks to ./processed_chunks.json
Saved 104 chunks to ./processed_chunks.jsonl


PROCESSING STATISTICS
Total chunks: 104
Average chunk length: 4432 characters
Min chunk length: 640 characters
Max chunk length: 4998 characters

Chunks by document type:
  docx: 27
  pdf: 77

✅ DATA PREPARATION COMPLETE!

Output files:
  - JSON format: e:\Users\Prajj\Documents\7th Sem\RM\Project\processed_chunks.json
  - JSONL format: e:\Users\Prajj\Documents\7th Sem\RM\Project\processed_chunks.jsonl

Next step: Generate embeddings and store in vector database


In [8]:
import numpy as np
print(np.__version__)
import faiss
print("FAISS imported successfully")

1.26.4
FAISS imported successfully


In [9]:
import json
import numpy as np
from pathlib import Path
from typing import List, Dict, Tuple
from tqdm import tqdm
import pickle

# Sentence Transformers for embeddings
from sentence_transformers import SentenceTransformer

# FAISS for vector storage and similarity search
import faiss

# Optional: For better progress tracking
import warnings
warnings.filterwarnings('ignore')

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
class EmbeddingConfig:
    """Configuration for embedding generation and vector database"""
    
    # Input/Output paths
    INPUT_JSONL_FILE = "./processed_chunks.jsonl"
    OUTPUT_EMBEDDINGS_FILE = "./embeddings.pkl"
    OUTPUT_FAISS_INDEX = "./faiss_index.bin"
    OUTPUT_METADATA_FILE = "./chunk_metadata.json"
    
    # Embedding model configuration
    EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"  # Fast, 384 dimensions
    # Alternative models:
    # "sentence-transformers/all-mpnet-base-v2"  # Better quality, 768 dimensions
    # "pritamdeka/BioBERT-mnli-snli-scinli-scitail-mednli-stsb"  # Medical domain
    
    # Processing configuration
    BATCH_SIZE = 32  # Number of chunks to embed at once
    MAX_CHUNKS = None  # Set to a number to limit chunks for testing, None for all
    
    # Vector database configuration
    USE_GPU = False  # Set to True if you have FAISS GPU installed
    DIMENSION = 384  # Must match the embedding model output dimension

In [11]:
def load_chunks_from_jsonl(file_path: str, max_chunks: int = None) -> Tuple[List[str], List[Dict]]:
    """
    Load text chunks and metadata from JSONL file
    
    Returns:
        texts: List of text strings to embed
        metadata: List of metadata dictionaries
    """
    texts = []
    metadata = []
    
    print(f"Loading chunks from {file_path}...")
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for idx, line in enumerate(f):
                if max_chunks and idx >= max_chunks:
                    break
                
                try:
                    chunk = json.loads(line.strip())
                    texts.append(chunk['text'])
                    metadata.append({
                        'chunk_id': chunk.get('chunk_id', idx),
                        'source': chunk['metadata'].get('source', 'unknown'),
                        'type': chunk['metadata'].get('type', 'unknown'),
                        'char_count': chunk.get('char_count', len(chunk['text'])),
                        'filename': chunk['metadata'].get('filename', 'unknown')
                    })
                except json.JSONDecodeError:
                    print(f"Warning: Skipping malformed JSON at line {idx}")
                    continue
        
        print(f"✓ Loaded {len(texts)} text chunks")
        return texts, metadata
    
    except FileNotFoundError:
        print(f"❌ Error: File '{file_path}' not found!")
        print("Please run the data preparation script first.")
        return [], []


In [12]:
class EmbeddingGenerator:
    """Generate embeddings using Sentence Transformers"""
    
    def __init__(self, model_name: str, device: str = 'cpu'):
        """
        Initialize the embedding model
        
        Args:
            model_name: Name or path of the sentence transformer model
            device: 'cpu' or 'cuda' for GPU
        """
        print(f"Loading embedding model: {model_name}")
        self.model = SentenceTransformer(model_name, device=device)
        self.dimension = self.model.get_sentence_embedding_dimension()
        print(f"✓ Model loaded successfully")
        print(f"  Embedding dimension: {self.dimension}")
        print(f"  Device: {self.model.device}")
    
    def generate_embeddings(self, texts: List[str], batch_size: int = 32) -> np.ndarray:
        """
        Generate embeddings for a list of texts
        
        Args:
            texts: List of text strings to embed
            batch_size: Number of texts to process at once
        
        Returns:
            embeddings: Numpy array of shape (n_texts, embedding_dim)
        """
        print(f"\nGenerating embeddings for {len(texts)} texts...")
        print(f"Batch size: {batch_size}")
        
        # Generate embeddings with progress bar
        embeddings = self.model.encode(
            texts,
            batch_size=batch_size,
            show_progress_bar=True,
            convert_to_numpy=True,
            normalize_embeddings=True  # Normalize for cosine similarity
        )
        
        print(f"✓ Generated embeddings with shape: {embeddings.shape}")
        return embeddings
    
    def save_embeddings(self, embeddings: np.ndarray, output_path: str):
        """Save embeddings to disk using pickle"""
        print(f"\nSaving embeddings to {output_path}...")
        with open(output_path, 'wb') as f:
            pickle.dump(embeddings, f)
        print(f"✓ Embeddings saved successfully")


In [13]:
class VectorDatabase:
    """FAISS vector database for similarity search"""
    
    def __init__(self, dimension: int, use_gpu: bool = False):
        """
        Initialize FAISS index
        
        Args:
            dimension: Dimension of the embedding vectors
            use_gpu: Whether to use GPU acceleration
        """
        self.dimension = dimension
        self.use_gpu = use_gpu
        self.index = None
        self.metadata = []
        
        print(f"\nInitializing FAISS index...")
        print(f"  Dimension: {dimension}")
        print(f"  GPU enabled: {use_gpu}")
    
    def create_index(self, embeddings: np.ndarray, metadata: List[Dict]):
        """
        Create FAISS index from embeddings
        
        Args:
            embeddings: Numpy array of embeddings (n_samples, dimension)
            metadata: List of metadata dictionaries for each embedding
        """
        print(f"\nCreating FAISS index with {len(embeddings)} vectors...")
        
        # Create index - using IndexFlatIP for inner product (cosine similarity with normalized vectors)
        self.index = faiss.IndexFlatIP(self.dimension)
        
        # Optionally use GPU
        if self.use_gpu:
            try:
                res = faiss.StandardGpuResources()
                self.index = faiss.index_cpu_to_gpu(res, 0, self.index)
                print("✓ GPU acceleration enabled")
            except Exception as e:
                print(f"Warning: GPU initialization failed: {e}")
                print("Falling back to CPU")
        
        # Add vectors to index
        self.index.add(embeddings.astype('float32'))
        self.metadata = metadata
        
        print(f"✓ Index created successfully")
        print(f"  Total vectors in index: {self.index.ntotal}")
    
    def save_index(self, index_path: str, metadata_path: str):
        """Save FAISS index and metadata to disk"""
        print(f"\nSaving vector database...")
        
        # Save FAISS index
        if self.use_gpu:
            # Convert GPU index to CPU before saving
            cpu_index = faiss.index_gpu_to_cpu(self.index)
            faiss.write_index(cpu_index, index_path)
        else:
            faiss.write_index(self.index, index_path)
        
        print(f"✓ FAISS index saved to: {index_path}")
        
        # Save metadata
        with open(metadata_path, 'w', encoding='utf-8') as f:
            json.dump(self.metadata, f, indent=2, ensure_ascii=False)
        
        print(f"✓ Metadata saved to: {metadata_path}")
    
    def load_index(self, index_path: str, metadata_path: str):
        """Load FAISS index and metadata from disk"""
        print(f"Loading vector database from {index_path}...")
        
        # Load FAISS index
        self.index = faiss.read_index(index_path)
        
        if self.use_gpu:
            try:
                res = faiss.StandardGpuResources()
                self.index = faiss.index_cpu_to_gpu(res, 0, self.index)
                print("✓ GPU acceleration enabled")
            except Exception as e:
                print(f"Warning: GPU not available: {e}")
        
        # Load metadata
        with open(metadata_path, 'r', encoding='utf-8') as f:
            self.metadata = json.load(f)
        
        print(f"✓ Loaded {self.index.ntotal} vectors")
        print(f"✓ Loaded {len(self.metadata)} metadata entries")
    
    def search(self, query_embedding: np.ndarray, k: int = 5) -> Tuple[List[float], List[Dict]]:
        """
        Search for k most similar vectors
        
        Args:
            query_embedding: Query vector (1, dimension)
            k: Number of results to return
        
        Returns:
            scores: Similarity scores
            results: Metadata of matching chunks
        """
        if self.index is None:
            raise ValueError("Index not created or loaded!")
        
        # Ensure query is 2D array
        if len(query_embedding.shape) == 1:
            query_embedding = query_embedding.reshape(1, -1)
        
        # Search
        scores, indices = self.index.search(query_embedding.astype('float32'), k)
        
        # Get metadata for results
        results = [self.metadata[idx] for idx in indices[0]]
        
        return scores[0].tolist(), results


In [14]:
def main():
    """Main execution pipeline"""
    
    # Load configuration
    config = EmbeddingConfig()
    
    print("="*70)
    print("AUTISM RAG PIPELINE - EMBEDDING GENERATION & VECTOR DATABASE CREATION")
    print("="*70)
    
    # ========== STEP 1: LOAD CHUNKS ==========
    print("\n" + "="*70)
    print("STEP 1: LOADING PROCESSED CHUNKS")
    print("="*70)
    
    texts, metadata = load_chunks_from_jsonl(
        config.INPUT_JSONL_FILE,
        max_chunks=config.MAX_CHUNKS
    )
    
    if not texts:
        print("\n❌ No chunks loaded. Exiting.")
        return
    
    # ========== STEP 2: GENERATE EMBEDDINGS ==========
    print("\n" + "="*70)
    print("STEP 2: GENERATING EMBEDDINGS")
    print("="*70)
    
    # Initialize embedding generator
    device = 'cuda' if config.USE_GPU else 'cpu'
    generator = EmbeddingGenerator(config.EMBEDDING_MODEL, device=device)
    
    # Generate embeddings
    embeddings = generator.generate_embeddings(texts, batch_size=config.BATCH_SIZE)
    
    # Save embeddings
    generator.save_embeddings(embeddings, config.OUTPUT_EMBEDDINGS_FILE)
    
    # ========== STEP 3: CREATE VECTOR DATABASE ==========
    print("\n" + "="*70)
    print("STEP 3: CREATING VECTOR DATABASE")
    print("="*70)
    
    # Initialize vector database
    vector_db = VectorDatabase(
        dimension=generator.dimension,
        use_gpu=config.USE_GPU
    )
    
    # Create index
    vector_db.create_index(embeddings, metadata)
    
    # Save index
    vector_db.save_index(config.OUTPUT_FAISS_INDEX, config.OUTPUT_METADATA_FILE)
    
    # ========== SUMMARY ==========
    print("\n" + "="*70)
    print("SUMMARY")
    print("="*70)
    print(f"✓ Processed {len(texts)} text chunks")
    print(f"✓ Generated embeddings: {embeddings.shape}")
    print(f"✓ Created vector database with {vector_db.index.ntotal} vectors")
    print(f"\nOutput files:")
    print(f"  - Embeddings: {config.OUTPUT_EMBEDDINGS_FILE}")
    print(f"  - FAISS Index: {config.OUTPUT_FAISS_INDEX}")
    print(f"  - Metadata: {config.OUTPUT_METADATA_FILE}")
    print("\n✅ EMBEDDING GENERATION COMPLETE!")
    print("\nNext step: Implement RAG query pipeline")
    print("="*70)


if __name__ == "__main__":
    main()


AUTISM RAG PIPELINE - EMBEDDING GENERATION & VECTOR DATABASE CREATION

STEP 1: LOADING PROCESSED CHUNKS
Loading chunks from ./processed_chunks.jsonl...
✓ Loaded 104 text chunks

STEP 2: GENERATING EMBEDDINGS
Loading embedding model: sentence-transformers/all-MiniLM-L6-v2
✓ Model loaded successfully
  Embedding dimension: 384
  Device: cpu

Generating embeddings for 104 texts...
Batch size: 32


Batches: 100%|██████████| 4/4 [00:04<00:00,  1.08s/it]

✓ Generated embeddings with shape: (104, 384)

Saving embeddings to ./embeddings.pkl...
✓ Embeddings saved successfully

STEP 3: CREATING VECTOR DATABASE

Initializing FAISS index...
  Dimension: 384
  GPU enabled: False

Creating FAISS index with 104 vectors...
✓ Index created successfully
  Total vectors in index: 104

Saving vector database...
✓ FAISS index saved to: ./faiss_index.bin
✓ Metadata saved to: ./chunk_metadata.json

SUMMARY
✓ Processed 104 text chunks
✓ Generated embeddings: (104, 384)
✓ Created vector database with 104 vectors

Output files:
  - Embeddings: ./embeddings.pkl
  - FAISS Index: ./faiss_index.bin
  - Metadata: ./chunk_metadata.json

✅ EMBEDDING GENERATION COMPLETE!

Next step: Implement RAG query pipeline





In [15]:
def test_search(query_text: str, k: int = 5):
    """
    Test the vector database with a sample query.
    
    Args:
        query_text: Text to search for
        k: Number of results to return
    """
    print(f"\nTesting search with query: '{query_text}'")
    print("="*70)
    
    # Load configuration
    config = EmbeddingConfig()
    
    # Load embedding model
    print("Loading embedding model...")
    model = SentenceTransformer(config.EMBEDDING_MODEL)
    
    # Load vector database
    print("Loading vector database...")
    vector_db = VectorDatabase(dimension=config.DIMENSION, use_gpu=config.USE_GPU)
    vector_db.load_index(config.OUTPUT_FAISS_INDEX, config.OUTPUT_METADATA_FILE)
    
    # Generate query embedding
    print("Generating query embedding...")
    query_embedding = model.encode([query_text], normalize_embeddings=True)
    
    # Search
    print(f"Searching for top {k} results...\n")
    scores, results = vector_db.search(query_embedding, k=k)

    print(results)
    
    # Display results
    print("SEARCH RESULTS:")
    print("="*70)
    for idx, (score, result) in enumerate(zip(scores, results), 1):
        print(f"\nResult {idx}:")
        print(f"  Score:      {score:.4f}")
        print(f"  Source:     {result['filename']}")
        print(f"  Type:       {result['type']}")
        print(f"  Chunk ID:   {result['chunk_id']}")
        print(f"  Text:")
        print(result.get('text', '[Text Not Found]'))
        print("-"*70)


# Example usage:
test_search("What does parents do for autism spectrum disorder?", k=3)



Testing search with query: 'What does parents do for autism spectrum disorder?'
Loading embedding model...
Loading vector database...

Initializing FAISS index...
  Dimension: 384
  GPU enabled: False
Loading vector database from ./faiss_index.bin...
✓ Loaded 104 vectors
✓ Loaded 104 metadata entries
Generating query embedding...
Searching for top 3 results...

[{'chunk_id': 0, 'source': 'papers\\video.pdf', 'type': 'pdf', 'char_count': 4913, 'filename': 'video.pdf'}, {'chunk_id': 2, 'source': 'papers\\video.pdf', 'type': 'pdf', 'char_count': 3899, 'filename': 'video.pdf'}, {'chunk_id': 0, 'source': 'papers\\video_4.pdf', 'type': 'pdf', 'char_count': 4937, 'filename': 'video_4.pdf'}]
SEARCH RESULTS:

Result 1:
  Score:      0.6128
  Source:     video.pdf
  Type:       pdf
  Chunk ID:   0
  Text:
[Text Not Found]
----------------------------------------------------------------------

Result 2:
  Score:      0.6060
  Source:     video.pdf
  Type:       pdf
  Chunk ID:   2
  Text:
[Text 

In [16]:
import requests

class RAGPipeline:
    def __init__(self, embedding_model_name: str, faiss_index_path: str, metadata_path: str, gemini_api_key: str, use_gpu=False):
        # Load embedding model
        self.model = SentenceTransformer(embedding_model_name)
        
        # Load FAISS index
        if use_gpu:
            res = faiss.StandardGpuResources()
            index_cpu = faiss.read_index(faiss_index_path)
            self.index = faiss.index_cpu_to_gpu(res, 0, index_cpu)
        else:
            self.index = faiss.read_index(faiss_index_path)
        
        # Load metadata
        with open(metadata_path, 'r', encoding='utf-8') as f:
            self.metadata = json.load(f)
        
        # Store Gemini API key
        self.gemini_api_key = gemini_api_key
    
    def embed_query(self, query: str) -> np.ndarray:
        # Generate normalized query embedding
        embedding = self.model.encode([query], normalize_embeddings=True)
        return embedding
    
    def retrieve(self, query_embedding: np.ndarray, top_k: int = 5):
        # Search FAISS index for top_k similar chunks
        scores, indices = self.index.search(query_embedding.astype('float32'), top_k)
        results = []
        for score, idx in zip(scores[0], indices[0]):
            chunk_info = self.metadata[idx]
            chunk_info['score'] = float(score)
            results.append(chunk_info)
        return results
    
    def generate_prompt(self, query:str, retrieved_chunks:list) -> str:
        # Format the context and query into a prompt string
        context = "\n\n".join([f"{i+1}. {chunk['text']}" for i, chunk in enumerate(retrieved_chunks)])
        prompt = (f"Use the following autism-related information to answer the question below:\n\n"
                  f"{context}\n\n"
                  f"Question: {query}\n"
                  f"Answer:")
        return prompt
    
    def query(self, user_query: str, top_k: int = 5) -> str:
        # Embed user query
        query_emb = self.embed_query(user_query)
        
        # Retrieve top-k context chunks
        retrieved = self.retrieve(query_emb, top_k)
        
        # Generate prompt for Gemini LLM
        prompt = self.generate_prompt(user_query, retrieved)
        
        # Call Gemini API for answer generation
        response_text = self.call_gemini_api(prompt)
        
        return response_text, retrieved
    
    def call_gemini_api(self, prompt: str) -> str:
        """
        Call Gemini LLM API with prompt and return the generated text
        
        Note: You must replace the URL, headers, and request details based on Gemini API docs.
        """
        url = "https://api.gemini.example/v1/generate"  # Replace with actual Gemini API URL
        
        headers = {
            "Authorization": f"Bearer {self.gemini_api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "prompt": prompt,
            "max_tokens": 512,
            "temperature": 0.7,
            # Add other Gemini API parameters as needed
        }
        
        try:
            response = requests.post(url, headers=headers, json=payload)
            response.raise_for_status()
            data = response.json()
            # Adjust below according to Gemini API response format
            generated_text = data.get("text") or data.get("generated_text") or ""
            return generated_text.strip()
        except Exception as e:
            print(f"Error calling Gemini API: {e}")
            return "Error: Unable to generate response at this time."



In [None]:
import os
os.environ["GEMINI_API_KEY"] = "YOUR_GEMINI_API_KEY_HERE"

In [18]:
import json
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from google import genai  # Google Gemini API client

# Initialize Gemini client (API key loaded from environment variable GEMINI_API_KEY)
client = genai.Client()

In [None]:
class GeminiConfig:
    VECTOR_INDEX_PATH = "./faiss_index.bin"
    METADATA_PATH = "./chunk_metadata.json"
    EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
    GEMINI_MODEL = "gemini-2.5-flash"  # Updated to valid model name
    TOP_K = 3


In [26]:
class VectorDB:
    def __init__(self, index_path: str, metadata_path: str):
        self.index = faiss.read_index(index_path)
        with open(metadata_path, 'r', encoding='utf-8') as f:
            self.metadata = json.load(f)

    def search(self, query_embedding: np.ndarray, k: int):
        if len(query_embedding.shape) == 1:
            query_embedding = query_embedding.reshape(1, -1)
        scores, indices = self.index.search(query_embedding.astype('float32'), k)
        results = []
        for idx in indices[0]:
            results.append(self.metadata[idx])
        return scores[0], results

In [27]:
embedding_model = SentenceTransformer(GeminiConfig.EMBEDDING_MODEL_NAME)

In [28]:
def call_gemini_api(prompt: str) -> str:
    response = client.models.generate_content(
        model=GeminiConfig.GEMINI_MODEL,
        contents=prompt,
        # Optional: disable "thinking" feature to speed up responses
        config=genai.types.GenerateContentConfig(
            thinking_config=genai.types.ThinkingConfig(thinking_budget=0)
        )
    )
    return response.text

In [29]:
def rag_query_with_gemini(query_text: str):
    print("\nEncoding query...")
    query_embedding = embedding_model.encode([query_text], normalize_embeddings=True)

    print(f"Searching vector database for top {GeminiConfig.TOP_K} relevant chunks...")
    vector_db = VectorDB(GeminiConfig.VECTOR_INDEX_PATH, GeminiConfig.METADATA_PATH)
    scores, retrieved_chunks = vector_db.search(query_embedding, GeminiConfig.TOP_K)

    print("\nRetrieved chunks and scores:")
    context = ""
    for i, (score, chunk) in enumerate(zip(scores, retrieved_chunks), 1):
        print(f"Chunk {i} - Score: {score:.4f}, Source: {chunk['filename']}")
        chunk_text = chunk.get('text', '')  # Ensure chunk text exists in metadata
        print(f"Text preview: {chunk_text[:300]}...\n")
        context += chunk_text + "\n"

    prompt = f"Use the following context to answer the question:\n{context}\nQuestion: {query_text}\nAnswer:"

    print("Calling Gemini API for answer generation...")
    answer = call_gemini_api(prompt)

    print("\n----- RAG Answer (Gemini) -----")
    print(answer.strip())
    print("-------------------------------")

    return answer


In [30]:
if __name__ == "__main__":
    user_question = "What are early signs of autism in toddlers?"
    rag_query_with_gemini(user_question)


Encoding query...
Searching vector database for top 3 relevant chunks...

Retrieved chunks and scores:
Chunk 1 - Score: 0.5559, Source: video.pdf
Text preview: ...

Chunk 2 - Score: 0.5541, Source: video_4.pdf
Text preview: ...

Chunk 3 - Score: 0.5460, Source: video_2.pdf
Text preview: ...

Calling Gemini API for answer generation...


ClientError: 404 NOT_FOUND. {'error': {'code': 404, 'message': 'models/gemini-1.5-flash is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.', 'status': 'NOT_FOUND'}}