In [10]:
#!pip uninstall frozendict

In [11]:
#pip install "frozendict>=2.3.4"

In [12]:
# ================================
# Install the correct frozendict version
# ================================
#!pip install --force-reinstall frozendict==1.2

# ================================
# Import libraries
# ================================
import pandas as pd
import spacy
import nltk

# Make sure to import Mapping correctly
from collections.abc import Mapping

# Import from experta
from experta import *

# ================================
# Example code to check everything works
# ================================

print("All imports were successful!")

# Example: Download NLTK data if needed
nltk.download('punkt')


All imports were successful!


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [None]:
import pandas as pd
import spacy
import nltk
from experta import KnowledgeEngine, Fact, Rule, DefFacts

# Load NLP model
nlp = spacy.load("en_core_web_sm")

# ---------- Load CSV ----------
df = pd.read_csv('Medical Diagnosis Expert System.csv')

# ---------- Combine symptoms and precautions ----------
disease_data = {}

for _, row in df.iterrows():
    disease = row['disease']
    symptoms = [s.strip() for s in row['symptoms'].split(',')]
    precautions = [p.strip() for p in str(row['precautions']).split(',') if p]

    if disease not in disease_data:
        disease_data[disease] = {'symptoms': set(), 'precautions': set()}
    disease_data[disease]['symptoms'].update(symptoms)
    disease_data[disease]['precautions'].update(precautions)

# Convert sets to sorted lists
for disease in disease_data:
    disease_data[disease]['symptoms'] = sorted(disease_data[disease]['symptoms'])
    disease_data[disease]['precautions'] = sorted(disease_data[disease]['precautions'])

# ---------- NLP preprocessing ----------
def preprocess_input(user_input):
    nltk.download('punkt', quiet=True)
    nltk.download('wordnet', quiet=True)
    nltk.download('averaged_perceptron_tagger', quiet=True)
    doc = nlp(user_input.lower().replace('_', ' '))
    return [token.lemma_ for token in doc if token.is_alpha]

# ---------- Experta Facts ----------
class Symptom(Fact):
    name = str

class Diagnosis(Fact):
    disease = str

# ---------- Dynamic Experta engine ----------
class DynamicDiagnosisEngine(KnowledgeEngine):
    def __init__(self, disease_data):
        super().__init__()
        self.disease_data = disease_data
        self.match_counts = {}

    def declare_symptoms(self, symptoms):
        for symptom in symptoms:
            self.declare(Symptom(name=symptom))

    @DefFacts()
    def initial_facts(self):
        yield Fact(started=True)

    def add_dynamic_rules(self):
        for disease, info in self.disease_data.items():
            symptoms = info['symptoms']

            def make_rule(disease=disease, symptoms=symptoms):
                @Rule(*(Symptom(name=s) for s in symptoms))
                def rule(self):
                    self.match_counts[disease] = len(symptoms)
                    self.declare(Diagnosis(disease=disease))
                return rule

            setattr(self.__class__, f'rule_{disease.replace(" ", "_")}', make_rule())

# ---------- Run Experta engine ----------
def run_experta_engine(processed_symptoms, disease_data):
    engine = DynamicDiagnosisEngine(disease_data)
    engine.add_dynamic_rules()
    engine.reset()
    engine.declare_symptoms(processed_symptoms)
    engine.run()
    if engine.match_counts:
        best_disease = max(engine.match_counts, key=engine.match_counts.get)
        return best_disease
    return None

# ---------- Simple symptom-matching approach ----------
def match_disease_by_symptoms(processed_symptoms, disease_data):
    match_scores = {}
    for disease, info in disease_data.items():
        match_count = len(set(processed_symptoms) & set(info['symptoms']))
        if match_count > 0:
            match_scores[disease] = match_count
    if match_scores:
        best_disease = max(match_scores, key=match_scores.get)
        return best_disease
    return None

def get_precautions(disease_name):
    return disease_data.get(disease_name, {}).get('precautions', [])

# ---------- Interactive Main Flow ----------
def main():
    print("\nMedical Diagnosis Expert System")
    print("--------------------------------")
    print("Describe your symptoms (e.g., 'I have fever and headache'):")

    while True:
        user_input = input("\n> ").strip()

        if user_input.lower() in ['exit', 'quit']:
            print("Goodbye!")
            break

        if not user_input:
            print("Please enter your symptoms.")
            continue

        symptoms = preprocess_input(user_input)

        print("\n[Diagnosis Results]")
        print("-------------------")

        # Try Experta engine first
        experta_disease = run_experta_engine(symptoms, disease_data)
        if experta_disease:
            precautions = get_precautions(experta_disease)
            print(f"Diagnosis: {experta_disease}")
            print("\nRecommended Precautions:")
            for i, precaution in enumerate(precautions, 1):
                print(f"{i}. {precaution}")
        else:
            # Fallback to simple matching
            simple_disease = match_disease_by_symptoms(symptoms, disease_data)
            if simple_disease:
                precautions = get_precautions(simple_disease)
                print(f"Diagnosis: {simple_disease}")
                print("\nRecommended Precautions:")
                for i, precaution in enumerate(precautions, 1):
                    print(f"{i}. {precaution}")
            else:
                print("No matching diagnosis found for your symptoms.")
                print("Please consult a healthcare professional.")

        print("\nEnter more symptoms or type 'exit' to quit.")

if __name__ == "__main__":
    main()


Medical Diagnosis Expert System
--------------------------------
Describe your symptoms (e.g., 'I have fever and headache'):

> I have a headache and dizziness

[Diagnosis Results]
-------------------
Diagnosis: Hypertension 

Recommended Precautions:
1. get proper sleep
2. meditation
3. reduce stress
4. salt baths

Enter more symptoms or type 'exit' to quit.

> gg

[Diagnosis Results]
-------------------
No matching diagnosis found for your symptoms.
Please consult a healthcare professional.

Enter more symptoms or type 'exit' to quit.

> jjj

[Diagnosis Results]
-------------------
No matching diagnosis found for your symptoms.
Please consult a healthcare professional.

Enter more symptoms or type 'exit' to quit.

> I

[Diagnosis Results]
-------------------
No matching diagnosis found for your symptoms.
Please consult a healthcare professional.

Enter more symptoms or type 'exit' to quit.

> head

[Diagnosis Results]
-------------------
No matching diagnosis found for your symptoms

In [14]:
import unittest


class TestMedicalDiagnosis(unittest.TestCase):
    def setUp(self):
        self.disease_data = {
            "Fungal infection": {
                "symptoms": ["itching", "skin rash", "peeling skin"],
                "precautions": ["keep dry", "use antifungal cream"]
            },
            "Common Cold": {
                "symptoms": ["cough", "fever", "sore throat"],
                "precautions": ["rest", "drink fluids"]
            }
        }

    def test_diagnosis_without_disease_name(self):
        print("\n=== Testing diagnosis without disease name ===")
        user_input = "I have itching, red skin rash, and peeling skin"

        print(f"\nInput: '{user_input}'")
        processed_symptoms = preprocess_input(user_input)
        print(f"Extracted symptoms: {processed_symptoms}")

        self.assertNotIn("fungal", processed_symptoms)
        self.assertNotIn("infection", processed_symptoms)

        diagnosis = match_disease_by_symptoms(processed_symptoms, self.disease_data)
        print(f"\nDiagnosis: {diagnosis}")
        self.assertEqual(diagnosis, "Fungal infection")

        precautions = get_precautions(diagnosis, self.disease_data)
        print(f"Precautions: {precautions}")
        self.assertEqual(precautions, ["keep dry", "use antifungal cream"])

    def test_no_matching_symptoms(self):
        print("\n=== Testing no matching symptoms ===")
        user_input = "I have a headache and dizziness"

        print(f"\nInput: '{user_input}'")
        processed_symptoms = preprocess_input(user_input)
        print(f"Extracted symptoms: {processed_symptoms}")

        diagnosis = match_disease_by_symptoms(processed_symptoms, self.disease_data)
        print(f"\nDiagnosis: {diagnosis}")
        self.assertIsNone(diagnosis)

if __name__ == "__main__":
    # Create a test suite
    suite = unittest.TestLoader().loadTestsFromTestCase(TestMedicalDiagnosis)

    # Run tests with verbose output
    unittest.TextTestRunner(verbosity=2).run(suite)

test_diagnosis_without_disease_name (__main__.TestMedicalDiagnosis.test_diagnosis_without_disease_name) ... ERROR
test_no_matching_symptoms (__main__.TestMedicalDiagnosis.test_no_matching_symptoms) ... ok

ERROR: test_diagnosis_without_disease_name (__main__.TestMedicalDiagnosis.test_diagnosis_without_disease_name)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-14-886c0e732e2b>", line 32, in test_diagnosis_without_disease_name
    precautions = get_precautions(diagnosis, self.disease_data)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: get_precautions() takes 1 positional argument but 2 were given

----------------------------------------------------------------------
Ran 2 tests in 0.027s

FAILED (errors=1)



=== Testing diagnosis without disease name ===

Input: 'I have itching, red skin rash, and peeling skin'
Extracted symptoms: ['I', 'have', 'itching', 'red', 'skin', 'rash', 'and', 'peel', 'skin']

Diagnosis: Fungal infection

=== Testing no matching symptoms ===

Input: 'I have a headache and dizziness'
Extracted symptoms: ['I', 'have', 'a', 'headache', 'and', 'dizziness']

Diagnosis: None
