# **lib**

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
!pip install hazm
import hazm
!pip install plotly
import plotly.graph_objects as go
import re
!pip install clean-text
from cleantext import clean

# **Distribution and Summary without Preprocessing**

In [None]:
def distribution(df):
  # Plot the distribution of classes (labels)
  plt.figure(figsize=(8, 6))
  sns.countplot(x='label', data=df)
  plt.title('Distribution of Classes')
  plt.xlabel('Class')
  plt.ylabel('Count')
  plt.show()

  # Tokenize sentences and calculate the length (in terms of tokens)
  df['premise_length'] = df['premise'].apply(lambda x: len(str(x).split()))
  df['hypothesis_length'] = df['hypothesis'].apply(lambda x: len(str(x).split()))

  # Plot the distribution of sentence lengths for premise and hypothesis
  plt.figure(figsize=(12, 6))

  plt.subplot(1, 2, 1)
  sns.histplot(df['premise_length'], bins=30, kde=True)
  plt.title('Distribution of Premise Sentence Lengths')
  plt.xlabel('Number of Tokens')
  plt.ylabel('Count')

  plt.subplot(1, 2, 2)
  sns.histplot(df['hypothesis_length'], bins=30, kde=True)
  plt.title('Distribution of Hypothesis Sentence Lengths')
  plt.xlabel('Number of Tokens')
  plt.ylabel('Count')

  plt.tight_layout()
  plt.show()

def summary(df):
  # Summary statistics for sentence lengths
  premise_length_stats = df['premise_length'].describe()
  hypothesis_length_stats = df['hypothesis_length'].describe()

  print("Premise Length Statistics:\n", premise_length_stats)
  print("Hypothesis Length Statistics:\n", hypothesis_length_stats)



# **Preproccing Function**

In [None]:
# # Preprocessing
def cleanhtml(raw_html):
    cleanr = re.compile('<.*?>')
    cleantext = re.sub(cleanr, '', raw_html)
    return cleantext


def cleaning(text):
    text = text.strip()

    # regular cleaning
    text = clean(text,
        fix_unicode=True,
        to_ascii=False,
        lower=True,
        no_line_breaks=True,
        no_urls=True,
        no_emails=True,
        no_phone_numbers=True,
        no_numbers=False,
        no_digits=False,
        no_currency_symbols=True,
        no_punct=False,
        replace_with_url="",
        replace_with_email="",
        replace_with_phone_number="",
        replace_with_number="",
        replace_with_digit="0",
        replace_with_currency_symbol="",
    )

    # cleaning htmls
    text = cleanhtml(text)

    # normalizing
    # normalizer = hazm.Normalizer()
    # text = normalizer.normalize(text)

    # removing wierd patterns
    wierd_pattern = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u'\U00010000-\U0010ffff'
        u"\u200d"
        u"\u2640-\u2642"
        u"\u2600-\u2B55"
        u"\u23cf"
        u"\u23e9"
        u"\u231a"
        u"\u3030"
        u"\ufe0f"
        u"\u2069"
        u"\u2066"
        # u"\u200c"
        u"\u2068"
        u"\u2067"
        "]+", flags=re.UNICODE)

    text = wierd_pattern.sub(r'', text)

    # removing extra spaces, hashtags
    text = re.sub("#", "", text)
    text = re.sub("\s+", " ", text)

    return text

def preprocess(df):
    data = df[['premise', "hypothesis", "label"]]

    # print data information
    print('data information')
    print(df.info(), '\n')

    # print missing values information
    print('missing values stats')
    print(data.isnull().sum(), '\n')

    # print some missing values
    print('some missing values')
    print(data[data['premise'].isnull()].iloc[:5], '\n')
    print(data[data['hypothesis'].isnull()].iloc[:5], '\n')
    print(data[data['label'].isnull()].iloc[:5], '\n')


    data = data.dropna(subset=['premise'])
    data = data.dropna(subset=['hypothesis'])
    data = data.dropna(subset=['label'])
    data = data.drop_duplicates(subset=['premise'], keep='first')
    data = data.drop_duplicates(subset=['hypothesis'], keep='first')
    data = data.reset_index(drop=True)


    # print data information
    print('data information')
    print(data.info(), '\n')

    # print missing values information
    print('missing values stats')
    print(data.isnull().sum(), '\n')


    # print some missing values
    print('some missing values')
    print(data[data['premise'].isnull()].iloc[:5], '\n')

    # Normalization
    # calculate the length based on their words
    data['premise_len_by_words'] = data['premise'].apply(lambda t: len(hazm.word_tokenize(t)))
    data['hypothesis_len_by_words'] = data['hypothesis'].apply(lambda t: len(hazm.word_tokenize(t)))


    min_max_len = data["premise_len_by_words"].min(), data["premise_len_by_words"].max()
    print(f'premise:\nMin: {min_max_len[0]} \tMax: {min_max_len[1]}')

    min_max_len = data["hypothesis_len_by_words"].min(), data["hypothesis_len_by_words"].max()
    print(f'hypothesis:\nMin: {min_max_len[0]} \tMax: {min_max_len[1]}')

    minlim, maxlim = 3, 256

    # remove comments with the length of fewer than three words
    data['premise_len_by_words'] = data['premise_len_by_words'].apply(lambda len_t: len_t if minlim < len_t <= maxlim else None)
    data = data.dropna(subset=['premise_len_by_words'])
    data = data.reset_index(drop=True)

    data['hypothesis_len_by_words'] = data['hypothesis_len_by_words'].apply(lambda len_t: len_t if minlim < len_t <= maxlim else None)
    data = data.dropna(subset=['hypothesis_len_by_words'])
    data = data.reset_index(drop=True)

    fig = go.Figure()

    fig.add_trace(go.Histogram(
    x=data['premise_len_by_words']))

    fig.update_layout(
      title_text='Distribution of word counts for premise',
      xaxis_title_text='Word Count',
      yaxis_title_text='Frequency',
      bargap=0.2,
      bargroupgap=0.2)

    fig.show()

    fig = go.Figure()

    fig.add_trace(go.Histogram(
    x=data['hypothesis_len_by_words']))

    fig.update_layout(
      title_text='Distribution of word counts for hypothesis',
      xaxis_title_text='Word Count',
      yaxis_title_text='Frequency',
      bargap=0.2,
      bargroupgap=0.2)

    fig.show()

    # uniques
    unique_rates = list(sorted(data['premise'].unique()))
    print(f'We have #{len(unique_rates)}: {unique_rates}')

    unique_rates = list(sorted(data['hypothesis'].unique()))
    print(f'We have #{len(unique_rates)}: {unique_rates}')


    fig = go.Figure()

    groupby_rate = data.groupby('premise')['premise'].count()

    fig.add_trace(go.Bar(
        x=list(sorted(groupby_rate.index)),
        y=groupby_rate.tolist(),
        text=groupby_rate.tolist(),
        textposition='auto'
    ))

    fig.update_layout(
        title_text='Distribution of rate within comments',
        xaxis_title_text='Rate',
        yaxis_title_text='Frequency',
        bargap=0.2,
        bargroupgap=0.2)

    fig.show()

    fig = go.Figure()

    groupby_rate = data.groupby('hypothesis')['hypothesis'].count()

    fig.add_trace(go.Bar(
        x=list(sorted(groupby_rate.index)),
        y=groupby_rate.tolist(),
        text=groupby_rate.tolist(),
        textposition='auto'
    ))

    fig.update_layout(
        title_text='Distribution of rate within comments',
        xaxis_title_text='Rate',
        yaxis_title_text='Frequency',
        bargap=0.2,
        bargroupgap=0.2)

    fig.show()

    print("Cleaning premise...")
    # cleaning premise
    data['cleaned_premise'] = data['premise'].apply(cleaning)


    # calculate the length of premise based on their words
    data['cleaned_premise_len_by_words'] = data['cleaned_premise'].apply(lambda t: len(hazm.word_tokenize(t)))

    # remove premise with the length of fewer than three words
    data['cleaned_premise_len_by_words'] = data['cleaned_premise_len_by_words'].apply(lambda len_t: len_t if minlim < len_t <= maxlim else len_t)
    data = data.dropna(subset=['cleaned_premise_len_by_words'])
    data = data.reset_index(drop=True)

    # data.head()

    print("Cleaning hypothesis ...")
    # cleaning hypothesis
    data['cleaned_hypothesis'] = data['hypothesis'].apply(cleaning)


    # calculate the length of hypothesis based on their words
    data['cleaned_hypothesis_len_by_words'] = data['cleaned_hypothesis'].apply(lambda t: len(hazm.word_tokenize(t)))

    # remove premise with the length of fewer than three words
    data['cleaned_hypothesis_len_by_words'] = data['cleaned_hypothesis_len_by_words'].apply(lambda len_t: len_t if minlim < len_t <= maxlim else len_t)
    data = data.dropna(subset=['cleaned_hypothesis_len_by_words'])
    data = data.reset_index(drop=True)

    data.head()

    # Create new dataframe
    data = data[['cleaned_premise', 'cleaned_hypothesis','label']]
    # data = data[['premise', 'hypothesis','label']]
    data.columns = ['premise', 'hypothesis', 'label']
    data.head()

    # Handling Unbalanced Data

    fig = go.Figure()

    groupby_label = data.groupby('label')['label'].count()

    fig.add_trace(go.Bar(
        x=list(sorted(groupby_label.index)),
        y=groupby_label.tolist(),
        text=groupby_label.tolist(),
        textposition='auto'
    ))

    fig.update_layout(
        title_text='Distribution of label within comments [DATA]',
        xaxis_title_text='Label',
        yaxis_title_text='Frequency',
        bargap=0.2,
        bargroupgap=0.2)

    fig.show()

    return data

# **Preprocessing Train Dataset**

In [None]:
# Load the dataset
# Train
df_train = pd.read_csv('Train-word.csv', delimiter='\t')
print(df_train.info())
distribution(df_train)
summary(df_train)
data_train = preprocess(df_train)
data_train.to_csv("cleaned_train.csv", index = False)

# **Preprocessing Test Dataset**

In [None]:
 # Test
df_test = pd.read_csv('Test-word.csv', delimiter='\t')
print(df_test.info())
distribution(df_test)
summary(df_test)
data_test = preprocess(df_test)
data_test.to_csv("cleaned_test.csv", index = False)

# **Preprocessing Validation Dataset**

In [None]:
# Val
df_val = pd.read_csv('Val-word.csv', delimiter='\t')
print(df_val.info())
distribution(df_val)
summary(df_val)
data_val = preprocess(df_val)
data_val.to_csv("cleaned_val.csv", index = False)

# **Fine Tuning the ParsBERT**

In [None]:
pip install transformers torch datasets

In [None]:
import pandas as pd
from transformers import AutoTokenizer, Trainer, TrainingArguments, AutoModelForSequenceClassification
from datasets import Dataset, DatasetDict
import torch

# Load the tokenizer and model
model_name = 'HooshvareLab/bert-base-parsbert-uncased'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

# Read the CSV files
train_df = pd.read_csv('cleaned_train.csv')
val_df = pd.read_csv('cleaned_val.csv')
test_df = pd.read_csv('cleaned_test.csv')

# Convert DataFrame to Dataset
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

# Function to tokenize the dataset
def tokenize_function(examples):
    return tokenizer(examples['premise'], examples['hypothesis'], padding=True, truncation=True)

# Tokenize datasets
tokenized_train_dataset = train_dataset.map(tokenize_function, batched=True)
tokenized_val_dataset = val_dataset.map(tokenize_function, batched=True)
tokenized_test_dataset = test_dataset.map(tokenize_function, batched=True)

# Set format for PyTorch
tokenized_train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
tokenized_val_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
tokenized_test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

# Create a DatasetDict
datasets = DatasetDict({
    'train': tokenized_train_dataset,
    'val': tokenized_val_dataset,
    'test': tokenized_test_dataset
})

# Define training arguments
training_args = TrainingArguments(
    output_dir='./results',          # output directory
    num_train_epochs=3,              # total number of training epochs
    per_device_train_batch_size=8,   # batch size for training
    per_device_eval_batch_size=8,    # batch size for evaluation
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
    logging_steps=10,
)

# Create Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=datasets['train'],
    eval_dataset=datasets['val'],
    tokenizer=tokenizer
)

# Start training
trainer.train()


# **tuning model**

In [None]:
!pip install transformers torch pandas scikit-learn

In [None]:
import pandas as pd
from transformers import AutoTokenizer
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import torch
# Load the data
train_data = pd.read_csv('cleaned_train.csv')
val_data = pd.read_csv('cleaned_val.csv')
test_data = pd.read_csv('cleaned_test.csv')

# Display column names to ensure they are correct
print(train_data.columns)
print(val_data.columns)
print(test_data.columns)

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("HooshvareLab/bert-fa-base-uncased")

# Tokenize data
def tokenize_data(data, tokenizer):
    texts = list(data.apply(lambda row: f"{row['premise']} [SEP] {row['hypothesis']}", axis=1))
    return tokenizer(
        texts,
        padding=True,
        truncation=True,
        max_length=512,
        return_tensors='pt'
    )

train_encodings = tokenize_data(train_data, tokenizer)
val_encodings = tokenize_data(val_data, tokenizer)
test_encodings = tokenize_data(test_data, tokenizer)

# Ensure labels are mapped to integers
label_mapping = {'e': 0, 'n': 1, 'c': 2}  # Replace this with the actual mapping
train_labels = train_data['label'].map(label_mapping).tolist()
val_labels = val_data['label'].map(label_mapping).tolist()
test_labels = test_data['label'].map(label_mapping).tolist()


In [None]:
import torch

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = CustomDataset(train_encodings, train_labels)
val_dataset = CustomDataset(val_encodings, val_labels)
test_dataset = CustomDataset(test_encodings, test_labels)


In [None]:
# Load model
model = AutoModelForSequenceClassification.from_pretrained("HooshvareLab/bert-fa-base-uncased", num_labels=3)

# Training settings
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy='epoch',
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset
)


In [None]:
# Train the model
trainer.train()


In [None]:
eval_results = trainer.evaluate(eval_dataset=val_dataset)



In [None]:
# Predict on the test set
predictions = trainer.predict(test_dataset)
# Extract true labels and predicted labels
true_labels = test_labels
predicted_labels = predictions.predictions.argmax(-1)

# Calculate accuracy
accuracy = accuracy_score(true_labels, predicted_labels)
print(f"Test Accuracy: {accuracy}")

# Calculate F1 score
f1 = f1_score(true_labels, predicted_labels, average='weighted')
print(f"F1 Score: {f1}")

# Calculate confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)
print(f"Confusion Matrix:\n{conf_matrix}")