In [None]:
import json
import pickle
import numpy as np
import faiss
import re
from dotenv import load_dotenv
import os
from google import genai
from pydantic import BaseModel, Field
from typing import Literal
import json
import os
from sentence_transformers import SentenceTransformer

In [None]:
INDEX_PATH = "indexes/reference.index"
META_PATH = "indexes/chunks_metadata.pkl"
CONFIG_PATH = "indexes/index_config.json"
BM25_PATH = "indexes/bm25.pkl"

with open(BM25_PATH, "rb") as f:
    bm25 = pickle.load(f)

index = faiss.read_index(INDEX_PATH)

with open(META_PATH, "rb") as f:
    chunked_docs = pickle.load(f)

with open(CONFIG_PATH, "r") as f:
    config = json.load(f)

embedder = SentenceTransformer(config["embedding_model"])

In [None]:
load_dotenv()
api_key = os.getenv("LLM_HW_API_KEY")
if not api_key:
    raise ValueError("API Key not set")

client = genai.Client(api_key=api_key)
model = client.models
model_name = "gemini-2.0-flash"

In [None]:
with open("indexes/documents.pkl", "rb") as f:
    documents = pickle.load(f)

print("Loaded", len(documents), "documents from corpus")

In [None]:
class PlagiarismResult(BaseModel):
    verdict: Literal[0, 1] = Field(
        description="Return 1 if the code is Plagiarized (or derived). Return 0 if it is Original."
    )
    explanation: str = Field(
        description="A brief explanation referencing specific retrieved files if plagiarism is found."
    )

In [None]:
def detect_embedding(
    code_snippet: str,
    top_k: int = 5,
    plagiarism_threshold: float = 0.42
):

    query_embedding = embedder.encode(
        [code_snippet],
        normalize_embeddings=True
    ).astype("float32")

    D, I = index.search(query_embedding, top_k)

    results = []
    max_sim = float(np.max(D[0]))

    for score, idx in zip(D[0], I[0]):
        meta = chunked_docs[idx]

        results.append({
            "similarity": float(score),
            "file_name": meta["file_name"],
            "chunk_type": meta["chunk_type"],
            "matched_text_preview": meta["text"][:300]
        })

    plagiarism = max_sim >= plagiarism_threshold


    if max_sim >= plagiarism_threshold:
        verdict = 1
    else:
        verdict = 0

    return {
        "plagiarism": plagiarism,
        "max_similarity": round(max_sim, 4),
        "verdict": verdict,
        "matches": results
    }

In [None]:
def detect_llm(student_code: str):
    """
    Direct LLM Analysis: Dumps the whole repo + student code into the prompt.
    """
    
    corpus_texts = []
    for doc in documents:
        fname = doc.get('file_name', 'unknown_file')
        content = doc.get('raw_code', doc.get('text', '')) 
        corpus_texts.append(f"--- START FILE: {fname} ---\n{content}\n--- END FILE ---\n")
    
    full_repo_context = "\n".join(corpus_texts)
    
    prompt = f"""
    You are a code plagiarism detection system.
    
    RETRIEVED REFERENCE \CODE:
    {full_repo_context}
    
    SUSPICIOUS STUDENT CODE:
    {student_code}
    
    TASK:
    Compare the logic of the Student Code against the Reference Code.
    - If the student code is a clear derivative (same logic, renamed variables, reordered lines), return verdict 1.
    - If the student code uses a different algorithm or logic, return verdict 0.
    """
    try:
        response = model.generate_content(
                model=model_name,
                contents=prompt,
                config={"response_mime_type": "application/json", "response_json_schema": PlagiarismResult.model_json_schema(),}
        )

        structured_output = PlagiarismResult.model_validate_json(response.text) # type: ignore
        
        result = structured_output.model_dump()
        
        result["retrieved_chunks"] = retrieved_chunks
        
        return result
    
    except Exception as e:
        print(f"RAG Error: {e}")
        # Fallback structure so your eval loop doesn't crash (TBH I do not like this result cause if it crashed we are just saying that it was not plagiarism)
        return {
            "verdict": 0, 
            "explanation": f"Error during processing: {str(e)}",
            "retrieved_chunks": retrieved_chunks
        }

In [None]:
def detect_rag(
    student_code: str,
    top_k: int = 5,
):
    query_emb = embedder.encode([student_code], normalize_embeddings=True).astype("float32")
    
    D, I = index.search(query_emb, top_k)
    
    retrieved_chunks = []
    context_texts = []
    for score, idx in zip(D[0], I[0]):
        meta = chunked_docs[idx]
        code_text = meta.get("original_code", meta["text"])
        retrieved_chunks.append({
            "similarity": float(score),
            "file_name": meta["file_name"],
            "chunk_type": meta["chunk_type"],
            "matched_text": code_text
        })
        context_texts.append(f"File: {meta['file_name']}\n{meta.get('original_code', meta['text'])}\n")
    
    context = "\n".join(context_texts)
    
    prompt = f"""
    You are a code plagiarism detection system.
    
    RETRIEVED REFERENCE CODE:
    {context}
    
    SUSPICIOUS STUDENT CODE:
    {student_code}
    
    TASK:
    Compare the logic of the Student Code against the Reference Code.
    - If the student code is a clear derivative (same logic, renamed variables, reordered lines), return verdict 1.
    - If the student code uses a different algorithm or logic, return verdict 0.
    """
    try:
        response = model.generate_content(
                model=model_name,
                contents=prompt,
                config={"response_mime_type": "application/json", "response_json_schema": PlagiarismResult.model_json_schema(),}
        )

        structured_output = PlagiarismResult.model_validate_json(response.text) # type: ignore
        
        result = structured_output.model_dump()
        
        result["retrieved_chunks"] = retrieved_chunks
        
        return result
    
    except Exception as e:
        print(f"RAG Error: {e}")
        # Fallback structure so your eval loop doesn't crash (TBH I do not like this result cause if it crashed we are just saying that it was not plagiarism)
        return {
            "verdict": 0, 
            "explanation": f"Error during processing: {str(e)}",
            "retrieved_chunks": retrieved_chunks
        }

In [None]:
def detect_hybrid_rag(
    student_code: str,
    top_k: int = 5,
):
    query_emb = embedder.encode([student_code], normalize_embeddings=True).astype("float32")

    D, I = index.search(query_emb, k=top_k) 
    

    dense_ranks = {idx: rank for rank, idx in enumerate(I[0])}
    

    tokenized_query = re.findall(r"\w+", student_code.lower())
    bm25_scores = bm25.get_scores(tokenized_query)

    sparse_indices = np.argsort(bm25_scores)[::-1][:top_k]
    sparse_ranks = {idx: rank for rank, idx in enumerate(sparse_indices)}
    
    fused_scores = {}
    
    all_candidates = set(dense_ranks.keys()).union(set(sparse_ranks.keys()))
    
    k_constant = 60
    for idx in all_candidates:
        rank_dense = dense_ranks.get(idx, 100)
        rank_sparse = sparse_ranks.get(idx, 100)
        
        fused_scores[idx] = (1 / (k_constant + rank_dense)) + (1 / (k_constant + rank_sparse))

    top_indices = sorted(fused_scores, key=fused_scores.get, reverse=True)[:top_k] # type: ignore
    

    retrieved_chunks = []
    context_texts = []
    for idx in top_indices:
        if idx < len(chunked_docs):
            meta = chunked_docs[idx]
            retrieved_chunks.append({
                "file_name": meta["file_name"],
                "chunk_type": meta["chunk_type"],
                "rrf_score": float(fused_scores[idx]), # Save fusion score
                "matched_text": meta.get("original_code", meta["text"])
            })
            context_texts.append(f"File: {meta['file_name']}\n{meta.get('original_code', meta['text'])}\n")
    

    context = "\n".join(context_texts)

    prompt = f"""
    You are a code plagiarism detection system.
    
    RETRIEVED REFERENCE CODE:
    {context}
    
    SUSPICIOUS STUDENT CODE:
    {student_code}
    
    TASK:
    Compare the logic of the Student Code against the Reference Code.
    - If the student code is a clear derivative (same logic, renamed variables, reordered lines), return verdict 1.
    - If the student code uses a different algorithm or logic, return verdict 0.
    """
    try:
        response = model.generate_content(
                model=model_name,
                contents=prompt,
                config={"response_mime_type": "application/json", "response_json_schema": PlagiarismResult.model_json_schema(),}
        )

        structured_output = PlagiarismResult.model_validate_json(response.text) # type: ignore
        
        result = structured_output.model_dump()
        
        result["retrieved_chunks"] = retrieved_chunks
        
        return result
    
    except Exception as e:
        print(f"RAG Error: {e}")
        # Fallback structure so your eval loop doesn't crash (TBH I do not like this result cause if it crashed we are just saying that it was not plagiarism)
        return {
            "verdict": 0, 
            "explanation": f"Error during processing: {str(e)}",
            "retrieved_chunks": retrieved_chunks
        }