# Demo on BERT encoding and classifying

In [1]:
# packages
import torch
from transformers import *
import numpy as np
import glob
import json

labels_path = "/mnt/sda1/backup/lapCin/Workspace/Cinnamon/Data/Invoice/Phase 3/train/labels"

In [2]:
class TextEncoder:
    _default = {
            "MODELS": [(BertModel,       BertTokenizer,       'bert-base-uncased')
         ],
    }
    def __init__(self):
        self._set_default()
        pass
    
    def _set_default(self):
        for model_class, tokenizer_class, pretrained_weights in self._default["MODELS"]:
            # Load pretrained model/tokenizer
            self.tokenizer = tokenizer_class.from_pretrained(pretrained_weights)
            self.model = model_class.from_pretrained(pretrained_weights)
    
    def encode(self, text, special_tokens=True):
        input_ids = torch.tensor([self.tokenizer.encode(text, add_special_tokens=special_tokens)])  # Add special tokens takes care of adding [CLS], [SEP], <s>... tokens in the right way for each model.
        with torch.no_grad():
            last_hidden_states = self.model(input_ids)[0]  # Models outputs are now tuples
        return last_hidden_states
    
    def compress(self, X, mode="average"):
        return X.mean(dim=1)
    
    def export_pd(self, X):
        return pd.DataFrame(X)
        
    def process_text_list(self, text_list):
        X = [self.encode(text) for text in text_list]
        X = [self.compress(x) for x in X]
        X = torch.cat(X, dim=0).numpy()
        X = self.export_pd(X)
        return X
        
    def process(self, data):
        text_list = [text for (text, C) in data]
        X = self.process_text_list(text_list)
        category = [C for text, C in data]
        
        X["Class"] = category
        return X

In [3]:
# utilities
def parse_json_to_CASIA(label_path):
    CASIA_output = []
    with open(label_path, "r", encoding="utf-8") as f:
        label = json.load(f)
        for info in label['attributes']['_via_img_metadata']['regions']:
            shape_info = info['shape_attributes']
            item_info = info['region_attributes']
            text = item_info.get('label')
            formal_key = item_info.get('formal_key')
            key_type = item_info.get('key_type')
            try:
                x,y,w,h = [shape_info['x'], shape_info['y'], shape_info['width'], shape_info['height']]
                loc = [(x,y), (x+w,y), (x+w,y+h), (x,y+h)]
            except:
                loc = [(x,y) for (x,y) in zip(shape_info['all_points_x'], shape_info['all_points_y'])]
            #print(f"{text}: - KEY: {formal_key} - TYPE: {key_type} - LOC: {x,y,w,h}")
            
            # put into CASIA
            item = {
                'text': text,
                'type': formal_key,
                'key_type': key_type,
                'location': loc,
            }
            CASIA_output.append(item)
    return CASIA_output



``` python
labels_path = "/mnt/sda1/backup/lapCin/Workspace/Cinnamon/Data/Invoice/Phase 3/train/labels"
list_label = glob.glob(labels_path + "/*")


data = []
for label_path in list_label:
    print(f"Processing {label_path}")
    CASIA_output = parse_json_to_CASIA(label_path)
    item = [(item.get("text"), item.get("type")) for item in CASIA_output \
            if item.get("type") not in ["", None, "None"]]
    data.extend(item)
```

In [4]:
from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):
    """Text dataset from json"""

    def __init__(self, labels_path, category=None, transform=None, target_transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        
        self.data, self.category = self.load_data(labels_path, category)
        
        self.transform = transform
        self.target_transform = target_transform
        
    def load_data(self, labels_path, category=None):
        list_label = glob.glob(labels_path + "/*")
        data = []
        for label_path in list_label:
            #print(f"Processing {label_path}")
            CASIA_output = parse_json_to_CASIA(label_path)
            item = [(item.get("text"), item.get("type")) for item in CASIA_output \
                    if item.get("type") not in ["", None, "None"] and item.get('key_type') in "value"]
            if category:
                out = []
                for element in item:
                    for cat in category:
                        if cat in element[1]:
                            out.append(element)
                            continue
                item = out

            data.extend(item)
        
        category = list(np.unique([item[1] for item in data]))
        
        return data, category
    
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
      
        text, cat = self.data[idx]
        if self.transform:
            text = self.transform(text)
        if self.target_transform:
            cat = self.target_transform(cat)
            
        return text, cat

In [5]:
category = ["name", "date", "type", "quantity", "amount"]

In [6]:
def _target_transform(label, category=category):
    out = [cat in label for cat in category]
    out = torch.tensor([out]).type(torch.FloatTensor)
    return out

def target_transform(label, category=category):
    for cat in category:
        if cat in label:
            return cat

    return ""


trainset = TextDataset(labels_path, category, target_transform=target_transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=1,
                                         shuffle=True, num_workers=1)

In [7]:
trainset.category

['account type',
 'account_name',
 'account_name_kana',
 'account_type',
 'amount_excluding_tax',
 'amount_including_tax',
 'amount_tax',
 'amount_total_including_tax',
 'bank_branch_name',
 'bank_name',
 'bank_name_bank_branch_name_account_name_account_type_account_number',
 'branch_name',
 'company name',
 'company_name',
 'delivery_date',
 'destination_company_department_name',
 'document_name',
 'issued_date',
 'item _unit_amount',
 'item_name',
 'item_quantity',
 'item_quantity_item_unit',
 'item_unit_amount',
 'pay_com_name',
 'payment_date']

category = ['account type',
 'account_name',
 'account_name_kana',
 'account_number',
 'account_type',
 'amount_excluding_tax',
 'amount_including_tax',
 'amount_tax',
 'bank_branch_name',
 'bank_name',
 'branch_address',
 'branch_fax',
 'branch_name',
 'branch_tel',
 'branch_zipcode',
 'car_number',
 'company address',
 'company name',
 'company_address',
 'company_fax',
 'company_name',
 'company_tel',
 'company_zipcode',
 'delivery_date',
 'document_name',
 'document_number',
 'invoice_number',
 'issued_date',
 'item_unit_amount',
 'item_line_number',
 'item_name',
 'item_quantity',
 'item_total_excluding_tax',
 'item_total_including_tax',
 'item_total_tax',
 'item_unit',
 'item_unit_amount',
 'partner_code',
 'pay_com_name',
 'payment_date',
 'table_excluding_tax',
 'table_total_excluding_tax',
 'table_total_including_tax',
 'table_total_tax',
 'tax']

In [8]:
"""
test_text = '日本語（にほんご、にっぽんご[注 1]）は、主に日本国内や日本人同士の間で使用されている言語である。'
EM = TextEncoder()
encoded_text = EM.encode(test_text)
print(test_text, encoded_text.shape)
"""

"\ntest_text = '日本語（にほんご、にっぽんご[注 1]）は、主に日本国内や日本人同士の間で使用されている言語である。'\nEM = TextEncoder()\nencoded_text = EM.encode(test_text)\nprint(test_text, encoded_text.shape)\n"

# Examples

``` python
BERT_MODEL_CLASSES = [BertForSequenceClassification, BertForTokenClassification]
pretrained_weights = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(pretrained_weights)
for model_class in BERT_MODEL_CLASSES:
    # Load pretrained model/tokenizer
    model = model_class.from_pretrained(pretrained_weights)

    # Models can return full list of hidden-states & attentions weights at each layer
    model = model_class.from_pretrained(pretrained_weights,
                                        output_hidden_states=True,
                                        output_attentions=True)
    input_ids = torch.tensor([tokenizer.encode("Let's see all hidden-states and attentions on this text")])
    all_hidden_states, all_attentions = model(input_ids)[-2:]

    # Models are compatible with Torchscript
    model = model_class.from_pretrained(pretrained_weights, torchscript=True)
    traced_model = torch.jit.trace(model, (input_ids,))

    # Simple serialization for models and tokenizers
    model.save_pretrained('/mnt/sda1/code/weights/NLP')  # save
    model = model_class.from_pretrained('/mnt/sda1/code/weights/NLP')  # re-load
    tokenizer.save_pretrained('/mnt/sda1/code/weights/NLP')  # save
    tokenizer = BertTokenizer.from_pretrained('/mnt/sda1/code/weights/NLP')  # re-load
```

In [9]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import tqdm
class BERTSequenceClassfier:
    def __init__(self, nClass=2):
        self.model = BertForSequenceClassification.from_pretrained('/mnt/sda1/code/weights/NLP')
        self.tokenizer = BertTokenizer.from_pretrained('/mnt/sda1/code/weights/NLP')  # re-load
        self.encode_model = BertModel.from_pretrained("bert-base-uncased")
        self.nClass = nClass
        self.design()
        
    def design(self):
        # freeze model
        for param in self.model.parameters(): param.requires_grad = False
        # modifying last layer
        self.model.classifier = nn.Sequential(
            nn.Linear(768, 256),
            #nn.BatchNorm1d(256),
            nn.Dropout(),
            nn.ReLU(),
            nn.Linear(256, 64),
            #nn.BatchNorm1d(64),
            nn.Dropout(),
            nn.ReLU(),
            nn.Linear(64, self.nClass),
            nn.Softmax(),
        )
        for param in self.model.classifier.parameters(): param.requires_grad = True
        #print(self.model)
    
    def encode(self, text, special_tokens=True):
        input_ids = torch.tensor([self.tokenizer.encode(text, add_special_tokens=special_tokens)])  # Add special tokens takes care of adding [CLS], [SEP], <s>... tokens in the right way for each model.
        with torch.no_grad():
            last_hidden_states = self.encode_model(input_ids)[0]  # Models outputs are now tuples
        return last_hidden_states
    
    def compress(self, X, mode="average"):
        return X.mean(dim=1)
        
        
        
    def train(self, dataloader, nb_iter=100):
        criterion = nn.BCELoss()
        optimizer = optim.Adam(self.model.classifier.parameters(), lr=0.0001)
        data_iter = iter(dataloader)
        with tqdm.trange(nb_iter) as t:
            for iteration in t:
            #for texts, labels in dataloader:
                try:
                    texts, labels = next(data_iter)
                except:
                    data_iter = iter(dataloader)
                    texts, labels = next(data_iter)
                    
                try:                  
                    #print([self.tokenizer.encode(text, add_special_tokens=True) for text in texts])
                    #break
                    #input_ids = torch.tensor([self.tokenizer.encode(text, add_special_tokens=True) for text in texts])
                    
                    encode = self.encode(texts[0])
                    input_ids = self.compress(encode)
                    pred = self.model.classifier(input_ids)
                    out = labels[0]
                    #print(pred)
                    #print(out)
                    
                    
                    
                    #out = [cat in labels[0] for cat in category]
                    
                    
                    #out = torch.tensor([out]).type(torch.FloatTensor)
                    #print(pred)
                    #print(out)
                    loss = criterion(pred, out)
                    loss.backward()
                    optimizer.step()

                    t.set_description(f"[{iteration}/{nb_iter}]: {loss.item()}")
                except:
                    print(texts)
                    continue
            
        
    
    def predict(self, text):
        input_ids = torch.tensor([self.tokenizer.encode(text)])
        #print(input_ids)
        return self.model(input_ids)
    

        

In [10]:
#BERTclf = BERTSequenceClassfier(nClass=len(category))
#BERTclf.train(trainloader, nb_iter=10000)

In [11]:
def predict(text):
    encode = BERTclf.encode(text)
    input_ids = BERTclf.compress(encode)
    return BERTclf.model.classifier(input_ids)

In [12]:
"""
test_iter = iter(trainloader)
test_text, label = test_iter.next()
#est_text = '日本語（にほんご、にっぽんご[注 1]）は、主に日本国内や日本人同士の間で使用されている言語である。'
pred = predict(test_text[0])
print("PRED:", pred[0].detach())
idx = torch.argmax(pred[0].detach())
out = category[idx] if idx <len(category) else None
print(test_text,out, label)
print(category)
"""

'\ntest_iter = iter(trainloader)\ntest_text, label = test_iter.next()\n#est_text = \'日本語（にほんご、にっぽんご[注 1]）は、主に日本国内や日本人同士の間で使用されている言語である。\'\npred = predict(test_text[0])\nprint("PRED:", pred[0].detach())\nidx = torch.argmax(pred[0].detach())\nout = category[idx] if idx <len(category) else None\nprint(test_text,out, label)\nprint(category)\n'

In [13]:
import pandas as pd
class BERTSVM:
    def __init__(self, nClass=2):
        self.tokenizer = BertTokenizer.from_pretrained('/mnt/sda1/code/weights/NLP')  # re-load
        self.encode_model = BertModel.from_pretrained("bert-base-uncased")
        self.nClass = nClass
    
    def export_pd(self, X):
        return pd.DataFrame(X)

    
    def encode(self, text, special_tokens=True):
        input_ids = torch.tensor([self.tokenizer.encode(text, add_special_tokens=special_tokens)])  # Add special tokens takes care of adding [CLS], [SEP], <s>... tokens in the right way for each model.
        with torch.no_grad():
            last_hidden_states = self.encode_model(input_ids)[0]  # Models outputs are now tuples
        return last_hidden_states
    
    def compress(self, X, mode="average"):
        return X.mean(dim=1)
    
    
    def process_text_list(self, text_list):
        X = []
        for text in text_list:
            try:
                encode = self.encode(text)
                x = self.compress(encode)
                X.append(x)
            except:
                print(text)
                
        #X = [self.encode(text) for text in text_list]
        #X = [self.compress(x) for x in X]
        X = torch.cat(X, dim=0).numpy()
        X = self.export_pd(X)
        return X
    
    def process(self, data):
        X = []
        category = []
        with tqdm.trange(len(data)) as t:
            for iteration in t:
            #for text, C in tqdm(data):
                text, C = data[iteration]
                try:
                    encode = self.encode(text)
                    x = self.compress(encode)
                    X.append(x)
                    category.append(C)
                except:
                    print(text)
                
        X = torch.cat(X, dim=0).numpy()
        X = self.export_pd(X)                
        X["Class"] = category
        return X
        
        
        
    def train(self, dataloader, nb_iter=100):
        pass
            
        
    
    def predict(self, text):
        pass

In [14]:
out = BERTSVM().process(trainset)

 15%|█▌        | 1422/9284 [00:29<02:35, 50.68it/s]

15


100%|██████████| 9284/9284 [03:08<00:00, 49.21it/s]


In [15]:
from sklearn.svm import SVC

class CLASSIFIER:
    def __init__(self):
        self.load_default_classifier()
        pass
    
    def load_default_classifier(self):
        self.load_svm()
        
    def load_svm(self, kernel='poly', degree=8):
        self.model = SVC(kernel=kernel, degree=degree)
        
    def fit(self, X_train, y_train):
        self.model.probability = True
        self.model.fit(X_train, y_train)
        
    def predict(self, X):
        return self.model.predict(X)
    
    def score(self, X):
        return self.model._predict_proba(X)
    
    def score_log(self, X):
        return self.model._predict_log_proba(X)
    
    def predict_MC(self, X):
        self.decision_function_shape='ovr'
        return self.model.decision_function(X)
    
class REPORT:
    def __init__(self):
        pass
    
    def export_report(self, gt, pred):
        print(classification_report(y_test, y_pred))
        
    def export_confusion_matrix(self, gt, pred):
        print(confusion_matrix(y_test, y_pred))


In [16]:
from sklearn.model_selection import train_test_split
# Split to data and label
X = out.drop('Class', axis=1)
y = out['Class']

# Split train - test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.20)

In [17]:
y_train

356       type
2682      name
505       date
4101      name
4372    amount
         ...  
4555      date
4680      name
3944      date
1484      name
5385    amount
Name: Class, Length: 7426, dtype: object

In [18]:
from time import time
from sklearn.metrics import classification_report, confusion_matrix
classifier = CLASSIFIER()
classifier.fit(X_train, y_train)
s = time()
y_pred = classifier.predict(X_test)
print(f"Elapsed {time()-s} s")
y_score = classifier.score(X_test)
report = REPORT()
report.export_report(y_test, y_pred)
report.export_confusion_matrix(y_test, y_pred)

Elapsed 1.141756296157837 s
              precision    recall  f1-score   support

      amount       0.99      0.98      0.98       379
        date       1.00      0.98      0.99       324
        name       0.98      0.99      0.99       904
    quantity       0.95      0.90      0.93       104
        type       0.94      0.97      0.96       146

    accuracy                           0.98      1857
   macro avg       0.97      0.97      0.97      1857
weighted avg       0.98      0.98      0.98      1857

[[372   0   2   5   0]
 [  0 317   7   0   0]
 [  0   0 895   0   9]
 [  5   0   5  94   0]
 [  0   0   4   0 142]]


In [19]:
def predict(text):
    X = BERTSVM().process([(text, None)])
    X = X.drop('Class', axis=1)
    y = classifier.predict(X)
    return y
    

In [22]:
print(predict("abc"))
print(predict("12/2/2020"))
print(predict("12"))


100%|██████████| 1/1 [00:00<00:00, 43.77it/s]


['name']


100%|██████████| 1/1 [00:00<00:00, 47.64it/s]


['date']


100%|██████████| 1/1 [00:00<00:00, 45.63it/s]

['quantity']





In [24]:
print(predict("15.000.00"))

100%|██████████| 1/1 [00:00<00:00, 40.08it/s]

['amount']



