In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from transformers import AutoModel,AutoTokenizer,AutoModelForSequenceClassification

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from dataclasses import dataclass

@dataclass
class config:
    # Hyperparameters
    BATCH_SIZE = 32
    MAX_LENGTH = 64
    NUM_EPOCHS = 30
    B1 = 0.9
    B2 = 0.999
    _LAMBDA = 0.01
    LEARNING_RATE = 2e-5
    LOGFILE = 'logfile.txt'
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # DEVICE = 'cpu'

In [3]:
import re
from dadmatools.models.normalizer import Normalizer
from dataclasses import dataclass
from tqdm import tqdm

class Preprocessing:
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained("HooshvareLab/bert-base-parsbert-uncased")

    def __call__(self,text):
        # Step 1: Normalize
        text = self.normalize(text)
        # # Step 2: Remove English characters
        text = self.remove_english_chars(text)
        # # Step 3: Remove repeatable characters
        # text = self.remove_repeatable_chars(text)
        # # Step 4: Remove Arabic diacritics
        text = self.remove_arabic_diacritics(text)
        # # Step 5: Remove non-Persian characters
        text = self.remove_non_persian_chars(text)
        # # Step 6: Remove hashtags
        text = self.remove_hashtags(text)
        # # Step 7: Remove Persian numerals
        text = self.remove_persian_numerals(text)
        # text = self.remove_hash_symbol(self.lemmatize(text))
        
        
        return text
    
    # Step 1: Normalize
    def normalize(self,text):
        normalizer = Normalizer(
            full_cleaning=True,
        )
        return normalizer.normalize(text)

    def remove_hash_symbol(self,text):
        # Remove hash symbol (#)
        try:
            cleaned_text = text.replace('#', '')
        except:
            cleaned_text = text
        return cleaned_text


    # Step 2: remove any engish character
    def remove_english_chars(self,text):
        english_chars_pattern = re.compile(r'[a-zA-Z]')
        cleaned_text = re.sub(english_chars_pattern, '', text)
        return cleaned_text



    # Step 4: remove arabic diactrics
    def remove_arabic_diacritics(self,text):
        """
            Some common Arabic diacritical marks include:
                Fatha (ً): Represents the short vowel "a" or "u" when placed above a letter.
                Kasra (ٍ): Represents the short vowel "i" when placed below a letter.
                Damma (ٌ): Represents the short vowel "u" when placed above a letter.
                Sukun (ـْ): Indicates the absence of any vowel sound.
                Shadda (ّ): Represents consonant doubling or gemination.
                Tanween (ًٌٍ): Represents the nunation or the "n" sound at the end of a word.
        """

        """
            The regular expression [\u064B-\u065F] represents a character range that covers the Unicode code points for Arabic diacritics.
        """
        # مرحبا بكم <== "مَرْحَبًا بِكُمْ"
        arabic_diacritics_pattern = re.compile(r'[\u064B-\u065F]')
        cleaned_text = re.sub(arabic_diacritics_pattern, '', text)
        return cleaned_text

    # Step 5: remove any non-persian chars
    def remove_non_persian_chars(self,text):
        import emoji
        import regex

        def get_emojis(text):
            emojis = []
            for char in text:
                if regex.match(r'\p{So}', char):
                    emojis.append(char)
            return emojis

        # emojis = get_emojis(text)
        persian_chars_pattern = re.compile(r'[^\u0600-\u06FF\uFB8A\u067E\u0686\u06AF\u200C\u200F]+')
        cleaned_text = re.sub(persian_chars_pattern, ' ', text)
        # cleaned_text += ' '.join(emojis)
        return cleaned_text

    # Step 6: remove # sign from text while keeping the information included into hashtags
    def remove_hashtags(self,text):
        # Regular expression to match hashtags
        hashtag_pattern = r'#\w+'
        
        def extract_and_replace(match):
            # Extract the text from the matched hashtag and remove the '#' sign
            hashtag_text = match.group(0)[1:]
            return hashtag_text
        
        # Use the 're.sub' function with the 'extract_and_replace' function as the replacement
        cleaned_text = re.sub(hashtag_pattern, extract_and_replace, text)
        
        return cleaned_text

    # Step 7: remove persian numeric characters from text
    def remove_persian_numerals(self,text):
        # Define a translation table to map Persian numerals to None (remove them)
        persian_numerals = {
            ord('۰'): None,
            ord('۱'): None,
            ord('۲'): None,
            ord('۳'): None,
            ord('۴'): None,
            ord('۵'): None,
            ord('۶'): None,
            ord('۷'): None,
            ord('۸'): None,
            ord('۹'): None
        }
        # Use str.translate() to remove Persian numerals
        cleaned_text = text.translate(persian_numerals)
        return cleaned_text


In [4]:
# X_train = pd.read_csv('preprocess_data/X_train.csv',encoding='utf-8')
# X_test = pd.read_csv('preprocess_data/X_test.csv',encoding='utf-8')
# y_train = pd.read_csv('preprocess_data/y_train.csv')
# y_test = pd.read_csv('preprocess_data/y_test.csv')
# # X_train = X_train.dropna()
# # X_test = X_test.dropna()
# # y_train = y_train.dropna()
# # y_test = y_test.dropna()

# # X_train = X_train['text'].to_list()
# # X_test = X_test['text'].to_list()

# # y_train = y_train['label'].to_list()
# # y_test = y_test['label'].to_list()
# X_test[-1],y_test[-1]

In [5]:
# train = open('data/Augmented_train.txt',encoding='utf-8').readlines()
# test = open('preprocess_data/test.txt',encoding='utf-8').readlines()


In [6]:
train = open('preprocess_data/train.txt',encoding='utf-8').readlines()
test = open('preprocess_data/test.txt',encoding='utf-8').readlines()
X_train = []
y_train = []
X_test = []
y_test = []
p = Preprocessing()
for item in train:
    X,y = item.split(",")
    X_train.append(p(X.strip()))
    y_train.append(y.rstrip())

for item in test:
    X,y = item.split(",")
    X_test.append(p(X.strip()))
    y_test.append(y.rstrip())


In [7]:
cleaned = pd.read_csv('data/cleaned_emopars.csv')
X_train.extend(cleaned['text'].tolist())
y_train.extend(cleaned['major_emotion'].tolist())

In [8]:
# from tqdm import tqdm
# def write_text_to_file(text,file_path):
#     with open(file_path, 'a',encoding='utf-8') as _file:
#         _file.write(text)
        
# preprocessor = Preprocessing()
# augments = pd.read_csv('data/augmented.csv',encoding='utf-8')
# texts = augments['text'].tolist()
# new_texts = []
# labels = augments['label'].tolist()
# idx = 0
# for text in tqdm(texts):
#     writed = preprocessor(text) + "," +str(labels[idx])+"\n"
#     write_text_to_file(writed,'train.txt')
#     idx+=1

 

In [9]:
cleaned['major_emotion'].value_counts()

ANGER        10530
FEAR          5009
SADNESS       4167
HATRED        4060
HAPPINESS     3753
WONDER        2363
Name: major_emotion, dtype: int64

In [10]:
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = AutoTokenizer.from_pretrained("HooshvareLab/bert-base-parsbert-uncased")
        self.max_length = config.MAX_LENGTH
        self.labels_dict = {'SAD': 0, 'HAPPY': 1,'SURPRISE': 2, 'FEAR': 3, 'HATE': 4, 'ANGRY': 5,'OTHER': 6,'HATRED':4,'WONDER':2,'ANGER':5,'SADNESS':0,'HAPPINESS':1}

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        # self.tokenizer.add_special_tokens(text)
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            truncation=True,
            max_length=self.max_length,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt'
        )
        inputs = {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(self.labels_dict[label])
        }
        return inputs

In [11]:
from tqdm import tqdm
from sklearn.metrics import f1_score
import torch.optim.lr_scheduler as lr_scheduler

class Trainer:
    def __init__(self, model,train_loader, val_loader):
        self.model = model.to(config.DEVICE)
        self.train_loader = train_loader
        self.val_loader = val_loader
        # self.test_loader = test_loader
        self.device = config.DEVICE
        self.learning_rate = config.LEARNING_RATE

        # Example initial learning rate
        self.optimizer = torch.optim.AdamW(
            self.model.parameters(),
            lr=config.LEARNING_RATE,
            betas=(config.B1, config.B2),
            weight_decay=config._LAMBDA
        )
        self.scheduler = lr_scheduler.LambdaLR(self.optimizer, lr_lambda=lambda epoch: 1 - epoch/(config.NUM_EPOCHS))
        self.weight = self.calculate_class_weights().to(config.DEVICE)

        self.criterion = nn.CrossEntropyLoss(weight=self.weight)
        
    def calculate_class_weights(self,csv_file='preprocess_data/arman_emo_concat.csv'):
        labels_dict = {'SAD': 0, 'HAPPY': 1,'SURPRISE': 2, 'FEAR': 3, 'HATE': 4, 'ANGRY': 5,'OTHER': 6,}
        data = pd.read_csv(csv_file)
        labels = data['major_emotion']
        class_counts = labels.value_counts()
        total_samples = len(labels)
        class_weights = {}
        for label, count in class_counts.items():
            class_weights[label] = total_samples / (len(class_counts) * count)
        matched_class_weights = {label: class_weights[label] for label in labels_dict}
        class_weights_tensor = torch.tensor(list(matched_class_weights.values()))
        return class_weights_tensor


    def train(self, num_epochs=config.NUM_EPOCHS):
        for epoch in range(num_epochs):
            train_loss, train_acc, train_f1 = self._train_epoch()
            val_loss, val_acc, val_f1 = self._evaluate(self.val_loader)
            print(f"Epoch {epoch+1}/{num_epochs}")
            print(f"Train Loss: {train_loss:.4f} | Train Accuracy: {train_acc:.4f} | Train F1 Score: {train_f1:.4f}")
            print(f"Validation Loss: {val_loss:.4f} | Validation Accuracy: {val_acc:.4f} | Validation F1 Score: {val_f1:.4f}")
            print()

            with open(config.LOGFILE, 'a') as f:
                f.write(f"Epoch {epoch+1}/{num_epochs} | ")
                f.write(f"Train Loss: {train_loss:.4f} | ")
                f.write(f"Train Accuracy: {train_acc:.4f} | ")
                f.write(f"Train F1 Score: {train_f1:.4f} | ")
                f.write(f"Valid Loss: {val_loss:.4f} | ")
                f.write(f"Valid Accuracy: {val_acc:.4f} | ")
                f.write(f"Valid F1 Score: {val_f1:.4f} | ")
                f.write("\n")
                
            checkpoint = {
                        'epoch': epoch + 1,
                        'model_state_dict': self.model.state_dict(),
                        'optimizer_state_dict': self.optimizer.state_dict(),
                        'train_loss': train_loss,
                        'train_acc': train_acc,
                        'train_f1_score': train_f1,
                        'valid_loss': val_loss,
                        'valid_acc': val_acc,
                        'valid_f1_score': val_f1
                    }
            torch.save(checkpoint, f'checkpoints/{epoch+1}checkpoint.pth')
            self.scheduler.step()


    def _train_epoch(self):
        self.model.train()
        total_loss = 0.0
        total_correct = 0
        total_samples = 0
        total_predictions = []
        total_labels = []

        for batch in tqdm(self.train_loader):
            input_ids = batch['input_ids'].to(self.device)
            attention_mask = batch['attention_mask'].to(self.device)
            labels = batch['labels'].to(self.device)

            self.optimizer.zero_grad()

            outputs = self.model(input_ids, attention_mask).logits

            _, predicted = torch.max(outputs, dim=1)

            loss = self.criterion(outputs, labels)
            total_loss += loss.item()

            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)

            total_predictions.extend(predicted.cpu().tolist())
            total_labels.extend(labels.cpu().tolist())

            loss.backward()
            self.optimizer.step()

        average_loss = total_loss / len(self.train_loader)
        accuracy = total_correct / total_samples
        f1 = f1_score(total_labels, total_predictions, average='macro')
        return average_loss, accuracy, f1

    def _evaluate(self, data_loader):
        self.model.eval()
        total_loss = 0.0
        total_correct = 0
        total_samples = 0
        total_predictions = []
        total_labels = []

        with torch.no_grad():
            for batch in tqdm(data_loader):
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)

                outputs = self.model(input_ids, attention_mask).logits
                
                _, predicted = torch.max(outputs, dim=1)
                loss = self.criterion(outputs, labels)
                total_loss += loss.item()

                total_correct += (predicted == labels).sum().item()
                total_samples += labels.size(0)

                total_predictions.extend(predicted.cpu().tolist())
                total_labels.extend(labels.cpu().tolist())

        average_loss = total_loss / len(data_loader)
        accuracy = total_correct / total_samples
        f1 = f1_score(total_labels, total_predictions, average='macro')

        
        return average_loss, accuracy, f1

In [12]:
class SequenceClassifierFactory:
    def __init__(self):
        pass

    @staticmethod
    def create_model(model_name, num_labels):
        model_name = model_name.lower()
        if model_name == 'parsbert':
            tokenizer = AutoTokenizer.from_pretrained("HooshvareLab/bert-base-parsbert-uncased")
            model = AutoModelForSequenceClassification.from_pretrained("HooshvareLab/bert-base-parsbert-uncased", num_labels=num_labels)
        elif model_name == 'xlm-roberta':
            tokenizer = AutoTokenizer.from_pretrained('checkpoints/models--xlm-roberta-large/snapshots/')
            model = AutoModelForSequenceClassification.from_pretrained('checkpoints/models--xlm-roberta-large/snapshots/', num_labels=num_labels)
        elif model_name == 'xlm-emo':
            tokenizer = AutoTokenizer.from_pretrained("m3hrdadfi/xlm-mlm-17-1280-emotion")
            model = AutoModelForSequenceClassification.from_pretrained("m3hrdadfi/xlm-mlm-17-1280-emotion", num_labels=num_labels)
        else:
            raise ValueError(f"Model '{model_name}' not supported.")
        
        return tokenizer, model

In [13]:

from transformers import AutoConfig , AutoTokenizer , AutoModel

def change_grad(module,_req_grad_=False):
    for param in module.parameters():
        param.requires_grad = _req_grad_
    
num_labels = 7
model_name = 'parsbert'
tokenizer , model = SequenceClassifierFactory().create_model(model_name,num_labels)


Some weights of the model checkpoint at HooshvareLab/bert-base-parsbert-uncased were not used when initializing BertForSequenceClassification: ['cls.predictions.decoder.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassifica

In [14]:
train_dataset = TextDataset(X_train,y_train,max_length=64)
test_dataset = TextDataset(X_test,y_test,max_length=64)
train_dataloader = torch.utils.data.DataLoader(train_dataset,batch_size=32,shuffle=True)
test_dataloader = torch.utils.data.DataLoader(test_dataset,batch_size=128,shuffle=False)

In [15]:
torch.cuda.empty_cache()
trainer = Trainer(model=model,train_loader=train_dataloader,val_loader=test_dataloader)
trainer.train(num_epochs=10)

100%|██████████| 1127/1127 [09:05<00:00,  2.07it/s]
100%|██████████| 9/9 [00:08<00:00,  1.12it/s]


Epoch 1/10
Train Loss: 1.7964 | Train Accuracy: 0.2333 | Train F1 Score: 0.2313
Validation Loss: 1.3261 | Validation Accuracy: 0.4926 | Validation F1 Score: 0.4859



100%|██████████| 1127/1127 [09:21<00:00,  2.01it/s]
100%|██████████| 9/9 [00:08<00:00,  1.12it/s]


Epoch 2/10
Train Loss: 1.5726 | Train Accuracy: 0.3175 | Train F1 Score: 0.3312
Validation Loss: 1.1705 | Validation Accuracy: 0.5578 | Validation F1 Score: 0.5305



100%|██████████| 1127/1127 [09:20<00:00,  2.01it/s]
100%|██████████| 9/9 [00:07<00:00,  1.14it/s]


Epoch 3/10
Train Loss: 1.3178 | Train Accuracy: 0.4167 | Train F1 Score: 0.4495
Validation Loss: 1.4311 | Validation Accuracy: 0.5613 | Validation F1 Score: 0.5277



100%|██████████| 1127/1127 [09:21<00:00,  2.01it/s]
100%|██████████| 9/9 [00:07<00:00,  1.18it/s]


Epoch 4/10
Train Loss: 0.9954 | Train Accuracy: 0.5419 | Train F1 Score: 0.5857
Validation Loss: 1.7872 | Validation Accuracy: 0.5387 | Validation F1 Score: 0.5219



100%|██████████| 1127/1127 [09:01<00:00,  2.08it/s]
100%|██████████| 9/9 [00:07<00:00,  1.18it/s]


Epoch 5/10
Train Loss: 0.6518 | Train Accuracy: 0.6916 | Train F1 Score: 0.7318
Validation Loss: 2.1360 | Validation Accuracy: 0.5091 | Validation F1 Score: 0.4833



100%|██████████| 1127/1127 [09:00<00:00,  2.09it/s]
100%|██████████| 9/9 [00:07<00:00,  1.18it/s]


Epoch 6/10
Train Loss: 0.4080 | Train Accuracy: 0.8006 | Train F1 Score: 0.8310
Validation Loss: 2.5437 | Validation Accuracy: 0.4778 | Validation F1 Score: 0.4476



100%|██████████| 1127/1127 [09:00<00:00,  2.08it/s]
100%|██████████| 9/9 [00:07<00:00,  1.18it/s]


Epoch 7/10
Train Loss: 0.2480 | Train Accuracy: 0.8760 | Train F1 Score: 0.8968
Validation Loss: 3.0266 | Validation Accuracy: 0.4796 | Validation F1 Score: 0.4503



100%|██████████| 1127/1127 [09:00<00:00,  2.08it/s]
100%|██████████| 9/9 [00:07<00:00,  1.18it/s]


Epoch 8/10
Train Loss: 0.1762 | Train Accuracy: 0.9160 | Train F1 Score: 0.9288
Validation Loss: 3.0550 | Validation Accuracy: 0.4970 | Validation F1 Score: 0.4777



100%|██████████| 1127/1127 [09:00<00:00,  2.09it/s]
100%|██████████| 9/9 [00:07<00:00,  1.18it/s]


Epoch 9/10
Train Loss: 0.1288 | Train Accuracy: 0.9392 | Train F1 Score: 0.9480
Validation Loss: 3.3261 | Validation Accuracy: 0.4596 | Validation F1 Score: 0.4447



100%|██████████| 1127/1127 [09:00<00:00,  2.09it/s]
100%|██████████| 9/9 [00:07<00:00,  1.18it/s]


Epoch 10/10
Train Loss: 0.1005 | Train Accuracy: 0.9545 | Train F1 Score: 0.9614
Validation Loss: 3.4920 | Validation Accuracy: 0.4396 | Validation F1 Score: 0.4236



In [20]:
def predict(texts,local_ids, model, device, threshold=0.5):
    model.eval()  # Set the model to evaluation mode
    inputs = []  # To store the input tensors
    # anger, sadness, surprise, happiness, fear, disgust, other
    label_dict = {0: 'sadness', 1: 'happiness', 2: 'surprise', 3: 'fear', 4: 'disgust', 5: 'anger', 6: 'other'}
    num_classes = len(label_dict)
    # texts = texts['tweet'].tolist()
    # local_id = texts['local_id'].tolist()
    
    # Tokenize and preprocess the texts
    tokenizer = AutoTokenizer.from_pretrained('HooshvareLab/bert-base-parsbert-uncased')
    max_length = 64
    for text in tqdm(texts):
        encoding = tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            truncation=True,
            max_length=max_length,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt'
        )
        input_ids = encoding['input_ids'].squeeze().to(device)
        attention_mask = encoding['attention_mask'].squeeze().to(device)
        inputs.append({'input_ids': input_ids, 'attention_mask': attention_mask})

    # Make predictions
    with torch.no_grad():
        predictions = []
        prob_matrix = []  # To store the probability distribution for each text
        for input_data in tqdm(inputs):
            input_ids = input_data['input_ids'].unsqueeze(0)  # Add batch dimension
            attention_mask = input_data['attention_mask'].unsqueeze(0)  # Add batch dimension

            outputs = model(input_ids, attention_mask).logits
            predicted_probs = torch.softmax(outputs, dim=1).squeeze().cpu().numpy()
            prob_matrix.append(predicted_probs)
            
            # Apply threshold to determine emotions present in text
            predicted_labels = [label_dict[idx] for idx, prob in enumerate(predicted_probs) if prob > threshold]
            predictions.append(label_dict[torch.argmax(outputs, dim=1).item()])

    csv_data = []
    for local_id, text, primary_emotion, probs in zip(local_ids, texts, predictions, prob_matrix):
        emotion_probs = [1 if prob > threshold else 0 for prob in probs]
        row = [local_id, text, primary_emotion] + emotion_probs
        csv_data.append(row)

    # Create a DataFrame from the CSV data
    columns = ["local_id", "tweet", "primary_emotion", "anger", "sadness", "fear", "happiness", "disgust", "surprise", "other"]
    df = pd.DataFrame(csv_data, columns=columns)
    df.to_csv('checkpoints/predictions_2.csv', index=False)

    return predictions


texts = pd.read_csv('final/data_emotion_without_label.csv',encoding='utf-8')['tweet'].tolist()
local_id = pd.read_csv('final/data_emotion_without_label.csv',encoding='utf-8')['local_id'].tolist()

x = predict(texts=texts,local_ids=local_id,model=model,device='cuda',threshold=0.35)

100%|██████████| 500/500 [00:00<00:00, 1418.10it/s]
100%|██████████| 500/500 [00:05<00:00, 84.42it/s]


In [17]:
"""
Oversampling and Undersampling: Oversampling and undersampling techniques can be used with text data, but there are some challenges. In oversampling, simply duplicating text samples might not be effective, as it can lead to overfitting. Techniques like SMOTE or generating synthetic examples through text augmentation (e.g., paraphrasing) can be more appropriate. For undersampling, randomly removing text samples may result in loss of valuable information. Undersampling methods that consider text properties, like Tomek links, might be more suitable.

Synthetic Data Generation: Generating synthetic examples for text data can be more complex than in tabular data. Techniques like SMOTE might need adaptations to consider the sequential nature of text. Additionally, methods like Word2Vec or GPT-based language models can be used for generating semantically similar but contextually different text samples.

Data Augmentation: Data augmentation methods for text involve introducing variations in text content while preserving meaning. Techniques like synonym replacement, word swapping, and paraphrasing can be used to create augmented versions of the minority class text samples.

Class-Weighted Loss: Class-weighted loss can be applied to text data as well. It considers the importance of each class in the loss calculation during training.

Imbalanced-learn Library: If you're using Python, the imbalanced-learn library offers resampling techniques tailored for imbalanced datasets. While some techniques might need adjustments for text data, the library can still provide a good starting point.

"""

"""
from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from imblearn.over_sampling import SMOTE
import nlpaug.augmenter.word as naw

# Load a text dataset (e.g., 20 Newsgroups)
newsgroups = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))

X, y = newsgroups.data, newsgroups.target

# Split the dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Apply TF-IDF vectorization
vectorizer = TfidfVectorizer(max_features=10000)
X_train_tfidf = vectorizer.fit_transform(X_train)

# Apply oversampling using SMOTE
oversampler = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = oversampler.fit_resample(X_train_tfidf, y_train)

# Apply data augmentation using nlpaug
aug = naw.SynonymAug(aug_src='wordnet', aug_max=10)
X_train_augmented = [aug.augment(text) for text in X_train]

# Print sample sizes before and after resampling/augmentation
print("Original class distribution:", {class_label: sum(y_train == class_label) for class_label in set(y_train)})
print("After SMOTE resampling:", {class_label: sum(y_train_resampled == class_label) for class_label in set(y_train_resampled)})
print("After data augmentation:", {class_label: sum(y_train == class_label) for class_label in set(y_train)})

# Now you can use X_train_resampled and y_train_resampled for training


"""

'\nfrom sklearn.datasets import fetch_20newsgroups\nfrom sklearn.model_selection import train_test_split\nfrom sklearn.feature_extraction.text import TfidfVectorizer\nfrom imblearn.over_sampling import SMOTE\nimport nlpaug.augmenter.word as naw\n\n# Load a text dataset (e.g., 20 Newsgroups)\nnewsgroups = fetch_20newsgroups(subset=\'all\', remove=(\'headers\', \'footers\', \'quotes\'))\n\nX, y = newsgroups.data, newsgroups.target\n\n# Split the dataset into training and test sets\nX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)\n\n# Apply TF-IDF vectorization\nvectorizer = TfidfVectorizer(max_features=10000)\nX_train_tfidf = vectorizer.fit_transform(X_train)\n\n# Apply oversampling using SMOTE\noversampler = SMOTE(random_state=42)\nX_train_resampled, y_train_resampled = oversampler.fit_resample(X_train_tfidf, y_train)\n\n# Apply data augmentation using nlpaug\naug = naw.SynonymAug(aug_src=\'wordnet\', aug_max=10)\nX_train_augmented = [aug.augmen

In [18]:
import re
from typing import Any
import hazm
from dadmatools.models.normalizer import Normalizer
from dataclasses import dataclass
from hazm import Lemmatizer
class Preprocessing:
    def __init__(self):
        pass

    def __call__(self,text):
        # Step 1: Normalize
        text = self.normalize(text)
        text = self.remove_english_chars(text)
        # text = self.remove_repeatable_chars(text)
        text = self.remove_arabic_diacritics(text)
        text = self.remove_non_persian_chars(text)

        text = self.remove_persian_numerals(text)
        text = self.remove_hashtags(text)
        return text
    
    # Step 1: Normalize
    def normalize(self,text):
        normalizer = Normalizer(
            full_cleaning=True,
        )
        return normalizer.normalize(text)

    # Step 2: remove any engish character
    def remove_english_chars(self,text):
        english_chars_pattern = re.compile(r'[a-zA-Z]')
        cleaned_text = re.sub(english_chars_pattern, '', text)
        return cleaned_text

    # Step 3: remove repeatable characters
    def remove_repeatable_chars(self,text):
        return hazm.Normalizer().normalize(text)

    # Step 4: remove arabic diactrics
    def remove_arabic_diacritics(self,text):
        """
            Some common Arabic diacritical marks include:
                Fatha (ً): Represents the short vowel "a" or "u" when placed above a letter.
                Kasra (ٍ): Represents the short vowel "i" when placed below a letter.
                Damma (ٌ): Represents the short vowel "u" when placed above a letter.
                Sukun (ـْ): Indicates the absence of any vowel sound.
                Shadda (ّ): Represents consonant doubling or gemination.
                Tanween (ًٌٍ): Represents the nunation or the "n" sound at the end of a word.
        """

        """
            The regular expression [\u064B-\u065F] represents a character range that covers the Unicode code points for Arabic diacritics.
        """
        # مرحبا بكم <== "مَرْحَبًا بِكُمْ"
        arabic_diacritics_pattern = re.compile(r'[\u064B-\u065F]')
        cleaned_text = re.sub(arabic_diacritics_pattern, '', text)
        return cleaned_text

    # Step 5: remove any non-persian chars
    def remove_non_persian_chars(self,text):
        persian_chars_pattern = re.compile(r'[^\u0600-\u06FF\uFB8A\u067E\u0686\u06AF\u200C\u200F]+')
        cleaned_text = re.sub(persian_chars_pattern, ' ', text)
        return cleaned_text

    # Step 6: remove # sign from text while keeping the information included into hashtags
    def remove_hashtags(self,text):
        # Regular expression to match hashtags
        hashtag_pattern = r'#\w+'
        
        def extract_and_replace(match):
            # Extract the text from the matched hashtag and remove the '#' sign
            hashtag_text = match.group(0)[1:]
            return hashtag_text
        
        # Use the 're.sub' function with the 'extract_and_replace' function as the replacement
        cleaned_text = re.sub(hashtag_pattern, extract_and_replace, text)
        
        return cleaned_text
    

    # Step 7: remove persian numeric characters from text
    def remove_persian_numerals(self,text):
        # Define a translation table to map Persian numerals to None (remove them)
        persian_numerals = {
            ord('۰'): None,
            ord('۱'): None,
            ord('۲'): None,
            ord('۳'): None,
            ord('۴'): None,
            ord('۵'): None,
            ord('۶'): None,
            ord('۷'): None,
            ord('۸'): None,
            ord('۹'): None
        }
        # Use str.translate() to remove Persian numerals
        cleaned_text = text.translate(persian_numerals)
        return cleaned_text


In [19]:
import pandas as pd
from tqdm import tqdm
train_file = pd.read_csv('train_emoPars.csv',encoding='utf-8')
test_file = pd.read_csv('test_emoPars.csv',encoding='utf-8')
p = Preprocessing()
tqdm.pandas()
train_file['text'] = train_file['text'].progress_apply(p)
test_file['text'] = test_file['text'].progress_apply(p)

FileNotFoundError: [Errno 2] No such file or directory: 'train_emoPars.csv'

In [None]:
train_file.to_csv('train_cleaned_emopars.csv')
test_file.to_csv('test_cleaned_emopars.csv')

In [None]:
train_file.to_csv('test_cleaned_emopars.csv')

In [None]:
train_file['texts']

0       من خیلی خودسانسوری می‌کنم تو اینستا هر چی فالو...
1               بعد اتمام جلسه مجلس روند بازار برگشت بورس
2       کاربران توییتر در جریان طوفان توییتری اعتراض ب...
3       وحشی شدن معده بعد از رسیدن به ایران اجتناب ناپ...
4       سحام نیوز بیانیه مشترک عربستان و امارات با پرو...
                              ...                        
2995                    بعد از کرونا یه صفایی بکنیم با هم
2996                              قبلش یه پیتزا بده حداقل
2997    از رفقا می‌خواهم که از بین اپوزیسیون حزب یا سا...
2998    از هر دریچه‌ای؛ چه موازنه قوا با امریکا چه تام...
2999          چند نفر تا حتما هستن میخوام آمار بگیرم بگین
Name: texts, Length: 3000, dtype: object

In [None]:
import pandas as pd
from tqdm import tqdm
y = pd.read_csv('datasets/data_emotion_without_label.csv')
p = Preprocessing()
tqdm.pandas()
y['tweet'] = y['tweet'].progress_apply(p)

  0%|          | 0/500 [00:00<?, ?it/s]

100%|██████████| 500/500 [00:02<00:00, 219.87it/s]


In [None]:
y.to_csv('data_emotion_without_label.csv')

In [None]:
train = pd.read_csv('data/train.csv')
test = pd.read_csv('data/test.csv')

In [None]:
combined = pd.concat([train,test],axis=0)
p = Preprocessing()
tqdm.pandas()
combined['text'] = combined['text'].progress_apply(p)
combined.to_csv('final/arman.csv')

100%|██████████| 7308/7308 [00:18<00:00, 386.83it/s]


In [25]:
x = pd.read_csv('checkpoints/predictions_1.csv')
y = pd.read_csv('res.csv')
y['primary_emotion'] = x['primary_emotion']
y.to_csv('final.csv')