In [None]:
!pip install pypdf python-dotenv langchain sentence-transformers faiss-cpu hnswlib annoy
!pip install -U sentence-transformers

In [None]:
import torch
print("GPU available:", torch.cuda.is_available())

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from pypdf import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Load PDF
def load_pdf(file_path):
    reader = PdfReader(file_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text()
    return text

# Split text into chunks
def split_text(text, chunk_size=500, chunk_overlap=50):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len
    )
    return splitter.split_text(text)

# Process the PDF
file_path = "/content/OSHA4472.pdf"  # Change if your filename is different
text = load_pdf(file_path)
chunks = split_text(text)

print(f"Loaded {len(chunks)} chunks")
print("Sample chunk:\n", chunks[0][:200] + "...")  # Show first 200 chars of first chunk

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
import time
import faiss

# Initialize embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')

# Generate embeddings
start_time = time.time()
embeddings = model.encode(chunks, show_progress_bar=True)
embeddings = np.array(embeddings, dtype='float32')  # FAISS requires float32
end_time = time.time()

print(f"Generated {len(embeddings)} embeddings in {end_time-start_time:.2f} seconds")
print(f"Embedding dimension: {embeddings.shape[1]}")
print(f"Embeddings dtype: {embeddings.dtype}")

# Create FAISS index (we'll use this for comparison later)
dimension = embeddings.shape[1]
faiss_index = faiss.IndexFlatL2(dimension)
faiss_index.add(embeddings)

# Save everything
np.save('/content/embeddings.npy', embeddings)
faiss.write_index(faiss_index, '/content/faiss_flat.index')

# Text database
text_database = {i: chunk for i, chunk in enumerate(chunks)}

In [None]:
import hnswlib
import numpy as np

# Load our embeddings from previous step
embeddings = np.load('/content/embeddings.npy')
dim = embeddings.shape[1]

# Create HNSW index
hnsw_index = hnswlib.Index(space='cosine', dim=dim)  # Using cosine similarity
hnsw_index.init_index(max_elements=len(embeddings), ef_construction=200, M=16)

# Add items to index
hnsw_index.add_items(embeddings, np.arange(len(embeddings)))

# Set query time parameters
hnsw_index.set_ef(50)  # Should be > than the number of neighbors you'll query

# Save the index
hnsw_index.save_index('/content/hnsw_index.bin')

# Create retrieval function
def hnsw_retrieve(query, k=3):
    query_embedding = model.encode([query])
    labels, distances = hnsw_index.knn_query(query_embedding, k=k)
    return [(text_database[label], float(distance))
            for label, distance in zip(labels[0], distances[0])]

# Test retrieval
test_query = "What are the electrical safety requirements?"
start_time = time.time()
hnsw_results = hnsw_retrieve(test_query)
hnsw_time = time.time() - start_time

print(f"HNSW Retrieval Time: {hnsw_time:.4f} seconds")
print("Top results:")
for i, (result, score) in enumerate(hnsw_results):
    print(f"\nResult {i+1} (Score: {score:.3f}):")
    print(result[:200] + "...")

In [None]:
from annoy import AnnoyIndex
import time

# Create ANNOY index
annoy_index = AnnoyIndex(dim, 'angular')  # Angular distance ≈ cosine similarity
for i, emb in enumerate(embeddings):
    annoy_index.add_item(i, emb)

# Build the index with trees (more trees = more precision)
num_trees = 50
annoy_index.build(num_trees)

# Save the index
annoy_index.save('/content/annoy_index.ann')

# Create retrieval function with PQ approximation
def annoy_retrieve(query, k=3):
    query_embedding = model.encode([query])[0]
    start_time = time.time()
    labels = annoy_index.get_nns_by_vector(query_embedding, k,
                                         include_distances=True)
    retrieval_time = time.time() - start_time

    results = [(text_database[label], 1 - (distance**2)/2)  # Convert angular to cosine
              for label, distance in zip(labels[0], labels[1])]
    return results, retrieval_time

# Test retrieval with same query
test_query = "What are the electrical safety requirements?"
annoy_results, annoy_time = annoy_retrieve(test_query)

print(f"\nANNOY Retrieval Time: {annoy_time:.4f} seconds")
print("Top results:")
for i, (result, score) in enumerate(annoy_results):
    print(f"\nResult {i+1} (Score: {score:.3f}):")
    print(result[:200] + "...")

# Add Product Quantization (for memory efficiency)
# Note: ANNOY doesn't directly support PQ, so we'll simulate it by reducing dimensionality
num_subvectors = 8  # Number of product quantizers
pq_dim = dim // num_subvectors  # Dimensions per quantizer

print(f"\nSimulating PQ with {num_subvectors} subvectors of {pq_dim} dimensions each")

In [None]:
test_queries = [
    "What are the electrical safety requirements?",
    "How should workers handle cement dust exposure?",
    "What PPE is required for steel welding?",
    "Emergency procedures for chemical spills",
    "Scaffolding safety guidelines"
]

In [None]:
def evaluate_method(method_func, queries):
    times = []
    results = []

    for query in queries:
        if method_func.__name__ == 'hnsw_retrieve':
            start = time.time()
            result = method_func(query)
            elapsed = time.time() - start
        else:
            result, elapsed = method_func(query)

        times.append(elapsed)
        results.append([r[0] for r in result])  # Just the text results

    return results, np.mean(times)

# Evaluate both methods
hnsw_results, hnsw_avg_time = evaluate_method(hnsw_retrieve, test_queries)
annoy_results, annoy_avg_time = evaluate_method(annoy_retrieve, test_queries)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Create ground truth using exact cosine similarity
def get_ground_truth(query):
    query_embed = model.encode([query])
    scores = cosine_similarity(query_embed, embeddings)[0]
    top_indices = np.argsort(scores)[-3:][::-1]
    return [text_database[i] for i in top_indices]

# Calculate accuracy metrics
def calculate_accuracy(method_results):
    accuracies = []
    for query, method_result in zip(test_queries, method_results):
        ground_truth = get_ground_truth(query)
        overlap = len(set(method_result) & set(ground_truth))
        accuracies.append(overlap / len(ground_truth))
    return np.mean(accuracies)

hnsw_accuracy = calculate_accuracy(hnsw_results)
annoy_accuracy = calculate_accuracy(annoy_results)

In [None]:
import matplotlib.pyplot as plt

# Data for plotting
methods = ['HNSW', 'ANNOY']
times = [hnsw_avg_time, annoy_avg_time]
accuracies = [hnsw_accuracy, annoy_accuracy]

# Create subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# Time comparison
ax1.bar(methods, times, color=['blue', 'orange'])
ax1.set_title('Average Retrieval Time (seconds)')
ax1.set_ylim(0, max(times)*1.1)

# Accuracy comparison
ax2.bar(methods, accuracies, color=['blue', 'orange'])
ax2.set_title('Accuracy (Match with Ground Truth)')
ax2.set_ylim(0, 1)

plt.tight_layout()
plt.show()

# Print numerical results
print(f"\n{'Metric':<20} {'HNSW':<10} {'ANNOY':<10}")
print(f"{'Avg Time (s)':<20} {hnsw_avg_time:.4f} {annoy_avg_time:.4f}")
print(f"{'Accuracy':<20} {hnsw_accuracy:.2f} {annoy_accuracy:.2f}")

In [None]:
def compare_retrieval(query, k=3):
    print(f"\nQuery: '{query}'\n")
    print("="*50)

    # HNSW Retrieval
    start_time = time.time()
    hnsw_results = hnsw_retrieve(query, k=k)
    hnsw_time = time.time() - start_time

    print(f"\nHNSW Results (Time: {hnsw_time:.4f}s):")
    for i, (result, score) in enumerate(hnsw_results):
        print(f"\nRank {i+1} (Score: {score:.3f}):")
        print(result[:300] + "..." if len(result) > 300 else result)

    # ANNOY Retrieval
    annoy_results, annoy_time = annoy_retrieve(query, k=k)

    print(f"\nANNOY Results (Time: {annoy_time:.4f}s):")
    for i, (result, score) in enumerate(annoy_results):
        print(f"\nRank {i+1} (Score: {score:.3f}):")
        print(result[:300] + "..." if len(result) > 300 else result)

    # Comparison
    print("\n" + "="*50)
    print(f"\nComparison Summary:")
    print(f"- HNSW was {annoy_time/hnsw_time:.1f}x faster" if hnsw_time < annoy_time
          else f"- ANNOY was {hnsw_time/annoy_time:.1f}x faster")

    # Check if top results match
    hnsw_top = [r[0] for r in hnsw_results][0]
    annoy_top = [r[0] for r in annoy_results][0]
    if hnsw_top == annoy_top:
        print("- Both methods agreed on the top result")
    else:
        print("- Methods disagreed on the top result")

# Example usage:


In [None]:
compare_retrieval("What are the requirements for working at heights?")

In [None]:
compare_retrieval("How should workers handle cement dust exposure?")

In [None]:
compare_retrieval("Emergency procedures for chemical spills")

In [None]:
compare_retrieval("What PPE is required for steel welding?")

In [None]:
compare_retrieval("Scaffolding safety guidelines")