In [28]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.feature_extraction.text import TfidfVectorizer
from lightgbm import LGBMClassifier
from tqdm import tqdm
import joblib
import os
from copy import deepcopy
from datetime import datetime
from scipy.sparse import hstack, csr_matrix
from lightgbm import LGBMClassifier, early_stopping
from sklearn.metrics import f1_score, precision_score, recall_score, hamming_loss, label_ranking_average_precision_score

In [29]:
# ------------------ Load Base Data ------------------
df = pd.read_csv("Data_Final.csv")
df.columns = df.columns.str.strip()
df = df.dropna(subset=["SampleName", "MasterTestCode"])

In [None]:
# ------------------ Feature Engineering: Final with Time & Count Weights + SampleName Text ------------------
# Ensure binary columns
for col in ["FeBase", "Destruct", "IsLarge"]:
    df[col] = df[col].fillna(0).astype(int)

# Load dependency info
strong_deps = pd.read_csv("strong_test_dependencies.csv")
strong_counts = strong_deps.groupby("Test1").size().reset_index(name="StrongDepCount")
df = df.merge(strong_counts, left_on="MasterTestCode", right_on="Test1", how="left")
df["StrongDepCount"] = df["StrongDepCount"].fillna(0)

# Mean physical attributes by MasterTestCode
avg_phys = df.groupby("MasterTestCode")[["FeBase", "Destruct", "IsLarge"]].mean().reset_index()
df = df.merge(avg_phys, on="MasterTestCode", suffixes=("", "_mean"))

# --- Time-based Features ---
df["MaxDate"] = pd.to_datetime(df["MaxDate"], errors="coerce")
now = pd.Timestamp.now()
df["TestAgeDays"] = (now - df["MaxDate"]).dt.days.clip(lower=1)
df["TimeWeight"] = 1 / df["TestAgeDays"]

# --- Count-based Features ---
df["TestImportance"] = df["TestCount"] * df["TimeWeight"]
df["LogTotalCount"] = np.log1p(df["TotalCount"])

# --- Weighted Feature Columns ---
df["WF_FeBase"] = df["FeBase"] * df["TestImportance"]
df["WF_Destruct"] = df["Destruct"] * df["TestImportance"]
df["WF_IsLarge"] = df["IsLarge"] * df["TestImportance"]
df["WF_StrongDep"] = df["StrongDepCount"] * df["TestImportance"]

# --- Group by SampleName ---
sample_features = df.groupby("SampleName").agg({
    "MasterTestCode": lambda x: " ".join(str(i) for i in x.dropna()),
    "WF_FeBase": "mean",
    "WF_Destruct": "mean",
    "WF_IsLarge": "mean",
    "WF_StrongDep": "mean",
    "FeBase_mean": "mean",
    "Destruct_mean": "mean",
    "IsLarge_mean": "mean",
    "LogTotalCount": "mean"
})

sample_features.rename(columns={
    "WF_FeBase": "FeBase_weighted",
    "WF_Destruct": "Destruct_weighted",
    "WF_IsLarge": "IsLarge_weighted",
    "WF_StrongDep": "StrongDepCount_weighted"
}, inplace=True)

# --- TF-IDF of SampleName (for similarity like پیچ m24 vs m42) ---
sample_features["SampleName"] = sample_features.index.astype(str)
sample_vectorizer = TfidfVectorizer(analyzer='word', ngram_range=(1, 2))
X_sample_name_text = sample_vectorizer.fit_transform(sample_features["SampleName"])

print("✅ فیچرهای نهایی با زمان، تکرار و متن SampleName ساخته شدند.")

In [None]:
# ------------------ Filter Rare Labels Before Training ------------------
label_counts = df.groupby("MasterTestCode")["SampleName"].nunique()
threshold = 20  # آستانه: کمتر از 20 نمونه
rare_labels = label_counts[label_counts < threshold].index

print(f"✅ تعداد برچسب‌های نادر شناسایی‌شده: {len(rare_labels)}")
print(f"✅ تعداد رکوردهای قبل از فیلترینگ: {len(df)}")

# فیلتر کردن داده‌ها
df_filtered = df[~df["MasterTestCode"].isin(rare_labels)].copy()

print(f"✅ تعداد رکوردهای باقیمانده پس از فیلترینگ: {len(df_filtered)}")
print(f"✅ درصد کاهش داده: {100 * (1 - len(df_filtered)/len(df)):.2f}%")

# جایگزین کردن دیتافریم اصلی با نسخه فیلترشده
df = df_filtered

In [None]:
# بررسی نمونه‌هایی که بعد از فیلترینگ بدون برچسب مانده‌اند
empty_samples = df.groupby("SampleName")["MasterTestCode"].apply(lambda x: len(set(x))==0)
empty_samples = empty_samples[empty_samples].index.tolist()
if empty_samples:
    print(f"⚠️ هشدار: {len(empty_samples)} نمونه بدون برچسب پس از فیلترینگ باقی مانده است.")
    # حذف نمونه‌های بدون برچسب
    df = df[~df["SampleName"].isin(empty_samples)]
    print(f"✅ نمونه‌های بدون برچسب حذف شدند. تعداد رکوردهای نهایی: {len(df)}")

In [None]:
# ------------------ Vectorize SampleName ------------------
# حذف نمونه‌های تکراری و تنظیم index روی SampleName
df_unique = df.drop_duplicates(subset="SampleName").set_index("SampleName")

# اطمینان از هم‌ترازی کامل بین df و sample_features روی SampleName
common_samples = df_unique.index.intersection(sample_features.index)

# فیلتر کردن هر دو جدول
df_grouped = df_unique.loc[common_samples]
sample_features = sample_features.loc[common_samples]

# ساخت ویژگی متنی از نام نمونه‌ها (SampleName) با TF-IDF
vectorizer = TfidfVectorizer(analyzer="word", ngram_range=(1, 2), max_features=100)
X_sample_name_text = vectorizer.fit_transform(df_grouped.index.astype(str))

print(f"✅ X_sample_name_text ساخته شد. شکل: {X_sample_name_text.shape}")

In [None]:
# ------------------ ویژگی‌های عددی نهایی ------------------
from scipy.sparse import csr_matrix

X_numeric = sample_features[[ 
    "FeBase_weighted", "Destruct_weighted", "IsLarge_weighted",
    "StrongDepCount_weighted",
    "FeBase_mean", "Destruct_mean", "IsLarge_mean",
    "LogTotalCount"
]].fillna(0).values

X_numeric_sparse = csr_matrix(X_numeric)

In [None]:
# ------------------ Prepare Labels ------------------
grouped = df.groupby("SampleName")["MasterTestCode"].apply(set).reset_index()
mlb = MultiLabelBinarizer()
y_all = mlb.fit_transform(grouped["MasterTestCode"])

In [None]:
# ------------------ ترکیب نهایی همه ویژگی‌ها ------------------
X_all = hstack([X_sample_name_text, X_numeric_sparse])
print(f"🔢 شکل نهایی فیچرها: {X_all.shape}")

In [None]:
# ------------------ تقسیم‌بندی ------------------
X_train, X_test, y_train, y_test = train_test_split(X_all, y_all, test_size=0.1, random_state=42)

In [None]:
# ------------------ حذف کلاس‌های نادر با positiveهای خیلی کم ------------------
MIN_POS_COUNT = 60

# ------------------ محاسبه تعداد positive برای هر کلاس ------------------
label_pos_counts = y_all.sum(axis=0)
all_classes = np.array(mlb.classes_)

# حذف کلاس‌هایی که تعداد مثبت‌هاشون خیلی کمه
MIN_POS_COUNT = 80
valid_mask = label_pos_counts >= MIN_POS_COUNT
print(f"✅ تعداد کلاس‌های معتبر پس از فیلتر: {valid_mask.sum()} از {y_all.shape[1]}")

# فیلتر کردن بر اساس mask
y_all_filtered = y_all[:, valid_mask]
mlb_filtered = deepcopy(mlb)
mlb_filtered.classes_ = all_classes[valid_mask]

# ------------------ تنظیم min_data_in_leaf ------------------
avg_positives_per_class = np.mean(label_pos_counts[valid_mask])
min_data_leaf_value = max(10, int(avg_positives_per_class * 0.05))
print(f"🔧 مقدار تنظیم‌شده min_data_in_leaf: {min_data_leaf_value}")

In [None]:
# ------------------ آموزش مدل‌ها ------------------
models = []
print("🚀 شروع آموزش مدل‌ها...")

for i in tqdm(range(y_all_filtered.shape[1]), desc="آموزش مدل‌ها", unit="کلاس"):
    X_train, X_val, y_train_i, y_val_i = train_test_split(
        X_all, y_all_filtered[:, i], test_size=0.2, random_state=42
    )

    model = LGBMClassifier(
        n_estimators=300,
        learning_rate=0.05,
        random_state=42,
        n_jobs=-1,
        is_unbalance=True,
        min_data_in_leaf=min_data_leaf_value
    )

    model.fit(
        X_train, y_train_i,
        eval_set=[(X_val, y_val_i)],
        eval_metric="binary_logloss",
        callbacks=[early_stopping(stopping_rounds=20)],
    )

    models.append(model)

# ------------------ ذخیره مدل‌ها و ابزارها ------------------
os.makedirs("model_output_new", exist_ok=True)
joblib.dump(models, "model_output_new/lightgbm_models.pkl")
joblib.dump(vectorizer, "model_output_new/vectorizer.pkl")
joblib.dump(mlb_filtered, "model_output_new/label_binarizer.pkl")

print("✅ آموزش مدل و ذخیره‌سازی با موفقیت انجام شد.")


In [None]:
# تقسیم داده برای کل برچسب‌ها (فقط یک‌بار)
X_train_all, X_test_all, y_train_all, y_test_all = train_test_split(
    X_all, y_all_filtered, test_size=0.2, random_state=42
)

# پیش‌بینی برای هر مدل روی X_test_all
y_pred = np.zeros_like(y_test_all)
y_scores = np.zeros_like(y_test_all, dtype=float)

for i, model in enumerate(models):
    y_pred[:, i] = model.predict(X_test_all)
    try:
        y_scores[:, i] = model.predict_proba(X_test_all)[:, 1]
    except:
        y_scores[:, i] = 0.0  # اگر predict_proba نده (مثلاً مدل خراب باشه)

# محاسبه شاخص‌ها
micro_f1 = f1_score(y_test_all, y_pred, average='micro')
micro_precision = precision_score(y_test_all, y_pred, average='micro')
micro_recall = recall_score(y_test_all, y_pred, average='micro')
lrap_score = label_ranking_average_precision_score(y_test_all, y_scores)
hamming = hamming_loss(y_test_all, y_pred)

# خروجی نهایی
results = {
    "Micro F1": micro_f1,
    "Micro Precision": micro_precision,
    "Micro Recall": micro_recall,
    "LRAP": lrap_score,
    "Hamming Loss": hamming
}

print("✅ نتایج ارزیابی مدل:")
for k, v in results.items():
    print(f"{k}: {v:.4f}")