このコード内のプロセス
テキストからmecabで形態素解析
解析後のtokenからentityのaliasを抽出
aliasー＞entityにマッピング
抽出したentityと病名とentityの解析結果を比較して病名候補を出力

In [1]:
import pandas as pd
import pyspark
from pyspark.sql.functions import col, countDistinct, when, broadcast, count, row_number, substring,udf,dense_rank,format_string,first,lit, to_date, transform,collect_set
from pyspark.sql.window import Window
import unicodedata
from pyspark.sql.types import StringType
import numpy as np
import pyspark.sql.functions as F
from typing import Optional, List, Tuple
from pyspark.sql import SparkSession


# Sparkセッションがまだ存在しない場合に作成
spark = SparkSession.builder \
    .appName("LoadParquetFile") \
    .config("spark.executor.memory", "32g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.sql.shuffle.partitions", "500") \
    .config("spark.sql.files.maxPartitionBytes", "32m") \
    .getOrCreate()


alias2entity_path = "/Users/takami.soshi/Documents/GitHub/KGLLM_v2/analytics/KG/cytoscape/alias2entity.csv"
dis_entity_data_path ="/Users/takami.soshi/Documents/GitHub/KGLLM_v2/analytics/project/shosin_disease_pred/test/top_aliases_by_main_name_raw_grouped.parquet"

# --- MeCab helpers（そのまま利用） ---
import MeCab
MECAB_DIC_PATH = "/opt/homebrew/lib/mecab/dic/mecab-ipadic-neologd"
_mecab_tagger = None  # lazy singleton

def _get_tagger() -> MeCab.Tagger:
    global _mecab_tagger
    if _mecab_tagger is None:
        _mecab_tagger = MeCab.Tagger(f"-Owakati -d {MECAB_DIC_PATH}")
    return _mecab_tagger

def mecab_tokenize(text: Optional[str]) -> List[str]:
    """Return wakati tokens for a single string without using Spark."""
    if not text:
        return []
    try:
        parsed = _get_tagger().parse(text)
        return parsed.strip().split() if parsed else []
    except Exception:
        return []


def extract_disease_candidates_from_text_weighted(
    text: str,
    spark: SparkSession,
    *,
    # 既存オブジェクトorパス（どちらかでOK）
    alias_df: Optional[pd.DataFrame] = None,
    alias2entity_path: Optional[str] = None,
    dis_entity_df=None,
    dis_entity_data_path: Optional[str] = None,
    # ハイパーパラメータ（disease_countを強め気味の初期値）
    rank_decay: float = 0.85,
    kg_boost: float = 1.15,
    beta: float = 2.0,     # prior寄与の線形強度
    k1: float = 60.0,      # priorのスケール
    gamma_prior: float = 1.5,  # prior寄与の非線形ブースト(>1で強化)
) -> Tuple:
    """
    テキストをMeCabで分かち→alias→diseaseに写像し、重み付きスコアで候補疾患を返す。

    Returns:
        (candidates_df, mapped_entities, tokens)

        candidates_df: Spark DataFrame
            columns: disease, final_score, match_score, prior, match_count,
                     matches, alias_total_cnt, disease_count
        mapped_entities: List[str]  # テキストから得たエンティティ（重複除去済）
        tokens: List[str]           # MeCab分かち書きトークン
    """
    # --- 入力ロード ---
    if alias_df is None:
        if not alias2entity_path:
            raise ValueError("alias2entity_path か alias_df のいずれかを指定してください。")
        alias_df = pd.read_csv(alias2entity_path)

    if dis_entity_df is None:
        if not dis_entity_data_path:
            raise ValueError("dis_entity_data_path か dis_entity_df のいずれかを指定してください。")
        dis_entity_df = spark.read.parquet(dis_entity_data_path)

    # --- tokenize → alias→entity マッピング ---
    tokens = mecab_tokenize(text)

    alias_to_entity = (
        alias_df.dropna(subset=["alias", "Name_mapped"])
                .drop_duplicates(subset=["alias"])
                .set_index("alias")["Name_mapped"]
                .to_dict()
    )

    mapped_entities = [alias_to_entity[t] for t in tokens if t in alias_to_entity]
    mapped_entities = list(dict.fromkeys(mapped_entities))  # 重複除去＆順序保持

    # マッチ対象が空なら空DF返す
    if not mapped_entities:
        schema = """
            disease string,
            final_score double,
            match_score double,
            prior double,
            match_count int,
            matches array<string>,
            alias_total_cnt long,
            disease_count long
        """
        empty_df = spark.createDataFrame([], schema=schema)
        return empty_df, mapped_entities, tokens

    mapped_lit = F.array(*[F.lit(x) for x in mapped_entities])

    # --- 一致抽出 ---
    df = (
        dis_entity_df
        .withColumn("alias_arr", F.expr("transform(aliases_ranked, x -> x.alias)"))
        .withColumn("matches", F.array_intersect(F.col("alias_arr"), mapped_lit))
        .withColumn("match_count", F.size("matches"))
        .filter(F.col("match_count") > 0)
    )

    # --- rank×KG 重み：UDFなしaggregateで match_score ---
    expr_match_score = f"""
    aggregate(
      aliases_ranked,
      0D,
      (acc, x) ->
        acc + IF(
                array_contains(matches, x.alias),
                pow({rank_decay}, x.rank - 1) * IF(x.KG_flag, {kg_boost}, 1.0),
                0D
              )
    )
    """
    df = df.withColumn("match_score", F.expr(expr_match_score))

    # --- disease_count prior と最終スコア（γで非線形強化） ---
    df = (
        df.withColumn(
            "prior",
            (F.col("disease_count") / (F.lit(k1) + F.col("disease_count"))).cast("double")
        )
        .withColumn(
            "final_score",
            F.col("match_score") * F.pow((1 + F.lit(beta) * F.col("prior")), F.lit(gamma_prior))
        )
    )

    candidates = (
        df.select(
            "disease","final_score","match_score","prior","match_count",
            "matches","alias_total_cnt","disease_count"
        )
        .orderBy(
            F.col("final_score").desc(),
            F.col("match_count").desc(),
            F.col("alias_total_cnt").desc_nulls_last()
        )
    )

    return candidates, mapped_entities, tokens


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/19 16:37:09 WARN Utils: Your hostname, Donuts-NM3044-2508.local, resolves to a loopback address: 127.0.0.1; using 10.10.35.188 instead (on interface en0)
25/11/19 16:37:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/19 16:37:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/19 16:37:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/19 16:37:10 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/11/19 16:37:10 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [None]:
# どちらか：パスで渡す


sample_text ="""
医療DX加算　2025･6算定分動脈硬化性疾患発症予測・脂質管理目標設定アプリ
（医師・医療従事者向け）
一次予防　高リスク
まず生活習慣の改善を行った後、薬物療法の適用を考慮する
管理目標値
LDL-C	＜	120	mg/dL
Non-HDL-C	＜	150	mg/dL
TG	＜	150	mg/dL(空腹時)
＜	175	mg/dL(随時)
HDL-C	≧	40	mg/dL

血糖、HbA1cを測定する

脂質系の検査を行う

検診の結果を持ってこられましたが、肥満があり、高血圧、高脂血症、糖尿病があります。まずは体重減少に努めていただくのが大事で、仕事は介護職という事で体を動かしているようですので、そうすると毎日100Kcal減量していただくというのが基本だろうと思います。それで1ヶ月ほど様子をみせていただき、今日のところは血糖とコレステロールも測らせていただきましょう。家庭血圧も測っていただき、血圧の結果を見て降圧剤の調整も行いたいと思います。1ヶ月に1回ずつ拝見したいと思います。

コレステロールの方は計算してみると、血糖値が高いためLDLコレステロール120未満という値が出ました。今日もう1回再検してみて、その結果をもってコレステロールの薬を考えたいと思います。家庭血圧測定をしていただき、今日は血糖値についての薬を出しましょう。
以上中安ひ記載

（以下幾田記載）
血圧測定指導、100Kcal減量指導（外食編）を行った。
（以上幾田記載）


"""
cands, ents, toks = extract_disease_candidates_from_text_weighted(
    text=sample_text,
    spark=spark,
    alias2entity_path=alias2entity_path,
    dis_entity_data_path=dis_entity_data_path,
    # 調整ノブ（必要なら）
    beta=2.0, k1=40.0, gamma_prior=2.0, rank_decay=0.85, kg_boost=1.15
)

cands.show(50,truncate=False)
print(ents)
print(toks)


+------------------------------+------------------+------------------+------------------+-----------+----------------------------------------+---------------+-------------+
|disease                       |final_score       |match_score       |prior             |match_count|matches                                 |alias_total_cnt|disease_count|
+------------------------------+------------------+------------------+------------------+-----------+----------------------------------------+---------------+-------------+
|内頚動脈狭窄症                |18.275578567925482|2.0983872435440425|0.9755799755799756|4          |[高血圧, 糖尿病, 動脈硬化症, 高脂質血症]|736            |1598         |
|頚動脈硬化症                  |17.598255888338755|2.0166666757256833|0.9770246984491672|3          |[高血圧, 動脈硬化症, 糖尿病]            |2081           |1701         |
|高脂血症                      |17.360666727835465|1.9338299384496538|0.9981112475210123|4          |[高脂質血症, 高血圧, 糖尿病, 検診]      |5510           |21138        |
|２型糖尿病             

In [11]:
soap_path ="/Users/takami.soshi/Documents/GitHub/disease_pred/preprocessed_soap"
cht_dis_path ="/Users/takami.soshi/Documents/GitHub/disease_pred/really_final_chart_patient_disease_with_icd.parquet"

soap_df = spark.read.parquet(soap_path)
cht_dis_df = spark.read.parquet(cht_dis_path).limit(10000)

In [15]:
soap_df = soap_df.join(cht_dis_df, on="chart_id", how="left_semi")
cht_dis_df = cht_dis_df.join(soap_df, on="chart_id", how="left_semi")

In [19]:
from tqdm import tqdm
# 1. 参照データを事前にロード（ループ内での再ロードを防ぐため）
# パスは変数が定義されている前提です
alias_df_pd = pd.read_csv(alias2entity_path)
dis_entity_df_spark = spark.read.parquet(dis_entity_data_path).cache() # 高速化のためキャッシュ
# 2. soap_dfをDriverに収集（データ量が大きい場合は注意）
soap_rows = soap_df.select("chart_id", "soap_text").collect()
results = []
# 3. 各行に対して関数を実行
for row in tqdm(soap_rows):
    cid = row.chart_id
    text = row.soap_text
    
    # テキストがない場合はスキップまたは空リスト
    if not text:
        continue
    # 関数を実行
    cands_df, _, _ = extract_disease_candidates_from_text_weighted(
        text=text,
        spark=spark,
        alias_df=alias_df_pd,
        dis_entity_df=dis_entity_df_spark,
        # パラメータはノートブックの例に準拠（必要に応じて調整してください）
        beta=2.0, k1=40.0, gamma_prior=2.0, rank_decay=0.85, kg_boost=1.15
    )
    
    # 上位50件を取得（関数内で既にfinal_score順にソートされています）
    top_50_rows = cands_df.select("disease", "final_score").limit(50).collect()
    
    # 結果をリスト形式に変換
    cands_list = [{"disease": r.disease, "final_score": r.final_score} for r in top_50_rows]
    
    results.append({
        "chart_id": cid,
        "candidates": cands_list
    })
# 確認
print(f"処理完了数: {len(results)}")
if results:
    print(results[0])

100%|██████████| 9981/9981 [19:53<00:00,  8.36it/s]                             

処理完了数: 9541
{'chart_id': 30884675, 'candidates': [{'disease': '鼻中隔弯曲症', 'final_score': 24.51811343559644}, {'disease': '嗅覚障害', 'final_score': 23.951823632522512}, {'disease': '通年性アレルギー性鼻炎', 'final_score': 22.181140563770562}, {'disease': '鼻閉', 'final_score': 21.68134496309376}, {'disease': '鼻副鼻腔腫瘍', 'final_score': 20.420821729107384}, {'disease': '慢性副鼻腔炎', 'final_score': 19.477450018607914}, {'disease': 'うっ血性鼻炎', 'final_score': 19.185372238607783}, {'disease': '慢性鼻炎', 'final_score': 18.781722567854125}, {'disease': '好酸球性副鼻腔炎', 'final_score': 18.66517348722773}, {'disease': '急性副鼻腔炎', 'final_score': 17.33466757444597}, {'disease': '耳管狭窄症', 'final_score': 17.318210692708327}, {'disease': '鼻炎', 'final_score': 17.048999494962825}, {'disease': '鼻汁', 'final_score': 16.852330659312344}, {'disease': '感染型気管支喘息', 'final_score': 16.52198765682412}, {'disease': '歯性上顎洞炎', 'final_score': 16.4951414767674}, {'disease': '喘息性気管支炎', 'final_score': 16.485849754359837}, {'disease': '咽頭扁桃炎', 'final_score': 




In [20]:
import json
# 保存先のパス
save_path = "disease_prediction_results.json"
# JSONとして保存
with open(save_path, 'w', encoding='utf-8') as f:
    json.dump(results, f, indent=4, ensure_ascii=False)
print(f"結果を保存しました: {save_path}")

結果を保存しました: disease_prediction_results.json


In [23]:
results

[{'chart_id': 30884675,
  'candidates': [{'disease': '鼻中隔弯曲症', 'final_score': 24.51811343559644},
   {'disease': '嗅覚障害', 'final_score': 23.951823632522512},
   {'disease': '通年性アレルギー性鼻炎', 'final_score': 22.181140563770562},
   {'disease': '鼻閉', 'final_score': 21.68134496309376},
   {'disease': '鼻副鼻腔腫瘍', 'final_score': 20.420821729107384},
   {'disease': '慢性副鼻腔炎', 'final_score': 19.477450018607914},
   {'disease': 'うっ血性鼻炎', 'final_score': 19.185372238607783},
   {'disease': '慢性鼻炎', 'final_score': 18.781722567854125},
   {'disease': '好酸球性副鼻腔炎', 'final_score': 18.66517348722773},
   {'disease': '急性副鼻腔炎', 'final_score': 17.33466757444597},
   {'disease': '耳管狭窄症', 'final_score': 17.318210692708327},
   {'disease': '鼻炎', 'final_score': 17.048999494962825},
   {'disease': '鼻汁', 'final_score': 16.852330659312344},
   {'disease': '感染型気管支喘息', 'final_score': 16.52198765682412},
   {'disease': '歯性上顎洞炎', 'final_score': 16.4951414767674},
   {'disease': '喘息性気管支炎', 'final_score': 16.485849754359837},


In [27]:
# 読み込み
with open(save_path, 'r', encoding='utf-8') as f:
    results = json.load(f)
print(f"読み込み完了: {len(results)} 件")

# 1. 正解データを辞書化 (chart_id -> [disease_names])
# cht_dis_df が Spark DataFrame の場合、collectして辞書にします
gt_rows = cht_dis_df.select("chart_id", "combined_disease_names").collect()
gt_map = {row.chart_id: row.combined_disease_names for row in gt_rows}
matched_ranks = []
unmatched_count_total = 0
total_ground_truth_diseases = 0

# --- デバッグ用コード ---
if results:
    sample_res_id = results[0]["chart_id"]
    print(f"results[0] chart_id: {sample_res_id} (Type: {type(sample_res_id)})")
else:
    print("results is empty")
print("\n--- cht_dis_df sample ---")
cht_dis_df.select("chart_id").show(5)
print(f"cht_dis_df schema: {cht_dis_df.schema['chart_id'].dataType}")
if gt_map:
    sample_gt_id = next(iter(gt_map))
    print(f"gt_map key sample: {sample_gt_id} (Type: {type(sample_gt_id)})")
    
# 3. 評価ループ (修正版)
for res in results:
    # chart_id を文字列に変換して検索キーにする
    cid = str(res["chart_id"])
    
    # 予測された病名リスト
    predicted_diseases = [item["disease"] for item in res["candidates"]]
    
    # 正解病名リストを取得 (キーは文字列)
    true_diseases = gt_map.get(cid, [])
    
    # 正解データがない場合はスキップ
    if not true_diseases:
        continue
        
    for true_disease in true_diseases:
        total_ground_truth_diseases += 1
        
        if true_disease in predicted_diseases:
            rank = predicted_diseases.index(true_disease) + 1
            matched_ranks.append(rank)
        else:
            unmatched_count_total += 1
# 4. 指標計算と表示
avg_rank = sum(matched_ranks) / len(matched_ranks) if matched_ranks else 0.0
print(f"=== 評価結果 ===")
print(f"マッチした病名の平均順位: {avg_rank:.2f}")
print(f"マッチしなかった正解病名の総数: {unmatched_count_total} / {total_ground_truth_diseases}")
if total_ground_truth_diseases > 0:
    coverage = (len(matched_ranks) / total_ground_truth_diseases * 100)
    print(f"カバー率 (Recall的な指標): {coverage:.2f}%")
else:
    print("カバー率 (Recall的な指標): N/A")

読み込み完了: 9541 件


                                                                                

results[0] chart_id: 30884675 (Type: <class 'int'>)

--- cht_dis_df sample ---


                                                                                

+--------+
|chart_id|
+--------+
|31676538|
|32532443|
|32660836|
|30900947|
|30893659|
+--------+
only showing top 5 rows
cht_dis_df schema: StringType()
gt_map key sample: 31676538 (Type: <class 'str'>)
=== 評価結果 ===
マッチした病名の平均順位: 12.63
マッチしなかった正解病名の総数: 21864 / 35485
カバー率 (Recall的な指標): 38.39%
