# Required libraries

In [None]:
!huggingface-cli login

In [None]:
!pip install -q transformers accelerate bitsandbytes gradio

In [None]:
!pip install peft

In [None]:
!pip install langchain faiss-cpu sentence-transformers

In [None]:
!pip install -U langchain-community

#Rag Database

In [None]:
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings

embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

vectorstore = FAISS.load_local(
    "/content/drive/MyDrive/rag_index",
    embeddings=embedding_model,
    allow_dangerous_deserialization=True
)


In [None]:
def get_full_rag_content(query, k=3):
    """Get complete content from RAG for specific queries"""

    print(f"🔍 SEARCHING RAG FOR: '{query}'")
    print("=" * 60)

    try:
        # Search your vectorstore
        results = vectorstore.similarity_search(query, k=k)

        if results:
            print(f"✅ Found {len(results)} relevant documents")

            for i, result in enumerate(results):
                print(f"\n📄 DOCUMENT {i+1}:")
                print("-" * 40)
                print(f"FULL CONTENT ({len(result.page_content)} characters):")
                print(result.page_content)
                print("-" * 40)

                # Check for specific keywords
                keywords_found = []
                search_terms = query.lower().split()

                for term in search_terms:
                    if term in result.page_content.lower():
                        keywords_found.append(term)

                print(f"📊 Keywords found: {keywords_found}")
                print(f"📊 Relevance: {len(keywords_found)}/{len(search_terms)} terms matched")

        else:
            print("❌ No results found for this query")

    except Exception as e:
        print(f"❌ Search error: {e}")

# Test with your specific example
# print("🎯 TESTING SPECIFIC QUERY:")
# get_full_rag_content("zone minutes 42 year old female")

print("\n" + "="*80)

# Test some other relevant queries
test_queries = [
    "exercise recommendations 25 year old",
    # "physical activity middle aged women",
    # "cardio minutes weekly adults",
    # "moderate intensity exercise adults",
    # "150 minutes physical activity"
]

for query in test_queries:
    print(f"\n{'='*80}")
    get_full_rag_content(query, k=2)  # Get top 2 results for each

# Data structure and Memory System

In [None]:
import torch
import json
import os
from datetime import datetime # Import datetime
from typing import Dict, List
import gradio as gr

class SimpleDataManager:
    """Simple file-based storage for user data and recommendations"""

    def __init__(self, data_dir="wellbeing_data"):
        self.data_dir = data_dir
        os.makedirs(data_dir, exist_ok=True)
        os.makedirs(f"{data_dir}/users", exist_ok=True)
        os.makedirs(f"{self.data_dir}/recommendations", exist_ok=True) # Corrected path

    def save_user_week(self, user_id: str, week_data: Dict):
        """Save weekly data for a user"""
        user_file = f"{self.data_dir}/users/{user_id}.json"

        try:
            # Load existing data
            if os.path.exists(user_file):
                with open(user_file, 'r') as f:
                    user_data = json.load(f)
            else:
                user_data = {"user_id": user_id, "weeks": []}

            # Add new week
            user_data["weeks"].append(week_data)

            # Keep only last 8 weeks
            user_data["weeks"] = user_data["weeks"][-8:]

            # Save
            with open(user_file, 'w') as f:
                json.dump(user_data, f, indent=2)

            print(f"✅ Saved week data for user {user_id}")

        except Exception as e:
            print(f"❌ Error saving user data: {str(e)}")

    def get_user_history(self, user_id: str) -> List[Dict]:
        """Get user's weekly history"""
        user_file = f"{self.data_dir}/users/{user_id}.json"

        try:
            if os.path.exists(user_file):
                with open(user_file, 'r') as f:
                    user_data = json.load(f)
                    history = user_data.get("weeks", [])
                    print(f"📊 Retrieved {len(history)} weeks of history for {user_id}")
                    return history
        except Exception as e:
            print(f"❌ Error loading user history: {str(e)}")

        return []

    def save_recommendation(self, user_id: str, week_start: str, recommendation: str):
        """Save LLM recommendation"""
        # Clean week_start for filename (remove invalid characters)
        clean_week = week_start.replace("/", "-").replace(":", "-")
        rec_file = f"{self.data_dir}/recommendations/{user_id}_{clean_week}.json"

        try:
            rec_data = {
                "user_id": user_id,
                "week_start": week_start,
                "recommendation": recommendation,
                "timestamp": datetime.now().isoformat()
            }

            with open(rec_file, 'w') as f:
                json.dump(rec_data, f, indent=2)

            print(f"✅ Saved recommendation for user {user_id}, week {week_start}")

        except Exception as e:
            print(f"❌ Error saving recommendation: {str(e)}")

    def get_last_recommendation(self, user_id: str) -> str:
        """Get user's last recommendation"""
        rec_dir = f"{self.data_dir}/recommendations"

        try:
            # Check if recommendations directory exists
            if not os.path.exists(rec_dir):
                return ""

            # Find latest recommendation file for this user
            user_files = [f for f in os.listdir(rec_dir) if f.startswith(f"{user_id}_") and f.endswith('.json')]

            if user_files:
                # Sort by date and get latest
                user_files.sort(reverse=True)
                latest_file = f"{rec_dir}/{user_files[0]}"

                with open(latest_file, 'r') as f:
                    rec_data = json.load(f)
                    recommendation = rec_data.get("recommendation", "")
                    print(f"📝 Retrieved last recommendation for {user_id} ({len(recommendation)} chars)")
                    return recommendation

        except Exception as e:
            print(f"❌ Error loading last recommendation: {str(e)}")

        return ""

    def get_user_stats(self, user_id: str) -> Dict:
        """Get basic stats about user's data (bonus method)"""
        history = self.get_user_history(user_id)

        if not history:
            return {"total_weeks": 0}

        return {
            "total_weeks": len(history),
            "first_week": history[0].get("week_start", "Unknown"),
            "latest_week": history[-1].get("week_start", "Unknown"),
            "avg_steps": sum(week.get("total_steps", 0) for week in history) // len(history),
            "avg_sleep": sum(week.get("avg_sleep", 0) for week in history) / len(history)
        }

#Well being LLM

In [None]:
import torch
import re
from typing import Dict, List


class WellbeingLLM:
    """Improved LLM system for wellbeing recommendations with better prompt engineering"""

    def __init__(self, base_model_id: str, adapter_path: str, vectorstore_path: str = None):
        self.model = None
        self.tokenizer = None
        self.device = None
        self.vectorstore = None

        # Load vectorstore if path provided
        if vectorstore_path:
            self.load_vectorstore(vectorstore_path)

        self.load_model(base_model_id, adapter_path)

    def load_vectorstore(self, vectorstore_path: str):
        """Load FAISS vectorstore for RAG"""
        try:
            from langchain.vectorstores import FAISS
            from langchain.embeddings import HuggingFaceEmbeddings

            print("📚 Loading knowledge base...")
            embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

            self.vectorstore = FAISS.load_local(
                vectorstore_path,
                embeddings=embedding_model,
                allow_dangerous_deserialization=True
            )
            print("✅ Knowledge base loaded")

        except Exception as e:
            print(f"⚠️ Could not load vectorstore: {e}")
            self.vectorstore = None

    def load_model(self, base_model_id: str, adapter_path: str = None):
        """Load the LLM model with better error handling"""
        try:
            print("🤖 Loading LLM...")

            from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
            from peft import PeftModel

            # Clear GPU memory and check device
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
                print(f"🔍 GPU available: {torch.cuda.get_device_name()}")
            else:
                print("🔍 Using CPU")

            # Load tokenizer
            print("📝 Loading tokenizer...")
            self.tokenizer = AutoTokenizer.from_pretrained(base_model_id, trust_remote_code=True)
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token
            print("✅ Tokenizer loaded")

            # Quantization config
            bnb_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_quant_type="nf4",
                bnb_4bit_use_double_quant=True,
                bnb_4bit_compute_dtype=torch.bfloat16,
            )
            print("⚙️ Quantization config ready")

            # Load base model
            print("🧠 Loading base model...")
            self.model = AutoModelForCausalLM.from_pretrained(
                base_model_id,
                device_map="auto",
                torch_dtype=torch.float16,  # Faster than float16
                quantization_config=bnb_config,
                trust_remote_code=True,
                low_cpu_mem_usage=True,      # 🚀 NEW: Reduces CPU memory usage
                use_cache=False,             # 🚀 NEW: Faster loading
            )
            print("✅ Base model loaded")

            # Load fine-tuned adapter
            # print("🎯 Loading fine-tuned adapter...")
            # self.model = PeftModel.from_pretrained(self.model, adapter_path)
            self.model.eval()

            # Store device for later use
            self.device = next(self.model.parameters()).device
            print(f"✅ LLM loaded successfully on {self.device}")

        except Exception as e:
            print(f"❌ LLM loading failed: {str(e)}")
            import traceback
            traceback.print_exc()
            self.model = None
            self.tokenizer = None
            self.device = None






    def generate_recommendation(self, current_week: Dict, user_history: List[Dict], last_recommendation: str) -> str:
      """Generate personalized recommendations with single template enforcement"""

      if not self.model or not self.tokenizer:
          print("❌ No model loaded")
          return "Model not available. Please check model loading."

      try:



          # Use the single master template
          query = self._build_master_template(current_week, user_history)


          docs = self.vectorstore.similarity_search(query, k=3)


          context = "\n\n".join(doc.page_content for doc in docs)
          print(f"the given prompt is {query}")


          prompt = (
          f"You are a helpful medical assistant. Use the following context to answer the question.\n\n"
          f"Context:\n{context}\n\n"
          f"Question: {query}\n\nAnswer:"
      )
          inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
          input_len = inputs["input_ids"].shape[1]

          print(f"Model is on: {self.device}")  # Should be 'cuda'


          with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=700,
                temperature=0.7,
                top_p=0.95,
                do_sample=True
            )

          generated_tokens = outputs[0][input_len:]
          answer = self.tokenizer.decode(generated_tokens, skip_special_tokens=True)

          return answer.strip()

      except Exception as e:
          print(f"❌ Generation error: {e}")
          return "Failed to generate response."


    def _build_master_template(self, current_week: Dict, user_history: List[Dict]) -> str:
      """Single master template with RAG-enhanced knowledge"""

      # Extract user data
      total_steps = current_week.get('total_steps', 0)
      zone_minutes = current_week.get('zone_minutes', 0)
      demographics = current_week.get('demographics', {})
      age = demographics.get('age', 'unknown')
      weight = demographics.get('weight', 'unknown')
      height = demographics.get('height', 'unknown')
      sex = demographics.get('sex', 'unknown')
      avg_sleep = current_week.get('avg_sleep', 0)
      diet_type = current_week.get('preferences', {}).get('diet_type', 'No Preference')
      allergies = current_week.get('preferences', {}).get('allergies', [])
      exercise_sessions = current_week.get('exercise_sessions', 0)
      food_data = current_week.get('food_data', {})
      dairy_liters = food_data.get('dairy_liters',0)
      legumes = food_data.get('legumes_grams',0)
      meat = food_data.get('meat_grams',0)
      furits= food_data.get('fruits_grams',0)
      vegtables = food_data.get('vegetables_grams',0)
      grains = food_data.get('grains_grams',0)
      nuts = food_data.get('nuts_seeds_grams',0)
      water = food_data.get('water_liters',0)



      # The master template with RAG integration
      prompt = f"""
A user is a {sex} aged {age} years, with a height of {height} cm and weight of {weight} kg. They walk about {total_steps:,} steps weekly, average {zone_minutes} zone minutes, and do {exercise_sessions} exercise sessions per week. They sleep around {avg_sleep:.1f} hours each night. Their diet preference is {diet_type}, and they are allergic to {', '.join(allergies) if allergies else 'none'}
and their this weekly food consumption is dairy {dairy_liters} liter, legumes {legumes} grams, meat {meat} grams, fruits {furits} grams, vegtables = {vegtables},
 grains {grains} grams, nuts {nuts} grams and water {water} liter .

Based on this profile and the expert knowledge below, provide health and wellness recommendations in the format outlined.
---

**1) Food Recommendation**
- Overall Assessment: [...]
- Areas of Improvements: [...]
- Suggested Meals: [...]

**2) Physical Activity**
- Activity Assessment: [...]
- Zone Minutes and intensity Feedback
- Strength/Cardio Tips: [suggest also exercise for strength and cardio]
- Weekly Goals: [...]

**3) Sleep & Well-being**
- Sleep Review: [...]
- Suggestions: [...]

**4) Weekly Summary**
- Summary: [...]
- Goals: [...]
"""


      return prompt




# Well being system

In [None]:
class WellbeingSystem:
    """Main system that combines data management and LLM"""

    def __init__(self, base_model_id: str, adapter_path: str, vectorstore_path: str = None):
        self.data_manager = SimpleDataManager()
        self.llm = WellbeingLLM(base_model_id, adapter_path, vectorstore_path) # Pass vectorstore_path to LLM
        print("🎯 WellbeingSystem initialized")

    def analyze_and_recommend(self, user_id: str, week_data: Dict) -> str:
        """Main function: analyze weekly data and generate recommendations"""

        print(f"📊 Analyzing data for user: {user_id}")

        try:
            # Get user history and last recommendation
            user_history = self.data_manager.get_user_history(user_id)
            last_recommendation = self.data_manager.get_last_recommendation(user_id)

            print(f"📈 Found {len(user_history)} weeks of history")
            if last_recommendation:
                print(f"📝 Previous recommendation found ({len(last_recommendation)} chars)")

            # Process current week data
            processed_week = self._process_week_data(week_data)

            # Show what data is being sent to LLM
            print(f"📊 Processed data: {processed_week['total_steps']:,} steps, {processed_week['zone_minutes']} zone mins, {processed_week['avg_sleep']:.1f}h sleep")

            # Generate recommendation using LLM
            recommendation = self.llm.generate_recommendation(
                processed_week, user_history, last_recommendation
            )

            # Save data (with error handling)
            try:
                self.data_manager.save_user_week(user_id, processed_week)
                self.data_manager.save_recommendation(user_id, processed_week['week_start'], recommendation)
                print(f"💾 Data saved successfully")
            except Exception as save_error:
                print(f"⚠️ Warning: Could not save data: {save_error}")
                # Continue anyway - return the recommendation even if saving fails

            print(f"✅ Generated recommendation for {user_id} ({len(recommendation)} chars)")
            return recommendation

        except Exception as e:
            print(f"❌ Error in analyze_and_recommend: {str(e)}")
            import traceback
            traceback.print_exc()
            return f"Analysis failed: {str(e)}. Please check your input data."

    def _process_week_data(self, week_data: Dict) -> Dict:
        """Process and standardize weekly data with better validation"""

        try:
            # Parse sleep and mood data with validation
            sleep_hours = week_data.get('sleep_hours', [])
            mood_scores = week_data.get('mood_scores', [])

            # Handle string input (comma-separated values)
            if isinstance(sleep_hours, str):
                try:
                    sleep_hours = [float(x.strip()) for x in sleep_hours.split(',') if x.strip()]
                except ValueError:
                    print("⚠️ Invalid sleep hours format, using default")
                    sleep_hours = []

            if isinstance(mood_scores, str):
                try:
                    mood_scores = [float(x.strip()) for x in mood_scores.split(',') if x.strip()]
                except ValueError:
                    print("⚠️ Invalid mood scores format, using default")
                    mood_scores = []

            # Calculate averages with validation
            avg_sleep = sum(sleep_hours) / len(sleep_hours) if sleep_hours else 0
            avg_mood = sum(mood_scores) / len(mood_scores) if mood_scores else 0

            # 🚀 ADD DEBUG:
            print(f"🔍 DEBUG - Sleep calculation: {sleep_hours} → avg: {avg_sleep}")
            print(f"🔍 DEBUG - Mood calculation: {mood_scores} → avg: {avg_mood}")

            # Enhanced food data processing
            food_data = self._process_food_data(week_data)

            processed_data = {
                'week_start': week_data.get('week_start', datetime.now().strftime('%Y-%m-%d')),
                'total_steps': max(0, int(week_data.get('total_steps', 0))), # Ensure int and non-negative
                'zone_minutes': max(0, int(week_data.get('zone_minutes', 0))), # Ensure int and non-negative
                'exercise_sessions': max(0, int(week_data.get('exercise_sessions', 0))), # Ensure int and non-negative
                'avg_sleep': round(avg_sleep, 1),
                'avg_mood': round(avg_mood, 1),
                'food_data': food_data,
                'timestamp': datetime.now().isoformat(),
                'demographics': week_data.get('demographics', {})
            }

            # Include preferences and targets if present in the input week_data
            if 'preferences' in week_data:
                processed_data['preferences'] = week_data['preferences']
            if 'targets' in week_data:
                processed_data['targets'] = week_data['targets']


            return processed_data

        except Exception as e:
            print(f"❌ Error processing week data: {str(e)}")
            # Return empty or default data structure on error
            return {
                 'week_start': week_data.get('week_start', datetime.now().strftime('%Y-%m-%d')),
                 'total_steps': 0,
                 'zone_minutes': 0,
                 'exercise_sessions': 0,
                 'avg_sleep': 0.0,
                 'avg_mood': 0.0,
                 'food_data': {},
                 'timestamp': datetime.now().isoformat(),
                 'processing_error': str(e)
            }


    def _process_food_data(self, week_data: Dict) -> Dict:
        """Process food data handling both old and new formats"""

        food_data = {}

        # Check for new format first (with units)
        unit_fields = ['dairy_liters', 'water_liters', 'legumes_grams', 'meat_grams',
                      'fruits_grams', 'vegetables_grams', 'grains_grams', 'nuts_seeds_grams']

        has_unit_format = any(field in week_data for field in unit_fields)

        if has_unit_format:
            print("📊 Using new format (with units)")
            # New format with units
            for field in unit_fields:
                value = week_data.get(field, 0)
                try:
                    food_data[field] = max(0, float(value))  # Ensure non-negative
                except (ValueError, TypeError):
                    food_data[field] = 0
                    print(f"⚠️ Invalid {field} value, using 0")
        else:
            print("📊 Using old format (servings)")
            # Old format (for backward compatibility)
            old_fields = ['dairy', 'legumes', 'meat', 'fruits', 'vegetables', 'grains', 'nuts_seeds', 'water_glasses']
            for field in old_fields:
                if field in week_data:
                    try:
                        food_data[field] = max(0, float(week_data[field]))
                    except (ValueError, TypeError):
                        food_data[field] = 0

        return food_data





# Well being app

In [None]:
import gradio as gr
import json
import os
from datetime import datetime
from typing import Dict, List

class WellbeingApp:
    """Complete Gradio interface with targets, history, and preferences"""

    def __init__(self, base_model_id: str, adapter_path: str):
      print("🚀 Initializing Complete WellbeingApp...")
      self.wellbeing_system = WellbeingSystem(
          base_model_id,
          adapter_path,
          vectorstore_path="/content/drive/MyDrive/rag_index"
      )

      # Default targets (same as before)
      self.default_targets = {
          'weekly_steps': 70000,
          'weekly_zone_minutes': 150,
          'daily_sleep_hours': 8.0,
          'weekly_water_liters': 14.0,
          'weekly_fruits_grams': 2100,
          'weekly_vegetables_grams': 3500,
          'weekly_exercise_sessions': 4
      }

    def create_wellbeing_app(self):
        """Create comprehensive Gradio interface with tabs"""

        with gr.Blocks(title="🏥 Complete Wellbeing LLM", theme=gr.themes.Soft()) as demo:

            gr.Markdown("# 🏥 Complete AI Wellbeing System")
            gr.Markdown("### Personalized health recommendations with custom targets and progress tracking")

            # Create tabs for different sections
            with gr.Tabs():

                # TAB 1: MAIN ANALYSIS
                with gr.TabItem("🤖 Get Recommendations", elem_id="main-tab"):
                    self._create_main_analysis_tab()

                # TAB 2: TARGETS CUSTOMIZATION
                with gr.TabItem("🎯 Customize Targets", elem_id="targets-tab"):
                    self._create_targets_tab()

                # TAB 3: HISTORY REVIEW
                with gr.TabItem("📈 Progress History", elem_id="history-tab"):
                    self._create_history_tab()

                # TAB 4: SYSTEM INFO
                # with gr.TabItem("ℹ️ System Info", elem_id="info-tab"):
                #     self._create_info_tab()

        return demo

    def _create_main_analysis_tab(self):
      """Create the main analysis tab with fixed component handling"""

      with gr.Row():
          with gr.Column():

              gr.Markdown("### 🤖 AI Enhancement Options")
              # User Info Section
              gr.Markdown("### 👤 User Information")
              user_id = gr.Textbox(label="👤 User ID", value="user_001", info="Unique identifier for tracking your progress")
              week_start = gr.Textbox(label="📅 Week Start Date", value="2024-01-15", info="Format: YYYY-MM-DD")

              # Personal Details - FIXED: These were not being collected properly
              sex = gr.Dropdown(
                  label="⚧️ Sex",
                  choices=["Male", "Female", "Other", "Prefer not to say"],
                  value="Male",
                  info="Biological sex for health calculations"
              )

              age = gr.Number(
                  label="🎂 Age",
                  value=25,
                  minimum=13,
                  maximum=120,
                  step=1,
                  info="Age in years"
              )

              weight = gr.Number(
                  label="⚖️ Weight (kg)",
                  value=70.0,
                  minimum=30.0,
                  maximum=300.0,
                  step=0.1,
                  info="Current weight in kilograms"
              )

              height = gr.Number(
                  label="📏 Height (cm)",
                  value=175.0,
                  minimum=100.0,
                  maximum=250.0,
                  step=0.1,
                  info="Height in centimeters"
              )

              # Preferences Section
              gr.Markdown("### 🍽️ Meal Preferences & Dietary Restrictions")
              with gr.Row():
                  diet_type = gr.Dropdown(
                      label="🥗 Diet Type",
                      choices=[
                          "No Preference", "Vegetarian", "Vegan", "Pescatarian",
                          "Keto", "Paleo", "Mediterranean", "Low Carb", "Gluten Free", "Dairy Free"
                      ],
                      value="No Preference",
                      info="Select your primary dietary preference"
                  )

              allergies = gr.CheckboxGroup(
                  label="⚠️ Food Allergies & Intolerances",
                  choices=[
                      "Nuts (Tree nuts)", "Peanuts", "Shellfish", "Fish",
                      "Milk/Dairy", "Eggs", "Soy", "Wheat/Gluten",
                      "Sesame", "Lactose Intolerant", "Other"
                  ],
                  info="⚠️ IMPORTANT: Select all allergies and intolerances"
              )

              other_allergies = gr.Textbox(
                  label="🚨 Other Allergies/Restrictions",
                  placeholder="e.g., tomatoes, specific medications, religious restrictions...",
                  info="Specify any other dietary restrictions or allergies"
              )

              # Activity Section
              gr.Markdown("### 🏃‍♂️ Physical Activity")
              total_steps = gr.Number(label="Total Steps (Week)", value=65000, info="Your current target will be shown")
              zone_minutes = gr.Number(label="Zone Minutes (Week)", value=120, info="Your current target will be shown")
              exercise_sessions = gr.Number(label="Exercise Sessions", value=3, info="Your current target will be shown")

              # Sleep & Mood Section
              gr.Markdown("### 😴 Sleep & Mood")
              sleep_hours = gr.Textbox(
                  label="Sleep Hours (7 days)",
                  value="7.5,6.8,8.2,7.0,6.5,8.5,7.8",
                  info="Enter daily sleep hours separated by commas"
              )
              mood_scores = gr.Textbox(
                  label="Mood Scores (7 days)",
                  value="4,3.5,5,3,4,2.5,5",
                  info="Rate your daily mood from 1-5, separated by commas"
              )

              # Food Section
              gr.Markdown("### 🍎 Weekly Nutrition Intake")
              gr.Markdown("*Enter your total consumption for the entire week*")

              with gr.Row():
                  dairy = gr.Number(label="🥛 Dairy (liters)", value=3.5, info="Milk, yogurt, cheese")
                  water_liters = gr.Number(label="💧 Water (liters)", value=48, info="Your current target will be shown")

              with gr.Row():
                  fruits = gr.Number(label="🍎 Fruits (grams)", value=2100, info="Your current target will be shown")
                  vegetables = gr.Number(label="🥬 Vegetables (grams)", value=3500, info="Your current target will be shown")

              with gr.Row():
                  legumes = gr.Number(label="🫘 Legumes (grams)", value=350, info="Beans, lentils, peas")
                  nuts_seeds = gr.Number(label="🥜 Nuts/Seeds (grams)", value=210, info="Almonds, walnuts, chia seeds")

              with gr.Row():
                  meat = gr.Number(label="🍗 Meat (grams)", value=700, info="Chicken, beef, fish")
                  grains = gr.Number(label="🌾 Grains (grams)", value=2800, info="Rice, bread, pasta, oats")

              # Action buttons
              with gr.Row():
                  analyze_btn = gr.Button("🤖 Get AI Recommendations", variant="primary", size="lg")
                  load_targets_btn = gr.Button("📊 Load My Targets", variant="secondary")

          with gr.Column():
              main_output = gr.Markdown("""
  Suggestions
  """)

      # FIXED: Store components with correct order including demographics
      self.main_components = {
          'user_id': user_id,
          'week_start': week_start,
          'sex': sex,           # ADDED: Missing demographic fields
          'age': age,           # ADDED
          'weight': weight,     # ADDED
          'height': height,     # ADDED
          'total_steps': total_steps,
          'zone_minutes': zone_minutes,
          'exercise_sessions': exercise_sessions,
          'sleep_hours': sleep_hours,
          'mood_scores': mood_scores,
          'dairy_liters': dairy,
          'legumes_grams': legumes,
          'meat_grams': meat,
          'fruits_grams': fruits,
          'vegetables_grams': vegetables,
          'grains_grams': grains,
          'nuts_seeds_grams': nuts_seeds,
          'water_liters': water_liters,
          'diet_type': diet_type,
          'allergies': allergies,
          'other_allergies': other_allergies,
          'output': main_output
      }

      # FIXED: Connect button with correct inputs order
      analyze_btn.click(
          fn=self.analyze_wellbeing_with_preferences,
          inputs=[
              user_id, week_start, sex, age, weight, height,  # Demographics in correct order
              total_steps, zone_minutes, exercise_sessions,
              sleep_hours, mood_scores, dairy, legumes, meat,
              fruits, vegetables, grains, nuts_seeds, water_liters,
              diet_type, allergies, other_allergies
          ],
          outputs=[main_output]
      )

      load_targets_btn.click(
          fn=self.load_user_targets,
          inputs=[user_id],
          outputs=[main_output]
      )

    def analyze_wellbeing_with_preferences(self, user_id, week_start, sex, age, weight, height,
                                     total_steps, zone_minutes, exercise_sessions,
                                     sleep_hours, mood_scores, dairy_liters, legumes_grams, meat_grams,
                                     fruits_grams, vegetables_grams, grains_grams, nuts_seeds_grams, water_liters,
                                     diet_type, allergies, other_allergies):
      """Enhanced analysis with agent support, custom targets, and user demographics"""

      try:
          # Input validation
          if not user_id or not user_id.strip():
              return "❌ Error: Please enter a valid User ID"

          if not week_start:
              return "❌ Error: Please enter a week start date"

          # Validate demographic data
          if age is not None and (age < 13 or age > 120):
              return "❌ Error: Please enter a valid age (13-120 years)"

          if weight is not None and (weight < 30 or weight > 300):
              return "❌ Error: Please enter a valid weight (30-300 kg)"

          if height is not None and (height < 100 or height > 250):
              return "❌ Error: Please enter a valid height (100-250 cm)"

          # Load user's custom targets
          user_targets = self.get_user_targets(user_id.strip())

          # Calculate BMI if weight and height are provided
          bmi = None
          if weight and height:
              height_m = height / 100  # Convert cm to meters
              bmi = round(weight / (height_m ** 2), 1)

          # Prepare enhanced data with demographics
          week_data = {
              'user_id': user_id.strip(),
              'week_start': week_start,
              'demographics': {
                  'sex': sex,
                  'age': int(age) if age else None,
                  'weight': float(weight) if weight else None,
                  'height': float(height) if height else None,
                  'bmi': bmi
              },
              'total_steps': max(0, int(total_steps)) if total_steps else 0,
              'zone_minutes': max(0, int(zone_minutes)) if zone_minutes else 0,
              'exercise_sessions': max(0, int(exercise_sessions)) if exercise_sessions else 0,
              'sleep_hours': sleep_hours,
              'mood_scores': mood_scores,
              'dairy_liters': max(0, float(dairy_liters)) if dairy_liters else 0,
              'legumes_grams': max(0, float(legumes_grams)) if legumes_grams else 0,
              'meat_grams': max(0, float(meat_grams)) if meat_grams else 0,
              'fruits_grams': max(0, float(fruits_grams)) if fruits_grams else 0,
              'vegetables_grams': max(0, float(vegetables_grams)) if vegetables_grams else 0,
              'grains_grams': max(0, float(grains_grams)) if grains_grams else 0,
              'nuts_seeds_grams': max(0, float(nuts_seeds_grams)) if nuts_seeds_grams else 0,
              'water_liters': max(0, float(water_liters)) if water_liters else 0,
              'preferences': {
                  'diet_type': diet_type,
                  'allergies': allergies if allergies else [],
                  'other_allergies': other_allergies.strip() if other_allergies else ""
              },
              'targets': user_targets
          }

          recommendation = self.wellbeing_system.analyze_and_recommend(user_id.strip(), week_data)

          # Enhanced output with custom targets and demographics
          output = self._format_analysis_output_with_targets(week_data, recommendation, user_targets)

          return output

      except Exception as e:
          print(f"❌ Analysis error: {str(e)}")
          import traceback
          traceback.print_exc()
          return f"❌ Error: {str(e)}\n\nPlease check your input format and try again."


    def _create_targets_tab(self):
        """Create the targets customization tab"""

        gr.Markdown("## 🎯 Customize Your Personal Health Targets")
        gr.Markdown("### Set personalized goals based on your fitness level, health conditions, and personal objectives")

        with gr.Row():
            with gr.Column():
                gr.Markdown("### 👤 User Selection")
                target_user_id = gr.Textbox(label="👤 User ID", value="user_001", info="Enter the user ID to customize targets for")

                gr.Markdown("### 🏃‍♂️ Activity Targets")
                target_weekly_steps = gr.Number(
                    label="Weekly Steps Target",
                    value=70000,
                    info="Default: 70,000 (10,000 daily). Adjust based on fitness level."
                )
                target_zone_minutes = gr.Number(
                    label="Weekly Zone Minutes Target",
                    value=150,
                    info="Default: 150 minutes. WHO recommends 150+ minutes moderate exercise."
                )
                target_exercise_sessions = gr.Number(
                    label="Weekly Exercise Sessions Target",
                    value=4,
                    info="Default: 4 sessions. Include both cardio and strength training."
                )

                gr.Markdown("### 😴 Sleep & Recovery Targets")
                target_daily_sleep = gr.Number(
                    label="Daily Sleep Hours Target",
                    value=8.0,
                    info="Default: 8 hours. Range: 7-9 hours for most adults."
                )

                with gr.Row():
                    save_targets_btn = gr.Button("💾 Save My Targets", variant="primary", size="lg")
                    load_current_targets_btn = gr.Button("📊 Load Current Targets", variant="secondary")
                    reset_defaults_btn = gr.Button("🔄 Reset to Defaults", variant="secondary")


        # Store target components
        self.target_components = {
            'user_id': target_user_id,
            'weekly_steps': target_weekly_steps,
            'zone_minutes': target_zone_minutes,
            'exercise_sessions': target_exercise_sessions,
            'daily_sleep': target_daily_sleep,
        }

        # Connect buttons
        save_targets_btn.click(
            fn=self.save_user_targets,
            inputs=list(self.target_components.values())[:-1],
            # outputs=[targets_output]
        )

        load_current_targets_btn.click(
            fn=self.load_user_targets_display,
            inputs=[target_user_id],
            outputs=list(self.target_components.values()) # Update all target number boxes and output
        )

        reset_defaults_btn.click(
            fn=self.reset_to_defaults,
            outputs=list(self.target_components.values())
        )


    def _create_history_tab(self):
        """Create the history review tab"""

        gr.Markdown("## 📈 Progress History & Analytics")
        gr.Markdown("### Track your health journey and see improvements over time")

        with gr.Row():
            with gr.Column():
                gr.Markdown("### 👤 User Selection")
                history_user_id = gr.Textbox(label="👤 User ID", value="user_001", info="Enter user ID to view history")

                gr.Markdown("### 📊 Analysis Options")
                analysis_type = gr.Radio(
                    label="📈 View Type",
                    choices=[
                        "📊 Complete Progress Summary",
                        "📈 Weekly Trends Analysis",
                        "🎯 Target Achievement Report",
                        "📋 Raw Data Export",
                        "🏆 Achievement Milestones"
                    ],
                    value="📊 Complete Progress Summary",
                    info="Choose what type of analysis you want to see"
                )

                weeks_to_show = gr.Slider(
                    label="📅 Weeks to Include",
                    minimum=1,
                    maximum=12,
                    value=4,
                    step=1,
                    info="How many recent weeks to analyze"
                )

                with gr.Row():
                    view_history_btn = gr.Button("📊 View Progress", variant="primary", size="lg")
                    export_data_btn = gr.Button("💾 Export Data", variant="secondary")
                    delete_history_btn = gr.Button("🗑️ Clear History", variant="stop")



        # Store history components
        self.history_components = {
            'user_id': history_user_id,
            'analysis_type': analysis_type,
            'weeks_to_show': weeks_to_show,
            # 'output': history_output
        }

    def _format_preferences_display(self, preferences: dict) -> str:
        """Format preferences for nice display"""

        lines = ["### 🍽️ Your Dietary Profile:"]

        # Diet type
        if preferences.get('diet_type', 'No Preference') != "No Preference":
            lines.append(f"- **Diet:** {preferences['diet_type']}")

        # Allergies (most important!)
        if preferences.get('allergies'):
            allergy_list = ", ".join(preferences['allergies'])
            lines.append(f"- **⚠️ ALLERGIES:** {allergy_list}")

        if preferences.get('other_allergies'):
            lines.append(f"- **⚠️ OTHER RESTRICTIONS:** {preferences['other_allergies']}")

        if len(lines) == 1:  # Only the header
            lines.append("- No specific dietary preferences set")

        return "\n".join(lines)



    def _format_analysis_output_with_targets(self, week_data, recommendation, user_targets):
      """Format analysis output including custom targets"""

      daily_steps = week_data['total_steps'] / 7
      daily_water = week_data['water_liters'] / 7

      # Safely get avg_sleep and avg_mood with defaults
      avg_sleep = week_data.get('avg_sleep', 0)
      avg_mood = week_data.get('avg_mood', 0)

      # Compare against custom targets
      targets_comparison = f"""
  ### 🎯 Target Performance:
  - **Steps:** {week_data['total_steps']:,} / {user_targets['weekly_steps']:,} {'✅' if week_data['total_steps'] >= user_targets['weekly_steps'] else '⚠️'} ({((week_data['total_steps'] / user_targets['weekly_steps']) * 100):.0f}%)
  - **Zone Minutes:** {week_data['zone_minutes']} / {user_targets['weekly_zone_minutes']} {'✅' if week_data['zone_minutes'] >= user_targets['weekly_zone_minutes'] else '⚠️'} ({((week_data['zone_minutes'] / user_targets['weekly_zone_minutes']) * 100):.0f}%)
  - **Sleep:** {avg_sleep:.1f}h / {user_targets['daily_sleep_hours']}h {'✅' if avg_sleep >= user_targets['daily_sleep_hours'] else '⚠️'}
  - **Water:** {week_data['water_liters']:.1f}L / {user_targets['weekly_water_liters']}L {'✅' if week_data['water_liters'] >= user_targets['weekly_water_liters'] else '⚠️'}
"""

      preferences_display = self._format_preferences_display(week_data['preferences'])

      return f"""# 🏥 Personalized Wellbeing Analysis & Recommendations

  ## 👤 User: {week_data['user_id']} | 📅 Week: {week_data['week_start']}

  {preferences_display}

  {targets_comparison}

  ### 📊 Your Weekly Data Summary:
  **🏃‍♂️ Activity:**
  - **Steps:** {week_data['total_steps']:,} ({daily_steps:.0f}/day)
  - **Zone Minutes:** {week_data['zone_minutes']}
  - **Exercise Sessions:** {week_data['exercise_sessions']}

  **😴 Sleep & Mood:**
  - **Sleep:** {week_data['sleep_hours']} hours daily
  - **Mood:** {week_data['mood_scores']}/10 daily scores

  **🍎 Nutrition:**
  - 🥛 **Dairy:** {week_data['dairy_liters']:.1f}L | 💧 **Water:** {week_data['water_liters']:.1f}L ({daily_water:.1f}L/day)
  - 🍎 **Fruits:** {week_data['fruits_grams']:.0f}g | 🥬 **Vegetables:** {week_data['vegetables_grams']:.0f}g
  - 🫘 **Legumes:** {week_data['legumes_grams']:.0f}g | 🍗 **Meat:** {week_data['meat_grams']:.0f}g
  - 🌾 **Grains:** {week_data['grains_grams']:.0f}g | 🥜 **Nuts/Seeds:** {week_data['nuts_seeds_grams']:.0f}g

  ---

  ## 🤖 AI Recommendations
  *Personalized based on your data, preferences, allergies, and custom targets*

  {recommendation}
  """

    def load_user_targets(self, user_id):
        """Load user targets and format for display in main tab"""
        try:
            if not user_id or not user_id.strip():
                return "❌ Please enter a valid User ID"

            targets = self.get_user_targets(user_id.strip())

            return f"""# 📊 Your Current Targets Loaded

## 🎯 Targets for {user_id}:

### 🏃‍♂️ Activity Targets:
- **Weekly Steps:** {targets['weekly_steps']:,}
- **Zone Minutes:** {targets['weekly_zone_minutes']}
- **Exercise Sessions:** {targets['weekly_exercise_sessions']}

### 😴 Recovery Target:
- **Daily Sleep:** {targets['daily_sleep_hours']} hours

### 🍎 Nutrition Targets:
- **Weekly Water:** {targets['weekly_water_liters']} liters
- **Weekly Fruits:** {targets['weekly_fruits_grams']:,} grams
- **Weekly Vegetables:** {targets['weekly_vegetables_grams']:,} grams

*These targets will be used to evaluate your progress when you get AI recommendations.*

**💡 Tip:** Go to the 'Customize Targets' tab to modify these values if needed.
"""

        except Exception as e:
            return f"❌ Error loading targets: {str(e)}"

    def save_user_targets(self, user_id, weekly_steps, zone_minutes, exercise_sessions,
                          daily_sleep, weekly_water):
        """Saves user's custom targets to a file"""
        if not user_id or not user_id.strip():
            return "❌ Error: Please enter a valid User ID to save targets."

        targets_file = f"{self.wellbeing_system.data_manager.data_dir}/targets_{user_id.strip()}.json"

        try:
            targets_data = {
                'weekly_steps': int(weekly_steps) if weekly_steps else self.default_targets['weekly_steps'],
                'weekly_zone_minutes': int(zone_minutes) if zone_minutes else self.default_targets['weekly_zone_minutes'],
                'weekly_exercise_sessions': int(exercise_sessions) if exercise_sessions else self.default_targets['weekly_exercise_sessions'],
                'daily_sleep_hours': float(daily_sleep) if daily_sleep else self.default_targets['daily_sleep_hours'],
                'weekly_water_liters': float(weekly_water) if weekly_water else self.default_targets['weekly_water_liters'],
              }

            with open(targets_file, 'w') as f:
                json.dump(targets_data, f, indent=2)

            return f"✅ Targets saved successfully for user '{user_id.strip()}'."

        except Exception as e:
            return f"❌ Error saving targets: {str(e)}"

    def get_user_targets(self, user_id: str) -> Dict:
        """Loads user's custom targets, or returns defaults"""
        targets_file = f"{self.wellbeing_system.data_manager.data_dir}/targets_{user_id.strip()}.json"

        if os.path.exists(targets_file):
            try:
                with open(targets_file, 'r') as f:
                    targets = json.load(f)
                    # Ensure all keys from default are present (for backward compatibility)
                    full_targets = self.default_targets.copy()
                    full_targets.update(targets)
                    return full_targets
            except Exception as e:
                print(f"⚠️ Warning: Could not load targets for user '{user_id.strip()}': {str(e)}. Using defaults.")
                return self.default_targets
        else:
            return self.default_targets

    def load_user_targets_display(self, user_id):
        """Loads user targets and formats for display in the targets tab"""
        if not user_id or not user_id.strip():
             return "❌ Error: Please enter a valid User ID to load targets.", *[gr.Number(value=self.default_targets[key]) for key in self.default_targets] # Return defaults to update UI

        targets = self.get_user_targets(user_id.strip())

        output_message = f"✅ Loaded targets for user '{user_id.strip()}'."

        # Return the message and the target values to update the Gradio Number components
        return (
            output_message,
            targets.get('weekly_steps', self.default_targets['weekly_steps']),
            targets.get('weekly_zone_minutes', self.default_targets['weekly_zone_minutes']),
            targets.get('weekly_exercise_sessions', self.default_targets['weekly_exercise_sessions']),
            targets.get('daily_sleep_hours', self.default_targets['daily_sleep_hours']),
            targets.get('weekly_water_liters', self.default_targets['weekly_water_liters']),
            )

    def reset_to_defaults(self):
        """Resets the target input fields to default values"""
        return (
            "🎯 Resetting targets to defaults.",
            self.default_targets['weekly_steps'],
            self.default_targets['weekly_zone_minutes'],
            self.default_targets['weekly_exercise_sessions'],
            self.default_targets['daily_sleep_hours'],
            self.default_targets['weekly_water_liters'],
        )









# Main execution

In [None]:
# Uncomment to run:
app = WellbeingApp(base_model_id="ContactDoctor/Bio-Medical-Llama-3-8B", adapter_path="AnjaliNV/WellBeing_LLM")
demo = app.create_wellbeing_app()
demo.launch(share=True, debug=True)

# Fine Tuning

In [None]:
import os
import json
from datasets import load_dataset, Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training, PeftModel
import torch # Import torch to check for CUDA

# Load your dataset from JSONL
data_path = "/content/diet_finetune_examples.jsonl"
data = []
with open(data_path, "r") as f:
    for line in f:
        item = json.loads(line)
        user_msg = next(msg["content"] for msg in item["messages"] if msg["role"] == "user")
        assistant_msg = next(msg["content"] for msg in item["messages"] if msg["role"] == "assistant")
        prompt = f"<|user|>\n{user_msg}\n<|assistant|>\n{assistant_msg}"
        data.append({"text": prompt})

# Convert to Hugging Face Dataset
dataset = Dataset.from_list(data)
dataset = dataset.train_test_split(test_size=0.1)

# Load tokenizer and base model
model_id = "ContactDoctor/Bio-Medical-Llama-3-8B"
tokenizer = AutoTokenizer.from_pretrained(model_id)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

# --- Use BitsAndBytesConfig for 4-bit loading ---
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
)

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=bnb_config, # Use quantization_config instead of load_in_8bit
    device_map="auto",
    torch_dtype=torch.bfloat16, # Use bfloat16 for compute dtype
    low_cpu_mem_usage=True,
)

model = prepare_model_for_kbit_training(model)

# Apply LoRA PEFT configuration
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)
model = get_peft_model(model, lora_config)

# Tokenization function
def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=512)

tokenized_datasets = dataset.map(tokenize, batched=True)

# Training arguments
training_args = TrainingArguments(
    output_dir="./diet-finetuned-model",
    per_device_train_batch_size=1,  # Keep reduced batch size
    per_device_eval_batch_size=1,   # Keep reduced batch size
    num_train_epochs=3,
    logging_dir="./logs",
    save_total_limit=2,
    learning_rate=2e-4,
    fp16=False, # Set to False when using bfloat16 or 4-bit
    bf16=True, # Enable bfloat16 if supported by GPU
    save_strategy="epoch",
    report_to="none",
    gradient_checkpointing=True, # Keep gradient checkpointing enabled
)

data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
    tokenizer=tokenizer,
    data_collator=data_collator
)

# Start fine-tuning
trainer.train()

# Save final LoRA adapter
model.save_pretrained("AnjaliNV/WellBeing_LLM")
tokenizer.save_pretrained("AnjaliNV/WellBeing_LLM")

In [None]:
adapter_path = "AnjaliNV/WellBeing_LLM"
model.push_to_hub(adapter_path, use_auth_token=True)
tokenizer.push_to_hub(adapter_path, use_auth_token=True)

#test

In [None]:
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
import torch
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document

# === Load FAISS vector store ===
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = FAISS.load_local(
    "/content/drive/MyDrive/rag_index",
    embeddings=embedding_model,
    allow_dangerous_deserialization=True
)

# === Load LLaMA 3 model with LoRA adapter ===
base_model_id = "ContactDoctor/Bio-Medical-Llama-3-8B"
adapter_path = "AnjaliNV/WellBeing_LLM"

tokenizer = AutoTokenizer.from_pretrained(base_model_id)
base_model = AutoModelForCausalLM.from_pretrained(
    base_model_id,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
    device_map="auto"
)
model = PeftModel.from_pretrained(base_model, adapter_path)
model.eval()

# === Define RAG-based response generation ===
def generate_with_rag(query, temperature=0.7, max_tokens=256, k=3):
    # Step 1: Retrieve relevant docs
    docs = vectorstore.similarity_search(query, k=k)
    context = "\n\n".join(doc.page_content for doc in docs)

    # Step 2: Construct the prompt
    prompt = (
        f"You are a helpful medical assistant. Use the following context to answer the question.\n\n"
        f"Context:\n{context}\n\n"
        f"Question: {query}\n\nAnswer:"
    )

    # Step 3: Tokenize and generate
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    input_len = inputs["input_ids"].shape[1]

    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=max_tokens,
            temperature=temperature,
            top_p=0.95,
            do_sample=True
        )

    # Step 4: Decode only the generated answer
    generated_tokens = outputs[0][input_len:]
    answer = tokenizer.decode(generated_tokens, skip_special_tokens=True)

    return answer.strip()

# === Gradio UI ===
iface = gr.Interface(
    fn=generate_with_rag,
    inputs=[
        gr.Textbox(lines=4, label="Enter your medical question"),
        gr.Slider(0.1, 1.0, value=0.7, label="Temperature"),
        gr.Slider(64, 1024, step=64, value=256, label="Max Tokens"),
        gr.Slider(1, 5, step=1, value=3, label="Top K Documents")
    ],
    outputs=gr.Textbox(label="Model Response"),
    title="RAG-Powered BioMedical LLaMA 3 Assistant"
)

iface.launch(share=True, debug=True)
