# Resnik Phenotype Similarity (Simplified)

このノートブックは、036_resnik_phenotype_similarity.ipynbをシンプルで可読性の高い実装にリファクタリングしたものです。
- クラスを使わず関数ベースで実装
- 煩雑な部分を簡素化
- アノテーション（Genotype, Sex, Life stage）を考慮したResnik similarity

In [None]:
# プロジェクトルートに移動
import os
from pathlib import Path

print(f"Current directory: {os.getcwd()}")

while not Path("LICENSE").exists():
    os.chdir("../")

print(f"Project root: {os.getcwd()}")

In [None]:
import json
import math
import re
import pandas as pd
import numpy as np
from pathlib import Path
from collections import defaultdict, Counter
import pickle

## 1. MPOオントロジーの読み込み

In [None]:
def parse_mpo_obo(obo_file_path: str) -> dict:
    """MPO OBOファイルをパースしてオントロジー構造を返す"""
    
    with open(obo_file_path, "r", encoding="utf-8") as f:
        content = f.read()
    
    # ターム情報を格納
    terms = {}  # MP_ID -> {name, is_a, ...}
    name_to_id = {}  # name -> MP_ID
    hierarchy = defaultdict(set)  # child_id -> {parent_ids}
    
    # Termブロックを分割
    term_blocks = content.split("[Term]")[1:]
    
    for block in term_blocks:
        lines = [line.strip() for line in block.strip().split("\n") if line.strip()]
        
        term_data = {"is_a": [], "is_obsolete": False}
        
        for line in lines:
            if ":" not in line:
                continue
            
            key, value = line.split(":", 1)
            key = key.strip()
            value = value.strip()
            
            if key == "id":
                term_data["id"] = value
            elif key == "name":
                term_data["name"] = value
            elif key == "is_a":
                parent_id = value.split(" !")[0].strip()
                term_data["is_a"].append(parent_id)
            elif key == "is_obsolete":
                term_data["is_obsolete"] = value.lower() == "true"
        
        # 有効なタームのみ追加
        if "id" in term_data and "name" in term_data and not term_data["is_obsolete"]:
            term_id = term_data["id"]
            terms[term_id] = term_data
            name_to_id[term_data["name"]] = term_id
            
            # 階層関係を構築
            for parent_id in term_data["is_a"]:
                hierarchy[term_id].add(parent_id)
    
    return {
        "terms": terms,
        "name_to_id": name_to_id,
        "hierarchy": hierarchy
    }


# MPOオントロジーをパース
mpo_data = parse_mpo_obo("data/ontology/mp.obo")
print(f"Parsed {len(mpo_data['terms'])} MP terms")

## 2. 表現型頻度とIC値の計算

In [None]:
def calculate_phenotype_frequencies(symbol_mptermname_path: str) -> dict:
    """遺伝子→表現型マッピングから各表現型の出現頻度を計算"""
    
    with open(symbol_mptermname_path, "r") as f:
        symbol_mptermname = json.load(f)
    
    phenotype_counts = Counter()
    
    for gene, phenotypes in symbol_mptermname.items():
        if phenotypes:
            for phenotype in phenotypes:
                if phenotype.strip():
                    phenotype_counts[phenotype.strip()] += 1
    
    return dict(phenotype_counts)


def calculate_information_content(phenotype_frequencies: dict) -> dict:
    """各表現型のInformation Content (IC)を計算"""
    
    total_observations = sum(phenotype_frequencies.values())
    ic_scores = {}
    
    for phenotype, frequency in phenotype_frequencies.items():
        probability = frequency / total_observations
        ic_score = -math.log(probability) if probability > 0 else 0.0
        ic_scores[phenotype] = ic_score
    
    return ic_scores


# 頻度とIC値を計算
phenotype_frequencies = calculate_phenotype_frequencies("data/annotation/symbol_mptermname.json")
ic_scores = calculate_information_content(phenotype_frequencies)

print(f"Found {len(phenotype_frequencies)} unique phenotypes")
print(f"Total phenotype observations: {sum(phenotype_frequencies.values())}")

# 頻出表現型TOP5を確認
top_phenotypes = sorted(phenotype_frequencies.items(), key=lambda x: x[1], reverse=True)[:5]
print("\nTop 5 most frequent phenotypes:")
for phenotype, count in top_phenotypes:
    print(f"  {count:4d}: {phenotype}")

## 3. 表現型の解析とアノテーション分離

In [None]:
def parse_phenotype_annotations(phenotype: str) -> tuple:
    """表現型文字列から基本項目とアノテーションを分離"""
    
    # パターン: "基本表現型 (アノテーション)"
    pattern = r"^(.+?)\s*\(([^)]+)\)$"
    match = re.match(pattern, phenotype.strip())
    
    if not match:
        return phenotype.strip(), {}
    
    base_term = match.group(1).strip()
    annotation_str = match.group(2).strip()
    
    # アノテーションを分類
    annotations = {}
    
    # 複数アノテーションはカンマで区切られる
    parts = [part.strip() for part in annotation_str.split(",")]
    
    for part in parts:
        if part in ["Homo", "Hetero", "Hemi"]:
            annotations["genotype"] = part
        elif part in ["Male", "Female"]:
            annotations["sex"] = part
        elif part in ["Early", "Late", "Embryo", "Interval"]:
            annotations["life_stage"] = part
    
    return base_term, annotations


# テスト
test_phenotypes = [
    "preweaning lethality, complete penetrance (Homo, Early)",
    "abnormal skin morphology (Homo, Early)",
    "hyperactivity (Homo, Early)"
]

print("表現型解析テスト:")
for phenotype in test_phenotypes:
    base, annotations = parse_phenotype_annotations(phenotype)
    print(f"  {phenotype}")
    print(f"    基本: '{base}'")
    print(f"    アノテーション: {annotations}")

## 4. オントロジー階層とLCA計算

In [None]:
def get_ancestors(term_id: str, hierarchy: dict) -> set:
    """指定されたtermの全祖先を取得"""
    
    ancestors = set()
    stack = [term_id]
    
    while stack:
        current = stack.pop()
        for parent in hierarchy.get(current, set()):
            if parent not in ancestors:
                ancestors.add(parent)
                stack.append(parent)
    
    return ancestors


def calculate_term_depth(term_id: str, hierarchy: dict, depth_cache: dict = None) -> int:
    """ルートからの深さを計算（キャッシュ付き）"""
    
    if depth_cache is None:
        depth_cache = {}
    
    if term_id in depth_cache:
        return depth_cache[term_id]
    
    if not hierarchy.get(term_id):
        depth_cache[term_id] = 0
        return 0
    
    max_depth = 0
    for parent in hierarchy[term_id]:
        parent_depth = calculate_term_depth(parent, hierarchy, depth_cache)
        max_depth = max(max_depth, parent_depth + 1)
    
    depth_cache[term_id] = max_depth
    return max_depth


def find_lowest_common_ancestor(term_a: str, term_b: str, mpo_data: dict) -> str | None:
    """2つのtermの最下位共通祖先（LCA）を見つける"""
    
    # 名前からIDに変換
    id_a = mpo_data["name_to_id"].get(term_a, term_a)
    id_b = mpo_data["name_to_id"].get(term_b, term_b)
    
    if id_a not in mpo_data["terms"] or id_b not in mpo_data["terms"]:
        return None
    
    # 両方の祖先を取得
    ancestors_a = get_ancestors(id_a, mpo_data["hierarchy"])
    ancestors_a.add(id_a)
    
    ancestors_b = get_ancestors(id_b, mpo_data["hierarchy"])
    ancestors_b.add(id_b)
    
    # 共通祖先
    common_ancestors = ancestors_a & ancestors_b
    
    if not common_ancestors:
        return None
    
    # 最も深い（具体的な）共通祖先を選択
    depth_cache = {}
    lca = max(common_ancestors, key=lambda x: calculate_term_depth(x, mpo_data["hierarchy"], depth_cache))
    
    return lca


# テスト
print("LCA計算テスト:")
test_pairs = [
    ("cellular phenotype", "abnormal cell morphology"),
    ("abnormal heart morphology", "abnormal blood vessel morphology")
]

for term_a, term_b in test_pairs:
    lca_id = find_lowest_common_ancestor(term_a, term_b, mpo_data)
    if lca_id:
        lca_name = mpo_data["terms"][lca_id]["name"]
        print(f"  '{term_a}' & '{term_b}' -> LCA: {lca_name}")
    else:
        print(f"  '{term_a}' & '{term_b}' -> LCA: None")

## 5. Resnik Similarity計算

In [None]:
def get_base_phenotype_ic(base_phenotype: str, ic_scores: dict) -> float:
    """基本表現型の最大IC値を取得"""
    
    max_ic = 0.0
    
    for phenotype, ic in ic_scores.items():
        # アノテーション部分を除いた基本表現型を抽出
        current_base, _ = parse_phenotype_annotations(phenotype)
        if current_base == base_phenotype:
            max_ic = max(max_ic, ic)
    
    return max_ic


def calculate_string_similarity(term_a: str, term_b: str) -> float:
    """文字列類似度による補完計算"""
    
    # 共通の"abnormal XXX morphology"パターンチェック
    pattern_morphology = r'^abnormal\s+\w+\s+morphology$'
    if (re.match(pattern_morphology, term_a) and 
        re.match(pattern_morphology, term_b)):
        return 1.0  # 共通パターンには中程度の類似度
    
    # 単語レベルのJaccard類似度
    words_a = set(term_a.lower().split())
    words_b = set(term_b.lower().split())
    
    if not words_a or not words_b:
        return 0.0
    
    common_words = words_a & words_b
    total_words = words_a | words_b
    
    jaccard_sim = len(common_words) / len(total_words)
    
    # 重要な単語にボーナス
    important_words = {'abnormal', 'morphology', 'increased', 'decreased'}
    important_common = common_words & important_words
    
    if important_common:
        bonus = len(important_common) * 0.3
        jaccard_sim += bonus
    
    return min(jaccard_sim, 3.0)  # 最大3.0に制限


def calculate_base_resnik_similarity(term_a: str, term_b: str, mpo_data: dict, ic_scores: dict) -> float:
    """基本表現型のResnik similarity計算"""
    
    # 同一項目の場合
    if term_a == term_b:
        return get_base_phenotype_ic(term_a, ic_scores)
    
    # MPOオントロジーでLCAを探す
    lca_id = find_lowest_common_ancestor(term_a, term_b, mpo_data)

    ontology_similarity = 0.0    
    if lca_id is not None:
        lca_name = mpo_data["terms"][lca_id]["name"]
        ontology_similarity = get_base_phenotype_ic(lca_name, ic_scores)
    
    # 文字列類似度による補完
    string_similarity = calculate_string_similarity(term_a, term_b)
    
    # より高い類似度を採用
    return max(ontology_similarity, string_similarity)


# テスト
print("基本Resnik similarity テスト:")
test_pairs = [
    ("preweaning lethality, complete penetrance", "abnormal heart morphology"),
    ("abnormal skin morphology", "abnormal kidney morphology"),
    ("abnormal spleen morphology", "abnormal kidney morphology"),
    ("abnormal heart morphology", "abnormal blood vessel morphology")

]

for term_a, term_b in test_pairs:
    similarity = calculate_base_resnik_similarity(term_a, term_b, mpo_data, ic_scores)
    print(f"  '{term_a}' & '{term_b}' -> {similarity:.4f}")

In [None]:
lca_id = find_lowest_common_ancestor(term_a, term_b, mpo_data)
print(lca_id)
ontology_similarity = 0.0    
if lca_id is not None:
    lca_name = mpo_data["terms"][lca_id]["name"]
    ontology_similarity = get_base_phenotype_ic(lca_name, ic_scores)
print(lca_name)
print(ontology_similarity)

In [None]:
ic_scores

## 6. アノテーション類似度計算

In [None]:
def get_annotation_partial_similarity(annotation_type: str, value_a: str, value_b: str) -> float:
    """アノテーション値間の部分的類似度"""
    
    if annotation_type == "genotype":
        genotype_similarity = {
            ("Hetero", "Homo"): 0.1,
            ("Hemi", "Homo"): 0.05,
            ("Hemi", "Hetero"): 0.05,
        }
        key = tuple(sorted([value_a, value_b]))
        return genotype_similarity.get(key, 0.0)
    
    elif annotation_type == "sex":
        return 0.0  # Male vs Female: 完全に異なる
    
    elif annotation_type == "life_stage":
        life_stage_similarity = {
            ("Early", "Interval"): 0.1,
            ("Early", "Late"): 0.02,
            ("Early", "Embryo"): 0.02,
            ("Interval", "Late"): 0.2,
            ("Interval", "Embryo"): 0.02,
            ("Late", "Embryo"): 0.02,
        }
        key = tuple(sorted([value_a, value_b]))
        return life_stage_similarity.get(key, 0.0)
    
    return 0.0


def calculate_annotation_similarity(annotations_a: dict, annotations_b: dict) -> float:
    """アノテーション間の類似度計算"""
    
    # 各アノテーションタイプの重み
    annotation_weights = {"genotype": 0.5, "sex": 0.3, "life_stage": 0.2}
    
    total_weight = 0
    similarity_sum = 0
    
    for annotation_type in ["genotype", "sex", "life_stage"]:
        if annotation_type in annotations_a or annotation_type in annotations_b:
            weight = annotation_weights[annotation_type]
            total_weight += weight
            
            if annotation_type in annotations_a and annotation_type in annotations_b:
                # 両方にアノテーション存在
                if annotations_a[annotation_type] == annotations_b[annotation_type]:
                    similarity_sum += weight * 1.0  # 完全一致
                else:
                    # 部分的類似度
                    partial_sim = get_annotation_partial_similarity(
                        annotation_type,
                        annotations_a[annotation_type],
                        annotations_b[annotation_type]
                    )
                    similarity_sum += weight * partial_sim
            else:
                # 片方のみにアノテーション存在 - 低いペナルティ
                similarity_sum += weight * 0.01
    
    if total_weight == 0:
        return 1.0  # 両方ともアノテーションなし
    
    return similarity_sum / total_weight


# テスト
print("アノテーション類似度テスト:")
test_cases = [
    ({"genotype": "Homo", "life_stage": "Early"}, {"genotype": "Homo", "life_stage": "Early"}),
    ({"genotype": "Homo", "life_stage": "Early"}, {"genotype": "Hetero", "life_stage": "Early"}),
    ({"genotype": "Homo", "sex": "Male"}, {"genotype": "Homo", "sex": "Female"})
]

for ann_a, ann_b in test_cases:
    similarity = calculate_annotation_similarity(ann_a, ann_b)
    print(f"  {ann_a} & {ann_b} -> {similarity:.4f}")

## 7. 統合Resnik Similarity

In [None]:
def calculate_resnik_similarity(phenotype_a: str, phenotype_b: str, mpo_data: dict, ic_scores: dict) -> float:
    """アノテーション考慮型Resnik similarity計算"""
    
    # 完全一致の場合
    if phenotype_a == phenotype_b:
        return ic_scores.get(phenotype_a, 0.0)
    
    # 表現型とアノテーションを分離
    base_a, annotations_a = parse_phenotype_annotations(phenotype_a)
    base_b, annotations_b = parse_phenotype_annotations(phenotype_b)
    
    # 基本表現型のResnik similarity
    base_similarity = calculate_base_resnik_similarity(base_a, base_b, mpo_data, ic_scores)
    
    # アノテーション類似度
    annotation_similarity = calculate_annotation_similarity(annotations_a, annotations_b)
    
    # 統合類似度計算（基本表現型を重視）
    if base_similarity < 0.01:
        # 基本表現型がほぼ異なる場合、アノテーションをほぼ無視
        combined_similarity = 0.999 * base_similarity + 0.001 * annotation_similarity
    elif base_similarity < 0.1:
        # 基本表現型が少し類似する場合
        combined_similarity = 0.99 * base_similarity + 0.01 * annotation_similarity
    else:
        # 通常の重み
        combined_similarity = 0.98 * base_similarity + 0.02 * annotation_similarity
    
    return combined_similarity


# テスト
print("統合Resnik similarity テスト:")
test_phenotype_pairs = [
    ("preweaning lethality, complete penetrance (Homo, Early)", "abnormal heart morphology (Homo, Early)"),
    ("abnormal skin morphology (Homo, Early)", "abnormal kidney morphology (Homo, Early)"),
    ("abnormal spleen morphology (Homo, Early)", "abnormal kidney morphology (Homo, Early)"),
    ("hyperactivity (Homo, Early)", "hyperactivity (Hetero, Early)")  # 同じ基本表現型、異なるgenotype
]

print("\n結果:")
for phenotype_a, phenotype_b in test_phenotype_pairs:
    similarity = calculate_resnik_similarity(phenotype_a, phenotype_b, mpo_data, ic_scores)
    
    # 詳細情報も表示
    base_a, ann_a = parse_phenotype_annotations(phenotype_a)
    base_b, ann_b = parse_phenotype_annotations(phenotype_b)
    base_sim = calculate_base_resnik_similarity(base_a, base_b, mpo_data, ic_scores)
    ann_sim = calculate_annotation_similarity(ann_a, ann_b)
    
    print(f"\n📍 類似度: {similarity:.4f}")
    print(f"   {phenotype_a}")
    print(f"   vs")
    print(f"   {phenotype_b}")
    print(f"   基本類似度: {base_sim:.4f}, アノテーション類似度: {ann_sim:.4f}")
    
    # 頻度情報
    freq_a = phenotype_frequencies.get(phenotype_a, 0)
    freq_b = phenotype_frequencies.get(phenotype_b, 0)
    print(f"   頻度: {freq_a} vs {freq_b}")

## 8. JaccardとResnikの比較

In [None]:
def jaccard_similarity(phenotype_a: str, phenotype_b: str) -> float:
    """従来のJaccard similarity（参考用）"""
    return 1.0 if phenotype_a == phenotype_b else 0.0


print("Jaccard vs Resnik Similarity 比較")
print("=" * 60)

comparison_pairs = [
    ("preweaning lethality, complete penetrance (Homo, Early)", "abnormal heart morphology (Homo, Early)"),
    ("abnormal skin morphology (Homo, Early)", "abnormal kidney morphology (Homo, Early)"),
    ("abnormal spleen morphology (Homo, Early)", "abnormal kidney morphology (Homo, Early)"),
    ("hyperactivity (Homo, Early)", "hyperactivity (Homo, Early)")  # 同一表現型
]

for phenotype_a, phenotype_b in comparison_pairs:
    jaccard_sim = jaccard_similarity(phenotype_a, phenotype_b)
    resnik_sim = calculate_resnik_similarity(phenotype_a, phenotype_b, mpo_data, ic_scores)
    
    print(f"\n📍 {phenotype_a}")
    print(f"   vs")
    print(f"   {phenotype_b}")
    print(f"   📊 Jaccard: {jaccard_sim:.4f}  |  Resnik: {resnik_sim:.4f}")
    
    # 改善効果を評価
    if jaccard_sim == 0.0 and resnik_sim < 0.1:
        print(f"   ✅ 改善成功: 非関連表現型の類似度が適切に低い")
    elif jaccard_sim == 0.0 and resnik_sim >= 0.1:
        print(f"   🔍 注目: 関連性のある表現型として検出")
    elif jaccard_sim == 1.0:
        print(f"   🎯 同一表現型: IC値を活用")

print(f"\n🎯 Resnik Similarity の利点:")
print(f"   1. 頻出表現型（preweaning lethality等）の偏りを軽減")
print(f"   2. オントロジー階層を考慮した意味的類似度")
print(f"   3. アノテーション情報を適切に考慮")
print(f"   4. 関連性のある表現型間で適切な類似度を提供")

## 9. 結果の保存

In [None]:
# 結果を保存
output_dir = Path("data/resnik_similarity_simplified")
output_dir.mkdir(exist_ok=True)

# 1. MPOデータ保存
with open(output_dir / "mpo_data.pkl", "wb") as f:
    pickle.dump(mpo_data, f)

# 2. IC scores保存
with open(output_dir / "ic_scores.json", "w") as f:
    json.dump(ic_scores, f, indent=2)

# 3. 頻度データ保存
with open(output_dir / "phenotype_frequencies.json", "w") as f:
    json.dump(phenotype_frequencies, f, indent=2)

# 4. 使いやすい関数群をまとめた辞書を保存
similarity_functions = {
    "calculate_resnik_similarity": calculate_resnik_similarity,
    "parse_phenotype_annotations": parse_phenotype_annotations,
    "calculate_base_resnik_similarity": calculate_base_resnik_similarity,
    "calculate_annotation_similarity": calculate_annotation_similarity
}

print(f"Simplified Resnik similarity data saved to {output_dir}/")
print(f"Files saved:")
for file_path in output_dir.glob("*"):
    print(f"  - {file_path.name}")

print(f"\n✅ 簡略化実装完了!")
print(f"主要関数:")
print(f"  - calculate_resnik_similarity(): メイン関数")
print(f"  - parse_phenotype_annotations(): 表現型解析")
print(f"  - calculate_base_resnik_similarity(): 基本類似度")
print(f"  - calculate_annotation_similarity(): アノテーション類似度")