<a href="https://colab.research.google.com/github/git4sudo/pharmaAI/blob/main/pharma_rag_chunks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import pandas as pd
import json
import uuid
from typing import List, Dict, Any
import re
import os
from google.colab import drive

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
csv_file_path='/content/drive/MyDrive/pharmadata/Cleaned_MID_csv_copy.csv'

In [7]:
if os.path.exists(csv_file_path):
    print(f"✅ Found CSV file: {os.path.basename(csv_file_path)}")
    print(f"📁 Full path: {csv_file_path}")

    # Get file size
    file_size = os.path.getsize(csv_file_path)
    print(f"📊 File size: {file_size} bytes ({file_size/1024:.1f} KB)")
else:
    print(f"❌ CSV file not found at: {csv_file_path}")
    print("💡 Make sure the file path is correct and the file exists in your Google Drive")
    exit()

print(f"🎯 Using CSV file: {csv_file_path}")

✅ Found CSV file: Cleaned_MID_csv_copy.csv
📁 Full path: /content/drive/MyDrive/pharmadata/Cleaned_MID_csv_copy.csv
📊 File size: 699360895 bytes (682969.6 KB)
🎯 Using CSV file: /content/drive/MyDrive/pharmadata/Cleaned_MID_csv_copy.csv


In [8]:
class PharmaceuticalRAGConverter:
    def __init__(self, csv_file_path):
        self.df = pd.read_csv(csv_file_path)
        # Clean column names
        self.df.columns = self.df.columns.str.strip()
        self.processed_chunks = []
        print(f"✅ Loaded CSV with {len(self.df)} rows and {len(self.df.columns)} columns")
        print(f"📋 Columns: {list(self.df.columns)}")

    def clean_text(self, text: str) -> str:
        if pd.isna(text) or text == '':
            return ""
        text = str(text)
        text = ' '.join(text.split())
        text = re.sub(r'[^\w\s\-\.\(\)\[\]\,\;\:]', ' ', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text

    def create_focused_chunks(self, row: pd.Series, row_idx: int) -> List[Dict[str, Any]]:
        drug_name = self.clean_text(row.get('Name', ''))
        contains = self.clean_text(row.get('Contains', ''))
        chunks = []

        base_metadata = {
            "source_type": "CSV_pharmaceutical_database",
            "drug_name": drug_name,
            "active_ingredient": contains,
            "row_index": row_idx
        }

        # 1. Basic Information Chunk
        basic_info_parts = []
        if drug_name:
            basic_info_parts.append(f"{drug_name}")
        if contains:
            basic_info_parts.append(f"contains {contains}")

        # Get product info from different possible columns
        product_info_columns = ['ProductInfo', 'ProductUsage', 'ProductUse']
        product_info = ""
        for col in product_info_columns:
            if col in row:
                product_info = self.clean_text(row.get(col, ''))
                if product_info:
                    break

        if product_info:
            basic_info_parts.append(f"Product details: {product_info}")

        if basic_info_parts:
            text_chunk = ". ".join(basic_info_parts)
            chunks.append({
                "text_chunk": text_chunk,
                "metadata": {
                    **base_metadata,
                    "source_id": f"drug_db_{drug_name.lower().replace(' ', '_')}_basic_{row_idx}",
                    "topic": "basic_information"
                }
            })

        # 2. Usage and Dosage Chunk
        usage_parts = []

        # Check different possible column names for usage info
        usage_columns = ['HowToUse', 'HowToUse', 'Usage', 'Dosage']
        for col in usage_columns:
            if col in row:
                usage_info = self.clean_text(row.get(col, ''))
                if usage_info:
                    usage_parts.append(f"Instructions: {usage_info}")
                    break

        if usage_parts:
            text_chunk = ". ".join(usage_parts)
            chunks.append({
                "text_chunk": text_chunk,
                "metadata": {
                    **base_metadata,
                    "source_id": f"drug_db_{drug_name.lower().replace(' ', '_')}_usage_{row_idx}",
                    "topic": "dosage_and_usage"
                }
            })

        # 3. Safety and Side Effects Chunk
        safety_parts = []

        side_effects_columns = ['SideEffects', 'SideEffect', 'Warnings']
        for col in side_effects_columns:
            if col in row:
                side_effects = self.clean_text(row.get(col, ''))
                if side_effects:
                    safety_parts.append(f"Side Effects: {side_effects}")
                    break

        safety_columns = ['SafetyAdvice', 'Safety', 'Precautions']
        for col in safety_columns:
            if col in row:
                safety_advice = self.clean_text(row.get(col, ''))
                if safety_advice:
                    safety_parts.append(f"Safety Advice: {safety_advice}")
                    break

        habit_columns = ['Chemical_Habit_Forming', 'HabitForming', 'Habit_Forming']
        for col in habit_columns:
            if col in row:
                habit_forming = self.clean_text(row.get(col, ''))
                if habit_forming and habit_forming.upper() not in ['FALSE', 'NO', '0']:
                    safety_parts.append(f"Habit Forming: {habit_forming}")
                    break

        if safety_parts:
            text_chunk = ". ".join(safety_parts)
            chunks.append({
                "text_chunk": text_chunk,
                "metadata": {
                    **base_metadata,
                    "source_id": f"drug_db_{drug_name.lower().replace(' ', '_')}_safety_{row_idx}",
                    "topic": "safety_and_side_effects"
                }
            })

        # 4. Classification Chunk
        classification_parts = []

        therapeutic_columns = ['Therapeutic_Class', 'TherapeuticClass', 'Therapeut_Class']
        for col in therapeutic_columns:
            if col in row:
                therapeutic_class = self.clean_text(row.get(col, ''))
                if therapeutic_class:
                    classification_parts.append(f"Therapeutic Class: {therapeutic_class}")
                    break

        action_columns = ['Action_Class', 'ActionClass', 'Action__Class']
        for col in action_columns:
            if col in row:
                action_class = self.clean_text(row.get(col, ''))
                if action_class:
                    classification_parts.append(f"Action Class: {action_class}")
                    break

        if classification_parts:
            text_chunk = ". ".join(classification_parts)
            chunks.append({
                "text_chunk": text_chunk,
                "metadata": {
                    **base_metadata,
                    "source_id": f"drug_db_{drug_name.lower().replace(' ', '_')}_classification_{row_idx}",
                    "topic": "drug_classification"
                }
            })

        return chunks

    def process_csv_to_rag(self) -> List[Dict[str, Any]]:
        all_chunks = []

        for idx, row in self.df.iterrows():
            try:
                chunks = self.create_focused_chunks(row, idx)
                for chunk in chunks:
                    if chunk["text_chunk"].strip():
                        all_chunks.append(chunk)
            except Exception as e:
                print(f"⚠️ Error processing row {idx}: {e}")
                continue

        self.processed_chunks = all_chunks
        print(f"✅ Processed {len(all_chunks)} chunks")
        return all_chunks

    def preview_chunks(self, num_chunks: int = 3):
        if not self.processed_chunks:
            print("❌ No processed chunks available.")
            return

        print(f"\n📋 Preview of first {min(num_chunks, len(self.processed_chunks))} chunks:")
        print("=" * 80)

        for i, chunk in enumerate(self.processed_chunks[:num_chunks]):
            print(f"\n🔸 Chunk {i+1}:")
            print(f"Text: {chunk['text_chunk']}")
            print(f"Metadata: {json.dumps(chunk['metadata'], indent=2)}")
            print("-" * 60)


In [9]:
converter = PharmaceuticalRAGConverter(csv_file_path)

✅ Loaded CSV with 148272 rows and 14 columns
📋 Columns: ['Name', 'Contains', 'ProductIntroduction', 'ProductUses', 'ProductBenefits', 'SideEffect', 'HowToUse', 'HowWorks', 'QuickTips', 'SafetyAdvice', 'Chemical_Class', 'Habit_Forming', 'Therapeutic_Class', 'Action_Class']


In [10]:
print("\n🔄 Processing CSV to RAG format...")
rag_chunks = converter.process_csv_to_rag()


🔄 Processing CSV to RAG format...
✅ Processed 593000 chunks


In [11]:
converter.preview_chunks(3)


📋 Preview of first 3 chunks:

🔸 Chunk 1:
Text: andol 0.5mg tablet. contains haloperidol (0.5mg)
Metadata: {
  "source_type": "CSV_pharmaceutical_database",
  "drug_name": "andol 0.5mg tablet",
  "active_ingredient": "haloperidol (0.5mg)",
  "row_index": 0,
  "source_id": "drug_db_andol_0.5mg_tablet_basic_0",
  "topic": "basic_information"
}
------------------------------------------------------------

🔸 Chunk 2:
Text: Instructions: take this medicine in the dose and duration as advised by your doctor. swallow it as a whole. do not chew, crush or break it. andol 0.5mg tablet may be taken with or without food, but it is better to take it at a fixed time.
Metadata: {
  "source_type": "CSV_pharmaceutical_database",
  "drug_name": "andol 0.5mg tablet",
  "active_ingredient": "haloperidol (0.5mg)",
  "row_index": 0,
  "source_id": "drug_db_andol_0.5mg_tablet_usage_0",
  "topic": "dosage_and_usage"
}
------------------------------------------------------------

🔸 Chunk 3:
Text: Side Effects:

In [12]:
output_data = {
    "total_chunks": len(rag_chunks),
    "source_file": os.path.basename(csv_file_path),
    "chunks": rag_chunks
}


In [13]:
json_filename = f"pharmaceutical_rag_data.json"
with open(json_filename, 'w', encoding='utf-8') as f:
    json.dump(output_data, f, indent=2, ensure_ascii=False)

In [14]:
jsonl_filename = f"pharmaceutical_rag_data.jsonl"
with open(jsonl_filename, 'w', encoding='utf-8') as f:
    for chunk in rag_chunks:
        json.dump(chunk, f, ensure_ascii=False)
        f.write('\n')

In [15]:
print(f"- {json_filename} (Complete JSON)")
print(f"- {jsonl_filename} (JSONL format)")

- pharmaceutical_rag_data.json (Complete JSON)
- pharmaceutical_rag_data.jsonl (JSONL format)


In [16]:
print(f"\n📊 Processing Statistics:")
print(f"- Total drugs processed: {len(converter.df)}")
print(f"- Total chunks created: {len(rag_chunks)}")
print(f"- Average chunks per drug: {len(rag_chunks)/len(converter.df):.1f}")


📊 Processing Statistics:
- Total drugs processed: 148272
- Total chunks created: 593000
- Average chunks per drug: 4.0


In [17]:
topic_counts = {}
for chunk in rag_chunks:
    topic = chunk['metadata']['topic']
    topic_counts[topic] = topic_counts.get(topic, 0) + 1

In [18]:
print(f"\n📋 Chunks by Topic:")
for topic, count in topic_counts.items():
    print(f"- {topic}: {count} chunks")


📋 Chunks by Topic:
- basic_information: 148272 chunks
- dosage_and_usage: 148197 chunks
- safety_and_side_effects: 148259 chunks
- drug_classification: 148272 chunks
