In [1]:
# Khởi tạo Spark Session với các cấu hình cần thiết
import pyspark, os, shutil, sys
from pyspark.sql import SparkSession

os.environ['JAVA_TOOL_OPTIONS'] = '--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED'
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/4.0.0/libexec"
os.environ["PATH"] = os.path.join(os.environ["SPARK_HOME"], "bin") + ":" + os.environ["PATH"]

spark = SparkSession.builder \
    .appName("DrugPipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
    .config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow") \
    .config("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true") \
    .config("spark.python.worker.faulthandler.enabled", "true") \
    .getOrCreate()


ModuleNotFoundError: No module named 'pyspark'

In [None]:
# Đọc dữ liệu từ file CSV
df = spark.read.csv("data/Medicine_Details.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5, truncate=False)

In [None]:
from pyspark.sql.functions import col, lower, concat_ws, udf

columns = df.columns
df_clean = df
if "Composition" in columns:
    df_clean = df_clean.withColumn("composition_clean", lower(col("Composition")))
if "Uses" in columns:
    df_clean = df_clean.withColumn("uses_clean", lower(col("Uses")))
if "Side_effects" in columns:
    df_clean = df_clean.withColumn("side_effects_clean", lower(col("Side_effects")))

# Tạo văn bản để nhúng sử dụng các cột có sẵn
text_cols = []
if "composition_clean" in df_clean.columns:
    text_cols.append(col("composition_clean"))
if "uses_clean" in df_clean.columns:
    text_cols.append(col("uses_clean"))
if "side_effects_clean" in df_clean.columns:
    text_cols.append(col("side_effects_clean"))
    
df_clean = df_clean.withColumn("text_for_embedding", concat_ws(" ", *text_cols))
df_clean.show(5, truncate=False)

In [None]:
# Sử dụng mô hình từ thư viện sentence-transformers để tạo hàm UDF nhúng văn bản
from sentence_transformers import SentenceTransformer
from pyspark.sql.types import ArrayType, FloatType

# model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
_model = None

def embed_text(text):
    global _model
    if _model is None:
        # Ép dùng CPU để tránh crash MPS
        _model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", device="cpu")
    if text is None or text.strip() == "":
        return [0.0] * 384
    vec = _model.encode(text)
    return vec.tolist()

embed_udf = udf(embed_text, ArrayType(FloatType()))
df_with_embeddings = df_clean.withColumn("embedding", embed_udf(col("text_for_embedding")))
df_with_embeddings.select("Medicine Name", "embedding").show(5, truncate=False)
df_with_embeddings.write.mode("overwrite").parquet("processed_drugs.parquet")

# ChromaDB

In [None]:
# ChromaDB Integration Setup
import chromadb
from chromadb.config import Settings
from typing import List, Dict, Any

def setup_chromadb_client(persist_directory: str = "./chroma_db"):
    """Setup ChromaDB client with persistent storage"""
    client = chromadb.PersistentClient(path=persist_directory)
    return client

def create_collections(client):
    """Create all necessary collections"""
    collections = {}
    
    # Main drugs collection for general semantic search
    collections['drugs_main'] = client.get_or_create_collection(
        name="drugs_main",
        metadata={"hnsw:space": "cosine", "description": "Main drugs collection for semantic search"}
    )
    
    # Side effects specific collection
    collections['drugs_side_effects'] = client.get_or_create_collection(
        name="drugs_side_effects", 
        metadata={"hnsw:space": "cosine", "description": "Side effects analysis"}
    )
    
    # Composition specific collection
    collections['drugs_composition'] = client.get_or_create_collection(
        name="drugs_composition",
        metadata={"hnsw:space": "cosine", "description": "Chemical composition similarity"}
    )
    
    # Reviews collection
    collections['drugs_reviews'] = client.get_or_create_collection(
        name="drugs_reviews",
        metadata={"hnsw:space": "cosine", "description": "Review analytics"}
    )
    
    return collections

client = setup_chromadb_client()
collections = create_collections(client)
print("ChromaDB setup functions loaded successfully!")

In [None]:
df_with_embeddings = spark.read.parquet("processed_drugs.parquet")

print("Loaded DataFrame schema:")
df_with_embeddings.printSchema()

print(f"Total medicines loaded: {df_with_embeddings.count()}")

In [None]:
# Data Transformation Functions
def spark_df_to_chromadb_format(df_with_embeddings):
    """Convert Spark DataFrame with embeddings to ChromaDB format"""
    
    # Collect data from Spark DataFrame
    print("Collecting data from Spark DataFrame...")
    medicines_data = df_with_embeddings.collect()
    
    # Prepare data for different collections
    main_data = {
        'ids': [],
        'embeddings': [],
        'metadatas': [],
        'documents': []
    }
    
    side_effects_data = {
        'ids': [],
        'embeddings': [],
        'metadatas': [],
        'documents': []
    }
    
    composition_data = {
        'ids': [],
        'embeddings': [],
        'metadatas': [],
        'documents': []
    }
    
    reviews_data = {
        'ids': [],
        'metadatas': []
    }
    
    print(f"Processing {len(medicines_data)} medicines...")
    
    for i, row in enumerate(medicines_data):
        medicine_id = f"medicine_{i:06d}"
        
        # Main collection data
        main_data['ids'].append(medicine_id)
        main_data['embeddings'].append(row["embedding"])
        main_data['documents'].append(row["text_for_embedding"])
        main_data['metadatas'].append({
            "medicine_name": row["Medicine Name"] or "",
            "composition": row["Composition"] or "",
            "uses": row["Uses"] or "",
            "side_effects": row["Side_effects"] or "",
            "manufacturer": row["Manufacturer"] or "",
            "excellent_review": row["Excellent Review %"] or 0,
            "average_review": row["Average Review %"] or 0,
            "poor_review": row["Poor Review %"] or 0,
            "image_url": row["Image URL"] or ""
        })
        
        # Side effects collection
        if row["Side_effects"] and row["Side_effects"].strip():
            side_effects_data['ids'].append(f"side_effects_{i:06d}")
            side_effects_data['embeddings'].append(row["embedding"])
            side_effects_data['documents'].append(row["Side_effects"])
            side_effects_data['metadatas'].append({
                "medicine_name": row["Medicine Name"] or "",
                "composition": row["Composition"] or "",
                "manufacturer": row["Manufacturer"] or ""
            })
        
        # Composition collection
        if row["Composition"] and row["Composition"].strip():
            composition_data['ids'].append(f"composition_{i:06d}")
            composition_data['embeddings'].append(row["embedding"])
            composition_data['documents'].append(row["Composition"])
            composition_data['metadatas'].append({
                "medicine_name": row["Medicine Name"] or "",
                "manufacturer": row["Manufacturer"] or "",
                "uses": row["Uses"] or ""
            })
        
        # Reviews collection
        reviews_data['ids'].append(f"review_{i:06d}")
        reviews_data['metadatas'].append({
            "medicine_name": row["Medicine Name"] or "",
            "excellent_review": row["Excellent Review %"] or 0,
            "average_review": row["Average Review %"] or 0,
            "poor_review": row["Poor Review %"] or 0,
            "total_score": (row["Excellent Review %"] or 0) * 3 + (row["Average Review %"] or 0) * 2 + (row["Poor Review %"] or 0) * 1
        })
    
    print(f"Data transformation completed:")
    print(f"- Main collection: {len(main_data['ids'])} items")
    print(f"- Side effects collection: {len(side_effects_data['ids'])} items") 
    print(f"- Composition collection: {len(composition_data['ids'])} items")
    print(f"- Reviews collection: {len(reviews_data['ids'])} items")
    
    return main_data, side_effects_data, composition_data, reviews_data

main_data, side_effects_data, composition_data, reviews_data = spark_df_to_chromadb_format(df_with_embeddings)

print("Data transformation functions loaded successfully!")

In [None]:
# Batch Insert Functions
def batch_insert_to_collection(collection, data, batch_size=1000):
    """Insert data to ChromaDB collection in batches"""
    
    total_items = len(data['ids'])
    
    for i in range(0, total_items, batch_size):
        end_idx = min(i + batch_size, total_items)
        
        batch_ids = data['ids'][i:end_idx]
        batch_metadatas = data['metadatas'][i:end_idx]
        
        if 'embeddings' in data and data['embeddings']:
            batch_embeddings = data['embeddings'][i:end_idx]
            batch_documents = data['documents'][i:end_idx] if 'documents' in data else None
            
            collection.add(
                ids=batch_ids,
                embeddings=batch_embeddings,
                metadatas=batch_metadatas,
                documents=batch_documents
            )
        else:
            # For collections without embeddings (like reviews)
            collection.add(
                ids=batch_ids,
                metadatas=batch_metadatas,
                documents=["Review summary only"] * len(batch_ids)
            )
        
        print(f"  Inserted batch {i//batch_size + 1}/{(total_items-1)//batch_size + 1}")

def populate_all_collections(collections, main_data, side_effects_data, composition_data, reviews_data):
    """Populate all ChromaDB collections with data"""
    
    print("Populating drugs_main collection...")
    batch_insert_to_collection(collections['drugs_main'], main_data)
    
    print("Populating drugs_side_effects collection...")
    batch_insert_to_collection(collections['drugs_side_effects'], side_effects_data)
    
    print("Populating drugs_composition collection...")
    batch_insert_to_collection(collections['drugs_composition'], composition_data)
    
    print("Populating drugs_reviews collection...")
    # Reviews collection doesn't need embeddings
    reviews_data_simple = {
        'ids': reviews_data['ids'],
        'metadatas': reviews_data['metadatas']
    }
    batch_insert_to_collection(collections['drugs_reviews'], reviews_data_simple)
    
    print("All collections populated successfully!")

populate_all_collections(collections, main_data, side_effects_data, composition_data, reviews_data)

print("Batch insert functions loaded successfully!")

In [None]:
# Query Interface Functions
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", device="cpu")

def setup_query_model():
    """Setup the same model used for embeddings"""
    return model  # Reuse the existing model

def search_similar_medicines(collection, query_text, query_model, n_results=5):
    """Search for similar medicines based on query text"""
    
    # Generate embedding for query
    query_embedding = query_model.encode(query_text).tolist()
    
    # Query the collection
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results,
        include=["metadatas", "documents", "distances"]
    )
    
    return results

def find_medicine_alternatives(collections, medicine_name, query_model, n_results=5):
    """Find alternative medicines based on composition similarity"""
    
    # First find the medicine in main collection
    main_results = collections['drugs_main'].query(
        query_texts=[medicine_name],
        n_results=1,
        include=["metadatas"]
    )
    
    if not main_results['metadatas'][0]:
        return None
    
    composition = main_results['metadatas'][0][0]['composition']
    
    # Search in composition collection
    alternatives = search_similar_medicines(
        collections['drugs_composition'], 
        composition, 
        query_model, 
        n_results
    )
    
    return alternatives

def analyze_side_effects_similarity(collections, side_effect_query, query_model, n_results=10):
    """Find medicines with similar side effects"""
    
    results = search_similar_medicines(
        collections['drugs_side_effects'],
        side_effect_query,
        query_model,
        n_results
    )
    
    return results

def get_top_reviewed_medicines(collections, n_results=10):
    """Get top reviewed medicines"""
    
    # Get all reviews
    all_reviews = collections['drugs_reviews'].get(include=["metadatas"])
    
    # Sort by total_score in application layer
    sorted_medicines = sorted(
        zip(all_reviews['ids'], all_reviews['metadatas']),
        key=lambda x: x[1]['total_score'],
        reverse=True
    )
    
    return sorted_medicines[:n_results]

query_model = setup_query_model()
print("Query functions loaded successfully!")

In [None]:
# Demo Queries - Test ChromaDB Integration
def demo_queries(collections, query_model):
    """Demonstrate various query capabilities"""
    
    print("\n=== ChromaDB Query Demo ===")
    
    # 1. Search for pain relief medicines
    print("\n1. 🔍 Searching for pain relief medicines:")
    pain_results = search_similar_medicines(
        collections['drugs_main'], 
        "pain relief headache fever", 
        query_model, 
        3
    )
    for i, metadata in enumerate(pain_results['metadatas'][0]):
        distance = pain_results['distances'][0][i]
        print(f"   {i+1}. {metadata['medicine_name']} (similarity: {1-distance:.3f})")
        print(f"      Uses: {metadata['uses'][:100]}...")
        print(f"      Manufacturer: {metadata['manufacturer']}")
        print()
    
    # 2. Find alternatives for a specific medicine
    print("\n2. 💊 Finding alternatives for Paracetamol-like medicines:")
    alternatives = find_medicine_alternatives(
        collections, 
        "paracetamol acetaminophen", 
        query_model, 
        3
    )
    if alternatives:
        for i, metadata in enumerate(alternatives['metadatas'][0]):
            distance = alternatives['distances'][0][i]
            print(f"   {i+1}. {metadata['medicine_name']} (similarity: {1-distance:.3f})")
            print(f"      Composition: {metadata['composition']}")
            print(f"      Manufacturer: {metadata['manufacturer']}")
            print()
    
    # 3. Analyze side effects
    print("\n3. ⚠️ Medicines with nausea side effects:")
    nausea_results = analyze_side_effects_similarity(
        collections, 
        "nausea vomiting stomach upset", 
        query_model, 
        3
    )
    for i, metadata in enumerate(nausea_results['metadatas'][0]):
        distance = nausea_results['distances'][0][i]
        print(f"   {i+1}. {metadata['medicine_name']} (similarity: {1-distance:.3f})")
        print(f"      Composition: {metadata['composition']}")
        print()
    
    # 4. Top reviewed medicines
    print("\n4. ⭐ Top reviewed medicines:")
    top_reviewed = get_top_reviewed_medicines(collections, 5)
    for i, (id, metadata) in enumerate(top_reviewed):
        print(f"   {i+1}. {metadata['medicine_name']} - Score: {metadata['total_score']}")
        print(f"      Reviews: {metadata['excellent_review']}% excellent, {metadata['average_review']}% average, {metadata['poor_review']}% poor")
        print()
    
    # 5. Search by manufacturer
    print("\n5. 🏭 Medicines from specific manufacturers:")
    pharma_results = search_similar_medicines(
        collections['drugs_main'],
        "Pfizer pharmaceutical company",
        query_model,
        3
    )
    for i, metadata in enumerate(pharma_results['metadatas'][0]):
        distance = pharma_results['distances'][0][i]
        print(f"   {i+1}. {metadata['medicine_name']} - {metadata['manufacturer']} (similarity: {1-distance:.3f})")
        print(f"      Uses: {metadata['uses'][:80]}...")
        print()

# Run the demo
demo_queries(collections, query_model)