# Chatbot
This notebook demonstrates the hybrid RAG pipeline using BM25 and vector search over the RDMkit dataset.

In [None]:
!pip install -r /kaggle/input/chatbotcorpus/requirements.txt
!pip install chromadb sentence-transformers
!pip install langchain langchain-core langchain-community
!pip install langchain faiss-cpu whoosh
!pip install rank-bm25
!pip install langchain-huggingface
!pip install -U FlagEmbedding
!pip install whoosh langchain python-dotenv typing_extensions numpy langchain-google-genai langchain-huggingface sentence-transformers langgraph langchain-chroma
!pip install ragas datasets evaluate openai

In [None]:
import os
import json
import shutil
import time
import logging
import hashlib
import threading
import numpy as np
import torch
from tempfile import mkdtemp
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple, Union
from typing_extensions import TypedDict
from concurrent.futures import ThreadPoolExecutor
from enum import Enum

# LangChain imports
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain_core.runnables import RunnableLambda, RunnablePassthrough, RunnableWithMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.chains import create_retrieval_chain, create_history_aware_retriever
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_community.retrievers import BM25Retriever
from langchain import hub

# Vector store and embeddings
from langchain_chroma import Chroma
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
from langchain_huggingface import HuggingFaceEmbeddings

# Whoosh imports
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser, MultifieldParser, OrGroup
from whoosh.query import Or

# Other model-specific imports
from sentence_transformers import CrossEncoder
from FlagEmbedding import FlagReranker

from dotenv import load_dotenv

# RAG Pipeline

In [3]:
class RDMkitRAGPipeline:
	def __init__(self, json_file_path: str = "rdmkit_chunks_cleaned.json"):
		self.store = {}
		# Load environment variables
		path_env = '/kaggle/input/chatbotcorpus/.env'
		load_dotenv(path_env)
		
		# Set Google API key for LLM
		self.google_api_key = os.environ.get("GOOGLE_API_KEY")
		if not self.google_api_key:
			raise ValueError("GOOGLE_API_KEY is not set in environment variables or .env file (needed for Gemini LLM).")
		os.environ["GOOGLE_API_KEY"] = self.google_api_key
		
		# LangChain tracing setup (optional)
		langchain_api_key = os.environ.get("LANGCHAIN_API_KEY_V2")
		if langchain_api_key:
			os.environ["LANGCHAIN_API_KEY"] = langchain_api_key
			os.environ["LANGCHAIN_TRACING_V2"] = "true"
		else:
			os.environ["LANGCHAIN_TRACING_V2"] = "false"
		
		self.embeddings = HuggingFaceEmbeddings(
			model_name="BAAI/bge-large-en-v1.5",
			model_kwargs={'device': 'cuda'}, 
			encode_kwargs={'normalize_embeddings': True}
		)
		self.llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro", temperature=0)
		
		
		# Initialize cross-encoder for re-ranking
		try:
			self.reranker = FlagReranker('BAAI/bge-reranker-base', use_fp16=True)
			self.reranking_enabled = True
			self.reranker_model_name = 'BAAI/bge-reranker-base'
			
			device = "cuda" if torch.cuda.is_available() else "cpu"
			
			
		except Exception as e:
			self.reranker = None
			self.reranking_enabled = False
		
		self.vectorstore = None
		self.retriever = None
		self.json_file_path = json_file_path
		self.corpus_summary = None
	
		self.relevance_cache = {}
		self.cache_size_limit = 1000
		self.corpus_summary = None
		
		# Configurable threshold parameters
		self.min_relevance_threshold = 0.2
		self.dynamic_threshold_factor = 0.5
		self.max_llm_verification_docs = 10

	def get_session_history(self, session_id: str) -> BaseChatMessageHistory:
		if session_id not in self.store:
			self.store[session_id] = ChatMessageHistory()
		return self.store[session_id]

	def _infer_category(self, source_file: str) -> str:
		"""Infer the category from the source file path."""
		parts = source_file.split(os.sep)
		if 'pages' in parts:
			try:
				idx = parts.index('pages')
				if idx + 1 < len(parts) and not parts[idx+1].endswith('.md'):
					return parts[idx+1]
			except ValueError:
				pass
		if '_data' in parts:
			return 'data_file'
		return 'general'

	def initialize_vectorstore(self) -> Dict[str, Any]:
		"""Load RDMkit preprocessed chunks from the JSON file and return status information."""
		try:
			with open(self.json_file_path, 'r', encoding='utf-8') as f:
				chunks = json.load(f)
			
			if not chunks:
				return {
					"success": False,
					"message": "No chunks found in the JSON file",
					"error": "NoChunksError"
				}

			documents = []
			category_counts = {}
			total_char_count = 0

			for chunk in chunks:
				source_file = chunk.get('file_path', '')

				doc_title = chunk.get('file_name', os.path.basename(source_file))
				
				category = self._infer_category(source_file)
				if category:
					category_counts[category] = category_counts.get(category, 0) + 1

				metadata = {
					'chunk_id': chunk.get('id', ''),
					'source_file': source_file,
					'file_stem': os.path.splitext(os.path.basename(source_file))[0],
					'category': category,
					'doc_title': doc_title,
					'description': '', 
					'section_title': '',
					'section_level': 0,
					'content_type': chunk.get('source', 'text'),
					'chunk_index': chunk.get('chunk_index', 0),
					'total_chunks': chunk.get('total_chunks', 1),
					'keywords': '',
					'tags': '',
					'contributors': '',
					'word_count': len(chunk.get('content', '').split()),
					'char_count': len(chunk.get('content', '')),
					'rdm_lifecycle': '',
				}
				
				total_char_count += metadata['char_count']
				
				clean_metadata = {}
				for k, v in metadata.items():
					if isinstance(v, (str, int, float, bool)):
						clean_metadata[k] = v
					else:
						clean_metadata[k] = str(v)
				
				documents.append(Document(
					page_content=chunk.get('content', ''),
					metadata=clean_metadata
				))

			self.corpus_summary = {
				"total_files": len(set(d.metadata['source_file'] for d in documents)),
				"total_chunks": len(documents),
				"categories": category_counts,
				"average_chunk_size": total_char_count / len(documents) if documents else 0
			}

			# Create vector store
			self.vectorstore = Chroma.from_documents(
				documents=documents,
				collection_name="rdmkit-rag-chroma-test",
				embedding=self.embeddings,
			)
			self.retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10})
			self.raw_documents = documents
			
			self._setup_unified_retrieval_system()
			
			return {
				"success": True,
				"message": "RDMkit vector database creation completed from json file",
				"loaded_docs": len(documents),
				"total_files": self.corpus_summary.get('total_files', 0),
				"categories": self.corpus_summary.get('categories', {}),
				"average_chunk_size": self.corpus_summary.get('average_chunk_size', 0)
			}

		except FileNotFoundError:
			return {
				"success": False,
				"message": f"RDMkit JSON file {self.json_file_path} not found",
				"error": "FileNotFoundError"
			}
		except Exception as e:
			import traceback
			print(traceback.format_exc())
			return {
				"success": False,
				"message": f"Error loading RDMkit data: {str(e)}",
				"error": str(e)
			}

	def _setup_unified_retrieval_system(self):
		"""Setup unified retrieval system with history awareness and advanced features."""

		retriever_prompt = (
			"Given a chat history and the latest user question which might reference context in the chat history, "
			"formulate a standalone question which can be understood without the chat history. "
			"Do NOT answer the question, just reformulate it if needed and otherwise return it as is."
		)
		
		contextualize_q_prompt = ChatPromptTemplate.from_messages([
			("system", retriever_prompt),
			MessagesPlaceholder(variable_name="chat_history"),
			("human", "{input}"),
		])
		

		def unified_retriever_func(input_dict: dict) -> List[Document]:
			"""Unified retriever with history awareness and re-ranking (BM25 removed)."""
			question = input_dict.get("input", "")
			chat_history = input_dict.get("chat_history", [])
			
			if chat_history:
				try:
					standalone_question_result = (contextualize_q_prompt | self.llm | StrOutputParser()).invoke({
						"input": question,
						"chat_history": chat_history
					})
					search_question = standalone_question_result
				except Exception as e:
					search_question = question
			else:
				search_question = question
			
			documents = self._vector_retrieve_documents(search_question, k=10)
			reranked_docs = self._rerank_documents(search_question, documents, top_k=5)

			return reranked_docs
		
		system_prompt = (
			"You are the RDMkit Assistant, a specialized AI guide for research data management (RDM). You help researchers, data managers, librarians, and institutions implement best practices for research data management throughout the entire data lifecycle.\n\n"
			"Your Expertise Areas:\n"
			"- FAIR Data Principles: Findable, Accessible, Interoperable, Reusable data practices\n"
			"- Data Lifecycle Management: Planning, collecting, processing, analyzing, preserving, sharing, reusing\n"
			"- Domain-Specific Guidance: Life sciences, social sciences, humanities, engineering, etc.\n"
			"- Role-Based Support: Researchers, data managers, IT support, institutional administrators\n"
			"- Tools & Resources: Recommend appropriate tools, standards, and best practices\n"
			"- Compliance & Policies: GDPR, institutional policies, funder requirements\n"
			"- National Resources: Country-specific RDM infrastructure and support\n\n"
			"Response Guidelines:\n\n"
			"1. Direct & Actionable Answers\n"
			"- For \"write/create/generate\" requests: Provide the actual document, template, or content requested\n"
			"- For \"how to\" questions: Provide step-by-step guidance and implementation advice\n"
			"- For informational queries: Give clear, concise explanations with practical context\n"
			"- Always match the response format to what the user is actually asking for\n\n"
			"2. RDMkit-Specific Context\n"
			"- Reference specific RDMkit pages, sections, or tools when available\n"
			"- Use RDM terminology consistently (e.g., \"data lifecycle stages\", \"FAIR principles\")\n"
			"- Mention relevant domain or role-specific considerations\n"
			"- Connect answers to broader RDM workflows and best practices\n\n"
			"3. Professional RDM Tone\n"
			"- Maintain an expert but approachable tone\n"
			"- Use inclusive language for diverse research communities\n"
			"- Acknowledge complexity while providing clear guidance\n"
			"- Be authoritative about established best practices\n\n"
			"4. Contextual Awareness\n"
			"- Adapt recommendations based on research domain (if mentioned)\n"
			"- Consider institutional vs. individual researcher perspectives\n"
			"- Address different levels of RDM maturity (beginner to advanced)\n"
			"- Mention compliance and policy considerations when relevant\n\n"
			"5. Knowledge Boundaries\n"
			"- If the RDMkit context doesn't contain sufficient information, clearly state:\n"
			"  \"This specific topic is not covered in the available RDMkit documentation.\"\n"
			"- Suggest alternative approaches: \"For this question, I recommend consulting [relevant authority/resource]\"\n"
			"- Never make up information outside the provided RDMkit context\n\n"
			"6. RDMkit Integration\n"
			"- Always connect individual practices to the broader RDM ecosystem\n"
			"- Mention how the guidance fits within data lifecycle stages\n"
			"- Reference relevant national or institutional resources when available\n"
			"- Emphasize community and collaborative approaches to RDM\n\n"
			"Use the following pieces of retrieved context to answer the question. If no relevant context is provided, clearly state that the information is not available in the RDMkit knowledge base.\n\n"
			"Context: {context}"
		)
		
		qa_prompt = ChatPromptTemplate.from_messages([
			("system", system_prompt),
			MessagesPlaceholder("chat_history"),
			("human", "{input}"),
		])
		
		def format_docs(docs: List[Document]) -> str:
			"""Format documents for context."""
			if not docs:
				return "No relevant documents found in the RDMkit knowledge base."
			
			formatted_context = []
			for doc in docs:
				category = doc.metadata.get('category', 'general')
				section = doc.metadata.get('section_title', '')
				content = doc.page_content
				
				context_piece = f"[{category}]"
				if section:
					context_piece += f" {section}:"
				context_piece += f" {content}"
				
				formatted_context.append(context_piece)
			
			return "\n\n".join(formatted_context)
	
		retriever_runnable = RunnableLambda(unified_retriever_func)
	
		answer_generation_chain = (
			RunnablePassthrough.assign(
				context=lambda x: format_docs(x["documents"])
			)
			| qa_prompt
			| self.llm
			| StrOutputParser()
		)
	
		self.unified_rag_chain = RunnablePassthrough.assign(
			documents=retriever_runnable
		).assign(
			answer=answer_generation_chain
		)

		
	def query(self, question: str, session_id: str = "default") -> Dict[str, Any]:
		if self.vectorstore is None:
			return {
				"success": False,
				"message": "RDMkit vector database not loaded, please initialize first",
				"answer": None
			}
		
		try:
			chat_history = []
			if session_id in self.store:
				history = self.store[session_id]
				# Convert message history to simple format
				for message in history.messages:
					if hasattr(message, 'content'):
						if message.type == 'human':
							chat_history.append(f"Human: {message.content}")
						elif message.type == 'ai':
							chat_history.append(f"Assistant: {message.content}")
			
			chain_input = {
				"input": question,
				"chat_history": chat_history
			}
			
			result_from_chain = self.unified_rag_chain.invoke(chain_input)
			
			answer = result_from_chain.get("answer")
			final_docs = result_from_chain.get("documents", [])
			
			if session_id not in self.store:
				self.store[session_id] = ChatMessageHistory()
			
			self.store[session_id].add_user_message(question)
			self.store[session_id].add_ai_message(answer)
			
			citations = self._extract_rdmkit_citations(final_docs)
			contexts = [doc.page_content for doc in final_docs]
			
			return {
				"success": True,
				"question": question,
				"answer": answer,
				"contexts": contexts,
				"documents_used": len(final_docs),
				"citations": citations,
				"categories_used": self._get_categories_used(final_docs),
				"rdm_lifecycle_stages": self._get_lifecycle_stages(final_docs)
			}
		
		except Exception as e:
			import traceback
			return {
				"success": False,
				"message": f"RDMkit query processing error: {str(e)}",
				"answer": None,
				"error": str(e)
			}

	def _rerank_documents(self, query: str, documents: List[Document], top_k: int = 5) -> List[Document]:
		"""
		Re-ranks documents using BGE Reranker if available (BM25 removed).
		"""
		if not documents or len(documents) <= top_k:
			return documents

		title_filtered_docs = self._title_priority_filter(query, documents)
		
		if not (self.reranking_enabled and self.reranker):
			return title_filtered_docs[:top_k]

		try:
			doc_scores = self._get_bge_reranker_scores_with_cache(query, title_filtered_docs)
			enhanced_scores = self._combine_scores_with_titles(doc_scores)
			filtered_docs_with_scores = self._apply_dynamic_threshold_filtering(enhanced_scores, top_k)
			final_docs = [doc for doc, score in filtered_docs_with_scores[:top_k]]
			
			print(f"BGE re-ranking successful. Selected {len(final_docs)} documents.")
			return final_docs
			
		except Exception as e:
			import traceback
			return title_filtered_docs[:top_k]

	def _extract_rdmkit_citations(self, documents: List[Document]) -> List[Dict[str, str]]: 
		citations = []
		seen_sources = set()
	
		for doc in documents:
			source_file = doc.metadata.get("source_file", "")
			if source_file and source_file not in seen_sources:
				seen_sources.add(source_file)
				
				# Generate RDMkit link
				if source_file.endswith('.md'):
					# Handle nested paths
					if '/' in source_file:
						# For files in subdirectories, use the directory and filename
						parts = source_file.split('/')
						if len(parts) > 1:
							page_name = os.path.splitext(parts[-1])[0]
							directory = parts[-2]
							link = f"https://rdmkit.elixir-europe.org/{page_name}"
						else:
							page_name = os.path.splitext(source_file)[0]
							link = f"https://rdmkit.elixir-europe.org/{page_name}"
					else:
						page_name = os.path.splitext(source_file)[0]
						link = f"https://rdmkit.elixir-europe.org/{page_name}"
					
					citations.append({
						"source_file": source_file,
						"link": link,
						"doc_title": doc.metadata.get("doc_title", ""),
						"section_title": doc.metadata.get("section_title", ""),
						"category": doc.metadata.get("category", ""),
						"content_type": doc.metadata.get("content_type", "text")
					})
		
		return citations

	def _get_categories_used(self, documents: List[Document]) -> List[str]:
		categories = set()
		for doc in documents:
			category = doc.metadata.get("category", "")
			if category:
				categories.add(category)
		return list(categories)
	
	def _get_lifecycle_stages(self, documents: List[Document]) -> List[str]:
		stages = set()
		for doc in documents:
			rdm_lifecycle = doc.metadata.get("rdm_lifecycle", "")
			if rdm_lifecycle:
				stages.update(rdm_lifecycle.split(', '))
		return list(stages)
	
	def _vector_retrieve_documents(self, question: str, k: int = 10) -> List[Document]:
		vector_docs = self.retriever.invoke(question)
		
		enhanced_docs = []
		for doc in vector_docs:
			doc.metadata['retrieval_source'] = 'vector'
			title_relevance = self._calculate_title_relevance(question, doc)
			doc.metadata['title_relevance'] = title_relevance
			enhanced_docs.append(doc)
		
		return enhanced_docs[:k]
	
	def _calculate_title_relevance(self, query: str, document: Document) -> float:
		"""
		- Compute the relevance between the document and the query title.
		- Add positive weight for matching titles or sections.
		- Apply penalty for mismatched categories or intents.
		"""
		query_lower = query.lower()
		query_words = {word for word in query_lower.split() if len(word) > 2}
		
		doc_title_words = {word for word in document.metadata.get('doc_title', '').lower().split() if len(word) > 2}
		doc_section_words = {word for word in document.metadata.get('section_title', '').lower().split() if len(word) > 2}
		doc_category = document.metadata.get('category', '')

		title_bonus = 0.6 if query_words.intersection(doc_title_words) else 0.0
		section_bonus = 0.3 if query_words.intersection(doc_section_words) else 0.0

		penalty = 0.0
		
		general_query_terms = ['template', 'guideline', 'best practice', 'example']
		specific_doc_categories = ['national_resources', 'domain_specific']
		if any(term in query_lower for term in general_query_terms) and doc_category in specific_doc_categories:
			penalty += 0.4
			
		concept_query_terms = ['template', 'policy', 'guideline', 'concept']
		if any(term in query_lower for term in concept_query_terms) and doc_category == 'tool_assembly':
			penalty += 0.5

		final_score = title_bonus + section_bonus - penalty
		final_score = max(0, final_score)
		
		document.metadata['relevance_score_breakdown'] = {
			'title_bonus': title_bonus,
			'section_bonus': section_bonus,
			'penalty': -penalty,
			'total': final_score
		}
		
		return final_score
	
	def _title_priority_filter(self, query: str, documents: List[Document]) -> List[Document]:
		documents.sort(key=lambda x: x.metadata.get('title_relevance', 0.0), reverse=True)
		
		high_title_docs = [doc for doc in documents if doc.metadata.get('title_relevance', 0.0) > 0.1]
		other_docs = [doc for doc in documents if doc.metadata.get('title_relevance', 0.0) <= 0.1]
		
		if len(high_title_docs) < len(documents) * 0.3:
			needed = min(len(documents) - len(high_title_docs), len(other_docs))
			result = high_title_docs + other_docs[:needed]
		else:
			result = high_title_docs
		
		print(f"filter: {len(documents)} -> {len(result)} documents")
		return result
	
	def _combine_scores_with_titles(self, doc_scores: List[Tuple[Document, float]]) -> List[Tuple[Document, float]]:
		enhanced_scores = []
		
		for doc, cross_score in doc_scores:
			title_score = doc.metadata.get('title_relevance', 0.0)

			combined_score = cross_score * 0.8 + title_score * 0.2
			doc.metadata['combined_score'] = combined_score
			enhanced_scores.append((doc, combined_score))
		
		return enhanced_scores

	def format_response_with_citations(self, result: Dict[str, Any]) -> str:
		if not result["success"]:
			return f" {result['message']}"
		
		response = f"\n **Answer:** {result['answer']}\n"
		
		if result.get("categories_used"):
			response += f"\n **RDMkit Categories:** {', '.join(result['categories_used'])}\n"
		
		if result.get("rdm_lifecycle_stages"):
			response += f"\n **Data Lifecycle Stages:** {', '.join(result['rdm_lifecycle_stages'])}\n"
		
		if result.get("documents_used", 0) == 0:
			response += "\n⚠️ **Note:** No relevant documents found in the RDMkit knowledge base for this query.\n"
		elif result.get("citations"):
			response += "\n **RDMkit References:**\n"
			for i, citation in enumerate(result["citations"], 1):
				title = citation.get('doc_title', citation.get('source_file', 'Unknown'))
				section = citation.get('section_title', '')
				category = citation.get('category', '')
				
				citation_text = f"[{i}] {title}"
				if section:
					citation_text += f" - {section}"
				if category:
					citation_text += f" ({category})"
				citation_text += f" - {citation['link']}\n"
				
				response += citation_text
		
		return response

	def get_status(self) -> Dict[str, Any]:
		"""Get RDMkit system status"""
		status = {
			"vectorstore_initialized": self.vectorstore is not None,
			"retriever_ready": self.retriever is not None,
			"json_file_path": self.json_file_path,
			"model_loaded": self.llm is not None,
			"reranking_enabled": self.reranking_enabled,
			"reranking_method": f"bge-reranker ({self.reranker_model_name})" if self.reranking_enabled else "llm-based"
		}
		
		if self.corpus_summary:
			status.update({
				"corpus_summary": self.corpus_summary,
				"total_files": self.corpus_summary.get('total_files', 0),
				"total_chunks": self.corpus_summary.get('total_chunks', 0),
				"categories": self.corpus_summary.get('categories', {}),
				"average_chunk_size": self.corpus_summary.get('average_chunk_size', 0)
			})
		
		return status

	def update_keyword_config(self, 
						 keyword_boost_factor: float = None,
						 bm25_boost_factor: float = None,
						 min_keyword_relevance: float = None,
						 keyword_exact_match_bonus: float = None):
		if keyword_boost_factor is not None:
			self.keyword_boost_factor = keyword_boost_factor
		if bm25_boost_factor is not None:
			self.bm25_boost_factor = bm25_boost_factor
		if min_keyword_relevance is not None:
			self.min_keyword_relevance = min_keyword_relevance
		if keyword_exact_match_bonus is not None:
			self.keyword_exact_match_bonus = keyword_exact_match_bonus
	
	def _get_cached_relevance(self, query_hash: str, doc_id: str) -> Optional[float]:
		cache_key = f"{query_hash}:{doc_id}"
		return self.relevance_cache.get(cache_key)

	def _cache_relevance(self, query_hash: str, doc_id: str, score: float):
		cache_key = f"{query_hash}:{doc_id}"
	
		if len(self.relevance_cache) >= self.cache_size_limit:
			oldest_key = next(iter(self.relevance_cache))
			del self.relevance_cache[oldest_key]
	
		self.relevance_cache[cache_key] = score

	def _get_crossencoder_scores_with_cache(self, query: str, documents: List[Document]) -> List[Tuple[Document, float]]:
		import hashlib
	
		query_hash = hashlib.md5(query.encode()).hexdigest()[:10]
	
		cached_results = []
		uncached_docs = []
	
		for doc in documents:
			doc_id = doc.metadata.get('chunk_id', f"doc_{hash(doc.page_content[:100])}")
			cached_score = self._get_cached_relevance(query_hash, doc_id)
		
			if cached_score is not None:
				cached_results.append((doc, cached_score))
			else:
				uncached_docs.append(doc)
	
		if uncached_docs:
			try:
				query_doc_pairs = [(query, doc.page_content) for doc in uncached_docs]
				scores = self.cross_encoder.predict(query_doc_pairs)
			
				for doc, score in zip(uncached_docs, scores):
					doc_id = doc.metadata.get('chunk_id', f"doc_{hash(doc.page_content[:100])}")
					self._cache_relevance(query_hash, doc_id, float(score))
					cached_results.append((doc, float(score)))
				
			except Exception as e:
				for doc in uncached_docs:
					cached_results.append((doc, 0.1))
	
		return cached_results

	def _get_bge_reranker_scores_with_cache(self, query: str, documents: List[Document]) -> List[Tuple[Document, float]]:
		import hashlib
		
		query_hash = hashlib.md5(query.encode()).hexdigest()[:10]
		
		cached_results = []
		uncached_docs = []
		
		for doc in documents:
			doc_id = doc.metadata.get('chunk_id', f"doc_{hash(doc.page_content[:100])}")
			cached_score = self._get_cached_relevance(query_hash, doc_id)
			
			if cached_score is not None:
				cached_results.append((doc, cached_score))
			else:
				uncached_docs.append(doc)
		
		if uncached_docs:
			try:
				query_doc_pairs = []
				for doc in uncached_docs:
					content = doc.page_content[:2000] if len(doc.page_content) > 2000 else doc.page_content
					query_doc_pairs.append([query, content])

				scores = self.reranker.compute_score(query_doc_pairs)
				
				if not isinstance(scores, (list, np.ndarray)):
					scores = [scores]
				
				epsilon = 1e-9
				normalized_scores = [1 / (1 + np.exp(-s)) if np.exp(-s) != np.inf else epsilon for s in scores]
				
				for doc, raw_score, norm_score in zip(uncached_docs, scores, normalized_scores):
					doc_id = doc.metadata.get('chunk_id', f"doc_{hash(doc.page_content[:100])}")
					self._cache_relevance(query_hash, doc_id, float(norm_score))
					cached_results.append((doc, float(norm_score)))
					
					doc.metadata['bge_raw_score'] = float(raw_score)
					doc.metadata['bge_normalized_score'] = float(norm_score)
				
			except Exception as e:
				for doc in uncached_docs:
					cached_results.append((doc, 0.1))
		
		return cached_results

	def _apply_dynamic_threshold_filtering(self, doc_scores: List[Tuple[Document, float]], top_k: int) -> List[Tuple[Document, float]]:
		if not doc_scores:
			return doc_scores
	
		scores = [score for _, score in doc_scores]

		mean_score = np.mean(scores)
		std_score = np.std(scores)

		dynamic_threshold = max(
			self.min_relevance_threshold, 
			mean_score - self.dynamic_threshold_factor * std_score
		)

		filtered_docs = [(doc, score) for doc, score in doc_scores if score >= dynamic_threshold]

		if len(filtered_docs) < 2:
			doc_scores.sort(key=lambda x: x[1], reverse=True)
			filtered_docs = doc_scores[:min(3, len(doc_scores))]

		filtered_docs.sort(key=lambda x: x[1], reverse=True)
	
		return filtered_docs


	def clear_cache(self):
		self.relevance_cache.clear()

	def get_cache_stats(self) -> Dict[str, Any]:
		return {
			"cache_size": len(self.relevance_cache),
			"cache_limit": self.cache_size_limit,
			"cache_usage": f"{len(self.relevance_cache)}/{self.cache_size_limit} ({len(self.relevance_cache)/self.cache_size_limit*100:.1f}%)"
		}

	def update_reranking_config(self, 
							min_relevance_threshold: float = None,
							dynamic_threshold_factor: float = None,
							max_llm_verification_docs: int = None):
		if min_relevance_threshold is not None:
			self.min_relevance_threshold = min_relevance_threshold
		if dynamic_threshold_factor is not None:
			self.dynamic_threshold_factor = dynamic_threshold_factor
		if max_llm_verification_docs is not None:
			self.max_llm_verification_docs = max_llm_verification_docs

# Usage functions

In [4]:
def create_rdmkit_rag_service(json_path: str = "rdmkit_chunks_cleaned.json") -> RDMkitRAGPipeline:
    rag = RDMkitRAGPipeline(json_path)
    init_result = rag.initialize_vectorstore()
    if not init_result["success"]:
        raise RuntimeError(f"RDMkit RAG initialization failed: {init_result['message']}")
    return rag

In [5]:
def run_rdmkit_cli_interface(json_path: str = "rdmkit_chunks_cleaned.json"):
    print("Initializing RDMkit RAG system...")
    
    try:
        rag = RDMkitRAGPipeline(json_path)
        init_result = rag.initialize_vectorstore()
        
        if not init_result["success"]:
            print(f"RDMkit initialization failed: {init_result['message']}")
            return
        
        print(f"RDMkit system initialization successful!")
        print(f"Loaded {init_result['loaded_docs']} document chunks from {init_result['total_files']} files")
        print(f"Categories: {init_result['categories']}")
        print(f"Average chunk size: {init_result['average_chunk_size']:.1f} characters")
        
        status = rag.get_status()
        rerank_method = status.get('reranking_method', 'unknown')
        print(f"Re-ranking method: {rerank_method}")
        print(f"RDMkit knowledge base Q&A system is ready!")
        print("Enter 'exit' or 'quit' to exit the program\n")
        
        while True:
            try:
                question = input("Please enter your RDMkit question: ").strip()
                
                if question.lower() in ['exit', 'quit']:
                    print("Thank you for using RDMkit RAG, goodbye!")
                    break
                
                if not question:
                    print("Please enter a valid question about research data management")
                    continue
                
                print("Processing your RDMkit question...")
                result = rag.query(question)
                response = rag.format_response_with_citations(result)
                print(response)
                
            except KeyboardInterrupt:
                print("\n\nThank you for using RDMkit RAG, goodbye!")
                break
            except Exception as e:
                print(f"Error during processing: {e}")
        
    except Exception as e:
        print(f"RDMkit system initialization failed: {e}")

# Run

In [None]:
if __name__ == "__main__":
    run_rdmkit_cli_interface("/kaggle/input/chatbotcorpus/rdmkit_chunks_cleaned.json")

# RAGAS

In [None]:
import os
import pandas as pd
from datasets import Dataset
from ragas import evaluate
from ragas import SingleTurnSample
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    LLMContextPrecisionWithoutReference
)
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from kaggle_secrets import UserSecretsClient
from datetime import datetime

try:
    user_secrets = UserSecretsClient()
    google_api_key = user_secrets.get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = google_api_key
except Exception as e:
    raise

results_dir = "/kaggle/working/ragas_evaluation_results"
os.makedirs(results_dir, exist_ok=True)

evaluation_questions = [
    "How do I ensure my sensitive data complies with GDPR?"
]

json_file_path = "/kaggle/input/chatbotcorpus/rdmkit_chunks_cleaned.json"
pipeline = RDMkitRAGPipeline(json_file_path=json_file_path)
pipeline.initialize_vectorstore()

metrics_to_evaluate = [
    faithfulness,
    answer_relevancy,
    LLMContextPrecisionWithoutReference(),
]

ragas_llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro", temperature=0)
ragas_embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")

all_qa_data = []

for i, question in enumerate(evaluation_questions, 1):
    print(f"\n{'='*60}")
    print(f" {i}/{len(evaluation_questions)}: {question[:50]}...")
    print('='*60)
    
    result = pipeline.query(question, session_id=f"ragas-eval-{i}")
    
    if not result["success"]:
        continue
        
    if not result["answer"] or not result["contexts"]:
        continue
    
    qa_item = {
        "question": result["question"],
        "answer": result["answer"],
        "contexts": result["contexts"],
    }
    all_qa_data.append(qa_item)
    
    single_dataset = Dataset.from_list([qa_item])
    
    try:
        single_results = evaluate(
            dataset=single_dataset,
            metrics=metrics_to_evaluate,
            llm=ragas_llm,
            embeddings=ragas_embeddings,
            batch_size=1,
            raise_exceptions=True,
        )
        
        single_df = single_results.to_pandas()
        evaluation_scores = single_df.iloc[0].to_dict()
        
        
    except Exception as e:
        evaluation_scores = {"error": str(e)}
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"question_{i:02d}_{timestamp}.txt"
    filepath = os.path.join(results_dir, filename)
    
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write("="*80 + "\n")
        f.write(f"RAGAS result {i}\n")
        f.write(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write("="*80 + "\n\n")
        
        f.write("Questioin:\n")
        f.write("-" * 40 + "\n")
        f.write(f"{result['question']}\n\n")
        
        f.write("FINAL DOCUMENTS USED FOR ANSWER GENERATION:\n")
        f.write("-" * 40 + "\n")
        if result.get('contexts') and isinstance(result['contexts'], list):
            for idx, context in enumerate(result['contexts'], 1):
                f.write(f"DocumentDocument {idx}:\n")
                f.write(f"{context}\n")
                f.write("-" * 20 + "\n")
        else:
            f.write("ERROR\n")
        f.write("\n")
        
        f.write("Answer:\n")
        f.write("-" * 40 + "\n")
        f.write(f"{result['answer']}\n\n")
        
        f.write("RAGAS:\n")
        f.write("-" * 40 + "\n")
        if "error" not in evaluation_scores:
            for metric_name, score in evaluation_scores.items():
                if metric_name not in ['question', 'answer', 'contexts']:
                    if isinstance(score, (int, float)) and not pd.isna(score):
                        f.write(f"{metric_name}: {score:.4f}\n")
                    else:
                        f.write(f"{metric_name}: {score}\n")
        else:
            f.write(f"{evaluation_scores['error']}\n")
        
        f.write("\n" + "="*80 + "\n")
    
    print("done")