# ***תרגיל 5 של הפרוייקט***

**Load Data and Basic Setup**

In [1]:
import pandas as pd
import numpy as np

# Load the dataset
df = pd.read_csv("/content/train-filtered_question_level.csv")

# Remove duplicate questions
df = df.drop_duplicates(subset=["question"], keep="first")

# Extract text and difficulty levels
texts = df["question"].astype(str).tolist()
levels = df["level"].tolist()


**Balancing Dataset (Undersampling to Minority Class)**

In [2]:
# import pandas as pd

# # 1. נגדיר את גודל היעד לפי המחלקה הקטנה ביותר (Hard)
# target_size = 15657

# # 2. נבצע דגימה מכל מחלקה בנפרד
# df_hard = df[df['level'] == 'hard']
# # כאן אנחנו לא עושים sample כי זה כבר הגודל שאנחנו רוצים

# df_medium_downsampled = df[df['level'] == 'medium'].sample(n=target_size, random_state=42)
# df_easy_downsampled = df[df['level'] == 'easy'].sample(n=target_size, random_state=42)

# # 3. נחבר את שלושתן יחד
# df_balanced = pd.concat([df_hard, df_medium_downsampled, df_easy_downsampled])

# # 4. נערבב את הדאטה (חשוב מאוד!)
# df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)

# # בדיקת תוצאה
# print("התפלגות חדשה:")
# print(df_balanced['level'].value_counts())

In [3]:
import pandas as pd

# 1. הגדרת גודל המדגם לכל קטגוריה (היפר-פרמטר של שלב הניסויים)
# התחלה עם 1000 מכל אחת עוזרת לבדיקה מהירה ("מתחילים בקטן" לפי ההוראות)
target_size = 7000

# 2. דגימה מאוזנת מכל הקטגוריות בבת אחת
# groupby מבטיח שנתייחס לכל רמת קושי בנפרד
df_balanced = df.groupby('level').apply(lambda x: x.sample(n=target_size, random_state=42)).reset_index(drop=True)

# 3. ערבוב הדאטה (Shuffle) - קריטי בלמידה עמוקה!
# כדי שה-Batch לא יכיל רק מחלקה אחת בזמן האימון, מה שיהרוס את הלמידה
df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)

# בדיקת התפלגות כפי שנדרש בשלב ה-EDA ובניתוח הדאטה
print("התפלגות חדשה ומאוזנת:")
print(df_balanced['level'].value_counts())

התפלגות חדשה ומאוזנת:
level
easy      7000
medium    7000
hard      7000
Name: count, dtype: int64


  df_balanced = df.groupby('level').apply(lambda x: x.sample(n=target_size, random_state=42)).reset_index(drop=True)


# **שלב 1**

# **א**

**Choosing Maximum Sequence Length (Documentation)**

In [4]:
!pip install tensorflow



In [5]:
import numpy as np
import torch
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# 1. חילוץ הטקסטים
texts = df_balanced['question'].astype(str).tolist()

# 2. אתחול והתאמת הטוקנייזר על כל המילים (בלי הגבלה שרירותית של 20,000)
tokenizer = Tokenizer(oov_token="<OOV>")
tokenizer.fit_on_texts(texts)

# 3. הגדרת VOCAB_SIZE האמיתי (קריטי למטריצת ה-Embedding)
VOCAB_SIZE = len(tokenizer.word_index) + 1
print(f"Actual Vocabulary size: {VOCAB_SIZE}")

# 4. המרה לרצפים וביצוע Padding
sequences = tokenizer.texts_to_sequences(texts)
MAX_LEN = int(np.percentile([len(seq) for seq in sequences], 95)) # אורך שחוסם 95% מהמשפטים
X = pad_sequences(sequences, maxlen=MAX_LEN, padding='post')


Actual Vocabulary size: 36543


**Padding and Truncation**

In [6]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split

# 1. הגדרת המילון (כפי שעשית)
label_dict = {'easy': 0, 'medium': 1, 'hard': 2}
y_integers = df_balanced['level'].map(label_dict).values

# 2. חלוקה מרובדת (Stratify) כדי לשמור על איזון באחוזים
# חלוקה ראשונה: מוציאים 15% לטסט סופי
X_train_val, X_test, y_train_val, y_test = train_test_split(
    X, y_integers,
    test_size=0.15,
    random_state=42,
    stratify=y_integers # מבטיח איזון
)

# חלוקה שנייה: פיצול היתרה לאימון וולידציה (15% מהסך הכל)
X_train, X_val, y_train, y_val = train_test_split(
    X_train_val, y_train_val,
    test_size=0.176, # 0.15 / 0.85
    random_state=42,
    stratify=y_train_val # מבטיח איזון
)

print(f"Train size: {len(X_train)} | Val size: {len(X_val)} | Test size: {len(X_test)}")

Train size: 14708 | Val size: 3142 | Test size: 3150


In [7]:
# המרה ל-Tensors
# X הוא LongTensor כי הוא מכיל אינדקסים של מילים
# y הוא LongTensor כי CrossEntropyLoss מצפה לאינדקסים של מחלקות
train_ds = TensorDataset(torch.LongTensor(X_train), torch.LongTensor(y_train))
val_ds   = TensorDataset(torch.LongTensor(X_val), torch.LongTensor(y_val))
test_ds  = TensorDataset(torch.LongTensor(X_test), torch.LongTensor(y_test))

# יצירת Loaders
# shuffle=True רק באימון כדי שהמודל לא ילמד את סדר השאלות
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=32, shuffle=False)
test_loader  = DataLoader(test_ds, batch_size=32, shuffle=False)

print("PyTorch DataLoaders are ready!")

PyTorch DataLoaders are ready!


# **ב**

**ניסוי 1**

In [8]:
from gensim.models import Word2Vec

# הכנת המשפטים לאימון (רשימה של רשימות מילים)
sentences_for_w2v = [text.split() for text in texts]

# אימון Word2Vec - לומד את הקשרים בין המילים בדאטה שלך
w2v_model = Word2Vec(sentences=sentences_for_w2v, vector_size=100, window=5, min_count=1, sg=1)

# יצירת מטריצת המשקולות (הגשר בין Word2Vec ל-PyTorch)
EMBED_DIM = 100
# VOCAB_SIZE צריך להיות שווה ל- len(tokenizer.word_index) + 1
embedding_matrix = torch.zeros((VOCAB_SIZE, EMBED_DIM))

for word, i in tokenizer.word_index.items():
    if i < VOCAB_SIZE:
        if word in w2v_model.wv:
            embedding_matrix[i] = torch.tensor(w2v_model.wv[word].copy())
        else:
            # מילים שלא קיימות ב-W2V מקבלות ערך אקראי קטן
            embedding_matrix[i] = torch.randn(EMBED_DIM) * 0.1

EMBED_MAT = embedding_matrix # נשמור את זה בשם ברור לניסויים

In [9]:
class DifficultyModel(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim, weights=None, is_frozen=False):
        super().__init__()
        if weights is not None:
            self.embedding = torch.nn.Embedding.from_pretrained(weights, freeze=is_frozen)
        else:
            self.embedding = torch.nn.Embedding(vocab_size, embed_dim)

        self.classifier = torch.nn.Sequential(
            torch.nn.Linear(embed_dim, 64),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.2),
            torch.nn.Linear(64, 3) # 3 קטגוריות
        )

    def forward(self, x):
        x = self.embedding(x).mean(dim=1) # Pooling
        return self.classifier(x)

def train_model(model, train_loader, val_loader, epochs=5):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = torch.nn.CrossEntropyLoss()

    for epoch in range(epochs):
        model.train()
        for texts_batch, labels_batch in train_loader:
            optimizer.zero_grad()
            loss = criterion(model(texts_batch), labels_batch)
            loss.backward()
            optimizer.step()

    # חישוב דיוק סופי
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for texts_batch, labels_batch in val_loader:
            _, predicted = torch.max(model(texts_batch), 1)
            total += labels_batch.size(0)
            correct += (predicted == labels_batch).sum().item()
    return 100 * correct / total

**Embedding Layer מאומן מאפס**

In [10]:
model_scratch = DifficultyModel(VOCAB_SIZE, EMBED_DIM)
acc_scratch = train_model(model_scratch, train_loader, val_loader)
print(f"Accuracy (Scratch): {acc_scratch:.2f}%")

Accuracy (Scratch): 47.55%


**The Frozen model**

In [11]:
# ניסוי 2א: Word2Vec קפוא (Frozen)
model_frozen = DifficultyModel(VOCAB_SIZE, EMBED_DIM, weights=embedding_matrix, is_frozen=True)
acc_frozen = train_model(model_frozen, train_loader, val_loader)
print(f"Accuracy (W2V Frozen): {acc_frozen:.2f}%")

Accuracy (W2V Frozen): 50.73%


**Fine-tuned**

In [12]:
# ניסוי 2ב: Word2Vec מכוונן (Fine-tuned)
model_tuned = DifficultyModel(VOCAB_SIZE, EMBED_DIM, weights=embedding_matrix, is_frozen=False)
acc_tuned = train_model(model_tuned, train_loader, val_loader)
print(f"Accuracy (W2V Fine-tuned): {acc_tuned:.2f}%")

Accuracy (W2V Fine-tuned): 49.08%


# **שלב 2**

**Basic Settings**

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# =========================
# סעיף א: ארכיטקטורת המודל
# =========================
class FlexibleModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, n_layers,
                 model_type='LSTM', bidirectional=False, dropout_p=0.0,
                 embedding_matrix=None, is_frozen=False):
        super(FlexibleModel, self).__init__()

        self.dropout = nn.Dropout(dropout_p)

        # 1. יצירת שכבת ה-Embedding הבסיסית
        self.embedding = nn.Embedding(vocab_size, embed_dim)

        # 2. טיפול ב-Word2Vec (אם נשלחה מטריצה)
        if embedding_matrix is not None:
            # טעינת המטריצה המוכנה
            self.embedding.weight = nn.Parameter(torch.tensor(embedding_matrix, dtype=torch.float32))

            # קביעה האם להקפיא (Frozen) או לאפשר שינוי (Fine-tuned)
            self.embedding.weight.requires_grad = not is_frozen
        else:
            # מצב "מאומן מאפס": אין מטריצה, לכן המשקולות אקראיות וחייבות להיות פתוחות לאימון
            self.embedding.weight.requires_grad = True

        # --- שאר חלקי המודל כפי שהגדרת ---
        rnn_dropout = dropout_p if n_layers > 1 else 0.0
        if model_type == 'RNN':
            self.rnn = nn.RNN(embed_dim, hidden_dim, n_layers, batch_first=True,
                              bidirectional=bidirectional, dropout=rnn_dropout)
        else:
            self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, batch_first=True,
                               bidirectional=bidirectional, dropout=rnn_dropout)

        num_directions = 2 if bidirectional else 1
        self.fc = nn.Linear(hidden_dim * num_directions, output_dim)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))
        output, hidden = self.rnn(embedded)
        if isinstance(hidden, tuple): hidden = hidden[0]
        if self.rnn.bidirectional:
            hidden = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
        else:
            hidden = hidden[-1, :, :]
        return self.fc(hidden)

# ==========================================
# סעיף ב: תהליך האימון - שמירת היסטוריה מלאה
# ==========================================
def run_experiment(model, train_loader, val_loader, epochs=5, lr=0.001):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}

    for epoch in range(epochs):
        # אימון
        model.train()
        total_train_loss = 0.0

        for texts, labels in train_loader:
            texts, labels = texts.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()

        # ולידציה
        model.eval()
        total_val_loss = 0.0
        correct = 0

        with torch.no_grad():
            for texts, labels in val_loader:
                texts, labels = texts.to(device), labels.to(device)

                outputs = model(texts)
                loss = criterion(outputs, labels)
                total_val_loss += loss.item()
                correct += (outputs.argmax(1) == labels).sum().item()

        avg_train_loss = total_train_loss / len(train_loader)
        avg_val_loss = total_val_loss / len(val_loader)
        acc = correct / len(val_loader.dataset)

        history['train_loss'].append(avg_train_loss)
        history['val_loss'].append(avg_val_loss)
        history['val_acc'].append(acc)

        print(f"Epoch {epoch+1}/{epochs} | Train Loss: {avg_train_loss:.4f} | "
              f"Val Loss: {avg_val_loss:.4f} | Val Acc: {acc:.4f}")

    return history


# ======================
# סעיף ג: הערכת ביצועים
# ======================
def print_evaluation_section_c(model, loader):
    print("\n" + "=" * 30)
    print("סעיף ג: הערכת ביצועים")
    print("=" * 30)

    model.to(device)
    model.eval()

    all_preds, all_labels = [], []
    with torch.no_grad():
        for texts, labels in loader:
            texts = texts.to(device)
            outputs = model(texts)

            preds = outputs.argmax(1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.numpy())

    print(classification_report(all_labels, all_preds, target_names=['Easy', 'Medium', 'Hard']))

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(4, 3))
    sns.heatmap(
        cm, annot=True, fmt='d', cmap='Blues',
        xticklabels=['Easy', 'Medium', 'Hard'],
        yticklabels=['Easy', 'Medium', 'Hard']
    )
    plt.title("Confusion Matrix")
    plt.show()


# ======================
# גרפים: Loss ו-Accuracy
# ======================
def plot_training_history(history, model_name="Model"):
    epochs = range(1, len(history['train_loss']) + 1)

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, history['train_loss'], label='Training Loss')
    plt.plot(epochs, history['val_loss'], label='Validation Loss')
    plt.title(f'{model_name} - Loss (Error) Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, history['val_acc'], label='Validation Accuracy')
    plt.title(f'{model_name} - Accuracy Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()


In [14]:
def decode_text(text_tensor, tokenizer):
    # שימוש במיפוי של Keras (index_word)
    # i.item() > 0 כי 0 הוא בדרך כלל ה-Padding
    index_to_word = tokenizer.index_word
    words = [index_to_word.get(i.item(), "") for i in text_tensor if i.item() > 0]
    return " ".join(words).strip()

def print_misclassifications(model, dataloader, device, tokenizer, label_map, target_true='Medium', target_pred='Hard', num_examples=5):
    model.eval()
    results = []

    # מיפוי שמות לקטגוריות (למשל 'Medium' -> 1)
    inv_label_map = {v: k for k, v in label_map.items()}
    true_idx = inv_label_map[target_true]
    pred_idx = inv_label_map[target_pred]

    with torch.no_grad():
        for texts, labels in dataloader:
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            preds = outputs.argmax(1)

            for i in range(len(labels)):
                if labels[i].item() == true_idx and preds[i].item() == pred_idx:
                    # שימוש ב-decode_text החדש שמתאים לטוקנייזר שלך
                    original_text = decode_text(texts[i], tokenizer)
                    results.append(original_text)

                if len(results) >= num_examples:
                    break
            if len(results) >= num_examples:
                break

    print(f"\n--- דוגמאות של {target_true} שסווגו בטעות כ-{target_pred} ---")
    if not results:
        print("לא נמצאו דוגמאות כאלו בניסוי הנוכחי.")
    else:
        for j, text in enumerate(results):
            print(f"{j+1}) {text}\n")

**RNN - Regular embedding**

In [None]:
# ==========================================
# ניסוי מעודכן: שליטה מלאה בפרמטרים
# ==========================================
MODEL_TYPE = 'RNN'          # 'RNN' או 'LSTM'
IS_BIDIRECTIONAL = False
DROPOUT_P = 0.0
LEARNING_RATE = 0.001
EPOCHS = 15                  # מספר איטרציות מקסימלי (העצירה המוקדמת תעצור לפני)

# --- שליטה על גודל המודל (Model Size) ---
HIDDEN_DIM = 128              # אופציה להקטין את המודל ל64
N_LAYERS = 1                 # שכבה אחת בלבד (פחות פרמטרים)

# --- שליטה על Batch Size ---
BATCH_SIZE = 32              # גודל קבוצה קטן יותר (עוזר לעיבוד מדויק יותר ולפעמים למניעת Overfitting)

# --- פרמטרים לעצירה מוקדמת (Early Stopping) ---
PATIENCE = 3                 # כמה איטרציות לחכות בלי שיפור ב-Val Loss לפני שעוצרים

# הגדרות לניסוי מכוונן
IS_FROZEN = False

model_experiment = FlexibleModel(
    vocab_size=36543,
    embed_dim=100,
    hidden_dim=128,
    output_dim=3,
    n_layers=1,
    model_type='RNN',
    bidirectional=False,
    #embedding_matrix=EMBED_MAT, # אותה מטריצה בדיוק
    #is_frozen=IS_FROZEN,        # פתיחה לעדכונים
    dropout_p=0.0
)

history_tuned = run_experiment(model_tuned, train_loader, val_loader, epochs=15)
print_evaluation_section_c(model_tuned, test_loader)

# ב. הרצת האימון
print(f"--- מריץ ניסוי: {MODEL_TYPE} | Bi={IS_BIDIRECTIONAL} | Dropout={DROPOUT_P} | LR={LEARNING_RATE} ---")
history = run_experiment(
    model_experiment,
    train_loader,
    val_loader,
    epochs=EPOCHS,
    lr=LEARNING_RATE
)

# ג. הצגת תוצאות
print_evaluation_section_c(model_experiment, test_loader)
plot_training_history(history, model_name=f"{MODEL_TYPE} (Dropout={DROPOUT_P})")

Epoch 1/15 | Train Loss: 1.0351 | Val Loss: 1.0096 | Val Acc: 0.4153
Epoch 2/15 | Train Loss: 0.9970 | Val Loss: 0.9974 | Val Acc: 0.4427


In [None]:
# הגדרת המפה לפי הסדר של המחלקות אצלך
label_map = {0: 'Easy', 1: 'Medium', 2: 'Hard'}

# קריאה לפונקציה
print_misclassifications(
    model_experiment,
    test_loader,
    device,
    tokenizer,   # הטוקנייזר שהגדרת קודם
    label_map,
    target_true='Hard',
    target_pred='Medium',
    num_examples=5
)