In [5]:

# ---------- بخش 1: تنظیمات اولیه ----------
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import LabelEncoder
from lightgbm import LGBMClassifier
import mlflow
from mlflow.models.signature import infer_signature
import joblib
import warnings
warnings.filterwarnings('ignore')

# نصب وابستگی‌ها
!pip install pandas pyarrow scikit-learn mlflow lightgbm confluent-kafka pyngrok --quiet

# نصب confluent-kafka برای رفع خطا
try:
    from confluent_kafka import Consumer
except ImportError:
    !pip install confluent-kafka
    from confluent_kafka import Consumer

# برای Airflow (در محیط غیر-Colab استفاده شود)
try:
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime, timedelta
except ImportError:
    print("Airflow در Colab قابل اجرا نیست. DAG برای محیط Airflow تعریف شده است.")

# ---------- بخش 2: اتصال به Google Drive ----------
from google.colab import drive
drive.mount('/content/drive')

# ---------- بخش 3: تابع دریافت داده از Kafka ----------
def fetch_kafka_data():
    conf = {
    'bootstrap.servers': '0.tcp.ngrok.io:12345',  # آدرس ngrok
    'group.id': 'formation_damage_group',
    'auto.offset.reset': 'earliest'
}
    try:
        consumer = Consumer(conf)
        consumer.subscribe(['formation_damage_topic'])
        msg = consumer.poll(timeout=10.0)
        if msg is None:
            print("هیچ داده جدیدی از Kafka دریافت نشد.")
            return None
        data = pd.read_json(msg.value().decode('utf-8'))
        print("داده‌های جدید از Kafka دریافت شد.")
        return data
    except Exception as e:
        print(f"خطا در دریافت داده از Kafka: {str(e)}")
        return None
    finally:
        consumer.close()

# ---------- بخش 4: خواندن و پیش‌پردازش دیتاست ----------
def preprocess_data(file_path=None, kafka_data=None):
    columns_to_use = ['Formation', 'Fluid_Type', 'Completion_Type',
                      'Temperature_C', 'Pressure_psi', 'Permeability_mD',
                      'Porosity_pct', 'Damage_Type']

    # خواندن داده‌های اولیه یا جدید
    if file_path:
        df = pd.read_parquet(file_path, columns=columns_to_use)
        print("دیتاست اولیه با موفقیت بارگیری شد.")
        print(f"ابعاد اولیه دیتاست: {df.shape}")
    elif kafka_data is not None:
        df = kafka_data[columns_to_use]
        print("داده‌های جدید از Kafka بارگیری شد.")
        print(f"ابعاد داده جدید: {df.shape}")
    else:
        raise ValueError("هیچ منبعی برای داده مشخص نشده است.")

    # نمایش نمونه داده
    print("\nنمونه داده:")
    display(df.head(3))

    # پردازش مقادیر گمشده
    initial_count = len(df)
    df = df.dropna(subset=['Damage_Type', 'Permeability_mD', 'Porosity_pct'])
    print(f"\nتعداد رکوردهای حذف شده: {initial_count - len(df)}")
    print(f"ابعاد جدید دیتاست: {df.shape}")

    # کدگذاری ستون هدف
    label_encoder = LabelEncoder()
    df['Damage_Type'] = label_encoder.fit_transform(df['Damage_Type'])

    # انتخاب ستون‌های کیفی
    categorical_cols = ['Formation', 'Fluid_Type', 'Completion_Type']

    X = df.drop(columns=['Damage_Type'])
    y = df['Damage_Type']

    # نمونه‌گیری تصادفی برای داده‌های بزرگ
    if len(df) > 100000:
        sample_size = min(100000, len(df))
        X = X.sample(n=sample_size, random_state=42)
        y = y.loc[X.index]

    return X, y, label_encoder, categorical_cols

# ---------- بخش 5: آموزش و ارزیابی مدل ----------
def train_model(X, y, categorical_cols, previous_run_id=None):
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    print(f"\nتقسیم داده:")
    print(f"داده آموزش: {X_train.shape[0]} نمونه")
    print(f"داده آزمون: {X_test.shape[0]} نمونه")

    # تنظیم MLflow
    !mlflow server --host 0.0.0.0 --port 5000 --backend-store-uri /tmp/mlflow &>./mlflow.log &
    !sleep 3
    mlflow.set_tracking_uri("http://localhost:5000")
    mlflow.set_experiment("FormationDamageFast")

    # پارامترهای مدل
    model_params = {
        'n_estimators': 200,
        'max_depth': 7,
        'learning_rate': 0.1,
        'subsample': 0.8,
        'colsample_bytree': 0.8,
        'random_state': 42,
        'n_jobs': -1,
        'verbose': -1,
        'categorical_feature': categorical_cols  # مدیریت ستون‌های کتگوریکال
    }

    with mlflow.start_run():
        model = LGBMClassifier(**model_params)
        model.fit(X_train, y_train)

        # ارزیابی
        train_pred = model.predict(X_train)
        test_pred = model.predict(X_test)

        metrics = {
            'train_accuracy': accuracy_score(y_train, train_pred),
            'test_accuracy': accuracy_score(y_test, test_pred)
        }

        # مقایسه با مدل قبلی
        if previous_run_id:
            try:
                previous_model = mlflow.lightgbm.load_model(f"runs:/{previous_run_id}/model")
                previous_accuracy = accuracy_score(y_test, previous_model.predict(X_test))
                metrics['accuracy_improvement'] = metrics['test_accuracy'] - previous_accuracy
                print(f"بهبود دقت نسبت به مدل قبلی: {metrics['accuracy_improvement']:.4f}")
            except:
                print("مدل قبلی یافت نشد.")
                metrics['accuracy_improvement'] = 0.0

        # ثبت در MLflow
        signature = infer_signature(X_train, model.predict(X_train))
        mlflow.log_params(model_params)
        mlflow.log_metrics(metrics)
        mlflow.lightgbm.log_model(model, "model", signature=signature)

        print("\nنتایج نهایی:")
        print(f"دقت آموزش: {metrics['train_accuracy']:.4f}")
        print(f"دقت آزمون: {metrics['test_accuracy']:.4f}")
        print("\nگزارش کامل طبقه‌بندی:")
        print(classification_report(y_test, test_pred, target_names=label_encoder.classes_))

        return model, metrics, mlflow.active_run().info.run_id

# ---------- بخش 6: ذخیره‌سازی مدل و LabelEncoder ----------
def save_artifacts(model, label_encoder, metrics):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    model_path = f"fast_formation_damage_model_{timestamp}.pkl"
    encoder_path = f"label_encoder_{timestamp}.pkl"

    joblib.dump(model, model_path, compress=3)
    joblib.dump(label_encoder, encoder_path)
    !cp "{model_path}" "/content/drive/MyDrive/"
    !cp "{encoder_path}" "/content/drive/MyDrive/"

    print(f"\nمدل در مسیر زیر ذخیره شد: /content/drive/MyDrive/{model_path}")
    print(f"LabelEncoder در مسیر زیر ذخیره شد: /content/drive/MyDrive/{encoder_path}")

# ---------- بخش 7: تعریف DAG برای Airflow (برای محیط غیر-Colab) ----------
try:
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }

    with DAG(
        'formation_damage_retraining',
        default_args=default_args,
        description='Retraining Formation Damage Model every 7 days',
        schedule_interval=timedelta(days=7),
        start_date=datetime(2025, 5, 22),
        catchup=False,
    ) as dag:

        def airflow_fetch_data():
            kafka_data = fetch_kafka_data()
            if kafka_data is not None:
                X, y, label_encoder, categorical_cols = preprocess_data(kafka_data=kafka_data)
                return X, y, label_encoder, categorical_cols
            else:
                file_path = '/content/drive/MyDrive/formation_damage_optimized.parquet'
                return preprocess_data(file_path=file_path)

        def airflow_train_model(ti):
            X, y, label_encoder, categorical_cols = ti.xcom_pull(task_ids='fetch_data')
            previous_run_id = ti.xcom_pull(task_ids='train_model', key='run_id') if ti.xcom_pull(task_ids='train_model') else None
            model, metrics, run_id = train_model(X, y, categorical_cols, previous_run_id)
            ti.xcom_push(key='run_id', value=run_id)
            return model, label_encoder, metrics

        def airflow_save_artifacts(ti):
            model, label_encoder, metrics = ti.xcom_pull(task_ids='train_model')
            save_artifacts(model, label_encoder, metrics)

        task_fetch = PythonOperator(
            task_id='fetch_data',
            python_callable=airflow_fetch_data,
            provide_context=True
        )

        task_train = PythonOperator(
            task_id='train_model',
            python_callable=airflow_train_model,
            provide_context=True
        )

        task_save = PythonOperator(
            task_id='save_artifacts',
            python_callable=airflow_save_artifacts,
            provide_context=True
        )

        task_fetch >> task_train >> task_save
except:
    print("DAG فقط برای نمایش ساختار تعریف شده و در محیط Airflow قابل اجرا است.")

# ---------- بخش 8: اجرای اولیه برای تست در Colab ----------
if __name__ == "__main__":
    try:
        # تلاش برای دریافت داده از Kafka
        kafka_data = fetch_kafka_data()
        if kafka_data is None:
            print("دریافت داده از Kafka ناموفق بود. استفاده از فایل پاراکت...")
            file_path = '/content/drive/MyDrive/formation_damage_optimized.parquet'
            X, y, label_encoder, categorical_cols = preprocess_data(file_path=file_path)
        else:
            X, y, label_encoder, categorical_cols = preprocess_data(kafka_data=kafka_data)

        # آموزش مدل
        model, metrics, run_id = train_model(X, y, categorical_cols)

        # ذخیره‌سازی
        save_artifacts(model, label_encoder, metrics)

        # راه‌اندازی ngrok برای MLflow
        !pip install pyngrok --quiet
        from pyngrok import ngrok
        public_url = ngrok.connect(5000)
        print(f"MLflow URL: {public_url}")

        from IPython.display import IFrame
        IFrame(src="http://localhost:5000", width="100%", height=600)
    except Exception as e:
        print(f"خطا در اجرای اولیه: {str(e)}")

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.6/3.8 MB[0m [31m16.4 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━[0m [32m3.3/3.8 MB[0m [31m46.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m35.8 MB/s[0m eta [36m0:00:00[0m
[?25hAirflow در Colab قابل اجرا نیست. DAG برای محیط Airflow تعریف شده است.
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
DAG فقط برای نمایش ساختار تعریف شده و در محیط Airflow قابل اجرا است.
هیچ داده جدیدی از Kafka دریافت نشد.
دریافت داده از Kafka ناموفق بود. استفاده از فایل پاراکت...
دیتاست اولیه با موفقیت بارگیری شد.
ابعاد اولیه دیتاست: (10500000, 8)

نمونه داده:


Unnamed: 0,Formation,Fluid_Type,Completion_Type,Temperature_C,Pressure_psi,Permeability_mD,Porosity_pct,Damage_Type
0,Dolomite,Oil-Based,Liner,103.802612,2508.131348,382.382812,22.198469,Filtration Problem
1,Shale,Water-Based,Liner,328.736755,17978.121094,1586.557373,3.340905,Corrosion Cracking
2,Dolomite,Water-Based,Perforated,80.249832,5548.513184,425.555878,11.398964,Corrosion Cracking



تعداد رکوردهای حذف شده: 0
ابعاد جدید دیتاست: (10500000, 8)

تقسیم داده:
داده آموزش: 80000 نمونه
داده آزمون: 20000 نمونه

نتایج نهایی:
دقت آموزش: 0.4374
دقت آزمون: 0.1239

گزارش کامل طبقه‌بندی:
                        precision    recall  f1-score   support

           Clay & Iron       0.09      0.07      0.08      1917
      Completion Issue       0.10      0.12      0.11      1927
    Corrosion Cracking       0.21      0.21      0.21      2415
       Drilling Damage       0.10      0.10      0.10      1846
              Emulsion       0.11      0.11      0.11      1899
    Filtration Problem       0.11      0.12      0.12      1895
 Fluid Incompatibility       0.19      0.15      0.17      2521
            Fluid Loss       0.10      0.10      0.10      1885
Rock/Fluid Interaction       0.10      0.11      0.10      1815
     Ultra-Clean Fluid       0.11      0.12      0.12      1880

              accuracy                           0.12     20000
             macro avg       0.12   

ERROR:pyngrok.process.ngrok:t=2025-05-22T07:57:55+0000 lvl=eror msg="failed to reconnect session" obj=tunnels.session err="authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n"


خطا در اجرای اولیه: The ngrok process errored on start: authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n.
