In [None]:
!pip install tqdm numpy scikit-learn networkx bs4 matplotlib requests rank_bm25 openai faiss-cpu python-dotenv

In [None]:
from tqdm import tqdm
import numpy as np
import networkx as nx
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
import os

from rank_bm25 import BM25Okapi
from openai import OpenAI
import faiss
from dotenv import load_dotenv

from utils import *



In [None]:
course_numbers = [
    1043, 1045, 1046, 1047, 1048, 1049, 1050, 1054, 1055, 1056, 
    1057, 1058, 1059, 1060, 1061, 1062, 1063, 1064, 1065, 1066, 
    1067, 1068, 1069, 1221
]

courses = []
certificates = []
certificate_htmls_location = 'certificate_htmls'

for html in tqdm(os.listdir(certificate_htmls_location)):
    certificate = Certificate(f'{certificate_htmls_location}/{html}')
    certificates.append(certificate)
    
for n in tqdm(course_numbers):
    new_course = Course(f'https://certification.adobe.com/courses/{n}')
    courses.append(new_course)



  0%|          | 0/54 [00:00<?, ?it/s]

100%|██████████| 54/54 [00:01<00:00, 32.25it/s]
100%|██████████| 24/24 [00:11<00:00,  2.02it/s]


In [None]:
documents: list[Source] = []
doc2source: dict[str, Source] = {}

for i, course in enumerate(courses):
    documents.append(course.to_text())
    doc2source[course.to_text()] = course

for i, cert in enumerate(certificates):
    documents.append(cert.to_text())
    doc2source[cert.to_text()] = cert


In [None]:
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")

tag2embed = {}

# documents = [
#     "Neural networks are a key part of deep learning.",
#     "BM25 is a ranking function used in search engines.",
#     "Reinforcement learning is used in decision-making.",
#     "Graph neural networks process graph-structured data."
# ]

# Keyword Search
tokenized_corpus = [doc.split() for doc in documents]
bm25 = BM25Okapi(tokenized_corpus)

# Semantic Search
client = OpenAI(api_key=openai_api_key)

def get_embedding(text):
    if text in tag2embed:
        return tag2embed[text]
    print("New embed for:", text)
    embed = client.embeddings.create(input=text, model="text-embedding-ada-002").data[0].embedding
    tag2embed[text] = embed
    return embed

embeddings = np.array([get_embedding(doc) for doc in documents])
d = embeddings.shape[1]  # Embedding dimension
index = faiss.IndexFlatL2(d)
index.add(embeddings)

In [None]:
# Tag Search

type_e = np.array([get_embedding(doc.type) for doc in doc2source.values()])
level_e = np.array([get_embedding(doc.level) for doc in doc2source.values()])
category_e = np.array([get_embedding(doc.category) for doc in doc2source.values()])
job_e = np.array([get_embedding(doc.job_role) for doc in doc2source.values()])

d = embeddings.shape[1]  # Embedding dimension
index = faiss.IndexFlatL2(d)

# Tag name, embedding of tag name, indices
tag_embeddings = {
    "type": faiss.IndexFlatL2(type_e.shape[1]),
    "level": faiss.IndexFlatL2(level_e.shape[1]),
    "category": faiss.IndexFlatL2(category_e.shape[1]),
    "job": faiss.IndexFlatL2(job_e.shape[1])
}

In [None]:
from collections import defaultdict

def cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

def rrf_fusion(results, k=60):
    """
    Computes Reciprocal Rank Fusion (RRF) for multiple ranked lists.
    
    :param results: Dict of {method_name: {doc_id: rank_position}}
    :param k: Small constant (default: 60) for score scaling.
    :return: Sorted list of (document_id, RRF score).
    """
    rrf_scores = {}

    for method, ranked_docs in results.items():
        for doc_id, rank in ranked_docs.items():
            if doc_id not in rrf_scores:
                rrf_scores[doc_id] = 0
            rrf_scores[doc_id] += 1 / (k + rank)

    return sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)    

def weighted_fusion(results, weights={"BM25": 0.5, "Vector": 0.5}):
    """
    Performs weighted sum fusion for multiple ranked lists.

    :param results: Dict of {method_name: {doc_id: rank_position}}
    :param weights: Dict of {method_name: weight}, e.g., {"BM25": 0.5, "Vector": 0.5}
    :return: Sorted list of (document_id, weighted_score).
    """
    hybrid_results = defaultdict[int]
    for i, (method, score) in enumerate(results.items()):
        print(method, score)
        hybrid_results[i] += weights[method] * score
    
    sorted_results = sorted(hybrid_results.items(), key=lambda x: x[1], reverse=True)
    return sorted_results

def tag_search(query: str, tag_embeddings: dict[str, faiss.IndexFlatL2], top_n=10) -> dict[int, int]:
    query_embedding = np.array(get_embedding(query)).reshape(1, -1)
    tag_results = {}
    tag_weights = {}

    for tag in tag_embeddings:
        # Compute tag relevance weight
        if tag == "type":
            tag_weight = 1.0  # Hardcoded priority for `type` tags
        else:
            tag_weight = cosine_similarity(get_embedding(query), get_embedding(tag))

        tag_weights[tag] = tag_weight  # Store weight for weighted fusion

        # Perform FAISS search for this tag
        _, vector_top_n = tag_embeddings[tag].search(query_embedding, top_n)
        
        # Convert FAISS result to {doc_id: rank_position}
        result = {i.item(): rank + 1 for rank, i in enumerate(vector_top_n[0])}  # Rank starts from 1
        tag_results[tag] = result  # Store per-tag search results

    # Normalize tag weights so they sum to 1
    total_weight = sum(tag_weights.values())
    if total_weight > 0:
        tag_weights = {k: v / total_weight for k, v in tag_weights.items()}
    
    # Fuse tag results using weighted fusion
    results = weighted_fusion(tag_results, tag_weights)

    return {doc_id: rank for doc_id, rank in results}  # Return matching output format


def keyword_search(query: str, bm25: BM25Okapi, top_n=10) -> dict[int, int]:
    """
    Perform BM25 keyword search and return ranked results.
    """
    tokenized_query = query.split()
    bm25_scores = bm25.get_scores(tokenized_query)
    bm25_top_n = np.argsort(bm25_scores)[::-1][:top_n]  # Get top BM25 results

    return {i.item(): rank+1 for rank, i in enumerate(bm25_top_n)}  # Rank position starts from 1

def semantic_search(query: str, index: faiss.IndexFlatL2, top_n=10) -> dict[int, int]:
    """
    Perform FAISS vector search and return ranked results.
    """
    query_embedding = np.array(get_embedding(query)).reshape(1, -1)
    _, vector_top_n = index.search(query_embedding, top_n)  # Retrieve top vector matches

    return {i.item(): rank+1 for rank, i in enumerate(vector_top_n[0])}  # Rank position starts from 1

def hybrid_search(query, bm25: BM25Okapi, index: faiss.IndexFlatL2, tag_embeddings: dict[str, faiss.IndexFlatL2], top_n=10, k=60, bm25_weight = 0, tag_weight=0.9):
    """
    Perform hybrid search using Reciprocal Rank Fusion (RRF).
    """
    bm25_results = keyword_search(query, bm25, top_n=top_n)
    vector_results = semantic_search(query, index, top_n=top_n)
    tag_results = tag_search(query, tag_embeddings, top_n=top_n)

    # results = rrf_fusion({"BM25": bm25_results, "Vector": vector_results}, k=k)

    weights = {"BM25": bm25_weight, "Vector": 1-bm25_weight - tag_weight, "Tag": tag_weight}
    results = weighted_fusion({"BM25": bm25_results, "Vector": vector_results, "Tag": tag_results}, weights)
    
    return [(documents[i], score) for i, score in results]

query = "What courses are good for beginner programmers?"
results = hybrid_search(query, bm25, index, tag_embeddings)

for rank, (doc, score) in enumerate(results, 1):
    source = doc2source[doc]
    print(f"{rank}. { source.display} (type: {source.type}) (Score: {score:.4f})")


{-1: 10}


TypeError: There are no type variables left in collections.defaultdict[int]