<a href="https://colab.research.google.com/github/hogara-ichika/senior_thesis/blob/main/qwen%2Bbert.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install datasets
!pip install evaluate
!pip install git+https://github.com/huggingface/accelerate
!pip install fugashi
!pip install ipadic
!pip install unidic-lite
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import classification_report,confusion_matrix, ConfusionMatrixDisplay
from imblearn.over_sampling import BorderlineSMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline
from sklearn.metrics import roc_curve, auc, RocCurveDisplay

In [None]:
import torch
from transformers import pipeline
import time # 処理時間の計測用

# -------------------------------------------------------------------------
# 1. LLMパイプラインの初期化（関数外で一度だけ行う）
# -------------------------------------------------------------------------
def initialize_pipeline(model_name="Qwen/Qwen2.5-3B-Instruct"):
    """
    モデルを指定して、Hugging Faceのpipelineを初期化します。
    """
    #print(f"--- {model_name} の読み込みを開始します... ---")
    try:
        pipe = pipeline(
            "text-generation",
            model=model_name,
            torch_dtype="auto", # FP16/BF16を自動選択
            device_map="auto" # GPUを自動割り当て
        )
        #print("--- パイプラインの準備が完了しました。---")
        return pipe
    except Exception as e:
        #print(f"--- パイプラインの初期化に失敗: {e} ---")
        return None



In [None]:
# -------------------------------------------------------------------------
# 2. プロンプトを生成し、LLMを実行する関数
# -------------------------------------------------------------------------
def create_chat_messages(data_list):
    """
    診察データ（リスト）を受け取り、LLMに渡すための
    「チャット履歴（messages）のリスト」を1件分作成して返します。
    """

    # --- 2-1. プロンプトのテンプレートを定義 ---

    # システムプロンプト (役割、指示、ルール)
    system_prompt = """
# 役割
あなたは医師のアシスタントです。患者の問診データを、診察用の自然なサマリー文章に変換してください。

# 指示
以下の「入力データ」を「ルールの定義」に従って、自然な日本語の文章に要約してください。「例文」を参考にして、同じような形式で出力してください。

# ルールの定義
* 0: その動作はしない又は使用しない
* 1: その動作は問題ない
* 2: その動作はやや困難
* 3: その動作は非常に困難
"""

    # メッセージの骨組み（例文を含む）
    messages = [
        {"role": "system", "content": system_prompt},

        # --- 例文1 ---※ここを変えてください
        {"role": "user", "content": "### 入力データ\n* シャンプー: 1\n* ドライヤー: 2\n* つり革: 2\n* 携帯電話: 1\n* 書字: 1"},
        {"role": "assistant", "content": "シャンプーは使用可能なようですが、ドライヤーの使用とつり革の使用はやや困難なようです。\nしかし携帯電話の使用と書字については問題ないようです。"},

        # --- 例文2 ---※ここを変えてください
        {"role": "user", "content": "### 入力データ\n* シャンプー: 3\n* ドライヤー: 3\n* つり革: 3\n* 携帯電話: 2\n* 書字: 1"},
        {"role": "assistant", "content": "シャンプーの使用、ドライヤーの使用、つり革の使用は非常に困難なようです。\nまた携帯電話の使用はやや困難なようですが、書字に関しては問題ないようです。"},
    ]

    # --- 2-2. 受け取ったデータから本番用の入力文字列を生成 ---
    # data_dict = {"シャンプー": 2, "ドライヤー": 3, ...}
    # を
    # "* シャンプー: 2\n* ドライヤー: 3\n..." という文字列に変換

    item = ["シャンプー","ドライヤー","つり革","携帯電話","書字"]

    data_lines = []
    for i, value in enumerate(data_list):
        data_lines.append(f"* {item[i]}: {value}")

    # \n (改行) で結合
    user_input_text = "### 入力データ\n" + "\n".join(data_lines)

    # --- 2-3. メッセージリストに本番データを追加 ---
    messages.append({"role": "user", "content": user_input_text})

    return messages


In [None]:
def generate_prompt(llm_pipe, all_patient_data):
  all_messages_list = [create_chat_messages(data) for data in all_patient_data]
  try:
    results = llm_pipe(
        all_messages_list,
        batch_size=16,
        max_new_tokens=512,
        do_sample=True,
        temperature=0.7,
        top_p=0.95,
        eos_token_id=llm_pipe.tokenizer.eos_token_id
    )



  except Exception as e:
    print(f"--- バッチ処理中にエラーが発生しました: {e} ---")
  output = []
  for result in results:
    output.append(result[0]["generated_text"][-1]["content"].strip())

  return output


In [None]:
from collections import Counter
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/data/machine_learning_TOS.csv', encoding='ms932', sep=',', skiprows=0)
t = df['Diagnosis'].values
features = df.iloc[:, 5:10]
cvt1 = {'非常に困難': 3, 'やや困難': 2, '問題なし': 1, 'しない': 0}
feature = []

for i, f in features.iterrows():
    num_vector = [cvt1[f['シャンプー']], cvt1[f['ドライヤー']], cvt1[f['つり革']],
                  cvt1[f['携帯電話']], cvt1[f['書字']]]
    feature.append(num_vector)

X_train, X_test, y_train, y_test = train_test_split(feature, t, stratify=t, random_state=42)

X_train = np.array(X_train)
y_train = np.array(y_train)

print(f"オーバーサンプリング前のクラス分布: {Counter(y_train)}")
print(f"オーバーサンプリング前の X_train の形状: {X_train.shape}")

rus = RandomUnderSampler(random_state=42)
X_test, y_test = rus.fit_resample(X_test, y_test)

smote = BorderlineSMOTE(random_state=42)

# 2. fit_resample を使ってオーバーサンプリングを実行
#    X_train と Y_train を渡し、新しい X と Y を受け取ります
X_train, y_train = smote.fit_resample(X_train, y_train)


# --- 結果の確認 ---
print("\n--- オーバーサンプリング後 ---")
print(f"オーバーサンプリング後のクラス分布: {Counter(y_train)}")
print(f"オーバーサンプリング後の X_train の形状: {X_train.shape}")

In [None]:
#かなり時間かかるので注意
llm_pipe = initialize_pipeline()
X_test=generate_prompt(llm_pipe,X_test)
texts=generate_prompt(llm_pipe,X_train)
t=y_train

In [None]:
import torch
import numpy as np
from datasets import Dataset, DatasetDict
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# --- 2. データセットの準備 ---

# (1) データを「訓練用」と「検証用」に分割
# （データが少ないため、記事の「テスト用」は省略し、「検証用」で代用します）
# （データが多い場合は、train_test_splitを2回使い、訓練/検証/テストに分けてください）
SEED = 42
train_texts, val_texts, train_labels, val_labels = train_test_split(
    texts, t, test_size=0.2, random_state=SEED, stratify=t
)

# (2) Hugging FaceのDataset形式に変換
# Pythonの辞書(dict)としてデータを作成
train_data = {'text': train_texts, 'label': train_labels}
val_data = {'text': val_texts, 'label': val_labels}

# Datasetオブジェクトを作成
train_dataset_raw = Dataset.from_dict(train_data)
val_dataset_raw = Dataset.from_dict(val_data)

# (3) 訓練・検証データセットをまとめる
dataset = DatasetDict({
    'train': train_dataset_raw,
    'validation': val_dataset_raw
})

print("データセットの準備完了:")
print(dataset)

# --- 3. トークナイザの準備 ---
model_ckpt = "cl-tohoku/bert-base-japanese"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

# --- 4. トークン化関数の定義と実行 ---
def tokenize(batch):
    # padding=True: バッチ内の最大長に合わせてパディング
    # truncation=True: モデルの最大長を超える場合はカット
    return tokenizer(batch["text"], padding=True, truncation=True)

# .map() を使ってデータセット全体をトークン化
# （記事と異なり、labelは既に含まれているのでtokenizerの結果をそのまま返す）
dataset_encoded = dataset.map(tokenize, batched=True)

# Trainerで使うために、不要な'text'カラムを削除
dataset_encoded = dataset_encoded.remove_columns(["text"])

# PyTorchテンソル形式に設定
dataset_encoded.set_format("torch")

print("\nトークン化後のデータセット:")
print(dataset_encoded)

# 使いやすいように変数に代入
small_train_dataset = dataset_encoded['train']
small_valid_dataset = dataset_encoded['validation']

In [None]:
# --- 5. モデルの読み込み ---

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ★ クラス数を 2 に設定
num_labels = 2
batch_size = 4 # データが少ないのでバッチサイズを小さめに設定
model_name = "my-text-classification-bert" # モデル名（任意）

model = (AutoModelForSequenceClassification
         .from_pretrained(model_ckpt, num_labels=num_labels)
         .to(device))

# --- 6. 評価関数の定義 ---
# (記事と全く同じ)
def compute_metrics(pred):
    preds, labels = pred
    preds = preds.argmax(-1)

    # F1スコアのaverageを 'binary' に変更（2クラス分類の場合）
    f1 = f1_score(labels, preds, average="binary")
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc, "f1": f1}

In [None]:
from transformers import TrainingArguments, Trainer

# --- 7. 学習パラメータの設定 ---

# 1エポックあたりのステップ数を計算
# (訓練データ数 / バッチサイズ) の切り上げ
steps_per_epoch = (len(small_train_dataset) + batch_size - 1) // batch_size

batch_size = 4
model_name = "my-text-classification-bert"

training_args = TrainingArguments(
    output_dir=model_name,
    num_train_epochs=3,
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    weight_decay=0.01,
    eval_steps=steps_per_epoch,     # 1エポック終了ごとに評価
    logging_steps=steps_per_epoch,  # 1エポック終了ごとにログ記録
    disable_tqdm=False,
    log_level="error",
    report_to="none"
)

# --- 8. Trainerの初期化と学習開始 ---
# （ここは変更なし）
trainer = Trainer(
    model=model,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=small_train_dataset,
    eval_dataset=small_valid_dataset,
    tokenizer=tokenizer
)

print("\n--- 学習を開始します ---")
trainer.train()
print("--- 学習が完了しました ---")

In [None]:
# (1) pipelineを作成
# 'pipeline'が未定義というエラーを解消
pipe = pipeline("text-classification", model=model, tokenizer=tokenizer, device=model.device.index)

# --- 1. 'pipe' を使って X_test のテキストを予測 ---
# (pipe は前のステップで作成・import済みと仮定)
# 'pipe' が 'pipeline' is not defined エラーになる場合は、
# from transformers import pipeline
# をこのセルの先頭で実行してください。

print("推論を実行中...")
pred_results = pipe(X_test)
print("推論完了。")

# --- 2. 予測結果を 0 と 1 のリストに変換 ---
# pipeの出力は {'label': 'LABEL_1', 'score': 0.99} のような形式です。
# 'LABEL_1' を 1 に、'LABEL_0' を 0 に変換します。

def convert_label_to_int(label_str):
    """'LABEL_1' を 1 に、'LABEL_0' を 0 に変換"""
    return 1 if label_str == 'LABEL_1' else 0

y_preds = [convert_label_to_int(result['label']) for result in pred_results]

# --- 3. 混同行列の計算と描画 ---
# 正解ラベルと予測ラベルを比較
print(f"正解ラベル (y_test): {y_test}")
print(f"予測ラベル (y_preds): {y_preds}")

# 混同行列を計算
# normalize="true" にすると、割合(%)で表示されます
cm = confusion_matrix(y_test, y_preds)
cm_custom = np.array([[cm[1,1], cm[1,0]],
                      [cm[0,1], cm[0,0]]])
labels = ['Positive', 'Negative']

# 混同行列を描画
fig, ax = plt.subplots(figsize=(6, 6))
disp = ConfusionMatrixDisplay(
    confusion_matrix=cm_custom,
    display_labels=labels
)

# グラフのカスタマイズ (記事のコードに準拠)
disp.plot(cmap="Blues")
plt.title("Confusion Matrix")
plt.show()

In [None]:
from sklearn.metrics import classification_report

# 1. クラスの表示名を定義します (混同行列で使ったものと同じ)
# (0 = Negative, 1 = Positive)
target_names = ["Negative (0)", "Positive (1)"]

# 2. レポートを生成
# 必要なのは y_test (正解) と y_preds (予測) だけです
report = classification_report(
    y_test,
    y_preds,
    target_names=target_names
)

# 3. レポートを出力
print(report)