# Symptom Analysis

This notebook provides comprehensive analysis of symptom data including:
1. **Symptom Mapping Quality** - How well Mayo Clinic symptoms map to vocabulary
2. **Duplicate Detection** - Find redundant symptoms across datasets
3. **Merge Impact Analysis** - What happens when we consolidate duplicates

---

In [1]:
import pandas as pd
import json
import sys
from pathlib import Path
from difflib import SequenceMatcher
import re

# Set project root
project_root = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
sys.path.insert(0, str(project_root))

from utils.symptom_normalizer import normalize_symptom, TYPO_MAP, SYNONYM_MAP, PLURAL_MAP

print(f"Project root: {project_root}")

Project root: c:\Users\henry\Desktop\Programming\Python\Multimodal_Diagnosis


## 1. Symptom Mapping Quality Analysis

Analyzes how well the `mayo_clinic_symptoms` in the template map to the standardized vocabulary.

In [2]:
# Load vocabulary and template
symptom_vocab_path = project_root / "data" / "symptom_vocabulary.json"
template_path = project_root / "data" / "rare_diseases_symptoms_template.json"

with open(symptom_vocab_path) as f:
    VOCABULARY = json.load(f)
VOCAB_SET = set(s.lower() for s in VOCABULARY)
VOCAB_LOOKUP = {s.lower(): s for s in VOCABULARY}

with open(template_path) as f:
    template = json.load(f)

print(f"Vocabulary size: {len(VOCABULARY)} symptoms")
print(f"Template diseases: {len(template)} diseases")

Vocabulary size: 454 symptoms
Template diseases: 135 diseases


In [3]:
def tokenize(text: str) -> set:
    """Convert text to lowercase tokens."""
    text = text.lower()
    text = re.sub(r'[^\w\s]', ' ', text)
    return set(text.split())

def similarity_score(a: str, b: str) -> float:
    """Calculate similarity between two strings."""
    tokens_a = tokenize(a)
    tokens_b = tokenize(b)
    
    if not tokens_a or not tokens_b:
        return 0.0
    
    overlap = len(tokens_a & tokens_b)
    total = len(tokens_a | tokens_b)
    jaccard = overlap / total if total > 0 else 0
    seq_ratio = SequenceMatcher(None, a.lower(), b.lower()).ratio()
    
    return 0.6 * jaccard + 0.4 * seq_ratio

def find_best_match(symptom: str, top_n: int = 3) -> list:
    """Find best matching symptoms from vocabulary."""
    symptom_lower = symptom.lower().strip()
    
    # Exact match
    if symptom_lower in VOCAB_SET:
        return [(VOCAB_LOOKUP[symptom_lower], 1.0)]
    
    # Fuzzy match
    scores = [(vocab_symptom, similarity_score(symptom, vocab_symptom)) 
              for vocab_symptom in VOCABULARY]
    scores.sort(key=lambda x: -x[1])
    return scores[:top_n]

In [4]:
# Collect all mayo_clinic_symptoms
all_mayo_symptoms = set()
for disease, info in template.items():
    mayo = info.get("mayo_clinic_symptoms", [])
    all_mayo_symptoms.update(mayo)

print(f"Total unique mayo_clinic_symptoms: {len(all_mayo_symptoms)}")

# Categorize by match quality
exact_matches = []
good_matches = []  # > 0.7 similarity
poor_matches = []  # 0.4 - 0.7
no_matches = []    # < 0.4

for symptom in sorted(all_mayo_symptoms):
    matches = find_best_match(symptom)
    best_match, best_score = matches[0]
    
    if best_score == 1.0:
        exact_matches.append(symptom)
    elif best_score >= 0.7:
        good_matches.append((symptom, best_match, best_score))
    elif best_score >= 0.4:
        poor_matches.append((symptom, best_match, best_score))
    else:
        no_matches.append((symptom, best_match, best_score))

print(f"\n--- MATCH QUALITY SUMMARY ---")
print(f"EXACT MATCHES (100%):     {len(exact_matches):4d} ({100*len(exact_matches)/len(all_mayo_symptoms):.1f}%)")
print(f"GOOD MATCHES (70-99%):    {len(good_matches):4d} ({100*len(good_matches)/len(all_mayo_symptoms):.1f}%)")
print(f"POOR MATCHES (40-69%):    {len(poor_matches):4d} ({100*len(poor_matches)/len(all_mayo_symptoms):.1f}%)")
print(f"NO MATCH (<40%):          {len(no_matches):4d} ({100*len(no_matches)/len(all_mayo_symptoms):.1f}%)")
print(f"\nUSABLE (exact + good):    {len(exact_matches) + len(good_matches):4d} ({100*(len(exact_matches) + len(good_matches))/len(all_mayo_symptoms):.1f}%)")

Total unique mayo_clinic_symptoms: 767

--- MATCH QUALITY SUMMARY ---
EXACT MATCHES (100%):      141 (18.4%)
GOOD MATCHES (70-99%):      12 (1.6%)
POOR MATCHES (40-69%):     306 (39.9%)
NO MATCH (<40%):           308 (40.2%)

USABLE (exact + good):     153 (19.9%)


In [5]:
# Show poor matches that need attention
print("--- POOR MATCHES (Need Manual Mapping) ---")
for s, m, score in sorted(poor_matches, key=lambda x: -x[2]):
    print(f"  [{int(score*100):3d}%] \"{s}\" -> \"{m}\"")

print(f"\n--- NO MATCH (Need to Add to Vocabulary) ---")
for s, m, score in sorted(no_matches, key=lambda x: -x[2]):
    print(f"  [{int(score*100):3d}%] \"{s}\" (closest: \"{m}\")")

--- POOR MATCHES (Need Manual Mapping) ---
  [ 69%] "loss of consiousness" -> "loss of consciousness"
  [ 68%] "nausea and vomitting" -> "nausea and vomiting"
  [ 68%] "loss of apetite" -> "loss of appetite"
  [ 68%] "ringing in ears" -> "ringing in ear"
  [ 68%] "progressive muscle weakness" -> "muscle weakness"
  [ 68%] "swelling of ankles and legs" -> "swelling of ankles"
  [ 67%] "sudden swelling of the scrotum" -> "swelling of scrotum"
  [ 66%] "no menstrual periods" -> "long menstrual periods"
  [ 65%] "unexplained weight gain" -> "weight gain"
  [ 65%] "unexplained weight loss" -> "weight loss"
  [ 65%] "swelling in ankles" -> "swelling of ankles"
  [ 65%] "changes in skin color" -> "skin color changes"
  [ 65%] "swelling of legs" -> "swelling of ankles"
  [ 64%] "swollen lymph glands" -> "swollen lymph nodes"
  [ 64%] "fast heart beat" -> "fast heart rate"
  [ 64%] "unintentional weight loss" -> "weight loss"
  [ 64%] "swelling of hands" -> "swelling of ankles"
  [ 64%] "lack o

---

## 2. Duplicate Detection Across Datasets

Finds symptoms that normalize to the same canonical form across different datasets.

In [6]:
NON_SYMPTOM_COLS = {'diseases', 'disease_category', 'symptoms', 'age', 'gender', 
                   'age_group', 'weight', 'height', 'bmi', 'occupation', 'sex'}

def analyze_csv_duplicates(filepath: Path) -> dict:
    """Analyze symptom columns in a CSV file for duplicates."""
    print(f"\n{'='*60}")
    print(f"Analyzing: {filepath.name}")
    print(f"{'='*60}")
    
    if not filepath.exists():
        print(f"  File not found")
        return {}
    
    # Load just column names
    df = pd.read_csv(filepath, nrows=0)
    cols = df.columns.tolist()
    
    print(f'Total columns: {len(cols)}')
    
    # Find symptom columns
    symptom_cols = [c for c in cols if c.lower() not in NON_SYMPTOM_COLS]
    print(f'Symptom columns: {len(symptom_cols)}')
    
    # Group by normalized form
    normalized_map = {}
    for col in symptom_cols:
        norm = normalize_symptom(col)
        if norm not in normalized_map:
            normalized_map[norm] = []
        normalized_map[norm].append(col)
    
    # Find groups with duplicates
    duplicates = {k: v for k, v in normalized_map.items() if len(v) > 1}
    
    if duplicates:
        print(f'\nDuplicate symptom groups: {len(duplicates)}')
        for norm, cols_list in list(duplicates.items())[:10]:
            print(f'  "{norm}": {cols_list}')
        if len(duplicates) > 10:
            print(f'  ... and {len(duplicates) - 10} more groups')
    else:
        print('\nNo duplicate symptoms found!')
    
    print(f'\nCould reduce from {len(symptom_cols)} to {len(normalized_map)} unique symptoms')
    return duplicates

In [7]:
# Analyze all relevant datasets
files = [
    project_root / 'data' / 'raw' / 'symptoms' / 'Disease and symptoms dataset.csv',
    project_root / 'data' / 'processed' / 'symptoms' / 'symptoms_to_disease_cleaned.csv',
    project_root / 'data' / 'processed' / 'symptoms' / 'symptoms_augmented_with_demographics.csv',
]

all_duplicates = {}
for f in files:
    if f.exists():
        dups = analyze_csv_duplicates(f)
        all_duplicates[f.name] = dups

print(f"\n{'='*60}")
print("OVERALL SUMMARY")
print(f"{'='*60}")
for name, dups in all_duplicates.items():
    print(f"  {name}: {len(dups)} duplicate groups")


Analyzing: Disease and symptoms dataset.csv


Total columns: 376
Symptom columns: 375

No duplicate symptoms found!

Could reduce from 375 to 375 unique symptoms

Analyzing: symptoms_to_disease_cleaned.csv
Total columns: 377
Symptom columns: 375

No duplicate symptoms found!

Could reduce from 375 to 375 unique symptoms

Analyzing: symptoms_augmented_with_demographics.csv
Total columns: 458
Symptom columns: 454

No duplicate symptoms found!

Could reduce from 454 to 454 unique symptoms

OVERALL SUMMARY
  Disease and symptoms dataset.csv: 0 duplicate groups
  symptoms_to_disease_cleaned.csv: 0 duplicate groups
  symptoms_augmented_with_demographics.csv: 0 duplicate groups


---

## 3. Merge Impact Analysis

Shows exactly what would change if we merge duplicate columns:
- How many rows have differences between duplicate columns
- Which diseases would be affected
- The overall data preservation rate

In [8]:
def analyze_merge_impact(filepath: Path):
    """Analyze what would change when merging duplicate columns."""
    print(f"\n{'='*70}")
    print(f"MERGE IMPACT ANALYSIS: {filepath.name}")
    print(f"{'='*70}")
    
    if not filepath.exists():
        print(f"  File not found")
        return
    
    # Load data
    print(f"Loading dataset...")
    df = pd.read_csv(filepath)
    print(f"Loaded {len(df):,} rows, {len(df.columns)} columns")
    
    # Find duplicates
    normalized_map = {}
    for col in df.columns:
        if col.lower() in NON_SYMPTOM_COLS:
            continue
        norm = normalize_symptom(col)
        if norm not in normalized_map:
            normalized_map[norm] = []
        normalized_map[norm].append(col)
    
    duplicates = {k: v for k, v in normalized_map.items() if len(v) > 1}
    
    if not duplicates:
        print("No duplicate columns found!")
        return
    
    print(f"\nFound {len(duplicates)} duplicate groups to analyze")
    print("-" * 70)
    
    total_gained = 0
    results = []
    
    for canonical, cols in duplicates.items():
        subset = df[cols].copy()
        
        all_zero = (subset.sum(axis=1) == 0).sum()
        all_one = (subset.min(axis=1) == 1).sum()
        mixed = len(df) - all_zero - all_one
        
        if mixed > 0:
            # Get affected diseases
            if 'diseases' in df.columns:
                mixed_mask = (subset.sum(axis=1) > 0) & (subset.min(axis=1) == 0)
                affected = df.loc[mixed_mask, 'diseases'].value_counts().head(3)
                affected_str = ', '.join([f"{d} ({c})" for d, c in affected.items()])
            else:
                affected_str = "N/A"
            
            results.append({
                'canonical': canonical,
                'columns': len(cols),
                'mixed_rows': mixed,
                'affected': affected_str
            })
            total_gained += mixed
    
    # Display as table
    if results:
        results_df = pd.DataFrame(results)
        results_df = results_df.sort_values('mixed_rows', ascending=False)
        display(results_df)
    
    print(f"\n{'='*70}")
    print(f"SUMMARY")
    print(f"{'='*70}")
    print(f"  Total rows in dataset: {len(df):,}")
    print(f"  Rows with conflicting values: {total_gained:,} ({100*total_gained/len(df):.2f}%)")
    print(f"\n  Merge strategy: MAX (if any column = 1, result = 1)")
    print(f"  Effect: We GAIN {total_gained:,} symptom signals (no information lost)")
    print(f"\n  This is SAFE - merging only adds information, never removes it.")

In [9]:
# Analyze the augmented dataset (most duplicates)
augmented_path = project_root / 'data' / 'processed' / 'symptoms' / 'symptoms_augmented_with_demographics.csv'
if augmented_path.exists():
    analyze_merge_impact(augmented_path)


MERGE IMPACT ANALYSIS: symptoms_augmented_with_demographics.csv
Loading dataset...
Loaded 223,944 rows, 458 columns
No duplicate columns found!


---

## 4. Normalizer Configuration

Shows the current normalization rules being applied.

In [10]:
print("=== TYPO CORRECTIONS ===")
for typo, correct in sorted(TYPO_MAP.items()):
    print(f"  '{typo}' -> '{correct}'")

print(f"\n=== PLURAL TO SINGULAR ===")
for plural, singular in sorted(PLURAL_MAP.items()):
    print(f"  '{plural}' -> '{singular}'")

print(f"\n=== SYNONYM MAPPINGS ===")
for synonym, canonical in sorted(SYNONYM_MAP.items()):
    print(f"  '{synonym}' -> '{canonical}'")

=== TYPO CORRECTIONS ===
  'apetite' -> 'appetite'
  'burpin' -> 'burping'
  'dizzy' -> 'dizziness'
  'fefver' -> 'fever'
  'itchness' -> 'itchiness'
  'lack of apetite' -> 'loss of appetite'
  'loss of apetite' -> 'loss of appetite'
  'loss of consiousness' -> 'loss of consciousness'
  'nausea and vomitting' -> 'nausea and vomiting'
  'neusea' -> 'nausea'
  'numbess' -> 'numbness'
  'paleness' -> 'pallor'
  'regurgitation.1' -> 'regurgitation'
  'ringing in ears' -> 'ringing in ear'
  'slowhealing' -> 'slow healing'
  'stiffeness' -> 'stiffness'
  'thirsty' -> 'thirst'
  'tireness' -> 'tiredness'
  'vomitting' -> 'vomiting'
  'weakeness' -> 'weakness'

=== PLURAL TO SINGULAR ===
  'bloody stools' -> 'bloody stool'
  'body aches' -> 'body ache'
  'chills' -> 'chills'
  'headaches' -> 'headache'
  'irregular heartbeats' -> 'irregular heartbeat'
  'muscle aches' -> 'muscle ache'
  'nosebleeds' -> 'nosebleed'
  'numbness in arms' -> 'numbness in arm'
  'numbness in legs' -> 'numbness in l

---

## Next Steps

To clean the datasets, run the utility scripts:

```bash
# Clean vocabulary files
python scripts/cleanup_symptom_vocabulary.py --apply

# Clean dataset columns
python scripts/cleanup_symptom_datasets.py --apply
```

In [11]:
# Load the cleaned dataset (replace path if needed)
cleaned_data_path = project_root / 'data' / 'processed' / 'symptoms' / 'symptoms_augmented_with_demographics.csv'

if cleaned_data_path.exists():
    df_final = pd.read_csv(cleaned_data_path)
    current_cols = set(c for c in df_final.columns if c not in NON_SYMPTOM_COLS)
    
    # Check for alignment
    unrecognized = current_cols - VOCAB_SET
    
    print(f"--- VOCABULARY ALIGNMENT ---")
    print(f"Total symptom columns: {len(current_cols)}")
    if unrecognized:
        print(f"❌ Found {len(unrecognized)} columns NOT in vocabulary:")
        for col in sorted(unrecognized):
            print(f"  - {col}")
    else:
        print("✅ Success: All columns perfectly match the standardized vocabulary.")

--- VOCABULARY ALIGNMENT ---
Total symptom columns: 454
✅ Success: All columns perfectly match the standardized vocabulary.


In [12]:
# Calculate "Symptom Density"
symptom_counts = df_final[list(current_cols)].sum(axis=1)
empty_rows = (symptom_counts == 0).sum()

print(f"--- DATA INTEGRITY ---")
print(f"Rows with zero symptoms: {empty_rows} ({100*empty_rows/len(df_final):.2f}%)")
print(f"Average symptoms per row: {symptom_counts.mean():.2f}")
print(f"Max symptoms in one row:  {symptom_counts.max()}")

if empty_rows > 0:
    print(f"⚠️ Warning: Found {empty_rows} rows with no symptoms. Consider removing these before training.")

--- DATA INTEGRITY ---
Rows with zero symptoms: 0 (0.00%)
Average symptoms per row: 5.33
Max symptoms in one row:  12


In [13]:
# Group by symptoms and check for multiple diseases
# We create a 'profile' by joining 0/1 values as a string
symptom_list = sorted(list(current_cols))
df_final['profile'] = df_final[symptom_list].astype(str).agg(''.join, axis=1)

# Find where different diseases share the same profile
collisions = df_final.groupby('profile')['diseases'].nunique()
colliding_profiles = collisions[collisions > 1]

print(f"--- REDUNDANCY & COLLISION CHECK ---")
if not colliding_profiles.empty:
    print(f"❌ Found {len(colliding_profiles)} symptom profiles shared by multiple diseases.")
    # Show an example of a collision
    sample_profile = colliding_profiles.index[0]
    diseases = df_final[df_final['profile'] == sample_profile]['diseases'].unique()
    print(f"Example Collision: {diseases} share the same symptoms.")
else:
    print("✅ Success: Every disease has a unique symptom fingerprint.")

# Cleanup temporary column
df_final.drop(columns=['profile'], inplace=True)

--- REDUNDANCY & COLLISION CHECK ---
❌ Found 11379 symptom profiles shared by multiple diseases.
Example Collision: ['parkinson disease' 'brain cancer' 'anemia' 'multiple myeloma'] share the same symptoms.


In [14]:
# Check which diseases ended up with very few active symptoms
disease_symptom_stats = df_final.groupby('diseases')[symptom_list].sum().gt(0).sum(axis=1)
weak_coverage = disease_symptom_stats[disease_symptom_stats < 4]

print(f"--- SYMPTOM COVERAGE PER DISEASE ---")
print(f"Total diseases: {len(disease_symptom_stats)}")
if not weak_coverage.empty:
    print(f"⚠️ {len(weak_coverage)} diseases have < 4 symptoms. They may be hard to classify:")
    for disease, count in weak_coverage.items():
        print(f"  - {disease}: {count} symptoms")
else:
    print("✅ All diseases have at least 4 distinct symptoms mapped.")

--- SYMPTOM COVERAGE PER DISEASE ---
Total diseases: 667
⚠️ 35 diseases have < 4 symptoms. They may be hard to classify:
  - birth trauma: 3 symptoms
  - carcinoid syndrome: 2 symptoms
  - cryptorchidism: 3 symptoms
  - diabetes: 2 symptoms
  - edward syndrome: 2 symptoms
  - esophageal varices: 2 symptoms
  - fetal alcohol syndrome: 3 symptoms
  - g6pd enzyme deficiency: 2 symptoms
  - granuloma inguinale: 2 symptoms
  - high blood pressure: 2 symptoms
  - hpv: 3 symptoms
  - huntington disease: 2 symptoms
  - hypergammaglobulinemia: 2 symptoms
  - hyperlipidemia: 3 symptoms
  - insulin overdose: 2 symptoms
  - intussusception: 3 symptoms
  - lichen planus: 3 symptoms
  - lymphogranuloma venereum: 3 symptoms
  - obesity: 3 symptoms
  - omphalitis: 3 symptoms
  - oral leukoplakia: 3 symptoms
  - otosclerosis: 3 symptoms
  - pelvic fistula: 3 symptoms
  - pemphigus: 3 symptoms
  - raynaud disease: 3 symptoms
  - reactive arthritis: 2 symptoms
  - spherocytosis: 3 symptoms
  - testicular