In [18]:
import requests
import uuid

In [19]:
EMMBED_SERVICE_URL = "http://localhost:8005"
DATABASE_SERVICE_URL = "http://localhost:8002"
DATA_FILE = './data/math_week06_S2025.txt'

In [20]:
with open(DATA_FILE, 'r', encoding='utf-8') as file:
    data = file.read().split("\n\n")
data

['## Write a website with content in Website content part below\n### Tech stack\n- using NextJS for page\n  - Best practices for university education UI\n  - friendly with user\n- in the right bottom, implement a chat box connecting with agent by websocket service, open websocket connection then call API in\n- using .env to store URL API and secret key if we have',
 '\n## Website content',
 'UPP014 S2 25 W6 Writing in Practice\nNguồn: https://utas.shorthandstories.com/2025_S2_UPP014_06_wjlkf-ds/index.html#group-section-Referencing-ZBD4DL0ywE',
 "Welcome to Module 6 What you will learn: The role of integrated sources in your writing. How to integrate sources into academic writing. How to apply APA style referencing for in-text citations. How to establish an APA style reference list. Learning Outcomes addressed: ILO2: Communicate using academic conventions.ILO3: Evaluate and integrate academic sources. Links to Assessment: This modules content, readings, and discussion directly supports 

In [21]:
name = DATA_FILE.split("/")[-1].split("_")
payload_build ={
        "content":None,
        "subject":name[0],
        "title":DATA_FILE,
        "week": name[1],
        "chunk_id":"" , 
    }
payload_build


{'content': None,
 'subject': 'math',
 'title': './data/math_week06_S2025.txt',
 'week': 'week06',
 'chunk_id': ''}

In [22]:
class ProcessingPipeline:
    def __init__(self):
        self.documents = []
        self.payloads = []
    
    def embed_text(self, text: str) -> list:
        try:
            response = requests.post(
                EMMBED_SERVICE_URL + "/embed", 
                json={"text": text},
                timeout=30
            )
            response.raise_for_status()
            return response.json()["embedding"]
        except Exception as e:
            print(f"❌ Lỗi embed: {e}")
            raise

    def upsert_document(self, payload: dict) -> bool:
        try:
            # ✅ Wrap payload trong "points" array
            request_data = {
                "points": [payload]  # API mong đợi array of points
            }
            
            response = requests.post(
                DATABASE_SERVICE_URL + "/upsert", 
                json=request_data,  # Gửi request_data thay vì payload
                timeout=30
            )
            response.raise_for_status()
            return response.json().get("success", False)
        except Exception as e:
            print(f"❌ Lỗi upsert: {e}")
            return False
    
    def process(self, data):
        print(f"🚀 Bắt đầu xử lý {len(data)} documents...")
        
        # Bước 1: Tạo embeddings và payloads
        for i, document in enumerate(data):
            try:
                print(f"📄 Processing {i+1}/{len(data)}: {str(document)[:50]}...")
                
                embedding = self.embed_text(str(document))
                
                payload = {
                    "id": str(uuid.uuid4()),
                    "vector": embedding,
                    "payload": {
                        "content": str(document),
                        "subject": payload_build["subject"],
                        "title": payload_build["title"],
                        "week": payload_build["week"],
                        "chunk_id": i, 
                    }
                }

                
                self.payloads.append(payload)
                
            except Exception as e:
                print(f"❌ Lỗi xử lý document {i+1}: {e}")
                continue
        
        # Bước 2: Upsert từng payload
        print(f"\n💾 Bắt đầu upsert {len(self.payloads)} payloads...")
        success_count = 0
        
        for i, payload in enumerate(self.payloads):
            try:
                success = self.upsert_document(payload)
                if success:
                    success_count += 1
                    print(f"✅ Upsert {i+1}/{len(self.payloads)} thành công")
                else:
                    print(f"❌ Upsert {i+1}/{len(self.payloads)} thất bại")
            except Exception as e:
                print(f"❌ Lỗi upsert {i+1}: {e}")
        
        print(f"\n📊 Hoàn thành: {success_count}/{len(self.payloads)} documents thành công")
        return success_count

In [23]:
# ✅ Đúng
pipeline = ProcessingPipeline()
result = pipeline.process(data)

🚀 Bắt đầu xử lý 10 documents...
📄 Processing 1/10: ## Write a website with content in Website content...
📄 Processing 2/10: 
## Website content...
📄 Processing 3/10: UPP014 S2 25 W6 Writing in Practice
Nguồn: https:/...
📄 Processing 4/10: Welcome to Module 6 What you will learn: The role ...
📄 Processing 5/10: 
(World Health Organisation, n.d.)
World Health Or...
📄 Processing 6/10: 
Italicise the title. Use sentence case (capitalis...
📄 Processing 7/10: 
Sources that stand alone (e.g., TV series, film, ...
📄 Processing 8/10: 
Review your Module 4 and Module 5 paragraphs and ...
📄 Processing 9/10: ...
📄 Processing 10/10: Wed like to remind you of the Module 1 discussion ...

💾 Bắt đầu upsert 10 payloads...
✅ Upsert 1/10 thành công
✅ Upsert 2/10 thành công
✅ Upsert 3/10 thành công
✅ Upsert 4/10 thành công
✅ Upsert 5/10 thành công
✅ Upsert 6/10 thành công
✅ Upsert 7/10 thành công
✅ Upsert 8/10 thành công
✅ Upsert 9/10 thành công
✅ Upsert 10/10 thành công

📊 Hoàn thành: 10/10 documents thà

In [24]:
class SearchPipeline:
    def __init__(self):
        self.embed_url = EMMBED_SERVICE_URL
        self.db_url = DATABASE_SERVICE_URL
    
    def embed_query(self, text: str) -> list:
        """Convert text to embedding vector"""
        response = requests.post(
            f"{self.embed_url}/embed", 
            json={"text": text},
            timeout=30
        )
        response.raise_for_status()
        return response.json()["embedding"]
    
    def vector_search(self, query_vector: list, limit: int = 5, score_threshold: float = 0.7):
        """Search using vector"""
        response = requests.post(
            f"{self.db_url}/search", 
            json={
                "query_vector": query_vector,
                "limit": limit,
                "score_threshold": score_threshold
            },
            timeout=30
        )
        response.raise_for_status()
        return response.json()
    
    def search(self, query_text: str, limit: int = 5, score_threshold: float = 0.7):
        """End-to-end search from text query"""
        print(f"🔍 Searching: '{query_text}'")
        
        # Embed query
        query_vector = self.embed_query(query_text)
        print(f"✅ Query embedded: {len(query_vector)} dimensions")
        
        # Search
        results = self.vector_search(query_vector, limit, score_threshold)
        print(f"✅ Found {results['total_found']} results")
        
        return results
    
    def display_results(self, results):
        """Display search results nicely"""
        if not results or not results.get("results"):
            print("❌ Không tìm thấy kết quả nào")
            return
        
        print(f"\n📊 KẾT QUẢ SEARCH ({results['total_found']} documents):")
        print("=" * 60)
        
        for i, result in enumerate(results["results"]):
            print(f"\n🔸 Kết quả {i+1}:")
            print(f"   📊 Score: {result['score']:.3f}")
            print(f"   📄 Text: {result['payload']['text'][:200]}...")
            print(f"   👤 User: {result['payload'].get('user_id', 'N/A')}")
            print(f"   📁 File: {result['payload'].get('file_id', 'N/A')}")
            print(f"   📖 Page: {result['payload'].get('page', 'N/A')}")

# Sử dụng
searcher = SearchPipeline()

# Test search
search_query = "Giới thiệu chung về dịch vụ dọn dẹp buồng phòng"
results = searcher.search(search_query, limit=3, score_threshold=0.5)
searcher.display_results(results)


🔍 Searching: 'Giới thiệu chung về dịch vụ dọn dẹp buồng phòng'
✅ Query embedded: 512 dimensions
✅ Found 0 results
❌ Không tìm thấy kết quả nào


In [25]:
import re
from collections import Counter
from typing import Dict, List
import math

class BM25Encoder:
    def __init__(self, k1=1.2, b=0.75):
        self.k1 = k1
        self.b = b
        self.vocabulary = {}
        self.doc_freqs = {}
        self.idf = {}
        self.doc_len = []
        self.avgdl = 0
        
    def tokenize(self, text: str) -> List[str]:
        """Simple tokenization"""
        # Convert to lowercase and split by non-alphanumeric characters
        tokens = re.findall(r'\b\w+\b', text.lower())
        return tokens
    
    def fit(self, corpus: List[str]):
        """Fit BM25 on corpus"""
        nd = len(corpus)
        doc_freqs = {}
        
        for document in corpus:
            tokens = self.tokenize(document)
            self.doc_len.append(len(tokens))
            
            # Count unique tokens in document
            unique_tokens = set(tokens)
            for token in unique_tokens:
                doc_freqs[token] = doc_freqs.get(token, 0) + 1
        
        self.doc_freqs = doc_freqs
        self.avgdl = sum(self.doc_len) / len(self.doc_len)
        
        # Calculate IDF
        for token, freq in doc_freqs.items():
            self.idf[token] = math.log((nd - freq + 0.5) / (freq + 0.5))
    
    def encode(self, text: str) -> Dict[int, float]:
        """Encode text to sparse vector"""
        tokens = self.tokenize(text)
        token_counts = Counter(tokens)
        
        sparse_vector = {}
        
        for token, count in token_counts.items():
            if token in self.idf:
                # Get token index (create vocabulary on the fly)
                if token not in self.vocabulary:
                    self.vocabulary[token] = len(self.vocabulary)
                
                token_idx = self.vocabulary[token]
                
                # BM25 score calculation
                idf = self.idf[token]
                tf = count
                doc_len = len(tokens)
                
                score = idf * (tf * (self.k1 + 1)) / (tf + self.k1 * (1 - self.b + self.b * doc_len / self.avgdl))
                
                if score > 0:
                    sparse_vector[token_idx] = score
        
        return sparse_vector

# Global BM25 encoder
bm25_encoder = BM25Encoder()

In [26]:
class HybridProcessingPipeline:
    def __init__(self, batch_size=5):
        self.documents = []
        self.payloads = []
        self.batch_size = batch_size
        self.bm25_encoder = BM25Encoder()
        self.corpus_fitted = False
    
    def embed_text(self, text: str) -> list:
        """Get dense embedding"""
        try:
            response = requests.post(
                EMMBED_SERVICE_URL + "/embed", 
                json={"text": text},
                timeout=30
            )
            response.raise_for_status()
            return response.json()["embedding"]
        except Exception as e:
            print(f"❌ Lỗi embed: {e}")
            raise
    
    def create_sparse_vector(self, text: str) -> Dict[int, float]:
        """Create BM25 sparse vector"""
        if not self.corpus_fitted:
            print("⚠️ BM25 chưa được fit trên corpus")
            return {}
        
        return self.bm25_encoder.encode(text)
    
    def fit_bm25(self, corpus: List[str]):
        """Fit BM25 on the corpus"""
        print(f"🔧 Fitting BM25 on {len(corpus)} documents...")
        self.bm25_encoder.fit(corpus)
        self.corpus_fitted = True
        print("✅ BM25 fitted successfully")
    
    def upsert_batch(self, payloads_batch: list) -> bool:
        """Upsert batch với hybrid vectors"""
        try:
            request_data = {"points": payloads_batch}
            
            response = requests.post(
                DATABASE_SERVICE_URL + "/upsert", 
                json=request_data,
                timeout=60
            )
            response.raise_for_status()
            return response.json().get("success", False)
        except Exception as e:
            print(f"❌ Lỗi upsert batch: {e}")
            return False
    
    def process(self, data):
        print(f"🚀 Bắt đầu xử lý {len(data)} documents với Hybrid Search...")
        
        # Bước 1: Fit BM25 trên toàn bộ corpus
        corpus = [str(doc) for doc in data]
        self.fit_bm25(corpus)
        
        # Bước 2: Tạo embeddings và sparse vectors
        for i, document in enumerate(data):
            try:
                print(f"📄 Processing {i+1}/{len(data)}: {str(document)[:50]}...")
                
                # Dense embedding
                dense_vector = self.embed_text(str(document))
                
                # Sparse vector (BM25)
                sparse_vector = self.create_sparse_vector(str(document))
                
                # Tạo payload với cả 2 loại vector
                payload = {
                    "id": str(uuid.uuid4()),
                    "vector": {
                        "dense_vector": dense_vector,
                        "bm25_sparse_vector": {
                            "indices": list(sparse_vector.keys()),
                            "values": list(sparse_vector.values())
                        }
                    },
                    "payload": {
                        "text": str(document),
                        "user_id": "test_user",
                        "title": "Test Title",
                        "file_id": "test_file_123",
                        "source": str(uuid.uuid4()),
                        "page": 1
                    }
                }
                
                self.payloads.append(payload)
                
            except Exception as e:
                print(f"❌ Lỗi xử lý document {i+1}: {e}")
                continue
        
        # Bước 3: Upsert theo batch
        print(f"\n💾 Bắt đầu upsert {len(self.payloads)} payloads...")
        success_count = 0
        
        for i in range(0, len(self.payloads), self.batch_size):
            batch = self.payloads[i:i + self.batch_size]
            batch_num = i // self.batch_size + 1
            
            try:
                print(f"🔄 Upsert batch {batch_num}: {len(batch)} documents...")
                success = self.upsert_batch(batch)
                
                if success:
                    success_count += len(batch)
                    print(f"✅ Batch {batch_num} thành công ({len(batch)} documents)")
                else:
                    print(f"❌ Batch {batch_num} thất bại")
                    
            except Exception as e:
                print(f"❌ Lỗi upsert batch {batch_num}: {e}")
        
        print(f"\n📊 Hoàn thành: {success_count}/{len(self.payloads)} documents thành công")
        return success_count

# Sử dụng
hybrid_pipeline = HybridProcessingPipeline()
result = hybrid_pipeline.process(data)

🚀 Bắt đầu xử lý 10 documents với Hybrid Search...
🔧 Fitting BM25 on 10 documents...
✅ BM25 fitted successfully
📄 Processing 1/10: ## Write a website with content in Website content...
📄 Processing 2/10: 
## Website content...
📄 Processing 3/10: UPP014 S2 25 W6 Writing in Practice
Nguồn: https:/...
📄 Processing 4/10: Welcome to Module 6 What you will learn: The role ...
📄 Processing 5/10: 
(World Health Organisation, n.d.)
World Health Or...
📄 Processing 6/10: 
Italicise the title. Use sentence case (capitalis...
📄 Processing 7/10: 
Sources that stand alone (e.g., TV series, film, ...
📄 Processing 8/10: 
Review your Module 4 and Module 5 paragraphs and ...
📄 Processing 9/10: ...
📄 Processing 10/10: Wed like to remind you of the Module 1 discussion ...

💾 Bắt đầu upsert 10 payloads...
🔄 Upsert batch 1: 5 documents...
✅ Batch 1 thành công (5 documents)
🔄 Upsert batch 2: 5 documents...
✅ Batch 2 thành công (5 documents)

📊 Hoàn thành: 10/10 documents thành công


In [9]:
class HybridSearcher:
    def __init__(self):
        self.embed_url = EMMBED_SERVICE_URL
        self.db_url = DATABASE_SERVICE_URL
        self.bm25_encoder = BM25Encoder()
        self.corpus_fitted = False
    
    def fit_bm25(self, corpus: List[str]):
        """Fit BM25 for search"""
        self.bm25_encoder.fit(corpus)
        self.corpus_fitted = True
    
    def embed_query(self, text: str) -> list:
        """Get dense embedding for query"""
        response = requests.post(
            f"{self.embed_url}/embed", 
            json={"text": text},
            timeout=30
        )
        response.raise_for_status()
        return response.json()["embedding"]
    
    def create_query_sparse_vector(self, text: str) -> Dict[int, float]:
        """Create sparse vector for query"""
        if not self.corpus_fitted:
            print("⚠️ BM25 chưa được fit. Chỉ sử dụng dense search.")
            return {}
        return self.bm25_encoder.encode(text)
    
    def hybrid_search(self, 
                     query_text: str, 
                     limit: int = 5, 
                     dense_weight: float = 0.7,
                     sparse_weight: float = 0.3):
        """
        Hybrid search kết hợp dense và sparse vectors
        
        Args:
            query_text: Text query
            limit: Số kết quả trả về
            dense_weight: Trọng số cho semantic search (0-1)
            sparse_weight: Trọng số cho keyword search (0-1)
        """
        print(f"🔍 Hybrid Search: '{query_text}'")
        print(f"   📊 Dense weight: {dense_weight}, Sparse weight: {sparse_weight}")
        
        try:
            # 1. Tạo dense vector
            dense_vector = self.embed_query(query_text)
            
            # 2. Tạo sparse vector
            sparse_vector = self.create_query_sparse_vector(query_text)
            
            # 3. Chuẩn bị query cho Qdrant
            if sparse_vector:
                # Hybrid search với cả dense và sparse
                query_data = {
                    "prefetch": [
                        {
                            "query": dense_vector,
                            "using": "dense_vector",
                            "limit": limit * 2  # Lấy nhiều hơn để fusion
                        },
                        {
                            "query": {
                                "indices": list(sparse_vector.keys()),
                                "values": list(sparse_vector.values())
                            },
                            "using": "bm25_sparse_vector", 
                            "limit": limit * 2
                        }
                    ],
                    "query": {
                        "fusion": "rrf"  # Reciprocal Rank Fusion
                    },
                    "limit": limit
                }
            else:
                # Chỉ dense search
                query_data = {
                    "query": dense_vector,
                    "using": "dense_vector",
                    "limit": limit
                }
            
            # 4. Gửi request
            response = requests.post(
                f"{self.db_url}/search",
                json=query_data,
                timeout=30
            )
            response.raise_for_status()
            
            results = response.json()
            
            # 5. Display results
            self.display_hybrid_results(results, query_text)
            
            return results
            
        except Exception as e:
            print(f"❌ Lỗi hybrid search: {e}")
            # Fallback to simple dense search
            return self.fallback_dense_search(query_text, limit)
    
    def fallback_dense_search(self, query_text: str, limit: int):
        """Fallback to simple dense search"""
        print("🔄 Fallback to dense search...")
        
        dense_vector = self.embed_query(query_text)
        
        response = requests.post(
            f"{self.db_url}/search",
            json={
                "query_vector": dense_vector,
                "limit": limit,
                "score_threshold": 0.5
            },
            timeout=30
        )
        response.raise_for_status()
        return response.json()
    
    def display_hybrid_results(self, results, query_text):
        """Display hybrid search results"""
        if not results or not results.get("results"):
            print("❌ Không tìm thấy kết quả nào")
            return
        
        print(f"\n🎯 HYBRID SEARCH RESULTS cho: '{query_text}'")
        print("=" * 70)
        
        for i, result in enumerate(results["results"]):
            print(f"\n🔸 Kết quả {i+1}:")
            print(f"   📊 Hybrid Score: {result['score']:.3f}")
            print(f"   📄 Text: {result['payload']['text'][:200]}...")
            print(f"   👤 User: {result['payload'].get('user_id', 'N/A')}")
            print(f"   📁 File: {result['payload'].get('file_id', 'N/A')}")

# Sử dụng Hybrid Search
searcher = HybridSearcher()

# Nếu bạn có corpus để fit BM25
# searcher.fit_bm25(data)  # Fit trên corpus đã upsert

# Test hybrid search
test_queries = [
    "Giới thiệu chung về dịch vụ dọn dẹp buồng phòng",
    "quy trình dọn dẹp",
    "thiết bị an toàn",
    "hướng dẫn sử dụng"
]

for query in test_queries:
    print(f"\n{'='*80}")
    results = searcher.hybrid_search(
        query, 
        limit=3, 
        dense_weight=0.7,  # 70% semantic
        sparse_weight=0.3  # 30% keyword
    )
    print(f"{'='*80}")


🔍 Hybrid Search: 'Giới thiệu chung về dịch vụ dọn dẹp buồng phòng'
   📊 Dense weight: 0.7, Sparse weight: 0.3
⚠️ BM25 chưa được fit. Chỉ sử dụng dense search.
❌ Lỗi hybrid search: 422 Client Error: Unprocessable Entity for url: http://localhost:8002/search
🔄 Fallback to dense search...

🔍 Hybrid Search: 'quy trình dọn dẹp'
   📊 Dense weight: 0.7, Sparse weight: 0.3
⚠️ BM25 chưa được fit. Chỉ sử dụng dense search.
❌ Lỗi hybrid search: 422 Client Error: Unprocessable Entity for url: http://localhost:8002/search
🔄 Fallback to dense search...

🔍 Hybrid Search: 'thiết bị an toàn'
   📊 Dense weight: 0.7, Sparse weight: 0.3
⚠️ BM25 chưa được fit. Chỉ sử dụng dense search.
❌ Lỗi hybrid search: 422 Client Error: Unprocessable Entity for url: http://localhost:8002/search
🔄 Fallback to dense search...

🔍 Hybrid Search: 'hướng dẫn sử dụng'
   📊 Dense weight: 0.7, Sparse weight: 0.3
⚠️ BM25 chưa được fit. Chỉ sử dụng dense search.
❌ Lỗi hybrid search: 422 Client Error: Unprocessable Entity for url

In [10]:
# 1. Upsert data với hybrid vectors
hybrid_pipeline = HybridProcessingPipeline()
hybrid_pipeline.process(data)

# 2. Search với hybrid
searcher = HybridSearcher()
searcher.fit_bm25(data)  # Fit BM25 trên corpus

# 3. Test search
results = searcher.hybrid_search(
    "Giới thiệu chung về dịch vụ dọn dẹp buồng phòng",
    limit=5,
    dense_weight=0.7,   # 70% semantic similarity
    sparse_weight=0.3   # 30% keyword matching
)

🚀 Bắt đầu xử lý 3 documents với Hybrid Search...
🔧 Fitting BM25 on 3 documents...
✅ BM25 fitted successfully
📄 Processing 1/3: - Giới thiệu chung
Dịch vụ Dọn dẹp Buồng phòng của...
📄 Processing 2/3: 
- Hướng dẫn sử dụng
Hướng dẫn đặt dịch vụ dọn dẹp...
📄 Processing 3/3: 
- Lưu ý
- Được chọn cùng lúc nhiều loại phòng, nh...

💾 Bắt đầu upsert 3 payloads...
🔄 Upsert batch 1: 3 documents...
✅ Batch 1 thành công (3 documents)

📊 Hoàn thành: 3/3 documents thành công
🔍 Hybrid Search: 'Giới thiệu chung về dịch vụ dọn dẹp buồng phòng'
   📊 Dense weight: 0.7, Sparse weight: 0.3
❌ Lỗi hybrid search: 422 Client Error: Unprocessable Entity for url: http://localhost:8002/search
🔄 Fallback to dense search...
