In [1]:
import numpy as np
from dotenv import load_dotenv, find_dotenv
import os
import json
import chromadb
from langchain_huggingface import HuggingFaceEmbeddings
from llama_index.core import (
    VectorStoreIndex, 
    SimpleDirectoryReader,
    StorageContext,
    Settings
)
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.langchain import LangchainEmbedding
from llama_index.core.response_synthesizers import ResponseMode
from llama_index.llms.gemini import Gemini
from llama_index.core.schema import QueryBundle
from llama_index.core.output_parsers import PydanticOutputParser
from llama_index.core.prompts import PromptTemplate
from pydantic import BaseModel, Field
from typing import List

_ = load_dotenv(find_dotenv())

# Initialize Gemini
os.environ["GEMINI_API_KEY"] = os.getenv('GEMINI_API_KEY')
llm = Gemini(model="models/gemini-2.0-flash", temperature=1)

# Initialize embedding model
lc_embed_model = HuggingFaceEmbeddings(
    model_name="intfloat/multilingual-e5-small"
)
embed_model = LangchainEmbedding(lc_embed_model)
Settings.embed_model = embed_model

# Initialize ChromaDB
# chroma_client = chromadb.PersistentClient(path="./chromadb")

  llm = Gemini(model="models/gemini-2.0-flash", temperature=1)


In [2]:
chroma_client = chromadb.PersistentClient(path="../chromadb")

def load_database():
    """Connect database with ChromaDB backend."""
    chroma_collection = chroma_client.get_collection("unit1_db")
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    
    index = VectorStoreIndex.from_vector_store(
        vector_store=vector_store,
        embed_model=embed_model,
    )
    return index


In [None]:
# from llama_index.core.prompts import PromptTemplate
# from llama_index.core.output_parsers import PydanticOutputParser
# from prototype.config.settings import llm
# from prototype.schemas.quiz import QuizQuestion

# def generate_questions_batch(chunks, n_questions):
#     """Generates multiple questions from given chunks of text"""
#     combined_content = "\n\n".join(chunks)
#     output_parser = PydanticOutputParser(QuizQuestion)
    
#     prompt_template = PromptTemplate(
#         template="""
#             Generate {n_questions} different multiple-choice questions based on the following content.
#             Make sure questions cover different aspects and concepts from the content.
#             Content: {content}
            
#             For each question, provide output in exactly this format:
#             {format_instructions}
            
#             Generate exactly {n_questions} questions, with each question separated by two newlines.
#             DO NOT return a JSON array. Return each question in the exact format specified above.
#         """
#     )
    
#     prompt = prompt_template.format(
#         content=combined_content,
#         n_questions=n_questions,
#         format_instructions=output_parser.get_format_string()
#     )
    
#     response = llm.complete(prompt)
#     return parse_questions(response.text, output_parser, n_questions)

# def generate_explanation(question, correct_answer):
#     """Generates explanation for a question"""
#     template = PromptTemplate(
#         template="Provide an explanation for the following question and answer:\n\nQuestion: {question}\nCorrect Answer: {correct_answer}\n\nExplanation:"
#     )
#     prompt = template.format(question=question, correct_answer=correct_answer)
#     response = llm.complete(prompt)
#     return response.text

# def parse_questions(response_text, output_parser, n_questions):
#     """Parse generated questions from response"""
#     questions_data = []
#     question_texts = response_text.split("\n\n")
    
#     for q_text in question_texts:
#         if not q_text.strip():
#             continue
            
#         q_text = q_text.strip()
#         if q_text.startswith("```json"):
#             q_text = q_text[7:]
#         if q_text.endswith("```"):
#             q_text = q_text[:-3]
            
#         try:
#             result = output_parser.parse(q_text)
#             questions_data.append(format_question(result))
            
#             if len(questions_data) >= n_questions:
#                 break
                
#         except Exception as e:
#             print(f"Error parsing question: {e}")
#             continue
            
#     return questions_data[:n_questions]

# def format_question(result):
#     """Format parsed question data"""
#     question = result.question
#     correct_answer = result.correct_answer
#     options = ["Option A", "Option B", "Option C", "Option D"]
#     answers = [result.option_a, result.option_b, result.option_c, result.option_d]
    
#     pre_answer = ['A) ', 'B) ', 'C) ', 'D) ']
#     formatted_question = question + '\n' + " ".join([pre + " " + answer for pre, answer in zip(pre_answer, answers)])
    
#     correct_option = options[answers.index(correct_answer)]
#     explanation = generate_explanation(question, correct_answer)
    
#     return (formatted_question, options, correct_option, explanation)

In [9]:
from typing import List, Dict
from llama_index.core.schema import Node, NodeWithScore, QueryBundle, MediaResource
from llama_index.core.retrievers import BaseRetriever
from elasticsearch import Elasticsearch
import numpy as np
import torch
import json
from llama_index.core.vector_stores.types import VectorStoreQuery

class SemanticSearchRetriever(BaseRetriever):
    """Retriever specialized for semantic search using embeddings (vector search)"""
    def __init__(self, vector_index, embed_model, top_k=4):
        self.vector_index = vector_index
        self.embed_model = embed_model
        self.top_k = top_k
        
    def _retrieve(self, query_str: str) -> List[NodeWithScore]:
        # Add query prefix for better embedding
        modified_query = f"query: {query_str}"
        
        # Generate query embedding
        query_embedding = self.embed_model.get_text_embedding(modified_query)
        
        # Perform vector search
        vector_results = self.vector_index.vector_store.query(
            query=VectorStoreQuery(
                query_embedding=query_embedding,
                similarity_top_k=self.top_k
            )
        )
        
        return [
            NodeWithScore(node=node, score=score)
            for node, score in zip(vector_results.nodes, vector_results.similarities or [])
        ]
    
class SemanticReranker:
    """Reranker using semantic similarity"""
    def __init__(self, embed_model):
        self.embed_model = embed_model
        
    def compute_similarity(self, query_embedding: List[float], doc_embedding: List[float]) -> float:
        try:
            # Convert to torch tensors
            query_emb = torch.tensor(query_embedding).reshape(1, -1)
            doc_emb = torch.tensor(doc_embedding).reshape(1, -1)
            
            # Calculate similarity
            similarity = torch.nn.functional.cosine_similarity(query_emb, doc_emb, dim=1)

            # Convert to float
            return float(similarity.item())
        except Exception as e:
            print(f"Error computing similarity: {str(e)}")
            return 0.0

class KeywordRetriever:
    """Retriever specialized for keyword-based search using Elasticsearch"""
    
    def __init__(self, es_host: str, index_name: str, top_k: int = 4):
        try:
            self.es_client = Elasticsearch(es_host)
            if not self.es_client.ping():
                raise ConnectionError("Could not connect to Elasticsearch")
        except Exception as e:
            raise ConnectionError(f"Error connecting to Elasticsearch: {str(e)}")
            
        self.index_name = index_name
        self.top_k = top_k
            
    def retrieve(self, query_str: str) -> List[Dict]:
        """Tìm kiếm trên Elasticsearch với full-text search."""
        try:
            search_query = {
                "size": self.top_k,
                "query": {
                    "match": {
                        "content": {
                            "query": query_str,
                            "boost": 1.0
                        }
                    }
                }
            }
            
            results = self.es_client.search(
                index=self.index_name,
                body=search_query,
            )
            
            return results["hits"]["hits"]
        except Exception as e:
            print(f"Error during Elasticsearch search: {str(e)}")
            return []

    def retrieve_by_id(self, chunk_ids: List[str]) -> List[Dict]:
        """Truy xuất các chunks liên quan bằng ID."""
        if not chunk_ids:
            return []
        
        try:
            search_query = {
                "size": len(chunk_ids),
                "query": {
                    "terms": {
                        "id": chunk_ids
                    }
                }
            }
            
            results = self.es_client.search(
                index=self.index_name,
                body=search_query,
            )
            
            return results["hits"]["hits"]
        except Exception as e:
            print(f"Error retrieving related chunks: {str(e)}")
            return []

class HybridSearchRetriever(BaseRetriever):
    """Retriever kết hợp Elasticsearch (BM25) và Semantic Search (vector search)."""

    def __init__(
        self,
        es_host: str,
        index_name: str,
        embed_model,
        initial_top_k: int = 20,
        final_top_k: int = 4
    ):
        self.keyword_retriever = KeywordRetriever(
            es_host=es_host,
            index_name=index_name,
            top_k=initial_top_k
        )
        self.reranker = SemanticReranker(embed_model)
        self.embed_model = embed_model
        self.final_top_k = final_top_k
        
    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        # Step 1: Initial retrieval with keywords
        bm25_results = self.keyword_retriever.retrieve(query_bundle.query_str)
        
        # Step 2: Prepare for reranking
        modified_query = f"query: {query_bundle.query_str}"
        query_embedding = self.embed_model.get_text_embedding(modified_query)
        
        reranked_results = []
        
        # Step 3: Rerank using semantic similarity
        for hit in bm25_results:
            content = hit['_source']['content']
            
            # Get document embedding
            doc_embedding = self.embed_model.get_text_embedding(content)
            
            # Calculate semantic similarity
            semantic_score = self.reranker.compute_similarity(
                query_embedding,
                doc_embedding
            )
            
            # Combine BM25 and semantic scores
            es_score = hit['_score']
            final_score = (semantic_score + es_score) / 2
            
            node = Node(
                text_resource=MediaResource(text=content),
                metadata={
                    "es_score": es_score,
                    "semantic_score": semantic_score
                }
            )
            reranked_results.append({
                'node': node,
                'score': final_score
            })

        # Step 4: Sort by final score
        sorted_results = sorted(
            reranked_results,
            key=lambda x: x['score'],
            reverse=True
        )
        
        # Get initial top_k results
        top_k_results = sorted_results[:self.final_top_k]
        
        # # Step 1.5: Lấy danh sách related_chunks từ metadata
        # related_chunk_ids = set()
        # for hit in bm25_results:
        #     node_content = json.loads(hit['_source']['metadata']['_node_content'])
        #     metadata = node_content.get("metadata", {})
        #     related_chunks = metadata.get('related_chunks', "")
        #     if related_chunks:
        #         related_chunk_ids.update(related_chunks.split(", "))

        # # Truy vấn thêm related_chunks nếu có
        # related_chunks = self.keyword_retriever.retrieve_by_id(list(related_chunk_ids))
        
        # # Gộp kết quả
        # all_results = bm25_results + related_chunks
        
        # Step 5: Get related chunks from top_k results
        existing_chunk_ids = set()
        related_chunk_ids = set()
        for result in top_k_results:
            hit = result['node'].metadata.get('es_hit', {})
            try:
                chunk_id = hit['_source']['id']
                existing_chunk_ids.add(chunk_id)
                
                # Get related chunks
                node_content = json.loads(hit['_source']['metadata']['_node_content'])
                metadata = node_content.get("metadata", {})
                related_chunks = metadata.get('related_chunks', "")
                if related_chunks:
                    related_chunk_ids.update(related_chunks.split(", "))
            except (json.JSONDecodeError, KeyError, TypeError):
                continue
            
        # Remove any related_chunk_ids that are already in top_k_results
        related_chunk_ids = related_chunk_ids - existing_chunk_ids

        # Retrieve and process related chunks
        final_results = top_k_results
        if related_chunk_ids:
            related_chunks = self.keyword_retriever.retrieve_by_id(list(related_chunk_ids))
            
            # Convert related chunks to NodeWithScore format
            for hit in related_chunks:
                content = hit['_source']['content']
                node = Node(
                    text_resource=MediaResource(text=content),
                    metadata={
                        "es_score": hit['_score'],
                        "is_related": True
                    }
                )
                final_results.append({
                    'node': node,
                    'score': hit['_score']  # Use ES score for related chunks
                })

        # Return all results without limiting
        return [
            NodeWithScore(node=item['node'], score=item['score'])
            for item in final_results
        ]

def get_relevant_chunk(
    vector_index,
    embed_model,
    es_host: str,
    es_index_name: str,
    question_context: str,
    top_k: int = 3
) -> List[str]:
    """Truy xuất các đoạn văn bản liên quan bằng hybrid search (BM25 + Semantic Search)."""
    
    # Tạo hybrid retriever
    hybrid_retriever = HybridSearchRetriever(
        es_host=es_host,
        index_name=es_index_name,
        # vector_index=vector_index,
        embed_model=embed_model,
        initial_top_k=10,
        final_top_k=top_k
    )
    
    # Truy vấn kết quả
    results = hybrid_retriever._retrieve(QueryBundle(question_context))

    # Trả về các đoạn văn bản phù hợp
    return [node.node.text_resource.text for node in results]

In [13]:
prompt = "Unit 1"
import nest_asyncio
import asyncio
from googletrans import Translator

nest_asyncio.apply()

async def translate_to_english(text):
    translator = Translator()
    translation = await translator.translate(text, src="vi", dest="en")
    return translation.text

# Chạy mà không gặp lỗi event loop
prompt = asyncio.run(translate_to_english(prompt))


In [15]:
index = load_database()
text_chunks = get_relevant_chunk(vector_index=index, 
                                 embed_model=embed_model,
                                 es_host="http://localhost:9200",
                                 es_index_name="vocabulary",
                                 question_context=prompt, 
                                 top_k=4
                                 )
# questions_data = generate_questions_batch(text_chunks, 3)

  if not self.es_client.ping():
  results = self.es_client.search(


In [16]:
text_chunks

['passage: Unit: 1. Life stories we admire\nSection: Getting Started\nType: list\n3 Find words and a phrase in 1 with the following meanings.\n1 a _________ descriptions of things that have happened\n2 d_________ the end of somebody’s life\n3 d_________ to giving time, attention, etc. to something\n4 y_________ the period of time when a person is young',
 'passage: Unit: 1. Life stories we admire\nSection: Getting Started\nType: text\nACTIVITY 4\nAim: To help Ss identify the past simple and the past continuous.\n•\tTell Ss to read the summary and check understanding.\n•\tAsk Ss to complete the sentences, using words and phrases from the conversation in 1.\n•\tCheck answers as a class.\n•\tElicit the verb tenses, i.e. past simple and past continuous.\nKey: 1. wrote\t2. was working\t3. was killed\t4. was doing her duty\t5. kept',
 'passage: Unit: 1. Life stories we admire\nSection: Getting Started\nType: text\nThis unit includes:\nLANGUAGE\nPronunciation\nDiphthongs /eɪ/ and /aʊ/\nVocabu

In [84]:
import chromadb
from elasticsearch import Elasticsearch

# Kết nối Elasticsearch
es = Elasticsearch("http://localhost:9200")

# Kết nối ChromaDB
collection = chroma_client.get_collection(name="unit1_db")

# Lấy tất cả dữ liệu từ ChromaDB
data = collection.get(include=["documents", "metadatas"]) 

# Trích xuất ids và documents
doc_ids = data["ids"]
documents = data["documents"]
metadatas = data.get("metadatas", [{}] * len(doc_ids))  # Nếu không có metadata, dùng dict rỗng

# Đưa vào Elasticsearch
for doc_id, content, metadata in zip(doc_ids, documents, metadatas):
    es.index(index="vocabulary", id=doc_id, body={"content": content, "metadata": metadata})

print("Dữ liệu đã đồng bộ vào Elasticsearch!")


  es.index(index="vocabulary", id=doc_id, body={"content": content, "metadata": metadata})


Dữ liệu đã đồng bộ vào Elasticsearch!


In [None]:
def sync_chromadb_to_es():
    """Đồng bộ dữ liệu từ ChromaDB sang Elasticsearch, hỗ trợ cập nhật nếu đã tồn tại."""
    
    # Lấy dữ liệu từ ChromaDB (lưu ý không cần 'ids' trong include)
    data = collection.get(include=["documents", "metadatas"])
    
    # Trích xuất thông tin
    doc_ids = data["ids"]   # IDs luôn có sẵn
    documents = data["documents"]
    metadatas = data.get("metadatas", [{}] * len(doc_ids))  # Metadata nếu có
    
    # Danh sách dữ liệu để đẩy vào Elasticsearch
    actions = []
    
    for doc_id, text, metadata in zip(doc_ids, documents, metadatas):
        # Kiểm tra nếu tài liệu đã tồn tại
        if es.exists(index="vocabulary", id=doc_id):
            action = {
                "_op_type": "update",
                "_index": "vocabulary",
                "_id": doc_id,
                "doc": {"content": text, "metadata": metadata}
            }
        else:
            action = {
                "_op_type": "index",
                "_index": "vocabulary",
                "_id": doc_id,
                "content": text,
                "metadata": metadata
            }
        
        actions.append(action)
    
    # Thực hiện cập nhật hàng loạt với Bulk API để tối ưu hiệu suất
    if actions:
        bulk(es, actions)
        print(f"Đã đồng bộ {len(actions)} tài liệu vào Elasticsearch.")
    else:
        print("Không có tài liệu mới để đồng bộ.")

# Chạy đồng bộ
sync_chromadb_to_es()
