In [None]:
!pip install transformers
!pip install datasets
!git clone https://github.com/Zarharan/PersianStanceDetection
!git clone https://github.com/kharazi/persian-stopwords
!pip install hazm
!pip install patool

In [None]:
import os
import csv
import pandas as pd
from tqdm import tqdm
import torch
from torch import nn
from transformers import AdamW
from sklearn.model_selection import train_test_split
import torch
import datasets
from datasets import Dataset
from transformers import AutoTokenizer, AutoModel
from tqdm.notebook import tqdm_notebook
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
import hazm
import patoolib
import numpy as np

In [None]:
with open("/content/persian-stopwords/persian", "rt", encoding="utf8") as fi:
    stop_words = fi.read().strip().split("\n")

In [None]:
# patoolib.extract_archive("/content/drive/MyDrive/glove.6B.100d.rar", outdir="/content/")

## importing Glove

In [None]:
vocab, embeddings = [], []
with open("glove.6B.100d.txt", "rt", encoding="utf8") as fi:
    full_content = fi.read().strip().split("\n")
for i in range(len(full_content)):
    i_word = full_content[i].split(" ")[0]
    i_embeddings = [float(val) for val in full_content[i].split(" ")[1:]]
    vocab.append(i_word)
    embeddings.append(i_embeddings)

In [None]:
vocab_npa = np.array(vocab)
embs_npa = np.array(embeddings)

In [None]:
vocab_npa = np.insert(vocab_npa, 0, "<pad>")
vocab_npa = np.insert(vocab_npa, 1, "<unk>")
print(vocab_npa[:10])

pad_emb_npa = np.zeros((1, embs_npa.shape[1]))  # embedding for '<pad>' token.
unk_emb_npa = np.mean(embs_npa, axis=0, keepdims=True)  # embedding for '<unk>' token.

# insert embeddings for pad and unk tokens at top of embs_npa.
embs_npa = np.vstack((pad_emb_npa, unk_emb_npa, embs_npa))
print(embs_npa.shape)

In [None]:
word_to_index = dict(zip(list(vocab_npa), range(len(vocab_npa))))

## preprocess data

In [None]:
data = pd.read_csv(
    "/content/PersianStanceDetection/Dataset/ArticleToClaim.csv",
    index_col=0,
    encoding="utf-8",
)


data.head()

In [None]:
train_df, test_df = train_test_split(data, test_size=0.1, random_state=42)

In [None]:
train_df["Stance"].value_counts()

In [None]:
maps = {"Unrelated": int(0), "Discuss": int(1), "Agree": int(1), "Disagree": int(1)}

In [None]:
train_df["label"] = train_df["Stance"].apply(maps.get)
test_df["label"] = test_df["Stance"].apply(maps.get)

In [None]:
train_df = train_df[["claim", "body", "label"]]
test_df = test_df[["claim", "body", "label"]]

In [None]:
tokenizer = hazm.WordTokenizer()

In [None]:
train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
hazm_normalizer = hazm.Normalizer(
    remove_extra_spaces=True,
    persian_numbers=False,
    persian_style=True,
    punctuation_spacing=False,
    remove_diacritics=True,
    affix_spacing=True,
    token_based=True,
)

In [None]:
def get_ids(txt):
    tokenized_txt = tokenizer.tokenize(txt)
    ids = []
    for token in tokenized_txt:
        if not (token in stop_words):
            id = word_to_index.get(token, -1)
            if id != -1:
                ids.append(id)
            else:
                if "\u200c" in token:
                    tks = token.split("\u200c")
                    for t in tks:
                        if word_to_index.get(t, -1) == -1:
                            ids.append(word_to_index["<unk>"])
                        else:
                            ids.append(word_to_index[t])
                else:
                    ids.append(word_to_index["<unk>"])
    return ids

In [None]:
def preprocess(record):
    max_len = 512
    normal_claim = hazm_normalizer.normalize(record["claim"])
    normal_body = hazm_normalizer.normalize(record["body"])
    normal_body = normal_body.replace("\n", " ")
    normal_claim = normal_claim.replace("آ", "ا")
    normal_body = normal_body.replace("آ", "ا")
    normal_claim = normal_claim.replace("ئ", "ی")
    normal_body = normal_body.replace("ئ", "ی")

    head_ids = get_ids(normal_claim)
    body_ids = get_ids(normal_body)

    label_task2 = record["label"]
    label_task1 = 0
    if label_task2 > 0:
        label_task1 = 1
    return {
        "head_ids": head_ids[:max_len],
        "body_ids": body_ids[:max_len],
        "label": record["label"],
    }

In [None]:
train_dataset = train_dataset.map(preprocess, remove_columns=train_dataset.column_names)
test_dataset = test_dataset.map(preprocess, remove_columns=test_dataset.column_names)

In [None]:
train_dataset = train_dataset.shuffle(seed=42)

In [None]:
train_dataset.set_format(type="torch")
test_dataset.set_format(type="torch")

In [None]:
class LSTM_Based(nn.Module):
    def __init__(self, embs_npa):
        super(LSTM_Based, self).__init__()
        self.word_embeddings = nn.Embedding.from_pretrained(
            torch.from_numpy(embs_npa).float()
        )
        self.lstm_header = nn.LSTM(
            300, 300, num_layers=2, dropout=0.1, bidirectional=True, batch_first=True
        )
        self.lstm_body = nn.LSTM(
            300, 300, num_layers=2, dropout=0.1, bidirectional=True, batch_first=True
        )
        self.dropout = nn.Dropout(p=0.2)
        self.fc = nn.Linear(1200, 256)
        self.act_func = nn.ReLU()
        self.label = nn.Linear(256, 2)

    def forward(self, input_seq_header, input_seq_body):
        x_header = self.word_embeddings(input_seq_header)
        x_body = self.word_embeddings(input_seq_body)
        _, (last_hidden_layyer_header, _) = self.lstm_header(x_header)
        x_header = last_hidden_layyer_header
        _, (last_hidden_layyer_body, _) = self.lstm_body(x_body)
        x_body = last_hidden_layyer_body
        out1 = torch.cat(
            (last_hidden_layyer_header[2, :, :], last_hidden_layyer_header[3, :, :]),
            dim=1,
        )
        out2 = torch.cat(
            (last_hidden_layyer_body[2, :, :], last_hidden_layyer_body[3, :, :]), dim=1
        )
        out = torch.cat((out1, out2), dim=1)
        out = self.act_func(self.fc(self.dropout(out)))
        out = self.label(self.dropout(out))
        return out

In [None]:
def collate_batch(batch):
    header_ids = []
    body_ids = []
    labels = []
    for b in batch:
        header_ids.append(b["head_ids"])
        body_ids.append(b["body_ids"])
        labels.append(b["label"])
    header_ids = torch.nn.utils.rnn.pad_sequence(
        header_ids, batch_first=True, padding_value=0.0
    )
    body_ids = torch.nn.utils.rnn.pad_sequence(
        body_ids, batch_first=True, padding_value=0.0
    )
    return {"head_ids": header_ids, "body_ids": body_ids, "label": torch.Tensor(labels)}

In [None]:
def validation(model, val_loader, loss_fn, device="cuda", lbl=""):
    val_loss = 0.0
    model.eval()
    num_words = 0
    tps = 0
    reals = []
    preds_list = []
    for batch in val_loader:
        header_ids = batch["head_ids"].to(device)
        body_ids = batch["body_ids"].to(device)
        labels = batch["label"].to(device)
        preds = model(input_seq_header=header_ids, input_seq_body=body_ids)
        loss = loss_fn(preds, labels.to(torch.long))
        val_loss += loss.item()
        reals += labels.cpu().numpy().tolist()
        preds_list += preds.argmax(axis=1).cpu().numpy().tolist()

    val_loss = val_loss / len(val_loader)
    accuracy = accuracy_score(reals, preds_list, normalize=True)
    print(classification_report(reals, preds_list))
    return val_loss, accuracy

In [None]:
def train(
    model, optimizer, loss_fn, train_loader, test_loader, epochs, lbl="", device="cuda"
):

    model.to(device)

    for epoch in range(epochs):

        training_loss = 0.0

        val_loss = 0.0
        model.train()


        t_labels = []

        t_preds = []

        for batch in tqdm_notebook(train_loader):

            optimizer.zero_grad()

            header_ids = batch["head_ids"].to(device)

            body_ids = batch["body_ids"].to(device)

            labels = batch["label"].to(device)

            preds = model(input_seq_header=header_ids, input_seq_body=body_ids)

            loss = loss_fn(preds, labels.to(torch.long))

            t_labels += labels.cpu().numpy().tolist()

            loss.backward()

            optimizer.step()

            training_loss += loss.item()

            t_preds += preds.argmax(axis=1).cpu().numpy().tolist()

        training_loss = training_loss / len(train_loader)


        train_accuracy = accuracy_score(t_labels, t_preds, normalize=True)


        print(
            "Epoch: {},training loss: {:.2f} , train accuracy: {:.2f} ".format(
                epoch, training_loss, train_accuracy
            )
        )

    test_loss, test_accuracy = validation(model, test_loader, loss_fn, device, lbl)

    print("test loss: {:.2f}  test  accuracy: {:.2f}".format(test_loss, test_accuracy))

In [None]:
epoch = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTM_Based(embs_npa=embs_npa)
optimizer = AdamW(model.parameters(), lr=2e-5)
loss_fn = nn.CrossEntropyLoss()
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=16, collate_fn=collate_batch, drop_last=True
)
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=4, drop_last=True, collate_fn=collate_batch
)
train(model, optimizer, loss_fn, train_loader, test_loader, epoch, device)

In [None]:
train(model, optimizer, loss_fn, train_loader, test_loader, epoch, device)