In [1]:
pip install spacy

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pdfplumber
import docx
import spacy
from ipywidgets import FileUpload
import io

# Load SpaCy model
nlp = spacy.load("en_core_web_sm")

# File uploader widget
upload_widget = FileUpload(accept='.pdf,.docx,.txt', multiple=False)

# Display widget
display(upload_widget)

# Extract text from PDF
def extract_text_from_pdf(file_stream):
    text = ""
    with pdfplumber.open(file_stream) as pdf:
        for page in pdf.pages:
            text += page.extract_text() or ''
    return text

# Extract text from DOCX
def extract_text_from_docx(file_stream):
    doc = docx.Document(file_stream)
    return '\n'.join([para.text for para in doc.paragraphs])

# Extract text from TXT
def extract_text_from_txt(file_stream):
    return file_stream.read().decode('utf-8')

# Main text extractor
def extract_text(uploaded_file):
    file_info = uploaded_file.value[0]
    filename = file_info['name']
    content = file_info['content']
    ext = filename.split('.')[-1].lower()
    file_stream = io.BytesIO(content)

    if ext == 'pdf':
        return extract_text_from_pdf(file_stream)
    elif ext == 'docx':
        return extract_text_from_docx(file_stream)
    elif ext == 'txt':
        return extract_text_from_txt(file_stream)
    else:
        raise ValueError("Unsupported file type. Please upload a PDF, DOCX, or TXT file.")

# Chunk text using SpaCy
def chunk_text(text, chunk_size=200, overlap=50):
    words = [token.text for token in nlp(text) if not token.is_space]
    chunks = []
    start = 0
    while start < len(words):
        end = start + chunk_size
        chunk = words[start:end]
        chunks.append(' '.join(chunk))
        start += chunk_size - overlap
    return chunks




FileUpload(value=(), accept='.pdf,.docx,.txt', description='Upload')

In [3]:
# Main logic
def process_file():
    if upload_widget.value:
        try:
            raw_text = extract_text(upload_widget)
            chunks = chunk_text(raw_text)
            print(f"✅ Total Chunks: {len(chunks)}\n")
            for i, chunk in enumerate(chunks[:5]):
                print(f"\n--- Chunk {i+1} ---\n{chunk[:500]}...")
        except Exception as e:
            print(f"❌ Error: {e}")
    else:
        print("⚠️ Please upload a file.")

# Run the process
process_file()

⚠️ Please upload a file.


### Insurance BERT model

In [4]:
import os
import pdfplumber
import re
import nltk
from nltk.tokenize import word_tokenize
from transformers import AutoTokenizer, AutoModel
import torch
from tqdm import tqdm

In [5]:
# Load InsuranceBERT model
# Load model directly
from transformers import AutoTokenizer, AutoModel
tokenizer = AutoTokenizer.from_pretrained("llmware/industry-bert-insurance-v0.1")
model = AutoModel.from_pretrained("llmware/industry-bert-insurance-v0.1")

def get_embedding(text):
    tokens = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors='pt')
    with torch.no_grad():
        output = model(**tokens)
    embeddings = output.last_hidden_state.mean(dim=1)
    return embeddings.squeeze().numpy()

### Processing all docs 

In [1]:
import os
import torch
import faiss
import spacy
import pdfplumber
import docx
import numpy as np
from transformers import AutoTokenizer, AutoModel

# Load spaCy and InsuranceBERT
nlp = spacy.load("en_core_web_sm")
tokenizer = AutoTokenizer.from_pretrained("llmware/industry-bert-insurance-v0.1")
model = AutoModel.from_pretrained("llmware/industry-bert-insurance-v0.1")

# Utility to extract text
def extract_text(file_path):
    ext = file_path.split('.')[-1].lower()
    if ext == 'pdf':
        with pdfplumber.open(file_path) as pdf:
            return ' '.join([page.extract_text() or '' for page in pdf.pages])
    elif ext == 'docx':
        doc = docx.Document(file_path)
        return '\n'.join([p.text for p in doc.paragraphs])
    elif ext == 'txt':
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    else:
        raise ValueError("Unsupported file type")

# Utility to chunk text
def chunk_text(text, chunk_size=200, overlap=50):
    doc = nlp(text)
    words = [token.text for token in doc if not token.is_space]
    chunks = []
    start = 0
    while start < len(words):
        end = start + chunk_size
        chunks.append(' '.join(words[start:end]))
        start += chunk_size - overlap
    return chunks

# Get embedding for each chunk
def get_embedding(text):
    tokens = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors='pt')
    with torch.no_grad():
        output = model(**tokens)
    return output.last_hidden_state.mean(dim=1).squeeze().numpy()

# Extract policy name from text (heuristic: first few lines)
def extract_policy_name(text):
    lines = text.strip().split('\n')
    for line in lines[:10]:
        if "policy" in line.lower() or "plan" in line.lower():
            return line.strip()
    return "Unknown_Policy"



#### Process FOlder and build FAISS Index

In [2]:
from tqdm import tqdm

def build_faiss_index(folder_path):
    index = faiss.IndexFlatL2(768)
    chunk_metadata = []  # stores (policy_name, file_name, chunk_text)

    for filename in tqdm(os.listdir(folder_path)):
        if filename.endswith(('.pdf', '.docx', '.txt')):
            file_path = os.path.join(folder_path, filename)
            try:
                full_text = extract_text(file_path)
                policy_name = extract_policy_name(full_text)
                chunks = chunk_text(full_text)

                for chunk in chunks:
                    embedding = get_embedding(chunk)
                    index.add(np.array([embedding], dtype='float32'))
                    chunk_metadata.append((policy_name, filename, chunk))

            except Exception as e:
                print(f"⚠️ Error processing {filename}: {e}")

    return index, chunk_metadata



#### Save and load Index + Metadata 

In [3]:
import pickle

# Save index and metadata
def save_index(index, metadata, index_path='faiss_index.index', metadata_path='metadata.pkl'):
    faiss.write_index(index, index_path)
    with open(metadata_path, 'wb') as f:
        pickle.dump(metadata, f)

# Load index and metadata
def load_index(index_path='faiss_index.index', metadata_path='metadata.pkl'):
    index = faiss.read_index(index_path)
    with open(metadata_path, 'rb') as f:
        metadata = pickle.load(f)
    return index, metadata


#### Query and retrieve relevant chunks

In [None]:
def search_index(query, index, metadata, top_k=5):
    query_emb = get_embedding(query).reshape(1, -1).astype('float32')
    D, I = index.search(query_emb, top_k)

    results = []
    for i in I[0]:
        if i < len(metadata):
            results.append(metadata[i])
    return results


: 

In [10]:
"""# Replace with your actual path to the folder containing the 20 policy files
folder_path = r"D:\NLPInsuranceProject\NLPINSURANCE-FINTECHPROJ\policy_pdfs"

# Build FAISS Index and metadata
index, metadata = build_faiss_index(folder_path)

# Save for future use
save_index(index, metadata)
"""

SyntaxError: (unicode error) 'unicodeescape' codec can't decode bytes in position 95-96: malformed \N character escape (1776261985.py, line 1)

#### Load and Query the FAISS Index

In [None]:
# Load saved index and metadata
index, metadata = load_index()

# Sample user query
user_query = "What is the waiting period for pre-existing conditions?"

# Search top 5 relevant chunks
results = search_index(user_query, index, metadata)

# Display results
for i, (policy_name, filename, chunk) in enumerate(results, 1):
    print(f"\n🔹 Result {i}")
    print(f"📄 Policy: {policy_name}")
    print(f"📁 File: {filename}")
    print(f"📜 Text: {chunk[:500]}...")


#### Check Policy name in the database 

In [None]:
# If the user uploads a new PDF
user_uploaded_file = r"C:\Users\Sejal Hanmante\Downloads\healthplus-policydocument.pdf"

# Extract text & policy name
text = extract_text(user_uploaded_file)
policy_name = extract_policy_name(text)

# Check if this policy name exists in your metadata
existing_policy_names = set([meta[0] for meta in metadata])

if policy_name in existing_policy_names:
    print(f"✅ Policy '{policy_name}' exists in the database.")
    # You can now directly query the FAISS index using `search_index`
else:
    print("❌ Policy not found, reprocessing...")
    # Call the same logic as in `build_faiss_index` to chunk and embed on-the-fly


In [None]:
test_queries = {
    "What is the sum insured under this plan?": "Sum Insured",
    "How long is the waiting period for diabetes?": "Waiting Period",
    "What OPD cover is provided?": "OPD Coverage"
}

for query, expected in test_queries.items():
    results = search_index(query, index, metadata)
    print(f"\n🧪 Query: {query}")
    match_found = any(expected.lower() in chunk.lower() for _, _, chunk in results)
    print(f"✅ Match found: {match_found}")


#### Eval Metrics

In [None]:
!pip install scikit-learn

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score
import numpy as np

def evaluate_retrieval(test_queries, index, metadata, top_k=5):
    precision_list = []
    recall_list = []
    hit_at_k = []
    reciprocal_ranks = []

    for query, expected_keyword in test_queries.items():
        results = search_index(query, index, metadata, top_k=top_k)

        retrieved_texts = [chunk.lower() for _, _, chunk in results]
        match_flags = [int(expected_keyword.lower() in text) for text in retrieved_texts]

        # Metrics
        precision = sum(match_flags) / top_k
        recall = 1 if any(match_flags) else 0
        rank = next((i + 1 for i, match in enumerate(match_flags) if match), None)

        precision_list.append(precision)
        recall_list.append(recall)
        hit_at_k.append(recall)
        reciprocal_ranks.append(1 / rank if rank else 0)

        print(f"\n🔍 Query: {query}")
        print(f"✅ Keyword matched in top-{top_k}: {bool(rank)} at rank {rank if rank else '-'}")

    # Aggregate Metrics
    print("\n📊 Evaluation Results:")
    print(f"🔹 Precision@{top_k}: {np.mean(precision_list):.2f}")
    print(f"🔹 Recall@{top_k}: {np.mean(recall_list):.2f}")
    print(f"🔹 F1 Score: {f1_score(recall_list, [1]*len(recall_list)):.2f}")
    print(f"🔹 MRR: {np.mean(reciprocal_ranks):.2f}")
    print(f"🔹 Hit@{top_k}: {np.mean(hit_at_k):.2f}")


In [None]:
index, metadata = load_index()  # Load saved FAISS + meta
evaluate_retrieval(test_queries, index, metadata)