In [2]:
!pip install emoji

Collecting emoji
  Downloading emoji-2.14.1-py3-none-any.whl.metadata (5.7 kB)
Downloading emoji-2.14.1-py3-none-any.whl (590 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m590.6/590.6 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: emoji
Successfully installed emoji-2.14.1


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import re
import os
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer, PorterStemmer
from nltk.tokenize import TweetTokenizer
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report
from imblearn.over_sampling import RandomOverSampler
import emoji
import kagglehub

nltk.download(['punkt', 'stopwords', 'wordnet', 'omw-1.4'], quiet=True)
STOPWORDS = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()
# stopwords  و lemmatizer: برای تبدیل کلمات به ریشه اونها 
tokenizer = TweetTokenizer()

# برای ذخیره ی بردار میانگین و انحراف معیار کلماتی که در امبدینگ نیستن
class EnhancedVocab(dict):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.unk_vector = None
        self.unk_std = None

def twitter_preprocessor(text):
    # تبدیل ایموجی
    text = emoji.demojize(text, delimiters=(" ", " "))

    # حذف لینک‌ها و نام کاربری‌ها
    text = re.sub(r'http\S+|@\w+', '', text)

    # حفظ هشتگ‌ها (فقط حذف #)
    text = re.sub(r'#(\w+)', r'\1', text)

    # حذف کاراکترهای خاص (به جز تک آپوستروف)
    text = re.sub(r"[^a-zA-Z\s']", '', text)

    # تصحیح تکرار کاراکترها
    text = re.sub(r'(.)\1{3,}', r'\1', text)

    # همه کلمات به صورت حروف کوچک نوشته شوند
    tokens = tokenizer.tokenize(text.lower())

    processed_tokens = []
    for token in tokens:
        if token not in STOPWORDS and len(token) > 2:
            lem = lemmatizer.lemmatize(token)
            stem = stemmer.stem(lem)
            processed_tokens.append(stem)
    # اگر دیگه کلمه ای جز هیچ کدوم از اینها نبود ، جز حالت "طبیعی" در نظر بگیره
    return ' '.join(processed_tokens) if processed_tokens else 'neutral'

dataset_path = kagglehub.dataset_download("pashupatigupta/emotion-detection-from-text")
data = pd.read_csv(os.path.join(dataset_path, "tweet_emotions.csv"))
data['clean_text'] = data['content'].apply(twitter_preprocessor)

top_labels = data['sentiment'].value_counts().nlargest(6).index
data = data[data['sentiment'].isin(top_labels)].copy()
final_label_mapping = {label: idx for idx, label in enumerate(top_labels)}
data['label'] = data['sentiment'].map(final_label_mapping)

ros = RandomOverSampler(random_state=42)
X_res, y_res = ros.fit_resample(
    data[['clean_text']],
    data['label']
)
data_balanced = pd.DataFrame({
    'clean_text': X_res['clean_text'],
    'label': y_res
})

# خب حالا دیتاست با 6 کلاس بعد از پیش پردازش ها اماده شد

GLOVE_DIM = 100  
GLOVE_FILE = f"glove.twitter.27B.{GLOVE_DIM}d.txt"

if not os.path.exists(GLOVE_FILE):
    print("Downloading Twitter-specific GloVe embeddings...")
    os.system("wget https://nlp.stanford.edu/data/glove.twitter.27B.zip")
    os.system("unzip -o glove.twitter.27B.zip")

print(f"Loading Twitter GloVe ({GLOVE_DIM}d)...")
glove_embeddings = {}
valid_vectors = []

with open(GLOVE_FILE, 'r', encoding='utf-8') as f:
    for line in f:
        try:
            values = line.split()
            word = values[0]
            vector = np.array(values[1:], dtype='float32')
            if len(vector) == GLOVE_DIM:
                glove_embeddings[word] = vector
                valid_vectors.append(vector)
        except:
            continue

all_vectors = np.array(valid_vectors)
print(f"Loaded {len(glove_embeddings)} vectors with dimension {GLOVE_DIM}")

# تکنیک لحاظ تعداد تکرار کلمات  
def build_twitter_vocab(texts, min_freq=2):
    counter = Counter()
    for text in texts:
        counter.update(text.split())

# محاسبه انحراف معیار و میانگین برای تخمین کلمات ناشناختته
    mean_vector = np.mean(all_vectors, axis=0)
    std_vector = np.std(all_vectors, axis=0)

    vocab = EnhancedVocab()
    vocab.update({
        "[PAD]": 0,
        "[UNK]": 1,
        "[CLS]": 2
    })

    current_idx = 3
    for word, freq in counter.items():
        if freq >= min_freq and word not in vocab:
            vocab[word] = current_idx
            current_idx += 1

    vocab.unk_vector = mean_vector
    vocab.unk_std = std_vector

    return vocab


vocab = build_twitter_vocab(data_balanced['clean_text'], min_freq=2)
vocab_size = len(vocab)
print(f"Vocabulary Size: {vocab_size}")

# مقداردهی کلمات ناشناخته با انحراف معیار و میانگین کل کلمات
embedding_matrix = np.zeros((vocab_size, GLOVE_DIM))
for word, idx in vocab.items():
    if word in glove_embeddings:
        embedding_matrix[idx] = glove_embeddings[word]
    elif idx not in [0, 1, 2]:
        embedding_matrix[idx] = np.random.normal(
            loc=vocab.unk_vector,
            scale=vocab.unk_std,
            size=(GLOVE_DIM,)
        )

seq_lengths = data_balanced['clean_text'].apply(lambda x: len(x.split()))
MAX_LEN = int(np.percentile(seq_lengths, 95))
print(f"Optimal Sequence Length: {MAX_LEN}")

# حال تبدیل کلمات به توکن و عدد
def text_to_sequence(text, vocab, max_len=MAX_LEN):
    tokens = text.split()[:max_len-1]
    sequence = [vocab["[CLS]"]] + [
        vocab.get(token, vocab["[UNK]"]) for token in tokens
    ]
    padding = [vocab["[PAD]"]] * (max_len - len(sequence))
    return sequence + padding if len(sequence) < max_len else sequence[:max_len]

data_balanced['sequence'] = data_balanced['clean_text'].apply(
    lambda x: text_to_sequence(x, vocab, MAX_LEN)
)


# تقسیم بندی دیتاست
X_train_valid, X_test, y_train_valid, y_test = train_test_split(
    np.stack(data_balanced['sequence']),
    data_balanced['label'].values,
    test_size=0.15,
    stratify=data_balanced['label'],
    random_state=42
)


X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_valid,
    y_train_valid,
    test_size=0.1765,
    stratify=y_train_valid,
    random_state=42
)


class TweetDataset(torch.utils.data.Dataset):
    def __init__(self, sequences, labels, augment=False):
        self.sequences = sequences
        self.labels = labels
        self.augment = augment

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        seq = self.sequences[idx]
        label = self.labels[idx]
 # اگر فعال بود، به صورت رندوم ، 90% توکن های موجود در جمله رو نگه میداره 
 # و 10 % رو حذف میکنه و به جاش پادینگ میزاره تا طول جمله با حداکثر برسه
        if self.augment and np.random.rand() > 0.5:
            mask = np.random.rand(len(seq)) > 0.1
            seq = seq[mask]
            seq = np.pad(seq, (0, MAX_LEN - len(seq)),
                         mode='constant', constant_values=vocab["[PAD]"])

        return {
            'sequence': torch.tensor(seq, dtype=torch.long),
            'label': torch.tensor(label, dtype=torch.long)
        }

BATCH_SIZE = 64
train_dataset = TweetDataset(X_train, y_train, augment=True)
valid_dataset = TweetDataset(X_valid, y_valid)
test_dataset = TweetDataset(X_test, y_test)

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True
)
valid_loader = torch.utils.data.DataLoader(
    valid_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True
)
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True
)

class TwitterTransformer(nn.Module):
    def __init__(self, vocab_size, embedding_dim=100, hidden_dim=256,
                 num_layers=4, num_heads=4, num_classes=6,
                 max_len=MAX_LEN, dropout=0.3):
        super().__init__()

        self.embedding = nn.Embedding(
            vocab_size, embedding_dim, padding_idx=vocab["[PAD]"]
        )
        self.embedding.weight.data.copy_(torch.from_numpy(embedding_matrix))
        self.embedding.weight.requires_grad = False

        self.pos_embedding = nn.Parameter(torch.randn(1, max_len, embedding_dim)) # اهمیت موقعیت قرار گیری کلمات not good
        self.dropout = nn.Dropout(dropout)

        self.encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=embedding_dim,
                nhead=num_heads,
                dim_feedforward=hidden_dim,
                dropout=dropout,
                activation='gelu',
                batch_first=True
            ),
            num_layers=num_layers
        )

        self.classifier = nn.Sequential(
            nn.LayerNorm(embedding_dim),
            nn.Linear(embedding_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, num_classes)
        )

    def forward(self, x):
        batch_size, seq_len = x.size()

        token_emb = self.embedding(x)
        pos_emb = self.pos_embedding[:, :seq_len, :]
        x = self.dropout(token_emb + pos_emb)

        padding_mask = (x == self.embedding.weight[vocab["[PAD]"]]).all(dim=-1)

        x = self.encoder(x, src_key_padding_mask=padding_mask)

        cls_output = x[:, 0, :]
        logits = self.classifier(cls_output)
        return logits

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TwitterTransformer(
    vocab_size=vocab_size,
    embedding_dim=GLOVE_DIM,
    hidden_dim=256,
    num_layers=4,
    num_heads=4,
    num_classes=len(final_label_mapping),
    max_len=MAX_LEN,
    dropout=0.3
).to(device)

class_counts = np.bincount(data_balanced['label'])
class_weights = torch.tensor(1. / np.sqrt(class_counts), dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)

optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-3)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='max', patience=2, factor=0.5, verbose=True
)

NUM_EPOCHS = 50
best_f1 = 0
patience = 4
no_improve = 0

for epoch in range(NUM_EPOCHS):
    model.train()
    train_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        sequences = batch['sequence'].to(device)
        labels = batch['label'].to(device)

        outputs = model(sequences)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        train_loss += loss.item()

    model.eval()
    all_preds = []
    all_labels = []
    val_loss = 0
    with torch.no_grad():
        for batch in valid_loader:
            sequences = batch['sequence'].to(device)
            labels = batch['label'].to(device)

            outputs = model(sequences)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    train_loss /= len(train_loader)
    val_loss /= len(valid_loader)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    scheduler.step(f1)

    print(f'\nEpoch {epoch+1:02}')
    print(f'Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}')
    print(classification_report(
        all_labels, all_preds,
        target_names=final_label_mapping.keys(),
        digits=4
    ))

    if f1 > best_f1:
        best_f1 = f1
        no_improve = 0
        torch.save(model.state_dict(), 'best_twitter_model.pth')
    else:
        no_improve += 1
        if no_improve >= patience:
            print(f"\nEarly stopping after {patience} epochs without improvement")
            break

model.load_state_dict(torch.load('best_twitter_model.pth'))
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for batch in test_loader:
        sequences = batch['sequence'].to(device)
        labels = batch['label'].to(device)
        outputs = model(sequences)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

test_accuracy = accuracy_score(all_labels, all_preds)
print(f'\nFinal Test Accuracy: {test_accuracy:.4f}')

inv_label_mapping = {v: k for k, v in final_label_mapping.items()}

test_samples = [
    "OMG just got tickets for the concert!!! 😍 #excited",
    "This service is terrible! Worst experience ever 😠",
    "Feeling so anxious about the interview tomorrow...",
    "Lost my pet today. I'm completely heartbroken 💔",
    "What a beautiful morning! 🌞 #blessed",
    "lol that's hilarious 😂"
]

for text in test_samples:
    processed = twitter_preprocessor(text)
    seq = text_to_sequence(processed, vocab, MAX_LEN)
    seq_tensor = torch.tensor(seq).unsqueeze(0).to(device)

    with torch.no_grad():
        output = model(seq_tensor)
        prob = torch.softmax(output, dim=1)
        pred = torch.argmax(prob).item()

    print(f"\nText: {text}")
    print(f"Processed: {processed}")
    print(f"Predicted Emotion: {inv_label_mapping[pred]} ({prob.max().item():.2f})")
    print("="*60)


Downloading Twitter-specific GloVe embeddings...
Loading Twitter GloVe (100d)...
Loaded 1193513 vectors with dimension 100
Vocabulary Size: 13446
Optimal Sequence Length: 13


  output = torch._nested_tensor_from_mask(



Epoch 01
Train Loss: 1.7466 | Val Loss: 1.6622
              precision    recall  f1-score   support

     neutral     0.3142    0.4938    0.3840      1296
       worry     0.3399    0.1721    0.2285      1296
   happiness     0.2699    0.2562    0.2629      1296
     sadness     0.2308    0.0023    0.0046      1296
        love     0.4693    0.5363    0.5005      1296
    surprise     0.1942    0.3534    0.2506      1296

    accuracy                         0.3023      7776
   macro avg     0.3030    0.3023    0.2719      7776
weighted avg     0.3030    0.3023    0.2719      7776


Epoch 02
Train Loss: 1.6921 | Val Loss: 1.6194
              precision    recall  f1-score   support

     neutral     0.3077    0.4383    0.3616      1296
       worry     0.2178    0.2060    0.2117      1296
   happiness     0.3515    0.2840    0.3141      1296
     sadness     0.3216    0.4653    0.3803      1296
        love     0.5738    0.4498    0.5043      1296
    surprise     0.1841    0.1088   

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))



Epoch 04
Train Loss: 1.6685 | Val Loss: 1.6005
              precision    recall  f1-score   support

     neutral     0.3027    0.5355    0.3867      1296
       worry     0.2788    0.3796    0.3215      1296
   happiness     0.2990    0.5895    0.3968      1296
     sadness     0.4741    0.1412    0.2176      1296
        love     0.6280    0.3596    0.4573      1296
    surprise     0.3429    0.0093    0.0180      1296

    accuracy                         0.3358      7776
   macro avg     0.3876    0.3358    0.2997      7776
weighted avg     0.3876    0.3358    0.2997      7776


Epoch 05
Train Loss: 1.6596 | Val Loss: 1.5977
              precision    recall  f1-score   support

     neutral     0.2787    0.6767    0.3948      1296
       worry     0.2988    0.3248    0.3113      1296
   happiness     0.2978    0.4313    0.3523      1296
     sadness     0.5270    0.0980    0.1653      1296
        love     0.6172    0.4105    0.4930      1296
    surprise     0.1708    0.0316   

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import re
import os
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer, PorterStemmer
from nltk.tokenize import TweetTokenizer
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report
from imblearn.over_sampling import RandomOverSampler
import emoji
import kagglehub

nltk.download(['punkt', 'stopwords', 'wordnet', 'omw-1.4'], quiet=True)
STOPWORDS = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()
tokenizer = TweetTokenizer()

class EnhancedVocab(dict):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.unk_vector = None
        self.unk_std = None

def twitter_preprocessor(text):
    text = emoji.demojize(text, delimiters=(" ", " "))
    text = re.sub(r'http\S+|@\w+', '', text)
    text = re.sub(r'#(\w+)', r'\1', text)
    text = re.sub(r"[^a-zA-Z\s']", '', text)
    text = re.sub(r'(.)\1{3,}', r'\1', text)

    tokens = tokenizer.tokenize(text.lower())
    processed_tokens = []
    for token in tokens:
        if token not in STOPWORDS and len(token) > 2:
            lem = lemmatizer.lemmatize(token)
            stem = stemmer.stem(lem)
            processed_tokens.append(stem)

    return ' '.join(processed_tokens) if processed_tokens else 'neutral'

dataset_path = kagglehub.dataset_download("pashupatigupta/emotion-detection-from-text")
data = pd.read_csv(os.path.join(dataset_path, "tweet_emotions.csv"))
data['clean_text'] = data['content'].apply(twitter_preprocessor)

top_labels = data['sentiment'].value_counts().nlargest(6).index
data = data[data['sentiment'].isin(top_labels)].copy()
final_label_mapping = {label: idx for idx, label in enumerate(top_labels)}
data['label'] = data['sentiment'].map(final_label_mapping)

ros = RandomOverSampler(random_state=42)
X_res, y_res = ros.fit_resample(data[['clean_text']], data['label'])
data_balanced = pd.DataFrame({'clean_text': X_res['clean_text'], 'label': y_res})

GLOVE_DIM = 100
GLOVE_FILE = f"glove.twitter.27B.{GLOVE_DIM}d.txt"

if not os.path.exists(GLOVE_FILE):
    print("Downloading GloVe embeddings...")
    os.system("wget https://nlp.stanford.edu/data/glove.twitter.27B.zip")
    os.system("unzip -o glove.twitter.27B.zip")

print(f"Loading GloVe ({GLOVE_DIM}d)...")
glove_embeddings = {}
valid_vectors = []

with open(GLOVE_FILE, 'r', encoding='utf-8') as f:
    for line in f:
        try:
            values = line.split()
            word = values[0]
            vector = np.array(values[1:], dtype='float32')
            if len(vector) == GLOVE_DIM:
                glove_embeddings[word] = vector
                valid_vectors.append(vector)
        except:
            continue

all_vectors = np.array(valid_vectors)

def build_twitter_vocab(texts, min_freq=2):
    counter = Counter()
    for text in texts:
        counter.update(text.split())

    mean_vector = np.mean(all_vectors, axis=0)
    std_vector = np.std(all_vectors, axis=0)

    vocab = EnhancedVocab()
    vocab.update({
        "[PAD]": 0,
        "[UNK]": 1,
        "[CLS]": 2
    })

    current_idx = 3
    for word, freq in counter.items():
        if freq >= min_freq and word not in vocab:
            vocab[word] = current_idx
            current_idx += 1

    vocab.unk_vector = mean_vector
    vocab.unk_std = std_vector

    return vocab

vocab = build_twitter_vocab(data_balanced['clean_text'], min_freq=2)
vocab_size = len(vocab)
print(f"Vocabulary Size: {vocab_size}")

embedding_matrix = np.zeros((vocab_size, GLOVE_DIM))
for word, idx in vocab.items():
    if word in glove_embeddings:
        embedding_matrix[idx] = glove_embeddings[word]
    elif idx not in [0, 1, 2]:
        embedding_matrix[idx] = np.random.normal(
            loc=vocab.unk_vector,
            scale=vocab.unk_std,
            size=(GLOVE_DIM,)
        )

seq_lengths = data_balanced['clean_text'].apply(lambda x: len(x.split()))
MAX_LEN = int(np.percentile(seq_lengths, 95))
print(f"Optimal Sequence Length: {MAX_LEN}")

def text_to_sequence(text, vocab, max_len=MAX_LEN):
    tokens = text.split()[:max_len-1]
    sequence = [vocab["[CLS]"]] + [
        vocab.get(token, vocab["[UNK]"]) for token in tokens
    ]
    padding = [vocab["[PAD]"]] * (max_len - len(sequence))
    return sequence + padding if len(sequence) < max_len else sequence[:max_len]

data_balanced['sequence'] = data_balanced['clean_text'].apply(
    lambda x: text_to_sequence(x, vocab, MAX_LEN)
)

X_train_valid, X_test, y_train_valid, y_test = train_test_split(
    np.stack(data_balanced['sequence']),
    data_balanced['label'].values,
    test_size=0.15,
    stratify=data_balanced['label'],
    random_state=42
)

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_valid,
    y_train_valid,
    test_size=0.1765,
    stratify=y_train_valid,
    random_state=42
)

class TweetDataset(torch.utils.data.Dataset):
    def __init__(self, sequences, labels, augment=False):
        self.sequences = sequences
        self.labels = labels
        self.augment = augment

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        seq = self.sequences[idx]
        label = self.labels[idx]

        if self.augment and np.random.rand() > 0.5:
            mask = np.random.rand(len(seq)) > 0.1
            seq = seq[mask]
            seq = np.pad(seq, (0, MAX_LEN - len(seq)),
                         mode='constant', constant_values=vocab["[PAD]"])
        return {
            'sequence': torch.tensor(seq, dtype=torch.long),
            'label': torch.tensor(label, dtype=torch.long)
        }

BATCH_SIZE = 64
train_dataset = TweetDataset(X_train, y_train, augment=True)
valid_dataset = TweetDataset(X_valid, y_valid)
test_dataset = TweetDataset(X_test, y_test)

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True
)
valid_loader = torch.utils.data.DataLoader(
    valid_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True
)
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True
)

class EnhancedTwitterTransformer(nn.Module):
    def __init__(self, vocab_size, embedding_dim=100, hidden_dim=256,
                 num_layers=4, num_heads=4, num_classes=6,
                 max_len=MAX_LEN, dropout=0.3):
        super().__init__()

        self.embedding = nn.Embedding(
            vocab_size, embedding_dim, padding_idx=vocab["[PAD]"]
        )
        self.embedding.weight.data.copy_(torch.from_numpy(embedding_matrix))
        self.embedding.weight.requires_grad = True  # امکان فاین‌تیونینگ جزئی

        self.pos_embedding = nn.Parameter(torch.zeros(1, max_len, embedding_dim))
        nn.init.trunc_normal_(self.pos_embedding, std=0.02)

        self.dropout = nn.Dropout(dropout)

        encoder_layers = nn.TransformerEncoderLayer(
            d_model=embedding_dim,
            nhead=num_heads,
            dim_feedforward=hidden_dim,
            dropout=dropout,
            activation='gelu',
            batch_first=True,
            norm_first=True
        )
        self.encoder = nn.TransformerEncoder(
            encoder_layers,
            num_layers=num_layers,
            norm=nn.LayerNorm(embedding_dim)
        )

        self.classifier = nn.Sequential(
            nn.LayerNorm(embedding_dim),
            nn.Linear(embedding_dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout/2),
            nn.Linear(hidden_dim, num_classes)
        )

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                nn.init.constant_(module.bias, 0)
        elif isinstance(module, nn.LayerNorm):
            nn.init.constant_(module.bias, 0)
            nn.init.constant_(module.weight, 1.0)

    def forward(self, x):
        batch_size, seq_len = x.size()

        token_emb = self.embedding(x)
        pos_emb = self.pos_embedding[:, :seq_len, :]
        x = self.dropout(token_emb + pos_emb)

        padding_mask = (x == self.embedding.weight[vocab["[PAD]"]]).all(dim=-1)
        x = self.encoder(x, src_key_padding_mask=padding_mask)

        cls_output = x[:, 0, :]
        logits = self.classifier(cls_output)
        return logits

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
MODEL_SAVE_PATH = "twitter_emotion_model.pth"

# بارگذاری یا ایجاد مدل
if os.path.exists(MODEL_SAVE_PATH):
    print("Loading pre-trained model...")
    model = EnhancedTwitterTransformer(
        vocab_size=vocab_size,
        embedding_dim=GLOVE_DIM,
        hidden_dim=256,
        num_layers=4,
        num_heads=4,
        num_classes=len(final_label_mapping),
        max_len=MAX_LEN,
        dropout=0.2
    ).to(device)
    model.load_state_dict(torch.load(MODEL_SAVE_PATH))

    for param in model.parameters():
        param.requires_grad = True

    optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-3)
else:
    print("Initializing new model...")
    model = EnhancedTwitterTransformer(
        vocab_size=vocab_size,
        embedding_dim=GLOVE_DIM,
        hidden_dim=256,
        num_layers=4,
        num_heads=4,
        num_classes=len(final_label_mapping),
        max_len=MAX_LEN,
        dropout=0.2
    ).to(device)
    optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-3)

class_counts = np.bincount(data_balanced['label'])
class_weights = torch.tensor(1. / np.sqrt(class_counts), dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='max',
    patience=3,
    factor=0.2,
    verbose=True,
    min_lr=1e-6
)

NUM_EPOCHS = 50
best_f1 = 0
patience = 6
no_improve = 0

for epoch in range(NUM_EPOCHS):
    model.train()
    train_loss = 0
    correct = 0
    total = 0

    for batch in train_loader:
        optimizer.zero_grad()
        sequences = batch['sequence'].to(device)
        labels = batch['label'].to(device)

        outputs = model(sequences)
        loss = criterion(outputs, labels)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    train_acc = 100. * correct / total
    train_loss /= len(train_loader)

    model.eval()
    all_preds = []
    all_labels = []
    val_loss = 0
    correct_val = 0
    total_val = 0

    with torch.no_grad():
        for batch in valid_loader:
            sequences = batch['sequence'].to(device)
            labels = batch['label'].to(device)

            outputs = model(sequences)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = outputs.max(1)
            total_val += labels.size(0)
            correct_val += predicted.eq(labels).sum().item()

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    val_acc = 100. * correct_val / total_val
    val_loss /= len(valid_loader)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    scheduler.step(f1)

    print(f'\nEpoch {epoch+1:02}')
    print(f'Train Loss: {train_loss:.4f} | Acc: {train_acc:.2f}%')
    print(f'Val Loss: {val_loss:.4f} | Acc: {val_acc:.2f}% | F1: {f1:.4f}')
    print(classification_report(
        all_labels, all_preds,
        target_names=final_label_mapping.keys(),
        digits=4
    ))

    if f1 > best_f1:
        best_f1 = f1
        no_improve = 0
        torch.save(model.state_dict(), MODEL_SAVE_PATH)
        print(f"Model saved with F1: {f1:.4f}")
    else:
        no_improve += 1
        if no_improve >= patience:
            print(f"\nEarly stopping after {patience} epochs without improvement")
            break

model.load_state_dict(torch.load(MODEL_SAVE_PATH))
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for batch in test_loader:
        sequences = batch['sequence'].to(device)
        labels = batch['label'].to(device)

        outputs = model(sequences)
        _, predicted = outputs.max(1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

test_accuracy = accuracy_score(all_labels, all_preds)
print(f'\nFinal Test Accuracy: {test_accuracy:.4f}')

inv_label_mapping = {v: k for k, v in final_label_mapping.items()}

test_samples = [
    "OMG just got tickets for the concert!!! 😍 #excited",
    "This service is terrible! Worst experience ever 😠",
    "Feeling so anxious about the interview tomorrow...",
    "Lost my pet today. I'm completely heartbroken 💔",
    "What a beautiful morning! 🌞 #blessed",
    "lol that's hilarious 😂"
]

for text in test_samples:
    processed = twitter_preprocessor(text)
    seq = text_to_sequence(processed, vocab, MAX_LEN)
    seq_tensor = torch.tensor(seq).unsqueeze(0).to(device)

    with torch.no_grad():
        output = model(seq_tensor)
        prob = torch.softmax(output, dim=1)
        pred = torch.argmax(prob).item()

    print(f"\nText: {text}")
    print(f"Processed: {processed}")
    print(f"Predicted Emotion: {inv_label_mapping[pred]} ({prob.max().item():.2f})")
    print("="*60)


Loading GloVe (100d)...
Vocabulary Size: 13446
Optimal Sequence Length: 13
Loading pre-trained model...





Epoch 01
Train Loss: 0.5781 | Acc: 78.56%
Val Loss: 1.5825 | Acc: 60.55% | F1: 0.5952
              precision    recall  f1-score   support

     neutral     0.4485    0.3565    0.3972      1296
       worry     0.4275    0.3480    0.3837      1296
   happiness     0.5983    0.6528    0.6244      1296
     sadness     0.5851    0.6528    0.6171      1296
        love     0.7244    0.7485    0.7362      1296
    surprise     0.7594    0.8742    0.8128      1296

    accuracy                         0.6055      7776
   macro avg     0.5905    0.6055    0.5952      7776
weighted avg     0.5905    0.6055    0.5952      7776

Model saved with F1: 0.5952

Epoch 02
Train Loss: 0.5733 | Acc: 78.78%
Val Loss: 1.5654 | Acc: 60.79% | F1: 0.5987
              precision    recall  f1-score   support

     neutral     0.4544    0.3727    0.4095      1296
       worry     0.4334    0.3565    0.3912      1296
   happiness     0.6026    0.6528    0.6267      1296
     sadness     0.5849    0.6381    0