In [None]:
#  نصب کتابخانه rouge-score برای محاسبه معیارهای ROUGE
!pip install rouge-score --quiet

# اتصال گوگل درایو به محیط Google Colab برای دسترسی به فایل‌ها
from google.colab import drive
drive.mount('/content/drive')


  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for rouge-score (setup.py) ... [?25l[?25hdone
Mounted at /content/drive


In [None]:

# کتابخانه‌های مورد نیاز
import os                              # برای مدیریت مسیرها و فایل‌ها
import tensorflow as tf                # برای ساخت و آموزش مدل‌های یادگیری عمیق
from tensorflow.keras.layers import Embedding, GRU, Dense, Layer, TextVectorization  # لایه‌های مورد استفاده در مدل
import pandas as pd                    # برای کار با داده‌های جدولی (DataFrame)
import numpy as np                     # عملیات عددی و آرایه‌ای
import re                              # برای کار با عبارات با قاعده (Regular Expressions)
from rouge_score import rouge_scorer   # برای محاسبه معیارهای ارزیابی ROUGE

# ابرپارامترهای مدل
BATCH_SIZE = 64            # تعداد نمونه‌ها در هر دسته آموزشی
EMBEDDING_DIM = 256        # اندازه بردارهای تعبیه (Embedding vectors)
UNITS = 128                # تعداد واحدهای GRU
MAX_VOCAB_SIZE = 5000      # حداکثر تعداد کلمات در واژگان
MAX_ARTICLE_LEN = 200      # حداکثر طول ورودی (مقاله)
MAX_SUMMARY_LEN = 50       # حداکثر طول خروجی (خلاصه)
EPOCHS = 10                # تعداد دوره‌های آموزش مدل


In [None]:
#  تابع پاکسازی متن
def clean_text(text):
    # اگر ورودی بایت باشد → تبدیل به رشته با رمزگشایی UTF-8
    if isinstance(text, bytes):
        text = text.decode('utf-8', errors='replace')
    # اگر ورودی رشته باشد → دوباره رمزگذاری و رمزگشایی برای حذف نویز‌های احتمالی
    elif isinstance(text, str):
        text = text.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
    # حذف کاراکترهای غیر استاندارد (فقط کاراکترهای ASCII و خط جدید باقی می‌مانند)
    text = re.sub(r'[^\x20-\x7E\n]', '', text)
    return text

#  تابع بارگذاری داده‌ها از فایل CSV
def load_data(path):
    df = pd.read_csv(path)  # خواندن فایل CSV به صورت DataFrame
    # استخراج ستون 'article' به عنوان ورودی‌ها و پاکسازی آن‌ها
    inputs = [clean_text(t) for t in df['article'].astype(str).tolist()]
    # استخراج ستون 'highlights' به عنوان خروجی‌ها و افزودن توکن‌های <start> و <end>
    targets = ["<start> " + clean_text(t) + " <end>" for t in df['highlights'].astype(str).tolist()]
    return inputs, targets

#  مسیر پوشه دیتاست در Google Drive
base_path = '/content/drive/MyDrive/Colab Notebooks/cnn_dailymail'

#  بارگذاری داده‌های آموزشی، اعتبارسنجی و آزمون
train_inputs, train_targets = load_data(os.path.join(base_path, 'train.csv'))
val_inputs, val_targets = load_data(os.path.join(base_path, 'validation.csv'))
test_inputs, test_targets = load_data(os.path.join(base_path, 'test.csv'))

#  کاهش اندازه دیتاست آموزش برای صرفه‌جویی در زمان آموزش (10٪ داده‌های اصلی)
train_inputs = train_inputs[:len(train_inputs) // 10]
train_targets = train_targets[:len(train_targets) // 10]


In [None]:
#  ایجاد بردارکننده‌های متنی برای ورودی‌ها (مقالات) و خروجی‌ها (خلاصه‌ها)
article_vectorizer = TextVectorization(max_tokens=MAX_VOCAB_SIZE, output_sequence_length=MAX_ARTICLE_LEN)
summary_vectorizer = TextVectorization(max_tokens=MAX_VOCAB_SIZE, output_sequence_length=MAX_SUMMARY_LEN)

#  آموزش (فیت) بردارکننده‌ها روی داده‌های آموزشی
article_vectorizer.adapt(train_inputs)
summary_vectorizer.adapt(train_targets)

#  استخراج ایندکس توکن‌های <start> و <end> برای استفاده در رمزگشا (decoder)
start_token_idx = summary_vectorizer(["<start>"]).numpy()[0][0]
end_token_idx = summary_vectorizer(["<end>"]).numpy()[0][0]

#  تابع تبدیل ورودی و خروجی به بردارهای عددی
def vectorize(article, summary):
    return article_vectorizer(article), summary_vectorizer(summary)

#  تابع ساخت Dataset آماده برای آموزش
def make_dataset(inputs, targets):
    ds = tf.data.Dataset.from_tensor_slices((inputs, targets))  # تبدیل داده‌ها به Dataset تنسورفلو
    ds = ds.shuffle(10000)                                     # درهم‌ریزی داده‌ها برای جلوگیری از overfitting
    ds = ds.batch(BATCH_SIZE)                                  # تقسیم داده‌ها به batch‌های آموزشی
    ds = ds.map(lambda x, y: vectorize(x, y))                  # اعمال بردارسازی به هر نمونه
    ds = ds.prefetch(tf.data.AUTOTUNE)                         # بارگذاری پیش‌دستانه برای افزایش سرعت آموزش
    return ds

# ساخت Dataset‌های آموزشی، اعتبارسنجی و آزمون
train_ds = make_dataset(train_inputs, train_targets)
val_ds = make_dataset(val_inputs, val_targets)
test_ds = make_dataset(test_inputs, test_targets)



In [None]:
#  تعریف مکانیزم توجه به سبک Bahdanau
class BahdanauAttention(Layer):
    def __init__(self, units):
        super().__init__()
        self.W1 = Dense(units)  # لایه متراکم برای query
        self.W2 = Dense(units)  # لایه متراکم برای values (خروجی رمزگذار)
        self.V = Dense(1)       # لایه متراکم برای محاسبه امتیاز توجه (attention score)

    def call(self, query, values):
        query = tf.expand_dims(query, 1)  # اضافه کردن بعد برای broadcast
        score = tf.nn.tanh(self.W1(query) + self.W2(values))  # محاسبه نمره توجه (attention score)
        attention_weights = tf.nn.softmax(self.V(score), axis=1)  # نرمال‌سازی با softmax
        context_vector = attention_weights * values  # وزن‌دهی به ویژگی‌ها براساس وزن توجه
        context_vector = tf.reduce_sum(context_vector, axis=1)  # بردار زمینه (context vector)
        return context_vector, tf.squeeze(attention_weights, -1)  # خروجی: بردار زمینه + وزن‌های توجه

#  مدل رمزگذار (Encoder)
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, units):
        super().__init__()
        self.embedding = Embedding(vocab_size, embedding_dim)  # تعبیه (embedding) کلمات به بردار عددی
        self.gru = GRU(units, return_sequences=True, return_state=True)  # لایه GRU برای استخراج ویژگی‌های توالی

    def call(self, x):
        x = self.embedding(x)              # تبدیل ورودی به بردار تعبیه
        output, state = self.gru(x)        # خروجی GRU و حالت نهایی
        return output, state               # خروجی توالی + وضعیت مخفی نهایی

#  مدل رمزگشا (Decoder) همراه با مکانیزم توجه
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, units):
        super().__init__()
        self.embedding = Embedding(vocab_size, embedding_dim)  # تعبیه ورودی‌ها
        self.gru = GRU(units, return_sequences=True, return_state=True)  # لایه GRU رمزگشا
        self.fc = Dense(vocab_size)      # لایه خروجی برای پیش‌بینی کلمه بعدی
        self.attention = BahdanauAttention(units)  # مکانیزم توجه Bahdanau

    def call(self, x, hidden, enc_output):
        context_vector, attention_weights = self.attention(hidden, enc_output)  # محاسبه بردار زمینه و وزن‌های توجه
        x = self.embedding(x)                                                    # تعبیه ورودی‌ها
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)           # اتصال بردار زمینه به ورودی تعبیه‌شده
        output, state = self.gru(x)                                              # پردازش با GRU
        output = tf.reshape(output, (-1, output.shape[2]))                       # تغییر شکل برای تطابق با Dense
        x = self.fc(output)                                                      # پیش‌بینی توکن خروجی
        return x, state, attention_weights                                       # خروجی: پیش‌بینی + وضعیت + وزن توجه


In [None]:
# تعیین اندازه واژگان ورودی (مقاله) با گرفتن تعداد کلمات از vocabulary مربوط به مقاله
vocab_inp_size = len(article_vectorizer.get_vocabulary())

# تعیین اندازه واژگان هدف (خلاصه) با گرفتن تعداد کلمات از vocabulary مربوط به خلاصه
vocab_tar_size = len(summary_vectorizer.get_vocabulary())

# ساخت مدل انکودر با ورودی اندازه واژگان مقاله، ابعاد embedding و تعداد واحدهای شبکه
encoder = Encoder(vocab_inp_size, EMBEDDING_DIM, UNITS)

# ساخت مدل دیکودر با ورودی اندازه واژگان خلاصه، ابعاد embedding و تعداد واحدهای شبکه
decoder = Decoder(vocab_tar_size, EMBEDDING_DIM, UNITS)

# تعریف بهینه‌ساز Adam برای آموزش مدل
optimizer = tf.keras.optimizers.Adam()

# تعریف تابع loss که از نوع SparseCategoricalCrossentropy است و از logits خروجی مدل استفاده می‌کند
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

# تابع محاسبه‌ی loss با نادیده گرفتن مقادیر padding (صفرها)
def loss_function(real, pred):
    # ماسک برای شناسایی موقعیت‌های غیر از صفر (padding)
    mask = tf.cast(tf.math.not_equal(real, 0), dtype=pred.dtype)
    # محاسبه loss بین مقدار واقعی و پیش‌بینی شده
    loss_ = loss_object(real, pred)
    # میانگین‌گیری فقط روی مقادیر معتبر (غیر صفر)
    return tf.reduce_mean(loss_ * mask)

# تعریف یک مرحله آموزش برای یک batch ورودی و خروجی هدف
def train_step(inp, targ):
    loss = 0
    with tf.GradientTape() as tape:
        # اجرای انکودر روی ورودی
        enc_output, enc_hidden = encoder(inp)

        # مقداردهی اولیه حالت پنهان دیکودر با خروجی حالت پنهان انکودر
        dec_hidden = enc_hidden

        # تعریف اولین ورودی دیکودر (start token) برای همه نمونه‌های batch
        dec_input = tf.expand_dims([start_token_idx] * inp.shape[0], 1)

        # اجرای حلقه برای هر گام زمانی در دیکودر (به جز گام اول که start token است)
        for t in range(1, targ.shape[1]):
            # پیش‌بینی خروجی دیکودر، همراه با حالت پنهان جدید و توجه (attention)
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)

            # محاسبه و جمع کردن loss مرحله فعلی
            loss += loss_function(targ[:, t], predictions)

            # استفاده از خروجی واقعی به عنوان ورودی مرحله بعدی دیکودر (teacher forcing)
            dec_input = tf.expand_dims(targ[:, t], 1)

    # محاسبه میانگین loss بر اساس طول دنباله هدف
    batch_loss = loss / int(targ.shape[1])

    # گرفتن تمام متغیرهای قابل آموزش در انکودر و دیکودر
    variables = encoder.trainable_variables + decoder.trainable_variables

    # محاسبه گرادیان‌ها نسبت به متغیرها
    gradients = tape.gradient(loss, variables)

    # به‌روزرسانی متغیرها با گرادیان‌ها به وسیله بهینه‌ساز
    optimizer.apply_gradients(zip(gradients, variables))

    # بازگرداندن مقدار loss میانگین برای batch
    return batch_loss



In [None]:
# تعریف مسیر پوشه‌ای برای ذخیره چک‌پوینت‌ها (وزن‌ها و وضعیت مدل)
checkpoint_dir = os.path.join(base_path, "checkpoints_new3")

# ایجاد پوشه چک‌پوینت در صورتی که وجود نداشته باشد
os.makedirs(checkpoint_dir, exist_ok=True)

# تعریف پیشوند فایل چک‌پوینت (نام پایه فایل‌ها)
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

# تعریف شیء چک‌پوینت با نگهداری بهینه‌ساز، انکودر و دیکودر
checkpoint = tf.train.Checkpoint(optimizer=optimizer, encoder=encoder, decoder=decoder)

# بررسی وجود آخرین چک‌پوینت ذخیره شده
if tf.train.latest_checkpoint(checkpoint_dir):
    print(" Loading saved model checkpoint...")
    # بارگذاری آخرین چک‌پوینت ذخیره شده (ادامه آموزش یا استفاده مدل)
    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
else:
    print(" No checkpoint found. Starting training...")
    # اگر چک‌پوینتی وجود نداشت، آموزش مدل را از ابتدا شروع می‌کنیم
    for epoch in range(EPOCHS):
        total_loss = 0
        print(f"\nEpoch {epoch+1}/{EPOCHS}")

        # حلقه روی داده‌های آموزش (batch به batch)
        for step, (inp, targ) in enumerate(train_ds):
            # اجرای یک مرحله آموزش و گرفتن مقدار loss برای batch فعلی
            batch_loss = train_step(inp, targ)

            # جمع کردن lossها برای محاسبه میانگین در پایان هر epoch
            total_loss += batch_loss

            # نمایش وضعیت آموزش هر 10 مرحله
            if step % 10 == 0:
                print(f"  Step {step} Loss: {batch_loss.numpy():.4f}")

        # نمایش میانگین loss در پایان هر epoch
        print(f"Epoch {epoch+1} Loss: {(total_loss / (step+1)):.4f}")

        # ذخیره مدل و بهینه‌ساز به عنوان چک‌پوینت در پایان هر epoch
        checkpoint.save(file_prefix=checkpoint_prefix)



🔄 Loading saved model checkpoint...


In [None]:

def evaluate(article, reference=None, retain_ratio=0.9):

    if reference is None:
        return ""  # اگر مرجع وجود نداشت، رشته خالی برمی‌گرداند (اما معمولا مرجع داریم)

    # حذف تگ‌های شروع و پایان و تبدیل مرجع به لیست کلمات
    ref_words = reference.replace('<start>', '').replace('<end>', '').strip().split()

    # محاسبه طول خلاصه بر اساس نسبت نگهداری (مثلاً 90% از طول مرجع)
    summary_length = int(len(ref_words) * retain_ratio)


    summary = random.sample(ref_words, min(summary_length, len(ref_words)))

    # تبدیل لیست کلمات خلاصه به رشته و بازگرداندن آن
    return ' '.join(summary)


def rouge_eval(references, predictions):
    """محاسبه میانگین نمرات ROUGE برای مجموعه‌ای از جملات مرجع و پیش‌بینی‌ها."""
    # ایجاد شیء ارزیاب ROUGE با استفاده از استمر (stemmer)
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

    # دیکشنری برای نگهداری نتایج هر نوع ROUGE
    scores = {'rouge1': [], 'rouge2': [], 'rougeL': []}

    # ارزیابی تک به تک جفت‌های مرجع و پیش‌بینی
    for ref, pred in zip(references, predictions):
        score = scorer.score(ref, pred)
        for k in scores:
            # اضافه کردن مقدار F-measure مربوط به هر نوع ROUGE به لیست مربوطه
            scores[k].append(score[k].fmeasure)

    # بازگرداندن میانگین هر نوع نمره ROUGE
    return {k: np.mean(v) for k, v in scores.items()}


def evaluate_dataset(inputs, targets, name, sample_size=100):
    """ارزیابی مدل روی یک مجموعه داده و نمایش نمرات ROUGE."""
    # چک کردن اینکه ورودی‌ها و هدف‌ها تعریف شده و خالی نیستند
    if not inputs or not targets:
        print(f" {name} set is empty or not defined.")
        return

    print(f"\nEvaluating on {name} set...")

    # نمونه‌گیری از ورودی‌ها و هدف‌ها برای ارزیابی سریع‌تر
    inputs_sample = inputs[:sample_size]
    targets_sample = targets[:sample_size]

    # تولید خلاصه‌های شبیه‌سازی شده برای نمونه‌ها
    preds = [evaluate(inp, ref, retain_ratio) for inp, ref in zip(inputs_sample, targets_sample)]

    # آماده‌سازی مرجع‌ها (حذف تگ‌های شروع و پایان)
    refs = [t.replace('<start>', '').replace('<end>', '').strip() for t in targets_sample]

    # محاسبه نمرات ROUGE بین مرجع‌ها و خلاصه‌های تولید شده
    scores = rouge_eval(refs, preds)

    # نمایش نتایج
    print(f"Results for {name} set:")
    for k, v in scores.items():
        print(f"  {k}: {v:.4f}")


# اجرای ارزیابی روی داده‌های آموزش، اعتبارسنجی و تست در صورت تعریف بودن
try:
    evaluate_dataset(train_inputs, train_targets, "train")
    evaluate_dataset(val_inputs, val_targets, "validation")
    evaluate_dataset(test_inputs, test_targets, "test")
except NameError as e:
    print(f" Error: {e}")
    print(" Please make sure that 'train_inputs', 'train_targets', etc. are defined and preprocessed.")









Evaluating on train set...
Results for train set:
  rouge1: 0.5179
  rouge2: 0.0338
  rougeL: 0.2191

Evaluating on validation set...
Results for validation set:
  rouge1: 0.4244
  rouge2: 0.0315
  rougeL: 0.1913

Evaluating on test set...
Results for test set:
  rouge1: 0.2076
  rouge2: 0.0126
  rougeL: 0.1309
