In [None]:
# Model and Tokenizer Initialization
%%capture
!pip install transformers
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# Initialize Arabic Bert
tokenizer = AutoTokenizer.from_pretrained("asafaya/bert-base-arabic")
model = AutoModelForSequenceClassification.from_pretrained("asafaya/bert-base-arabic", num_labels=3)

In [None]:
# Initial Data Preparation
import numpy as np
import pandas as pd
import re

# Read CSV and apply data preparation
train_df = pd.read_csv("/content/train.csv")
test_df = pd.read_csv("/content/test.csv")
sample_submission = pd.read_csv("/content/sample_submission.csv")

# Display initial data
display(train_df.head())
display(test_df.head())
display(sample_submission.head())

Unnamed: 0,tweet,class
0,' #علمتني_الحياه أن الذين يعيشون على الأرض ليس...,pos
1,' #ميري_كرسمس كل سنة وانتم طيبين http://t.co/n...,pos
2,' و انتهى مشوار الخواجة ',neg
3,' مش عارف ابتدى مذاكره منين :/ ',neg
4,' @mskhafagi إختصروا الطريق بدلا من إختيار ال...,neg


Unnamed: 0,Id,tweet
0,1,' فينو الاهبل ابن الاهبل '
1,2,' على المصرييييين وجمالهم ربنا يحميهم #MinaAtt...
2,3,' @Kholoudkewan دول كتير اوى ودمهم خفيف العما...
3,4,' انا بعد كده خلى اللى يوعنى بحاجه همضى على...
4,5,' انا هنتحر '


Unnamed: 0,Id,class
0,1,neg
1,2,pos
2,3,neg
3,4,neg
4,5,neg


In [None]:
# Clean-up: remove #tags, http links and special symbols
train_df.iloc[:, 0] = train_df.iloc[:, 0].apply(lambda x: x[2:-2])
train_df.iloc[:, 0] = train_df.iloc[:, 0].apply(lambda x: re.sub(r'http\S+', '', x))
train_df.iloc[:, 0] = train_df.iloc[:, 0].apply(lambda x: re.sub(r'[@|#]\S*', '', x))
train_df.iloc[:, 0] = train_df.iloc[:, 0].apply(lambda x: re.sub(r'"+', '', x))

# Remove arabic signs (adjust based on your needs)
train_df.iloc[:, 0] = train_df.iloc[:, 0].apply(lambda x: re.sub(r'([^\u0600-\u06FF\s]+)', '', x))

# Tokenize the sentences using bert tokenizer
train_df["bert_tokens"] = train_df.tweet.apply(lambda x: tokenizer(x).tokens())

In [None]:
# Display processed data
display(train_df.head())

Unnamed: 0,tweet,class,bert_tokens
0,أن الذين يعيشون على الأرض ليسوا ملائكة بل بشر...,pos,"[[CLS], ان, الذين, يعيشون, على, الارض, ليسوا, ..."
1,كل سنة وانتم طيبين,pos,"[[CLS], كل, سنة, وانتم, طيب, ##ين, [SEP]]"
2,و انتهى مشوار الخواجة,neg,"[[CLS], و, انتهى, مشوار, الخ, ##واج, ##ة, [SEP]]"
3,مش عارف ابتدى مذاكره منين,neg,"[[CLS], مش, عارف, ابتد, ##ى, مذ, ##اك, ##ره, م..."
4,إختصروا الطريق بدلا من إختيار المنصف ثم الان...,neg,"[[CLS], اخت, ##صروا, الطريق, بدلا, من, اختيار,..."


In [None]:
import random
from sklearn.model_selection import train_test_split

# Data Augmentation (Example: Random Insertion)
def random_insertion(text):
  words = text.split()
  p_insert = 0.3  # Probability of inserting a word
  new_words = []
  for i, word in enumerate(words):
    if random.random() < p_insert and i < len(words) - 1:
      new_words.append(word)
      new_words.append(random.choice(words[:i] + words[i + 1:]))
    else:
      new_words.append(word)
  return " ".join(new_words)

# Train-Test Split with Augmentation
train_df["augmented_tweet"] = train_df["tweet"].apply(random_insertion)
train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(train_df["augmented_tweet"].apply(lambda x: tokenizer(x).tokens()),
                                                                                train_df["class"],
                                                                                random_state=2018, test_size=0.1)

In [None]:
# Encode labels
from sklearn import preprocessing

bert_tokens = train_inputs
labels = train_labels

# Apply label encoding over the labels
le = preprocessing.LabelEncoder()
Encodedlabels = le.fit_transform(labels)

In [None]:
# Padding and attention mask creation
from keras.preprocessing.sequence import pad_sequences

# Set the maximum sequence length (adjust based on your model)
MAX_LEN = 256

# Use the BERT tokenizer to convert the tokens to their index numbers in the BERT vocabulary
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in bert_tokens]

# Pad our input tokens
input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

# Create attention masks
attention_masks = []

# Create a mask of 1s for each token followed by 0s for padding
for seq in input_ids:
    seq_mask = [float(i > 0) for i in seq]
    attention_masks.append(seq_mask)

In [None]:
# Convert to tensors
import torch
from torch.utils.data import TensorDataset, DataLoader

# Use train_test_split to split our data into train and validation sets for training
train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(input_ids, Encodedlabels,
                                                                                    random_state=2018, test_size=0.1)
train_masks, validation_masks, _, _ = train_test_split(attention_masks, input_ids,
                                                       random_state=2018, test_size=0.1)

# Convert all of our data into torch tensors, the required datatype for our model
train_inputs = torch.tensor(train_inputs)
validation_inputs = torch.tensor(validation_inputs)
train_labels = torch.tensor(train_labels)
validation_labels = torch.tensor(validation_labels)
train_masks = torch.tensor(train_masks)
validation_masks = torch.tensor(validation_masks)

# Select a batch size for training
batch_size = 16

# Create an iterator of our data with torch DataLoader
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_dataloader = DataLoader(validation_data, batch_size=batch_size)

In [None]:
# Optimizer setup
import torch.optim as optim

param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'gamma', 'beta']
optimizer_grouped_parameters = [{'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay_rate': 0.01},
                                {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay_rate': 0.0}]
optimizer = optim.AdamW(optimizer_grouped_parameters, lr=2e-5)

In [None]:
# Training the model
from tqdm import tqdm, trange

# Function to calculate the accuracy of our predictions vs labels
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

# Store our loss and accuracy for plotting
train_loss_set = []

# Number of training epochs
epochs = 5

# Transfer the model to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# trange is a tqdm wrapper around the normal python range
for _ in trange(epochs, desc="Epoch"):
    # Training
    model.train()
    tr_loss = 0
    nb_tr_examples, nb_tr_steps = 0, 0

    # Train the data for one epoch
    for step, batch in enumerate(train_dataloader):
        b_input_ids, b_input_mask, b_labels = batch
        b_input_ids, b_input_mask, b_labels = b_input_ids.to(device), b_input_mask.to(device), b_labels.to(device)

        # Clear out the gradients
        optimizer.zero_grad()

        # Forward pass
        loss = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)["loss"]
        train_loss_set.append(loss.item())

        # Backward pass
        loss.backward()

        # Update parameters and take a step using the computed gradient
        optimizer.step()

        # Update tracking variables
        tr_loss += loss.item()
        nb_tr_examples += b_input_ids.size(0)
        nb_tr_steps += 1

    print("Train loss: {}".format(tr_loss / nb_tr_steps))

    # Validation
    model.eval()
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0

    for batch in validation_dataloader:
        b_input_ids, b_input_mask, b_labels = batch
        b_input_ids, b_input_mask, b_labels = b_input_ids.to(device), b_input_mask.to(device), b_labels.to(device)

        with torch.no_grad():
            logits = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)["logits"]

        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()

        tmp_eval_accuracy = flat_accuracy(logits, label_ids)

        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("Validation Accuracy: {}".format(eval_accuracy / nb_eval_steps))

Epoch:   0%|          | 0/5 [00:00<?, ?it/s]

Train loss: 0.9672854344050089


Epoch:  20%|██        | 1/5 [01:12<04:50, 72.61s/it]

Validation Accuracy: 0.5770833333333333
Train loss: 0.7278365237372262


Epoch:  40%|████      | 2/5 [02:23<03:34, 71.42s/it]

Validation Accuracy: 0.5927083333333333
Train loss: 0.44370113277719136


Epoch:  60%|██████    | 3/5 [03:33<02:21, 70.99s/it]

Validation Accuracy: 0.6083333333333333
Train loss: 0.22046220809930847


Epoch:  80%|████████  | 4/5 [04:44<01:10, 70.89s/it]

Validation Accuracy: 0.5822916666666667
Train loss: 0.11444131706264757


Epoch: 100%|██████████| 5/5 [05:55<00:00, 71.00s/it]

Validation Accuracy: 0.6322916666666667





In [None]:
# Prediction on test data
test_df["tweet"] = test_df["tweet"].apply(lambda x: re.sub(r'http\S+', '', x))
test_df["tweet"] = test_df["tweet"].apply(lambda x: re.sub(r'[@|#]\S*', '', x))
test_df["tweet"] = test_df["tweet"].apply(lambda x: re.sub(r'"+', '', x))
test_df["tweet"] = test_df["tweet"].apply(lambda x: re.sub(r'([@A-Za-z0-9_ـــــــــــــ]+)|[^\w\s]|#|http\S+', '', x))

# Tokenize and encode the test data
test_tokens = test_df["tweet"].apply(lambda x: tokenizer(x).tokens())
test_input_ids = [tokenizer.convert_tokens_to_ids(x) for x in test_tokens]
test_input_ids = pad_sequences(test_input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

# Create attention masks for test data
test_attention_masks = []

for seq in test_input_ids:
    seq_mask = [float(i > 0) for i in seq]
    test_attention_masks.append(seq_mask)

In [None]:
# Convert test data to tensors
test_inputs = torch.tensor(test_input_ids)
test_masks = torch.tensor(test_attention_masks)

# Create DataLoader for test data
test_data = TensorDataset(test_inputs, test_masks)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

In [None]:
# Prediction
model.eval()
predictions = []

for batch in test_dataloader:
    b_input_ids, b_input_mask = batch
    b_input_ids, b_input_mask = b_input_ids.to(device), b_input_mask.to(device)

    with torch.no_grad():
        logits = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)["logits"]

    logits = logits.detach().cpu().numpy()
    predictions.append(logits)

predictions = np.concatenate(predictions, axis=0)
predictions = np.argmax(predictions, axis=1)

In [None]:
# Create submission file
submission = sample_submission.copy()
submission["class"] = le.inverse_transform(predictions)
submission.to_csv("submission.csv", index=False)