In [1]:
import json
import sys
from os import listdir
from os.path import isfile, join
import re
import string
import pandas as pd
import numpy as np

import time
from tqdm import tqdm
from typing import List


import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import DataLoader, TensorDataset
# from keras.preprocessing.sequence import pad_sequences

import transformers
from transformers import AutoTokenizer, AutoModel, utils
from transformers import AutoTokenizer, AutoModelForSequenceClassification,Trainer, TrainingArguments
from datasets import Dataset
transformers.logging.set_verbosity_error()
utils.logging.set_verbosity_error()  # Suppress standard warnings

from bertviz import model_view, head_view


from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression


import eli5
from eli5.lime import TextExplainer
# from captum.attr import IntegratedGradients
# import matplotlib.pyplot as plt


from wordcloud import WordCloud
from PIL import Image
import matplotlib.pyplot as plt

## Data

In [2]:
sen_train_data = pd.read_csv('../sentence_split/train_sentence_data.csv')
sen_test_data = pd.read_csv('../sentence_split/test_sentence_data.csv')
sen_val_data = pd.read_csv('../sentence_split/val_sentence_data.csv')

In [3]:
doc_train_data = pd.read_csv("../complete_sentence/train_processed_data.csv")
doc_test_data = pd.read_csv("../complete_sentence/train_processed_data.csv") 

In [4]:
sen_train_data = sen_train_data.drop(['File_id'],axis=1)
sen_test_data = sen_test_data.drop(['File_id'],axis=1)
sen_val_data = sen_val_data.drop(['File_id'],axis=1)

In [5]:
def get_20(data):
    data_1 = data.loc[data['Status'] ==1].iloc[:20]
    data_2 = data.loc[data['Status'] ==0].iloc[:20]
    frames = [data_1, data_2]
    return pd.concat(frames)

sen_train_data = get_20(sen_train_data) 
sen_test_data = get_20(sen_test_data)
sen_val_data = get_20(sen_val_data)

In [6]:
doc_train_data = get_20(doc_train_data)
doc_test_data = get_20(doc_test_data)

In [7]:
doc_train_text = doc_train_data.Paper_text.values.tolist()
doc_train_status = doc_train_data.Status.values.tolist()

doc_test_text = doc_test_data.Paper_text.values.tolist()
doc_test_status = doc_test_data.Status.values.tolist()

In [8]:
sen_train_data=sen_train_data.sample(frac = 1)
sen_test_data=sen_test_data.sample(frac = 1)
sen_val_data=sen_val_data.sample(frac = 1)

## SVM CLASSIFIER

In [9]:
class CustomEmbedding(BaseEstimator, TransformerMixin):
    
    def __init__(self):
        self.model_name = "sentence-transformers/bert-base-nli-mean-tokens"
        self.model = AutoModel.from_pretrained(self.model_name, output_attentions=True)  # Configure model to return attention values
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        print('\n>>>>>>>init() called.\n')

    def fit(self, X, y = None):
        print('\n>>>>>>>fit() called.\n')
        return self
    
    def mean_pooling(self,model_output, attention_mask):
        token_embeddings = model_output[0] #First element of model_output contains all token embeddings
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)


    def bert(self,text):
        encoded_input = self.tokenizer(text, padding=True, truncation=True,max_length=self.model.config.max_position_embeddings-2, return_tensors='pt')
        inputs = self.tokenizer.encode(text,padding=True, truncation=True,max_length=self.model.config.max_position_embeddings-2, return_tensors='pt')  # Tokenize input text
        
        outputs = self.model(inputs)  # Run model

        attention = outputs[-1]  # Retrieve attention from model outputs

        tokens = self.tokenizer.convert_ids_to_tokens(inputs[0]) 
        sentence_embeddings = self.mean_pooling(outputs, encoded_input['attention_mask'])
        return sentence_embeddings.detach().numpy()[0].tolist()

    def transform(self, X, y = None):
        embeddings = []
        for i in X:
            #print(i)
            try:
                emb = self.bert(i)
                embeddings.append(emb)
            except Exception as e:
                print(i)
                print(e)
                break
        #print(embeddings)
        return embeddings
        

In [10]:
pipe2 = Pipeline(steps=[
                       ('Bert Embeddings', CustomEmbedding()),    # this will trigger a call to __init__
                       ('Support Vector Classifier', SVC(kernel='rbf',probability=True))
])



>>>>>>>init() called.



In [11]:
sen_text = sen_train_data['Sentence'].values.tolist()
sen_label = sen_train_data['Status'].values.tolist()

In [12]:
pipe2.fit(sen_text, sen_label)


>>>>>>>fit() called.



Pipeline(steps=[('Bert Embeddings', CustomEmbedding()),
                ('Support Vector Classifier', SVC(probability=True))])

In [13]:
sen_test_text = sen_test_data['Sentence']
sen_test_lab = sen_test_data['Status']
pipe2.score(sen_test_text,sen_test_lab)

0.5

In [14]:
target =['Reject','Accept']
doc = 'however, models that make use of this strategy eventually fail after a certain level of complexity (e'
pipe2_exp = TextExplainer(random_state=42)
pipe2_exp.fit(doc, pipe2.predict_proba)
pipe2_exp.show_prediction(target_names= target)



Contribution?,Feature
0.094,Highlighted in text (sum)
-0.065,<BIAS>


In [15]:
def print_prediction_1(doc):
    #print(doc)
    y_pred = pipe2.predict_proba([doc])[0]
    tar =['Reject','Accept']
    for target, prob in zip(tar, y_pred):
        print("{:.3f} {}".format(prob, target))

In [16]:
pipe2_exp.explain_weights(target_names=target)



Weight?,Feature
0.065,<BIAS>
0.057,a
0.047,eventually
0.029,of this
0.027,strategy
0.026,complexity e
0.026,however
0.016,after
0.014,make
0.008,models


In [17]:
print(pipe2.classes_) 
print(pipe2_exp.metrics_ )

[0 1]
{'mean_KL_divergence': 0.001042355565567589, 'score': 0.7774797108309447}


In [18]:
class Padding_2(BaseEstimator, TransformerMixin):
    
    def __init__(self,pipe):
        self.pipe = pipe # Configure model to return attention values
        self.mxlenght = 400
        print('\n>>>>>>>init() called.\n')

    def fit(self, X, y = None):
        print('\n>>>>>>>fit() called.\n')
        return self
    

    def transform(self, X, y = None):
        embeddings = []
        for i in tqdm(X):
            sentence_list = i.split(".")
            pred = self.pipe.predict(sentence_list).tolist()
            size = self.mxlenght - len(pred)
            if size > 0:
                pred.extend([-1]*size)
            elif size < 0:
                pred = pred[0:self.mxlenght]
            else:
                pass
            embeddings.append(pred)
        return embeddings

In [19]:
pipe3 = Pipeline(steps=[
                       ('Documnet Embeddings', Padding_2(pipe2)), # this will trigger a call to __init__
                       ('Logistic Regression', LogisticRegression(solver='lbfgs')),

])

pipe3.fit(doc_train_text, doc_train_status)
pipe3.score(doc_test_text,doc_test_status)


>>>>>>>init() called.


>>>>>>>fit() called.



 22%|███████████████████████▏                                                                               | 9/40 [07:27<25:42, 49.75s/it]


KeyboardInterrupt: 

In [None]:
def print_prediction_2(doc):
    print(doc)
    y_pred = pipe3.predict_proba([doc])[0]
    tar =['Reject','Accept']
    for target, prob in zip(tar, y_pred):
        print("{:.3f} {}".format(prob, target))

In [None]:
target =['Reject','Accept']

doc = doc_train_data.iloc[0].Paper_text
ti = doc_train_data.iloc[0].Title

pipe3_exp = TextExplainer(n_samples=10,random_state=42)

pipe3_exp.fit(doc, pipe3.predict_proba)

# pipe3_exp.show_prediction(target_names= target)

In [None]:
weights = pipe3_exp.explain_weights(top= None)
positivie_words= {}

for i in weights.targets[0].feature_weights.pos:
    #print(i.feature)
    g = positivie_words.get(i.feature,-1)
    if g==-1:
        positivie_words[i.feature]=1
    else:
        positivie_words[i.feature]+=1
        

negative_words= {}

for i in weights.targets[0].feature_weights.neg:
    #print(i.feature)
    g = negative_words.get(i.feature,-1)
    if g==-1:
        negative_words[i.feature]=1
    else:
        negative_words[i.feature]+=1
# print(weights.targets[0].feature_weights.pos[0].weight,weights.targets[0].feature_weights.pos[0].feature)

In [None]:
wc = WordCloud(background_color="white",width=1000,height=1000,relative_scaling=0.5,normalize_plurals=False).generate_from_frequencies(positivie_words)
plt.imshow(wc)
plt.title(title, fontsize=13)

In [None]:
wc = WordCloud(background_color="white",width=1000,height=1000,relative_scaling=0.5,normalize_plurals=False).generate_from_frequencies(negative_words)
plt.imshow(wc)
plt.title(ti, fontsize=13)