#DATASET

https://huggingface.co/datasets/ccdv/patent-classification

In [None]:
%%capture

import tensorflow as tf
import torch

device = torch.device('cuda')

!pip install transformers
!pip install datasets

from google.colab import drive
drive.mount('/content/drive')

In [None]:
folder_name = 'data_2'
num_classes = 14

# path folder with models and data
PATH = '/content/drive/MyDrive/Masterthesis/MixText/data/' + folder_name + '/'

In [None]:
from transformers import AutoTokenizer
import pandas as pd

# create a tokenizer from patent bert
tokenizer = AutoTokenizer.from_pretrained('anferico/bert-for-patents', do_lower_case=True) # bert-base-uncased anferico/bert-for-patents

# load the test data for the mixtext model
train_patents = pd.read_csv(PATH + 'train.csv', header=None)
train_patents = train_patents.dropna()
train_patents.head()

In [None]:
import textwrap
import random

wrapper = textwrap.TextWrapper(width=80)

patent_examples = train_patents[2]

for i in range(1):
  j = random.choice(patent_examples.index)

  print('')
  print(wrapper.fill(patent_examples[j]))
  print('')

# BERT MODEL

In [None]:
text = train_patents.iloc[200][2]

tokens = tokenizer.tokenize(text)

print(wrapper.fill(str(' '.join(tokens[0:512]))))

In [None]:
input_ids = []

lengths = []

print('Tokenizing patents...')

for sen in train_patents[2]:
  if ((len(input_ids) % 1000) == 0):
    print('  Read {:,} patents.'.format(len(input_ids)))

  encoded_sent = tokenizer.encode(
      str(sen),
      add_special_tokens = True
  )

  input_ids.append(encoded_sent)

  lengths.append(len(encoded_sent))

print('DONE.')

In [None]:
train_patents.head()

In [None]:
from keras.utils import np_utils

labels_num = train_patents[0].to_numpy().astype(int)
print(labels_num)

labels = np_utils.to_categorical(labels_num)
print(labels)
print(labels.shape)

In [None]:
from keras.preprocessing.sequence import pad_sequences

MAX_LEN = 256

input_ids = pad_sequences(input_ids, maxlen = MAX_LEN, dtype='long', 
                          value=0, truncating='post', padding='post')

In [None]:
attention_masks = []

for sent in input_ids:
  att_mask = [int(token_id > 0) for token_id in sent]
  
  attention_masks.append(att_mask)

In [None]:
from sklearn.model_selection import train_test_split
train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(input_ids,
                                                                                    labels, random_state = 1, test_size=int(1))

train_masks, validation_masks, _, _ = train_test_split(attention_masks, labels, random_state=1, test_size=int(1))

train_inputs.shape

In [None]:
train_inputs = torch.tensor(train_inputs)
validation_inputs = torch.tensor(validation_inputs)

train_labels = torch.tensor(train_labels)
validation_labels = torch.tensor(validation_labels)

train_masks = torch.tensor(train_masks)
validation_masks = torch.tensor(validation_masks)

In [None]:
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

batch_size = 1

train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

In [None]:
from transformers import BertForSequenceClassification, AdamW, BertConfig

model = BertForSequenceClassification.from_pretrained(
    'anferico/bert-for-patents',
    num_labels = num_classes + 1,
    output_attentions = False,
    output_hidden_states = False
)

model.cuda();

In [None]:
optimizer = AdamW(model.parameters(),
                  lr = 2e-5,
                  eps = 1e-8
                  )

In [None]:
from transformers import get_linear_schedule_with_warmup

epochs = 5

total_steps = len(train_dataloader) * epochs

scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

In [None]:
import numpy as np

def flat_accuracy(preds, labels):
  pred_flat = np.argmax(preds, axis=1).flatten()
  labels_flat = labels.flatten()
  return np.sum(pred_flat == labels_flat) / len(labels_flat)

In [None]:
import time
import datetime

def format_time(elapsed):
  elapsed_rounded = int(round((elapsed)))
  return str(datetime.timedelta(seconds=elapsed_rounded))

In [None]:
import random
import torch

seed_val = 42

random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

loss_values = []

for epoch_i in range(0, epochs):
  t0 = time.time()

  total_loss = 0

  model.train()

  for step, batch in enumerate(train_dataloader):
    if step % 100 == 0 and not step == 0:
      elapsed = format_time(time.time() - t0)

      print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))
    
    b_input_ids = batch[0].to(device)
    b_input_mask = batch[1].to(device)
    b_labels = batch[2].to(device)

    model.zero_grad()

    outputs = model(b_input_ids,
                    token_type_ids = None,
                    attention_mask = b_input_mask,
                    labels = b_labels
                    )
    
    loss = outputs[0]

    total_loss += loss.item()

    loss.backward()

    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

    optimizer.step()

    scheduler.step()
  
  avg_train_loss = total_loss / len(train_dataloader)

  loss_values.append(avg_train_loss)

  print('')
  print('Avg train loss: {0:2f}'.format(avg_train_loss))

  print('')
  print('Runtime validation...')

  t0 = time.time()

  model.eval()

  eval_loss, eval_accuracy = 0, 0
  nb_eval_steps, nb_eval_examples = 0, 0

  for batch in validation_dataloader:
    batch = tuple(t.to(device) for t in batch)

    b_input_ids, b_input_mask, b_labels = batch

    with torch.no_grad():
      outputs = model(b_input_ids,
                      token_type_ids=None,
                      attention_mask=b_input_mask
                      )
    
    logits = outputs[0]

    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    tmp_eval_accuracy = flat_accuracy(logits, label_ids)

    eval_accuracy += tmp_eval_accuracy

    nb_eval_steps += 1

  print(' Accuracy: {0:2f}'.format(eval_accuracy/nb_eval_steps))

print('')
print('Training complete')


In [None]:
torch.save(model.state_dict(), PATH + 'bert_model.pt')

In [None]:
print(b_labels)

In [None]:
test_patents = pd.read_csv(PATH + 'test.csv', header=None)
test_patents = test_patents.dropna()
test_patents.head()

In [None]:
from sys import float_repr_style
test_input_ids = []

for sen in test_patents[2]:

  if ((len(input_ids) % 20000) == 0):
    print(' Read {:,} patents.'.format(len(input_ids)))
  
  encoded_sent = tokenizer.encode(
      str(sen),
      add_special_tokens = True,
      max_length = MAX_LEN,
  )

  test_input_ids.append(encoded_sent)

print('Done')

test_labels_num = test_patents[0].to_numpy().astype(int)
test_labels = np_utils.to_categorical(test_labels_num)

test_input_ids = pad_sequences(test_input_ids, maxlen=MAX_LEN,
                               dtype='long', truncating='post', padding='post')

test_attention_masks = []

for seq in test_input_ids:
  seq_mask = [float(i>0) for i in seq]
  test_attention_masks.append(seq_mask)

test_inputs = torch.tensor(test_input_ids)
test_masks = torch.tensor(test_attention_masks)
test_labels = torch.tensor(test_labels)

batch_size = 32

test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

In [None]:
print('Predicting labels for {:,} test patents...'.format(len(input_ids)))

model.eval()

predictions , true_labels = [], []

t0 = time.time()

for (step, batch) in enumerate(test_dataloader):

  batch = tuple(t.to(device) for t in batch)
  
  if step % 100 == 0 and not step == 0:
      elapsed = format_time(time.time() - t0)

      print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))


  b_input_ids, b_input_mask, b_labels = batch
  

  with torch.no_grad():
      outputs = model(b_input_ids, token_type_ids=None, 
                      attention_mask=b_input_mask)

  logits = outputs[0]


  logits = logits.detach().cpu().numpy()
  label_ids = b_labels.to('cpu').numpy()
  
  predictions.append(logits)
  true_labels.append(label_ids)

print('    DONE.')

In [None]:
predictions = np.concatenate(predictions, axis=0)
true_labels = np.concatenate(true_labels, axis=0)

In [None]:
true_labels[0:10]

In [None]:
predictions[0:10]

In [None]:
pred = []

for i in predictions:
  pred.append(np.argmax(i))

true = []

for i in true_labels:
  true.append(np.argmax(i))

print(true)
print(pred)

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

print(round(f1_score(true, pred, average='macro')*100,1), "/", round(f1_score(true, pred, average='micro')*100,1), sep='')

acc = accuracy_score(true, pred)

print('Test Accuracy: %.3f' %acc)
print('F1 score macro: ', f1_score(true, pred, average='macro'))
print('F1 score micro: ', f1_score(true, pred, average='micro'))