In [23]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import torch
import os
import re

# device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
df = pd.read_csv("funda_data_11_06_2024.csv")
df = df[["descrip", "price"]]

In [24]:
import torch
from torch.nn.functional import mse_loss
from transformers import BertTokenizer, BertModel
from transformers import BertConfig
import torch.nn as nn
from typing import Literal

def rmse_loss(input, target):
    loss = mse_loss(input, target)
    return torch.sqrt(loss)

class RegressorBERT(torch.nn.Module):
    def __init__(self, 
                 model_name: str, 
                 aggregation_method: Literal["sum", "mean"] = "mean", 
                 hidden_layers:list = [-1], 
                 device: torch.device=torch.device("cpu"), 
                 freeze_bert: bool|list = True, 
                 dense_layers:list = []) -> None:
        
        super(RegressorBERT, self).__init__()

        # set up internal attributes
        self.aggregation_method = aggregation_method
        self.hidden_layers = hidden_layers
        self.device = device

        # load up given BERT model and freeze all or specific layers
        self._load_bert(model_name)
        if freeze_bert:
            self._freeze_bert_layers(freeze_bert)

        # creaete chain of dense and ReLU layers based on given list
        dense_layers = [layer for layer_size in dense_layers for layer in (nn.Linear(*layer_size, device=self.device), nn.ReLU())]

        # 1 guaranteed dense layer between BERT and output layer 
        self.regressor_layers = torch.nn.Sequential(
            torch.nn.Linear(self.bert.config.hidden_size, 128, device=self.device),
            torch.nn.ReLU(),
            *dense_layers,
            torch.nn.Linear(128, 1, device=self.device)
        )

    # TODO freeze specific layers
    def _freeze_bert_layers(self, freeze_bert: bool|list) -> None:
        for parameter in self.bert.parameters():
            parameter.requires_grad = False

    def _load_bert(self, model_name: str) -> None:
        configuration = BertConfig.from_pretrained(model_name, output_attentions=True, output_hidden_states=True)
        self.bert = BertModel.from_pretrained(model_name, config=configuration).to(self.device)

    def _pool_tensors(self, x: torch.Tensor) -> torch.Tensor:
        if x.shape[0] > 1: # multiple layers
            match self.aggregation_method:
                case "sum":
                    x = torch.sum(x, dim=0)
                case "mean":
                    x = torch.mean(x, dim=0)

        return torch.mean(x, dim=1)
    
    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor, token_type_ids: torch.Tensor, **kwargs):
        output = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids).hidden_states
        # resulting shape -> (input_size, num_layers, seq_len, emb_dim)
        output = torch.stack([output[layer] for layer in self.hidden_layers], dim=0)
        output = self._pool_tensors(output)
        output = self.regressor_layers(output)

        # compute loss
        price = kwargs["price"]
        if isinstance(price, torch.Tensor):
            price = price.unsqueeze(1).type(torch.float32)
        else:
            price = torch.as_tensor(price, dtype=torch.float32).unsqueeze(1)
            
        loss = rmse_loss(output, price)

        return {"loss": loss, "logits": output}

In [25]:
model_name = "google-bert/bert-base-multilingual-cased"

tokenizer = BertTokenizer.from_pretrained(model_name, return_tensors="pt", truncation=True, max_length=512, padding="max_length")
model = RegressorBERT(model_name=model_name, hidden_layers=[-1,-2], device=torch.device("cpu"))



In [27]:
tokens = tokenizer(df["descrip"][:2].tolist(), truncation=True, max_length=512, padding="longest", return_tensors="pt")
outputs = model(**tokens, price=df["price"][:2].tolist()) # unsqueeze to add extra dimension corresponding to batches
print(outputs)

{'loss': tensor(425000.0938, grad_fn=<SqrtBackward0>), 'logits': tensor([[-0.1015],
        [-0.1197]], grad_fn=<AddmmBackward0>)}


In [28]:
# def rmse_loss(input, target):
#     loss = mse_loss(input, target)
#     return torch.sqrt(loss)

def compute_metrics(pred):
    logits, value = pred
    return {"rmse": mse_loss(value, logits)}

In [29]:
from datasets import Dataset
from transformers import DataCollatorWithPadding

def tokenize(x):
    return tokenizer(x["descrip"], max_length=512, truncation=True)

dataset = Dataset.from_pandas(df.iloc[:150])
dataset = dataset.map(tokenize, batched=True)
dataset = dataset.train_test_split(0.2)
dataset = dataset.rename_column("descrip", "text")
dataset = dataset.remove_columns("text")

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
# dataloader = DataLoader(dataset, batch_size=3, shuffle=True, num_workers=2)

Map: 100%|██████████| 150/150 [00:00<00:00, 154.43 examples/s]


In [30]:
os.environ["CUDA_VISIBLE_DEVICES"]=""
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(output_dir="regressor_bert", 
                                  learning_rate=1e-2, 
                                  num_train_epochs=10, 
                                  weight_decay=0.01, 
                                  eval_strategy="epoch", 
                                  save_strategy="no", 
                                  # load_best_model_at_end=True, 
                                  push_to_hub=False,
                                  remove_unused_columns=True,
                                  label_names=["price"]
                                  )

trainer = Trainer(model=model,
                  tokenizer=tokenizer,
                  args=training_args,
                  data_collator=data_collator,
                  train_dataset=dataset["train"],
                  eval_dataset=dataset["test"],
                #   compute_metrics=compute_metrics
                  )
trainer.train()

  1%|▏         | 2/150 [00:11<13:46,  5.59s/it]

KeyboardInterrupt: 

In [None]:
def test_model(sample_text, sample_price):
    tokens = tokenizer(sample_text, truncation=True, max_length=512, padding="longest", return_tensors="pt")
    outputs = model(**tokens, price=[sample_price])

    print(outputs)

test_model(df.iloc[0]["descrip"], df.iloc[0]["price"])

{'loss': tensor(229257.6250, grad_fn=<SqrtBackward0>), 'logits': tensor([[654257.6250]], grad_fn=<AddmmBackward0>)}
