## Enhanced RAG Framework for Health Condition Queries:
- **Loads and caches knowledge** about conditions, variations, contraindications, and recommendations
- **Extracts and validates** mentioned conditions using similarity matching
- **Retrieves relevant information** from the knowledge graph
- **Generates comprehensive answers** by combining retrieved knowledge with LLM capabilities

In [1]:
import sys
from IPython.utils.capture import capture_output

with capture_output() as captured:
    %pip install --upgrade langchain-neo4j neo4j python-dotenv
    %pip install --upgrade transformers accelerate torch
    %pip install --upgrade langchain langchain-community
    %pip install --upgrade langchain-openai
    
    #Uncomment the following lines to see the installation proccess if needed:
#print(captured.stdout)
#print(captured.stderr)

In [2]:
import os
import sys
import json
import re
import time
import difflib
import logging
from typing import Dict, List, Tuple, Optional, Any
from dotenv import load_dotenv
from langchain_neo4j import Neo4jGraph
from langchain_openai import ChatOpenAI  # Add this for OpenRouter integration
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

In [3]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("health_rag")

# Load environment variables
load_dotenv()
NEO4J_URL = os.getenv("NEO4J_URL", "Insert your Neo4j URL here")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PWD = os.getenv("NEO4J_PWD", "Insert your Neo4j password here")



In [4]:
def safe_parse_json(text):
    # direct parsing
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        pass
    
    json_pattern = r'({[\s\S]*})'
    json_matches = re.findall(json_pattern, text)
    
    for potential_json in json_matches:
        try:
            return json.loads(potential_json)
        except json.JSONDecodeError:
            continue
    
    condition_match = re.search(r'"condition"\s*:\s*"([^"]+)"', text)
    variation_match = re.search(r'"variation"\s*:\s*"?([^",}]+)"?', text)
    
    if condition_match:
        result = {"condition": condition_match.group(1)}
        if variation_match and variation_match.group(1).lower() != "null":
            result["variation"] = variation_match.group(1)
        else:
            result["variation"] = None
        return result
    
    # Nothing worked, log it and return empty result
    logger.warning(f"Could not extract JSON from: {text[:200]}...")
    return {"condition": None, "variation": None}

In [5]:
class HealthKnowledgeBase:
   """Knowledge base for health conditions, variations, guidelines, and exercises."""
   
   # Threshold constants
   CONDITION_MATCH_THRESHOLD = 0.7
   VARIATION_MATCH_THRESHOLD = 0.5
   VARIATION_FILTER_THRESHOLD = 0.3
   EXERCISE_MATCH_THRESHOLD = 0.7
   
   def __init__(self, graph):
       self.graph = graph
       self.conditions = {}
       self.variations = {}
       self.guidelines = {}
       self.exercises = {}
       self.last_update = 0
       self.update_interval = 3600
       self.query_cache = {}
       self.cache_timeout = 300
       
   def load_knowledge(self, force=False):
       """Load or refresh the knowledge base."""
       current_time = time.time()
       if not force and self.last_update > 0 and (current_time - self.last_update) < self.update_interval:
           return
           
       logger.info("Loading health conditions, variations, guidelines, and exercises...")
       self.conditions = self._get_conditions()
       self.variations = self._get_variations(self.conditions)
       self.guidelines = self._get_guidelines(self.conditions)
       self.exercises = self._get_exercises()
       self.last_update = current_time
       
       total_conditions = len(self.conditions)
       total_variations = sum(len(vars) for vars in self.variations.values())
       total_guidelines = sum(len(guidelines) for condition, guidelines in self.guidelines.items())
       total_exercises = len(self.exercises)
       logger.info(f"Loaded {total_conditions} conditions with {total_variations} total variations, {total_guidelines} fitness guidelines, and {total_exercises} exercises")
   
   def query_with_cache(self, query, cache_key=None):
       """Execute a query with caching to reduce database load."""
       if cache_key is None:
           cache_key = query
           
       current_time = time.time()
       
       if cache_key in self.query_cache:
           cached_time, cached_result = self.query_cache[cache_key]
           
           if current_time - cached_time < self.cache_timeout:
               return cached_result
       
       result = self.graph.query(query)
       self.query_cache[cache_key] = (current_time, result)
       
       return result
   
   def _get_conditions(self):
       """Retrieve all health conditions from the database."""
       query = """
       MATCH (condition:Resource)-[:rdfs__subClassOf]->(hc:Resource)
       WHERE hc.uri = 'http://example.org/exercise#HealthConditionQuality'
       RETURN condition.uri AS uri, condition.rdfs__label[0] AS label
       """
       
       results = self.query_with_cache(query, "all_conditions")
       conditions = {}
       
       for result in results:
           uri = result.get('uri', '')
           label = result.get('label', '')
           
           if '#' in uri:
               name = uri.split('#')[1].split('Quality')[0]
               conditions[name] = {
                   'uri': uri,
                   'label': label,
                   'name': name,
                   'normalized_name': self._normalize_text(name)
               }
       
       return conditions
   
   def _get_variations(self, conditions):
       """Retrieve variations for all conditions."""
       variations = {}
       
       for condition_name, condition_data in conditions.items():
           condition_uri = condition_data['uri']
           
           query = f"""
           MATCH (variation:Resource)-[:rdfs__subClassOf]->(condition:Resource)
           WHERE condition.uri = '{condition_uri}'
           RETURN variation.uri AS uri, variation.rdfs__label[0] AS label
           """
           
           results = self.query_with_cache(query, f"variations_{condition_name}")
           condition_variations = []
           
           for result in results:
               variation_uri = result.get('uri', '')
               variation_label = result.get('label', '')
               
               variation_name = variation_label
               if ' - ' in variation_label:
                   variation_name = variation_label.split(' - ', 1)[1].strip()
               
               condition_variations.append({
                   'name': variation_name,
                   'uri': variation_uri,
                   'label': variation_label,
                   'normalized_name': self._normalize_text(variation_name)
               })
           
           variations[condition_name] = condition_variations
       
       return variations
   
   def _get_guidelines(self, conditions):
       """Retrieve fitness guidelines for all conditions."""
       guidelines = {}
       
       for condition_name in conditions.keys():
           recommendations = self._get_condition_recommendations(condition_name)
           if recommendations:
               guidelines[condition_name] = recommendations
       
       return guidelines
   
   def _get_condition_recommendations(self, condition_name):
       """Get fitness recommendations for a specific condition."""
       query = f"""
       MATCH (recommendation:Resource)
       WHERE recommendation.uri CONTAINS 'Recommendation_{condition_name}'
       RETURN recommendation.uri AS uri, recommendation.rdfs__label[0] AS label
       """
       
       results = self.query_with_cache(query, f"recommendations_{condition_name}")
       recommendations = []
       
       for result in results:
           rec_uri = result.get('uri', '')
           rec_label = result.get('label', '')
           
           if not rec_uri or not rec_label:
               continue
           
           attributes = self._get_recommendation_attributes(rec_uri)
           
           if attributes:
               recommendations.append({
                   'label': rec_label,
                   'uri': rec_uri,
                   'attributes': attributes
               })
       
       return recommendations
   
   def _get_recommendation_attributes(self, recommendation_uri):
       """Get attributes for a specific recommendation."""
       query = f"""
       MATCH (recommendation:Resource)-[r]->(info:Resource)
       WHERE recommendation.uri = '{recommendation_uri}'
       RETURN info.uri AS info_uri, info.rdfs__label[0] AS info_label
       """
       
       results = self.query_with_cache(query, f"recommendation_attrs_{recommendation_uri}")
       
       attr_results = [{'attr_label': result.get('info_label', '')} for result in results
                       if result.get('info_label', '') and "quantity" not in result.get('info_label', '').lower()]
       
       attribute_types = {
           'Frequency': None,
           'Intensity': None,
           'Duration': None,
           'Type': None,
           'RestPeriod': None,
           'HeartRateZone': None,
           'Progression': None,
           'Supervision': None,
           'Equipment': None
       }
       
       return self.process_attributes(attr_results, attribute_types)
   
   def process_attributes(self, attr_results, attribute_types=None):
       """Process attribute results in a standardized way."""
       if attribute_types is None:
           attribute_types = {
               'Intensity': None,
               'ImpactLevel': None,
               'BalanceRequirement': None,
               'JointInvolvement': [],
               'Equipment': []
           }
       
       attributes = attribute_types.copy()
       
       for attr in attr_results:
           attr_label = attr.get('attr_label', '')
           if not attr_label:
               continue
           
           attr_label_lower = attr_label.lower()
           
           if "intensity" in attr_label_lower:
               attributes['Intensity'] = attr_label.split(' intensity')[0]
           elif "frequency" in attr_label_lower:
               attributes['Frequency'] = attr_label.replace("Frequency:", "").strip()
           elif "duration" in attr_label_lower:
               attributes['Duration'] = attr_label.replace("Duration:", "").strip()
           elif "exercise type" in attr_label_lower:
               attributes['Type'] = attr_label.replace("Exercise type:", "").strip()
           elif "rest period" in attr_label_lower:
               attributes['RestPeriod'] = attr_label.replace("Rest period:", "").strip()
           elif "heartrateZone" in attr_label_lower:
               attributes['HeartRateZone'] = attr_label.replace("HeartRateZone:", "").strip()
           elif "progression" in attr_label_lower:
               attributes['Progression'] = attr_label.replace("Progression:", "").strip()
           elif "supervision" in attr_label_lower:
               attributes['Supervision'] = attr_label.replace("Supervision:", "").strip()
           elif "equipment" in attr_label_lower:
               equipment = attr_label.replace("Equipment:", "").strip()
               if not isinstance(attributes['Equipment'], list):
                   attributes['Equipment'] = []
               if equipment not in attributes['Equipment']:
                   attributes['Equipment'].append(equipment)
           elif "impactlevel" in attr_label_lower:
               attributes['ImpactLevel'] = attr_label.split(' impactlevel')[0]
           elif "balancerequirement" in attr_label_lower:
               attributes['BalanceRequirement'] = attr_label.split(' balancerequirement')[0]
           elif "involvement" in attr_label_lower:
               joint = attr_label.split(' involvement')[0]
               if not isinstance(attributes['JointInvolvement'], list):
                   attributes['JointInvolvement'] = []
               if joint not in attributes['JointInvolvement']:
                   attributes['JointInvolvement'].append(joint)
       
       return attributes
   
   def _get_exercises(self):
       """Retrieve all exercises and their attributes from the database."""
       query = """
       MATCH (exercise:Resource)
       WHERE exercise.uri ENDS WITH 'Exercise' 
       RETURN exercise.uri AS uri, exercise.rdfs__label[0] AS label
       """
       
       results = self.query_with_cache(query, "all_exercises")
       exercises = {}
       
       for result in results:
           uri = result.get('uri', '')
           label = result.get('label', '')
           
           if not uri or not label:
               continue
           
           if '#' in uri:
               name = uri.split('#')[1].replace('Exercise', '')
               
               exercise_type = self._get_exercise_type_by_relationship(uri)
               
               attributes = self._get_exercise_attributes(uri)
               
               exercises[name] = {
                   'uri': uri,
                   'label': label,
                   'type': exercise_type,
                   'name': name,
                   'normalized_name': self._normalize_text(name),
                   'attributes': attributes
               }
       
       exercises = {name: ex for name, ex in exercises.items() 
           if ex['attributes']['Intensity'] is not None 
           or ex['attributes']['ImpactLevel'] is not None
           or ex['attributes']['BalanceRequirement'] is not None
           or len(ex['attributes']['JointInvolvement']) > 0}
       
       return exercises
   
   def _get_exercise_type_by_relationship(self, exercise_uri):
       """Determine the exercise type using relationships instead of properties."""
       query = f"""
       MATCH (exercise:Resource)-[:rdfs__subClassOf]->(superclass:Resource)
       WHERE exercise.uri = '{exercise_uri}'
       RETURN superclass.uri AS uri, superclass.rdfs__label[0] AS label
       """
       
       results = self.query_with_cache(query, f"exercise_type_{exercise_uri}")
       if results and len(results) > 0:
           superclass_uri = results[0].get('uri', '')
           superclass_label = results[0].get('label', '')
           
           if superclass_uri and superclass_label:
               if "Exercise" in superclass_uri or "exercise" in superclass_uri.lower():
                   return superclass_label
       
       type_map = {
           "Strength": "Strength Exercise",
           "Aerobic": "Aerobic Exercise",
           "Anaerobic": "Anaerobic Exercise",
           "Stretching": "Stretching Exercise"
       }
       
       for key, value in type_map.items():
           if key in exercise_uri:
               return value
       
       for exercise_type in ["Strength Exercise", "Aerobic Exercise", "Anaerobic Exercise", "Stretching Exercise"]:
           query = f"""
           MATCH (exercise:Resource)-[r]->(:Resource)-[r2]->(type:Resource)
           WHERE exercise.uri = '{exercise_uri}' 
           AND type.rdfs__label[0] = '{exercise_type}'
           RETURN COUNT(*) > 0 AS has_relation
           """
           
           results = self.query_with_cache(query, f"exercise_type_relation_{exercise_uri}_{exercise_type}")
           if results and len(results) > 0 and results[0].get('has_relation', False):
               return exercise_type
       
       if exercise_uri.endswith('Exercise'):
           attributes = self._get_exercise_attributes(exercise_uri)
           
           if attributes['JointInvolvement'] and len(attributes['JointInvolvement']) >= 2:
               return "Strength Exercise"
           if attributes['Intensity'] == 'low':
               return "Stretching Exercise"
           
       return "Exercise"
   
   def _get_exercise_attributes(self, exercise_uri):
       """Get attributes for a specific exercise using the attribute sets."""
       exercise_name = exercise_uri.split('#')[1].replace('Exercise', '')
       
       query = f"""
       MATCH (attrSet:Resource)
       WHERE attrSet.uri CONTAINS 'AttributeSet_{exercise_name}'
       RETURN attrSet.uri AS uri, attrSet.rdfs__label[0] AS label
       """
       
       results = self.query_with_cache(query, f"attribute_set_{exercise_name}")
       if not results or len(results) == 0:
           return {
               'Intensity': None,
               'ImpactLevel': None,
               'BalanceRequirement': None,
               'JointInvolvement': [],
               'Equipment': []
           }
       
       attr_set_uri = results[0].get('uri')
       
       attr_query = f"""
       MATCH (attrSet:Resource)-[:ns2__refersTo]->(attr:Resource)
       WHERE attrSet.uri = '{attr_set_uri}'
       RETURN attr.uri AS attr_uri, attr.rdfs__label[0] AS attr_label
       """
       
       attr_results = self.query_with_cache(attr_query, f"attributes_{attr_set_uri}")
       
       formatted_attr_results = [{'attr_label': attr.get('attr_label', '')} for attr in attr_results if attr.get('attr_label', '')]
       
       attribute_types = {
           'Intensity': None,
           'ImpactLevel': None,
           'BalanceRequirement': None,
           'JointInvolvement': [],
           'Equipment': []
       }
       
       return self.process_attributes(formatted_attr_results, attribute_types)
   
   def _normalize_text(self, text):
       """Normalize text for better matching."""
       if not text:
           return ""
       normalized = text.lower()
       normalized = re.sub(r'[^\w\s]', ' ', normalized)
       normalized = re.sub(r'\s+', ' ', normalized).strip()
       return normalized
   
   def match_condition(self, condition_text):
       """Match a condition text against known conditions."""
       if not condition_text:
           return None, 0.0
           
       normalized_text = self._normalize_text(condition_text)
       
       for name, data in self.conditions.items():
           if normalized_text == data['normalized_name']:
               return name, 1.0
               
       best_match = None
       best_score = 0.0
       
       for name, data in self.conditions.items():
           score = self._calculate_similarity(normalized_text, data['normalized_name'])
           if score > best_score:
               best_score = score
               best_match = name
       
       if best_score >= self.CONDITION_MATCH_THRESHOLD:
           return best_match, best_score
       else:
           return None, best_score
   
   def match_variation(self, condition_name, variation_text):
       """Match a variation text against variations for a condition."""
       if not variation_text or condition_name not in self.variations:
           return None, 0.0
           
       normalized_text = self._normalize_text(variation_text)
       condition_variations = self.variations[condition_name]
       
       for variation in condition_variations:
           if normalized_text == variation['normalized_name']:
               return variation, 1.0
       
       best_match = None
       best_score = 0.0
       
       for variation in condition_variations:
           score = self._calculate_similarity(normalized_text, variation['normalized_name'])
           if score > best_score:
               best_score = score
               best_match = variation
       
       if best_score >= self.VARIATION_MATCH_THRESHOLD:
           return best_match, best_score
       else:
           return None, best_score
   
   def match_variation_to_condition(self, variation_text):
       """Match a variation text to potential conditions."""
       if not variation_text:
           return []
           
       normalized_text = self._normalize_text(variation_text)
       potential_matches = []
       
       for condition_name, variations in self.variations.items():
           for variation in variations:
               score = self._calculate_similarity(normalized_text, variation['normalized_name'])
               if score >= self.VARIATION_MATCH_THRESHOLD:
                   potential_matches.append({
                       'condition': condition_name,
                       'variation': variation,
                       'score': score
                   })
       
       potential_matches.sort(key=lambda x: x['score'], reverse=True)
       return potential_matches
   
   def get_variations_for_condition(self, condition_name, normalized_text=None):
       """Get variations for a condition with optional filtering by normalized text."""
       if condition_name not in self.variations:
           return []
           
       variations = self.variations[condition_name]
       
       if not normalized_text:
           return variations
       
       filtered_variations = []
       for variation in variations:
           score = self._calculate_similarity(normalized_text, variation['normalized_name'])
           if score >= self.VARIATION_FILTER_THRESHOLD:
               filtered_variations.append((variation, score))
       
       filtered_variations.sort(key=lambda x: x[1], reverse=True)
       
       return [v for v, _ in filtered_variations]
   
   def match_exercise(self, exercise_text):
       """Match an exercise text against known exercises."""
       if not exercise_text:
           return None, 0.0
           
       normalized_text = self._normalize_text(exercise_text)
       
       for name, data in self.exercises.items():
           if normalized_text == data['normalized_name']:
               return name, 1.0
               
       best_match = None
       best_score = 0.0
       
       for name, data in self.exercises.items():
           score = self._calculate_similarity(normalized_text, data['normalized_name'])
           if score > best_score:
               best_score = score
               best_match = name
       
       if best_score >= self.EXERCISE_MATCH_THRESHOLD:
           return best_match, best_score
       else:
           return None, best_score
   
   def _calculate_similarity(self, text1, text2):
       """Calculate similarity between two texts."""
       if not text1 or not text2:
           return 0.0
           
       seq_score = difflib.SequenceMatcher(None, text1, text2).ratio()
       
       words1 = set(text1.split())
       words2 = set(text2.split())
       
       if not words1 or not words2:
           word_score = 0.0
       else:
           intersection = words1.intersection(words2)
           word_score = len(intersection) / max(len(words1), len(words2))
       
       return 0.6 * seq_score + 0.4 * word_score
   
   def get_condition_info(self, condition_name):
       """Get details about a condition."""
       return self.conditions.get(condition_name)
   
   def get_condition_guidelines(self, condition_name):
       """Get fitness guidelines for a condition."""
       return self.guidelines.get(condition_name, [])
   
   def get_variation_count(self, condition_name):
       """Get number of variations for a condition."""
       if condition_name in self.variations:
           return len(self.variations[condition_name])
       return 0
   
   def get_all_condition_names(self):
       """Get a list of all condition names."""
       return list(self.conditions.keys())
   
   def get_all_exercise_names(self):
       """Get a list of all exercise names."""
       return list(self.exercises.keys())
   
   def get_exercise_types(self):
       """Get a list of all exercise types in the system."""
       types = set()
       for name, exercise in self.exercises.items():
           if exercise.get('type') and exercise.get('type') != "Unknown":
               types.add(exercise.get('type'))
       return sorted(list(types))
   
   def get_exercises_by_type(self, exercise_type=None):
       """Get exercises filtered by type."""
       if not exercise_type:
           return self.exercises
       
       filtered_exercises = {}
       for name, exercise in self.exercises.items():
           if exercise.get('type') and exercise_type.lower() in exercise.get('type').lower():
               filtered_exercises[name] = exercise
       
       return filtered_exercises
   
   def get_exercise_details(self, exercise_name):
       """Get detailed information about a specific exercise."""
       return self.exercises.get(exercise_name)
   
   def get_contraindications(self, condition, variation=None, limit=50):
       """Single source of truth for querying contraindications."""
       cache_key = f"contraindications_{condition}_{variation['name'] if variation and isinstance(variation, dict) else 'none'}"
       
       if variation and isinstance(variation, dict):
           query = f"""
           MATCH (avoidSet:Resource)-[:ns2__refersTo]->(attribute:Resource)
           WHERE avoidSet.uri CONTAINS 'AvoidSet_{condition}' 
           AND avoidSet.rdfs__label[0] CONTAINS '{variation['name']}'
           RETURN 
               avoidSet.rdfs__label[0] AS variation_label,
               attribute.rdfs__label[0] AS attribute_to_avoid
           """
       else:
           query = f"""
           MATCH (avoidSet:Resource)-[:ns2__refersTo]->(attribute:Resource)
           WHERE avoidSet.uri CONTAINS 'AvoidSet_{condition}'
           RETURN 
               avoidSet.rdfs__label[0] AS variation_label,
               attribute.rdfs__label[0] AS attribute_to_avoid
           LIMIT {limit}
           """
       
       try:
           records = self.query_with_cache(query, cache_key)
           
           if (not records or len(records) == 0) and variation and isinstance(variation, dict):
               fuzzy_query = f"""
               MATCH (avoidSet:Resource)-[:ns2__refersTo]->(attribute:Resource)
               WHERE avoidSet.uri CONTAINS 'AvoidSet_{condition}'
               AND avoidSet.rdfs__label[0] CONTAINS '{variation['name'].split()[0]}'
               RETURN 
                   avoidSet.rdfs__label[0] AS variation_label,
                   attribute.rdfs__label[0] AS attribute_to_avoid
               """
               
               fuzzy_cache_key = f"contraindications_fuzzy_{condition}_{variation['name'].split()[0]}"
               records = self.query_with_cache(fuzzy_query, fuzzy_cache_key)
               
               if records and len(records) > 0:
                   query = fuzzy_query
           
           if not records or len(records) == 0:
               condition_query = f"""
               MATCH (avoidSet:Resource)-[:ns2__refersTo]->(attribute:Resource)
               WHERE avoidSet.uri CONTAINS 'AvoidSet_{condition}'
               RETURN 
                   avoidSet.rdfs__label[0] AS variation_label,
                   attribute.rdfs__label[0] AS attribute_to_avoid
               LIMIT {limit}
               """
               
               condition_cache_key = f"contraindications_condition_{condition}"
               records = self.query_with_cache(condition_query, condition_cache_key)
               
               if records and len(records) > 0:
                   query = condition_query
           
           organized = {}
           for record in records:
               variation_label = record.get('variation_label', 'Unknown')
               attribute = record.get('attribute_to_avoid', 'Unknown')
               
               if variation_label not in organized:
                   organized[variation_label] = []
                   
               if attribute and attribute not in organized[variation_label]:
                   organized[variation_label].append(attribute)
           
           return organized, query, records
           
       except Exception as e:
           logger.error(f"Error querying contraindications: {str(e)}")
           return {}, f"Query failed due to error: {str(e)}", []
   
   def filter_exercises_by_contraindications(self, contraindications, available_equipment=None):
       """Filter exercises based on contraindications."""
       suitable_exercises = {}
       
       all_avoid_attrs = {
           'Intensity': [],
           'ImpactLevel': [],
           'BalanceRequirement': [],
           'JointInvolvement': []
       }
       
       for variation_label, attributes in contraindications.items():
           for attr in attributes:
               attr_lower = attr.lower()
               if "intensity" in attr_lower:
                   all_avoid_attrs['Intensity'].append(attr_lower)
               elif "impactlevel" in attr_lower:
                   all_avoid_attrs['ImpactLevel'].append(attr_lower)
               elif "balancerequirement" in attr_lower:
                   all_avoid_attrs['BalanceRequirement'].append(attr_lower)
               elif "involvement" in attr_lower:
                   joint = attr.split(' involvement')[0]
                   all_avoid_attrs['JointInvolvement'].append(joint)
       
       for name, exercise in self.exercises.items():
           is_suitable = True
           ex_attrs = exercise['attributes']
           
           if ex_attrs['Intensity'] and any(ex_attrs['Intensity'].lower() in avoid for avoid in all_avoid_attrs['Intensity']):
               is_suitable = False
               
           if ex_attrs['ImpactLevel'] and any(ex_attrs['ImpactLevel'].lower() in avoid for avoid in all_avoid_attrs['ImpactLevel']):
               is_suitable = False
               
           if ex_attrs['BalanceRequirement'] and any(ex_attrs['BalanceRequirement'].lower() in avoid for avoid in all_avoid_attrs['BalanceRequirement']):
               is_suitable = False
               
           for joint in ex_attrs['JointInvolvement']:
               if joint in all_avoid_attrs['JointInvolvement']:
                   is_suitable = False
                   break
           
           if available_equipment and ex_attrs['Equipment'] and len(ex_attrs['Equipment']) > 0:
               has_required_equipment = False
               for eq in ex_attrs['Equipment']:
                   if eq.lower() in [e.lower() for e in available_equipment] or eq.lower() == "none":
                       has_required_equipment = True
                       break
               if not has_required_equipment:
                   is_suitable = False
           
           if is_suitable:
               suitable_exercises[name] = exercise
       
       return suitable_exercises
   
   def get_exercise_plan_data(self, condition_name, variation=None, exercise_type=None):
       """Get organized data for exercise planning for a condition and variation."""
       contraindications, _, _ = self.get_contraindications(condition_name, variation)
       
       suitable_exercises = self.filter_exercises_by_contraindications(contraindications)
       
       if exercise_type:
           suitable_exercises = {name: ex for name, ex in suitable_exercises.items() 
                               if ex.get('type') and exercise_type.lower() in ex.get('type').lower()}
       
       guidelines = self.get_condition_guidelines(condition_name)
       
       exercise_by_type = {}
       for name, ex in suitable_exercises.items():
           ex_type = ex.get('type', 'Other')
           if ex_type not in exercise_by_type:
               exercise_by_type[ex_type] = []
           exercise_by_type[ex_type].append(ex)
       
       return {
           "condition": condition_name,
           "variation": variation.get('name') if variation and isinstance(variation, dict) else variation,
           "contraindications": contraindications,
           "exercises_by_type": exercise_by_type,
           "guidelines": guidelines,
           "exercise_count": len(suitable_exercises)
       }

In [6]:
class SessionContext:
    """Manages conversation state consistently across components."""
    
    def __init__(self):
        self.condition = None
        self.variations = []
        self.filtered_variations = []
        self.previous_response = None
        self.original_question = None
        self.debug_mode = False
    
    def store_variation_context(self, condition, variations, filtered=False):
        """Store condition and variation context."""
        self.condition = condition
        if filtered:
            self.filtered_variations = variations
        else:
            self.variations = variations
            self.filtered_variations = []
    
    def clear(self, keep_question=False):
        """Reset the context, optionally preserving original question."""
        original_question = self.original_question if keep_question else None
        debug_mode = self.debug_mode
        
        # Reset attributes
        self.condition = None
        self.variations = []
        self.filtered_variations = []
        self.previous_response = None
        self.original_question = None
        
        # Restore preserved attributes
        if keep_question:
            self.original_question = original_question
        self.debug_mode = debug_mode
    
    def has_variation_selection_context(self):
        """Check if there's a valid context for variation selection."""
        return (self.condition is not None and 
                (len(self.variations) > 0 or len(self.filtered_variations) > 0))
    
    def get_active_variations(self):
        """Get the currently active variations list (filtered or regular)."""
        return self.filtered_variations if self.filtered_variations else self.variations
    
    def is_variation_selection_prompt(self, response):
        """Check if the response is a variation selection prompt."""
        return "Which ones would you like information about?" in response
    
    def set_debug_mode(self, enabled=True):
        """Enable or disable debug mode."""
        self.debug_mode = enabled

In [7]:
# constants for LLM prompts
PARSE_PROMPT_TEMPLATE = """
Extract the health condition and specific variation (if any) from this question: "{question}"

Available health conditions in the database: {condition_list}

Known variations for conditions, the first part is the condition name, and the rest after ":" is the variation names
{variation_info}
IMPORTANT: If the user mentions "Osteopenia", this is a variation of Osteoporosis, not a standalone condition. Take that in consideration.
Respond in this JSON format ONLY, no other text:
{{"condition": "ConditionName", "variation": "Specific variation description or null if none"}}

Notes:
- Match the condition as closely as possible to one in the provided list
- Use the variations list to help identify which condition a specific variation belongs to
- For variation, include only the specific subtype, stage, or characteristic
- If no variation is mentioned, use null for variation
- Return ONLY the JSON, nothing else
"""



ANSWER_PROMPT_TEMPLATE_NoGuidelines = """
You are an experienced health & fitness advisor specializing in exercise adaptation for various health conditions. Use the information provided below to craft a helpful, personalized response.

IMPORTANT PRIORITIES:
1. Create a personalized fitness plan based on your knowledge of the condition and what should be avoided.

2. When responding to questions:
   - If they ask about exercises or workout plans, create appropriate recommendations
   - If they ask about what to avoid, focus on contraindications while explaining WHY these restrictions matter
   - When creating workout plans, ensure they are safe and appropriate

3. Be comprehensive rather than minimal:
   - Explain the reasoning behind recommendations when possible
   - Connect contraindications to specific exercises when appropriate
   - Feel free to elaborate on important points with examples

4. Create engaging, practical responses:
   - Use a friendly, conversational tone
   - Structure your response for readability (bullet points, headers, etc.)
   - Feel free to create detailed plans when they would be helpful
   - Provide specific intensity recommendations, repetitions, sets, or durations when relevant

5. Exercise-specific guidance:
   - When suggesting exercises, explain WHY they're appropriate for the condition
   - Feel free to organize exercises by type, difficulty, or body part when helpful
   - It's fine to be creative with workout structures as long as they avoid contraindications

6. IMPORTANT - ALWAYS cite your Knowledge Graph sources:
   - For each recommendation, explicitly mention which part of the provided data you're using
   - Use phrases like "Based on the contraindication data..." when relevant
   - When recommending specific exercises, cite them as "from our exercise database" or similar
   - Explain your reasoning process - how you selected particular exercises and why they're relevant
   - Be transparent about which parts of your response come directly from the knowledge graph versus general expertise

7. Include a brief reminder about professional guidance when appropriate.

QUESTION:
{question}

CONDITION: {condition}
VARIATION: {variation_info}

DATA EXCERPTS:
- Contraindications (what to avoid):
{contraindications_info}

- Safe Exercises for this condition:
{safe_exercises_info}
"""


ANSWER_PROMPT_TEMPLATE = """
You are an experienced health & fitness advisor specializing in exercise adaptation for various health conditions. Use the information provided below to craft a helpful, personalized response.

IMPORTANT PRIORITIES:
1. Pay SPECIAL ATTENTION to the Fitness Guidelines - these contain crucial recommendations specific to the condition and should heavily influence your response.

2. When responding to questions:
   - If they ask about exercises or workout plans, thoroughly incorporate the Fitness Guidelines in your recommendations
   - If they ask about what to avoid, focus on contraindications while explaining WHY these restrictions matter
   - When creating workout plans, ensure they follow ALL the principles in the Fitness Guidelines

3. Be comprehensive rather than minimal:
   - Include ALL relevant guidelines from the provided data
   - Explain the reasoning behind recommendations when possible
   - Connect guidelines to specific exercises when appropriate
   - Feel free to elaborate on important points with examples

4. Create engaging, practical responses:
   - Use a friendly, conversational tone
   - Structure your response for readability (bullet points, headers, etc.)
   - Feel free to create detailed plans when they would be helpful
   - Provide specific intensity recommendations, repetitions, sets, or durations when relevant

5. Exercise-specific guidance:
   - When suggesting exercises, explain WHY they're appropriate for the condition
   - Feel free to organize exercises by type, difficulty, or body part when helpful
   - It's fine to be creative with workout structures as long as they follow the guidelines

6. IMPORTANT - ALWAYS cite your Knowledge Graph sources:
   - For each recommendation, explicitly mention which part of the provided data you're using
   - Use phrases like "According to the Fitness Guidelines in our knowledge graph..." or "Based on the contraindication data..."
   - When recommending specific exercises, cite them as "from our exercise database" or similar
   - Explain your reasoning process - how you selected particular guidelines or exercises and why they're relevant
   - Be transparent about which parts of your response come directly from the knowledge graph versus general expertise

7. Include a brief reminder about professional guidance when appropriate.

QUESTION:
{question}

CONDITION: {condition}
VARIATION: {variation_info}

DATA EXCERPTS:
- Fitness Guidelines (ESSENTIAL information):
{guidelines_info}

- Contraindications (what to avoid):
{contraindications_info}

- Safe Exercises for this condition:
{safe_exercises_info}
"""


class QueryProcessor:
   """Processes user queries using LLM and knowledge base."""
   
   MAX_VARIATIONS_TO_DISPLAY = 10
   NO_CONDITION_ERROR = "I couldn't identify a specific health condition in your question. Please try asking about a specific condition like Parkinson's Disease, Stroke, Amputees, etc."
   
   def __init__(self, graph, knowledge_base, llm, context=None):
       self.graph = graph
       self.kb = knowledge_base
       self.llm = llm
       self.context = context if context else SessionContext()
       self.debug_mode = False
       
       self.parse_chain = self._create_parse_chain()
       self.answer_chain = self._create_answer_chain()
   
   def _create_parse_chain(self):
       """Create the LLM chain for parsing questions."""
       prompt_template = PromptTemplate(
           input_variables=["question", "condition_list", "variation_info"],
           template=PARSE_PROMPT_TEMPLATE
       )
       return LLMChain(llm=self.llm, prompt=prompt_template)
   
   def _create_answer_chain(self):
       """Create the LLM chain for generating concise, context-aware answers."""
       prompt_template = PromptTemplate(
           input_variables=[
               "contraindications_info",
               "guidelines_info",
               "question",
               "condition",
               "variation_info",
               "safe_exercises_info"
           ],
           template= ANSWER_PROMPT_TEMPLATE_NoGuidelines  # ANSWER_PROMPT_TEMPLATE_NoGuidelines;      ANSWER_PROMPT_TEMPLATE
       )
       return LLMChain(llm=self.llm, prompt=prompt_template)
   
   def set_debug_mode(self, enabled=True):
       """Enable or disable debug mode to show extracted information."""
       self.debug_mode = enabled
       if self.context:
           self.context.set_debug_mode(enabled)
       logger.info(f"Debug mode {'enabled' if enabled else 'disabled'}")
   
   def _prompt_for_variation_selection(self, condition_name, variations, subtype=None):
       """Generate a prompt for variation selection."""
       variations.sort(key=lambda v: v['name'])
       self.context.store_variation_context(condition_name, variations, filtered=bool(subtype))
       
       variation_list = [f"{i+1}. {v['name']}" for i, v in enumerate(variations[:self.MAX_VARIATIONS_TO_DISPLAY])]
       variation_prompt = "\n".join(variation_list)
       
       if subtype:
           response = f"I found {len(variations)} variations of {condition_name} related to '{subtype}'. " \
               f"Which ones would you like information about? You can select multiple by entering their numbers separated by commas (e.g., '1,3,5').\n\n{variation_prompt}"
       else:
           response = f"I found {len(variations)} variations for {condition_name}. " \
               f"Which ones would you like information about? You can select multiple by entering their numbers separated by commas (e.g., '1,3,5').\n\n{variation_prompt}"
       
       self.context.previous_response = response
       return response
   
   def parse_question(self, question):
       """Extract condition and variation from a question."""
       if re.match(r'^[\d,\s]+$', question.strip()):
           return None, None
           
       condition_list = self.kb.get_all_condition_names()
       
       variation_info = []
       for condition_name, variations in self.kb.variations.items():
           if variations:
               var_names = [v['name'] for v in variations]
               variation_info.append(f"* {condition_name}: {', '.join(var_names)}")
       
       variation_info_str = "\n".join(variation_info)
       
       try:
           result = self.parse_chain.invoke({
               "question": question,
               "condition_list": ", ".join(condition_list),
               "variation_info": variation_info_str
           })
           
           text_result = result.get('text', '').strip()
           
           if text_result.startswith('```json'):
               text_result = text_result[7:]
           if text_result.endswith('```'):
               text_result = text_result[:-3]
           
           text_result = text_result.strip()
           
           parsed = safe_parse_json(text_result)
           
           if parsed.get('condition'):
               parsed['condition'] = re.sub(r'\'s', '_s', parsed['condition'])
           
           condition = parsed.get('condition')
           variation = parsed.get('variation')
           
           if condition and condition not in self.kb.conditions:
               potential_matches = self.kb.match_variation_to_condition(condition)
               if potential_matches:
                   best_match = potential_matches[0]
                   return best_match['condition'], best_match['variation']['name']
           
           return condition, variation
               
       except Exception as e:
           logger.error(f"Error parsing question: {str(e)}")
           return None, None
   
   def validate_and_match(self, condition_text, variation_text=None, verbose=False):
       """Validate and match condition and variation against knowledge base."""
       matched_condition, condition_score = self.kb.match_condition(condition_text)
       
       if verbose:
           if matched_condition:
               logger.info(f"Matched condition: {matched_condition} (score: {condition_score:.4f})")
           else:
               logger.info(f"Could not match condition: {condition_text} (best score: {condition_score:.4f})")
       
       if not matched_condition:
           return None, None, condition_score, 0.0
       
       matched_variation = None
       variation_score = 0.0
       
       if variation_text:
           matched_variation, variation_score = self.kb.match_variation(matched_condition, variation_text)
           
           if verbose and matched_variation:
               logger.info(f"Matched variation: {matched_variation['name']} (score: {variation_score:.4f})")
           elif verbose:
               logger.info(f"Could not match variation: {variation_text} (best score: {variation_score:.4f})")
       
       return matched_condition, matched_variation, condition_score, variation_score
   
   def query_contraindications(self, condition, variation=None, limit=50):
       """Query contraindications using knowledge base's centralized method."""
       organized, query, records = self.kb.get_contraindications(condition, variation, limit)
       return records, query
   
   def organize_contraindications(self, records):
       """Organize raw contraindications - simplified as this logic moved to KB."""
       if not records:
           return {}
       
       if isinstance(records, dict):
           return records
           
       organized = {}
       for record in records:
           variation_label = record.get('variation_label', 'Unknown')
           attribute = record.get('attribute_to_avoid', 'Unknown')
           
           if variation_label not in organized:
               organized[variation_label] = []
               
           if attribute and attribute not in organized[variation_label]:
               organized[variation_label].append(attribute)
       
       return organized
   
   def format_safe_exercises(self, exercises):
       """Format safe exercises into a readable format with only name."""
       if not exercises:
           return "No specific safe exercises found."
       
       formatted = []
       for name, exercise in exercises.items():
           exercise_info = f"- {name}"
           formatted.append(exercise_info)
       
       return "\n".join(formatted)
   
   def get_guidelines(self, condition):
       """Get fitness guidelines for a condition."""
       return self.kb.get_condition_guidelines(condition)
   
   def generate_answer(self, question, condition, variation, contraindications_info, guidelines_info, safe_exercises=None):
       """Generate a comprehensive answer combining contraindications and guidelines."""
       if variation is None:
           variation_info = "None specified"
       elif isinstance(variation, dict) and "multiple" in variation:
           selected_variations = variation["multiple"]
           variation_names = [v['name'] for v in selected_variations]
           variation_info = f"Selected variations: {', '.join(variation_names)}"
       elif isinstance(variation, dict):
           variation_info = variation['name']
       else:
           variation_info = variation
   
       if self.debug_mode:
           debug_info = {
               "condition": condition,
               "variation": variation_info,
               "contraindications": contraindications_info,
               "guidelines": guidelines_info
           }
           debug_json = json.dumps(debug_info, indent=2)
           logger.info(f"Extracted information for query:\n{debug_json}")
       
       try:
           if safe_exercises and isinstance(safe_exercises, dict):
               safe_exercises_formatted = self.format_safe_exercises(safe_exercises)
           else:
               safe_exercises_formatted = "No specific safe exercises found."

           input_data = {
               "contraindications_info": json.dumps(contraindications_info, indent=2) if contraindications_info else "No specific contraindication information found.",
               "guidelines_info": json.dumps(guidelines_info, indent=2) if guidelines_info else "No specific fitness guidelines found.",
               "question": question,
               "condition": condition,
               "variation_info": variation_info,
               "safe_exercises_info": safe_exercises_formatted
           }
           
           if self.debug_mode:
               prompt = self.answer_chain.prompt.format(**input_data)
               print("\n" + "="*80)
               print("FULL PROMPT SENT TO LLM:".center(80))
               print("="*80)
               print(prompt)
               print("="*80 + "\n")
           
           result = self.answer_chain.invoke(input_data)
           
           answer_text = result.get('text', '')
           
           if self.debug_mode:
               debug_section = "\n\n" + "="*80 + "\nDEBUG INFORMATION\n" + "="*80 + "\n"
               debug_section += f"Original Question: {question}\n"
               debug_section += f"Condition: {condition}\n"
               debug_section += f"Variation: {variation_info}\n\n"
               debug_section += "CONTRAINDICATIONS:\n" + json.dumps(contraindications_info, indent=2) + "\n\n"
               debug_section += "GUIDELINES:\n" + json.dumps(guidelines_info, indent=2)
               
               answer_text += debug_section
           
           return answer_text
               
       except Exception as e:
           logger.error(f"Error generating answer: {str(e)}")
           return f"I encountered an error while generating your answer. Please try again or rephrase your question. Error: {str(e)}"
   
   def handle_selection_response(self, selection_text):
       """Handle a numeric selection response from the user."""
       if not self.context.has_variation_selection_context():
           return "I'm sorry, but I don't have any context for your selection. Please ask a question about a specific health condition first."
       
       try:
           selection_numbers = [int(num.strip()) for num in selection_text.split(',')]
           
           variations_list = self.context.get_active_variations()
           if not variations_list:
               return "I'm sorry, but I don't have any variations stored for selection."
           
           valid_selections = []
           for num in selection_numbers:
               if 1 <= num <= len(variations_list):
                   valid_selections.append(variations_list[num-1])
               else:
                   return f"Selection number {num} is out of range. Please select numbers between 1 and {len(variations_list)}."
           
           if not valid_selections:
               return "No valid selections were made. Please enter numbers corresponding to the variations you're interested in."
           
           condition = self.context.condition
           
           if len(valid_selections) == 1:
               matched_variation = valid_selections[0]
           else:
               matched_variation = {
                   "multiple": valid_selections,
                   "name": ", ".join(v['name'] for v in valid_selections)
               }
           
           contraindications, _, _ = self.kb.get_contraindications(condition, matched_variation)
           
           guidelines_info = self.get_guidelines(condition)
           
           safe_exercises = self.kb.filter_exercises_by_contraindications(contraindications)
           
           user_question = self.context.original_question if self.context.original_question else selection_text
           
           return self.generate_answer(
               user_question,
               condition,
               matched_variation,
               contraindications,
               guidelines_info,
               safe_exercises
           )
           
       except Exception as e:
           logger.error(f"Error handling selection: {str(e)}")
           return f"I encountered an error processing your selection. Please try again with valid numbers separated by commas (e.g., '1,3,5')."
   
   def process_question(self, question, verbose=False):
       """Process a question and generate an answer."""
       self.kb.load_knowledge()
       
       if re.match(r'^[\d,\s]+$', question.strip()):
           return self.handle_selection_response(question)
       
       for condition_name in self.kb.conditions.keys():
           subtype_pattern = re.compile(r'(\w+)\s+' + condition_name, re.IGNORECASE)
           match = subtype_pattern.search(question)
           
           if match:
               subtype = match.group(1)
               if verbose:
                   logger.info(f"Detected compound term: {subtype} {condition_name}")
               
               normalized_subtype = self.kb._normalize_text(subtype)
               filtered_vars = self.kb.get_variations_for_condition(condition_name, normalized_subtype)
               
               if filtered_vars:
                   self.context.original_question = question
                   return self._prompt_for_variation_selection(condition_name, filtered_vars, subtype)
       
       extracted_condition, extracted_variation = self.parse_question(question)
       
       if verbose:
           logger.info(f"Extracted condition: {extracted_condition}")
           logger.info(f"Extracted variation: {extracted_variation}")
       
       matched_condition = None
       matched_variation = None
       condition_score = 0.0
       variation_score = 0.0
       
       if extracted_condition:
           matched_condition, matched_variation, condition_score, variation_score = self.validate_and_match(
               extracted_condition, extracted_variation, verbose
           )
           
           if extracted_variation and matched_variation:
               normalized_query = self.kb._normalize_text(extracted_variation)
               filtered_vars = self.kb.get_variations_for_condition(matched_condition, normalized_query)
               
               if len(filtered_vars) > 1:
                   self.context.original_question = question
                   return self._prompt_for_variation_selection(matched_condition, filtered_vars, extracted_variation)
       
       elif extracted_variation:
           potential_matches = self.kb.match_variation_to_condition(extracted_variation)
           
           if verbose:
               logger.info(f"Found {len(potential_matches)} potential condition matches for variation '{extracted_variation}'")
               for idx, match in enumerate(potential_matches[:3]):
                   logger.info(f"  Match {idx+1}: {match['condition']} - Score: {match['score']:.4f}")
           
           if not potential_matches:
               return self.NO_CONDITION_ERROR
           
           if len(potential_matches) > 1:
               options = [match['condition'] for match in potential_matches[:3]]
               return f"The variation '{extracted_variation}' could apply to multiple conditions: {', '.join(options)}. Could you please specify which condition you're interested in?"
           else:
               best_match = potential_matches[0]
               matched_condition = best_match['condition']
               matched_variation = best_match['variation']
               condition_score = 1.0
               variation_score = best_match['score']
               
               if verbose:
                   logger.info(f"Selected best condition match: {matched_condition}")
                   
               normalized_query = self.kb._normalize_text(extracted_variation)
               filtered_vars = self.kb.get_variations_for_condition(matched_condition, normalized_query)
               
               if len(filtered_vars) > 1:
                   self.context.original_question = question
                   return self._prompt_for_variation_selection(matched_condition, filtered_vars, extracted_variation)
       else:
           return self.NO_CONDITION_ERROR
       
       if not matched_condition:
           return f"I couldn't find information about '{extracted_condition or extracted_variation}' in our database. Please try a different condition or rephrase your question."
       
       if not matched_variation and extracted_variation:
           normalized_query = self.kb._normalize_text(extracted_variation)
           filtered_vars = self.kb.get_variations_for_condition(matched_condition, normalized_query)
           
           if filtered_vars:
               self.context.original_question = question
               return self._prompt_for_variation_selection(matched_condition, filtered_vars, extracted_variation)
       
       if not matched_variation and not extracted_variation:
           variations = self.kb.variations.get(matched_condition, [])
           if len(variations) > 1:
               self.context.original_question = question
               return self._prompt_for_variation_selection(matched_condition, variations)
       
       if verbose:
           logger.info(f"Retrieving information for {matched_condition}")
       
       contraindications, _, _ = self.kb.get_contraindications(matched_condition, matched_variation)
       
       guidelines_info = self.get_guidelines(matched_condition)
       
       safe_exercises = self.kb.filter_exercises_by_contraindications(contraindications)

       if verbose:
           if contraindications:
               logger.info(f"Found {len(contraindications)} contraindication sets")
           if guidelines_info:
               logger.info(f"Found {len(guidelines_info)} guideline recommendations")
           if safe_exercises:
               logger.info(f"Found {len(safe_exercises)} safe exercises for {matched_condition}")
       
       return self.generate_answer(
           question,
           matched_condition,
           matched_variation,
           contraindications,
           guidelines_info,
           safe_exercises
       )

In [8]:
def initialize_framework():
    """Initialize all components of the framework."""
    try:
        # Connect to Neo4j
        graph = Neo4jGraph(
            url=NEO4J_URL,
            username=NEO4J_USER,
            password=NEO4J_PWD
        )
        
        # Create session context first
        context = SessionContext()
        
        # Use the API key directly
        openrouter_api_key = "Insert_Your_OpenRouter_API_Key_Here" 
        
        # Create LLM using OpenRouter
        llm = ChatOpenAI(
            model="google/gemini-2.0-flash-exp:free", #google/gemini-2.0-flash-exp:free; tngtech/deepseek-r1t-chimera:free
            temperature=0.4,
            openai_api_key=openrouter_api_key,
            openai_api_base="https://openrouter.ai/api/v1",
            max_tokens=4096
        )
        
        # Create and load knowledge base
        kb = HealthKnowledgeBase(graph)
        
        # Create query processor with shared context (same as before)
        processor = QueryProcessor(graph, kb, llm, context)
        
        # Load knowledge base after everything is set up
        kb.load_knowledge(force=True)
        
        return graph, kb, processor, context
    
    except Exception as e:
        logger.error(f"Failed to initialize framework: {str(e)}")
        return None, None, None, None

In [9]:
def print_welcome():
    print("\n" + "="*80)
    print("ENHANCED HEALTH CONDITION QUERY TOOL".center(80))
    print("="*80)
    print("\nAsk questions about health conditions:")
    print("  - What should I avoid for Parkinson's Disease?")
    print("  - What exercises are recommended for Osteoporosis?") 
    print("  - Tell me about exercise guidelines for Diabetes")
    print("  - What to avoid with resting tremor?")
    print("\nCommands:")
    print("  - Type 'exit', 'quit', or 'q' to exit")
    print("  - Type 'debug' to toggle display of extracted information")
    print("  - Type 'refresh' to refresh the knowledge base")
    print("="*80 + "\n")

In [None]:
def main():
    """Main interactive function."""
    print_welcome()
    
    # Initialize framework with the session context
    graph, kb, processor, context = initialize_framework()
    if not processor:
        print("Failed to initialize framework. Exiting.")
        sys.exit(1)
    
    # Set initial modes
    debug_mode = False
    processor.set_debug_mode(debug_mode)
    
    while True:
        try:
            print("\nEnter your question (or 'exit' to quit):")
            question = input("> ")
            
            if question.lower() in ['exit', 'quit', 'q']:
                print("Exiting. Goodbye!")
                break
            
            if question.lower() == 'debug':
                debug_mode = not debug_mode
                processor.set_debug_mode(debug_mode)
                print(f"Debug mode {'enabled' if debug_mode else 'disabled'}")
                print("Debug information will show what data was extracted for each query.")
                continue
            
            if question.lower() == 'refresh':
                kb.load_knowledge(force=True)
                print("Knowledge base refreshed.")
                continue
            
            if not question.strip():
                print("Please enter a valid question.")
                continue
            
            print("\nProcessing your question... this may take a moment.")
            
            if re.match(r'^[0-9,\s]+$', question) and context.has_variation_selection_context() and \
               context.is_variation_selection_prompt(context.previous_response):
                answer = processor.handle_selection_response(question)
                
                # Display answer
                print("\n" + "="*80)
                print("ANSWER:".center(80))
                print("="*80)
                print(answer)
                print("="*80)
                
                # Reset context after handling selection
                context.clear()
                continue
            
            context.original_question = question
            
            answer = processor.process_question(question)
            
            print("\n" + "="*80)
            print("ANSWER:".center(80))
            print("="*80)
            print(answer)
            print("="*80)
            
            if context.is_variation_selection_prompt(answer):

                condition_match = re.search(r"I found \d+ variations for (\w+)\.", answer)
                subtype_match = re.search(r"I found \d+ variations of (\w+) related to '(\w+)'", answer)
                
                if subtype_match:

                    condition_name = subtype_match.group(1)
                    subtype = subtype_match.group(2)
                    
                    normalized_subtype = kb._normalize_text(subtype)
                    filtered_vars = kb.get_variations_for_condition(condition_name, normalized_subtype)
                    
                    filtered_vars.sort(key=lambda v: v['name'])
                    
                    context.store_variation_context(condition_name, filtered_vars, filtered=True)
                    
                elif condition_match:
                    condition_name = condition_match.group(1)
                    
                    all_variations = kb.variations.get(condition_name, [])
                    all_variations.sort(key=lambda v: v['name'])
                    
                    context.store_variation_context(condition_name, all_variations)
                
                context.previous_response = answer
            else:
                # Reset context but keep original question if needed for debugging
                context.clear(keep_question=debug_mode)
                
        except KeyboardInterrupt:
            print("\nInterrupted. Exiting.")
            break
        except Exception as e:
            logger.error(f"An error occurred: {str(e)}")
            print(f"\nAn error occurred: {str(e)}")
            print("Please try again with a different question.")

In [11]:
if __name__ == "__main__":
    main()


                      ENHANCED HEALTH CONDITION QUERY TOOL                      

Ask questions about health conditions:
  - What should I avoid for Parkinson's Disease?
  - What exercises are recommended for Osteoporosis?
  - Tell me about exercise guidelines for Diabetes
  - What to avoid with resting tremor?

Commands:
  - Type 'exit', 'quit', or 'q' to exit
  - Type 'debug' to toggle display of extracted information
  - Type 'refresh' to refresh the knowledge base



  return LLMChain(llm=self.llm, prompt=prompt_template)
2025-05-28 23:02:18,389 - health_rag - INFO - Loading health conditions, variations, guidelines, and exercises...
2025-05-28 23:02:18,722 - health_rag - INFO - Loaded 21 conditions with 145 total variations, 63 fitness guidelines, and 77 exercises
2025-05-28 23:02:18,723 - health_rag - INFO - Debug mode disabled



Enter your question (or 'exit' to quit):
Exiting. Goodbye!
