In [None]:
#pip install transformers

In [None]:
#pip install pytorch-transformers

In [None]:
import io
import torch
import string
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm, trange
import matplotlib.pyplot as plt
from pytorch_transformers import AdamW
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from keras.preprocessing.sequence import pad_sequences
table = str.maketrans(dict.fromkeys(string.punctuation))
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from transformers import (XLNetConfig, XLNetForSequenceClassification, XLNetTokenizer, XLNetModel)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
n_gpu = torch.cuda.device_count()
torch.cuda.get_device_name(0)

In [None]:
def legal_data_appending(df):
    Summarized_Text = list()
    for idx,row in df.iterrows():
        Clean_Text=row['Summarized_content'].translate(table)
        Summarized_Text.append(row['Legal_Details'] + ' ' +  Clean_Text)
    df['Summarized_Content_LegalDetails'] = Summarized_Text
    return df

In [None]:
# Change the path:
# For tfidf: Thesis - Dataset and Transformations/transform - post text augmentation/lsa_tfidf_augmentation.csv
# For tf: Thesis - Dataset and Transformations/transform - post text augmentation/lsa_tf_augmentation.csv
df= pd.read_csv('/content/drive/MyDrive/BERT Data Files/tfidf_formula_2.csv')
df.fillna('No text',inplace=True)
df = legal_data_appending(df)
df.tail(5)

### Add special tokens ([SEP] and [CLS]) at the beginning and end of each sentence
For single sentence inputs here, we just need to add [SEP] and [CLS] to the end

In [None]:
sentences = df.Summarized_Content_LegalDetails.values
sentences = [sentence + " [SEP] [CLS]" for sentence in sentences]
labels = df.Labels.values

### Importing XLNet tokenizer
Using Sentencepiece tokenizer to tokenize the text to convert our text into tokens that correspond to XLNet’s vocabulary.

XLNet requires specifically formatted inputs. For each tokenized input sentence, we need to create:

* input ids: a sequence of integers identifying each input token to its index number in the XLNet tokenizer vocabulary
* segment mask: (optional) a sequence of 1s and 0s used to identify whether the input is one sentence or two sentences long. For one sentence inputs, this is simply a sequence of 0s. For two sentence inputs, there is a 0 for each token of the first sentence, followed by a 1 for each token of the second sentence
* attention mask: (optional) a sequence of 1s and 0s, with 1s for all input tokens and 0s for all padding tokens (we’ll detail this in the next paragraph)

In [None]:
tokenizer = XLNetTokenizer.from_pretrained('xlnet-base-cased', do_lower_case=True)

tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
print ("Tokenize the first sentence:")
print (tokenized_texts[100])

### Padding and Trunacation to Max Sequence Length
XLNet requires our input arrays to be the same size. We address this by first choosing a maximum sentence length, and then padding and truncating our inputs until every input sequence is of the same length.

### Attention Mask
The attention_mask is an optional argument used when batching sequences together. This argument indicates to the model which tokens should be attended to, and which should not.

In [None]:
MAX_LEN = 512
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]
input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

attention_masks = []
for seq in input_ids:
  seq_mask = [float(i>0) for i in seq]
  attention_masks.append(seq_mask)

### Train, Validation and Test splitting of dataset

In [None]:
train_inputs, test_inputs, train_labels, test_labels = train_test_split(input_ids, labels, 
                                                            random_state=42, test_size=0.2)
train_masks, test_masks, unk_x, unk_y = train_test_split(attention_masks, input_ids,
                                                            random_state=42, test_size=0.2)

train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(train_inputs, train_labels, 
                                                            random_state=42, test_size=0.2)
train_masks, validation_masks, _, _ = train_test_split(train_masks, unk_x,
                                                            random_state=42, test_size=0.2)

In [None]:
print(train_inputs.shape, train_labels.shape)
print(validation_inputs.shape, validation_labels.shape)
print(test_inputs.shape, test_labels.shape)

### Coverting to torch tensors
Convert all of our data into torch tensors, the required datatype for our model.

In [None]:
train_inputs = torch.tensor(train_inputs)
validation_inputs = torch.tensor(validation_inputs)
test_inputs = torch.tensor(test_inputs)

train_labels = torch.tensor(train_labels)
validation_labels = torch.tensor(validation_labels)
test_labels = torch.tensor(test_labels)

train_masks = torch.tensor(train_masks)
validation_masks = torch.tensor(validation_masks)
test_masks = torch.tensor(test_masks)

### Defining torch DataLoader
Create an iterator of our data with torch DataLoader. This helps save on memory during training because, unlike a for loop, 
with an iterator the entire dataset does not need to be loaded into memory.

In [None]:
batch_size = 8

train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

### Loading XLNet pre-trained model
Load XLNet ForSequenceClassification, the pretrained XLNet model with a single linear classification layer on top. 

In [None]:
model = XLNetForSequenceClassification.from_pretrained("xlnet-base-cased", num_labels=len(df.Labels.unique()))
model.cuda()

In [None]:
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'gamma', 'beta']
optimizer_grouped_parameters = [
    {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
     'weight_decay_rate': 0.01},
    {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)],
     'weight_decay_rate': 0.0}
]

In [None]:
optimizer = AdamW(optimizer_grouped_parameters, lr=2e-5)

### Accuracy calculation

In [None]:
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

### Training and Validation loop


In [None]:
# Store our loss and accuracy for plotting
train_loss_set = []

# Number of training epochs (authors recommend between 2 and 4)
epochs = 3
# trange is a tqdm wrapper around the normal python range
for _ in trange(epochs, desc="Epoch"):
  
  # Training
  # Set our model to training mode (as opposed to evaluation mode)
    model.train()
  # Tracking variables
    tr_loss = 0
    nb_tr_examples, nb_tr_steps = 0, 0
  
  # Train the data for one epoch
    for step, batch in enumerate(train_dataloader):
        # Add batch to GPU
        batch = tuple(t.to(device) for t in batch)
        # Unpack the inputs from our dataloader
        b_input_ids, b_input_mask, b_labels = batch
        # Clear out the gradients (by default they accumulate)
        optimizer.zero_grad()
        # Forward pass
        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)
        loss = outputs[0]
        logits = outputs[1]
        train_loss_set.append(loss.item())    
        # Backward pass
        loss.backward()
        # Update parameters and take a step using the computed gradient
        optimizer.step()
        # Update tracking variables
        tr_loss += loss.item()
        nb_tr_examples += b_input_ids.size(0)
        nb_tr_steps += 1

    print("Train loss: {}".format(tr_loss/nb_tr_steps))
    
    
    # Validation

    # Put model in evaluation mode to evaluate loss on the validation set
    model.eval()

    # Tracking variables 
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0
    
    y_true = []
    y_predict = []
    # Evaluate data for one epoch
    for batch in validation_dataloader:
        # Add batch to GPU
        batch = tuple(t.to(device) for t in batch)
        # Unpack the inputs from our dataloader
        b_input_ids, b_input_mask, b_labels = batch
        # Telling the model not to compute or store gradients, saving memory and speeding up validation
        with torch.no_grad():
            # Forward pass, calculate logit predictions
            output = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)
            logits = output[0]
    
        # Move logits and labels to CPU
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()

        tmp_eval_accuracy = flat_accuracy(logits, label_ids)
        for predict in np.argmax(logits, axis=1):
            y_predict.append(predict)
        
        for real_result in label_ids.tolist():
            y_true.append(real_result)

        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("Validation Accuracy: {}".format(eval_accuracy/nb_eval_steps))

### Testing loop

In [None]:
model.eval()

# Tracking variables 
test_loss, test_accuracy = 0, 0
nb_test_steps, nb_test_examples = 0, 0

y_true_test = []
y_predict_test = []
for batch in test_dataloader:
        # Add batch to GPU
    batch = tuple(t.to(device) for t in batch)
        # Unpack the inputs from our dataloader
    b_input_ids, b_input_mask, b_labels = batch
        # Telling the model not to compute or store gradients, saving memory and speeding up validation
    with torch.no_grad():
            # Forward pass, calculate logit predictions
        output = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)
        logits = output[0]
    
        # Move logits and labels to CPU
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    tmp_test_accuracy = flat_accuracy(logits, label_ids)
    for predict in np.argmax(logits, axis=1):
        y_predict_test.append(predict)
        
    for real_result in label_ids.tolist():
        y_true_test.append(real_result)

    test_accuracy += tmp_test_accuracy
    nb_test_steps += 1

print("Test Accuracy: {}".format(test_accuracy/nb_test_steps))

In [None]:
from sklearn.metrics import confusion_matrix
def heatconmat(y_true,y_pred):
    sns.set_context('talk')
    plt.figure(figsize=(15,12))
    sns.heatmap(confusion_matrix(y_true,y_pred),
                annot=True,
                fmt='d',
                cbar=False,
                cmap='gist_earth_r',
                yticklabels=sorted(y_true.unique()))
    plt.show()

In [None]:
heatconmat(pd.Series(y_true_test),pd.Series(y_predict_test))

In [None]:
from sklearn.metrics import classification_report
print(classification_report(list(y_true_test), y_predict_test))