In [5]:
from datasets import load_dataset, DatasetDict, Dataset

In [6]:
from transformers import (
    AutoTokenizer,
    AutoConfig,
    AutoModelForSequenceClassification,
    DataCollatorWithPadding,
    TrainingArguments,
    Trainer
)
import torch.nn as nn
from sklearn.metrics import roc_auc_score
from sklearn.metrics import accuracy_score
from peft import PeftModel, PeftConfig, get_peft_model, LoraConfig

RuntimeError: Failed to import transformers.trainer because of the following error (look up to see its traceback):
Failed to import transformers.integrations.integration_utils because of the following error (look up to see its traceback):
Failed to import transformers.modeling_utils because of the following error (look up to see its traceback):
module 'sympy.printing' has no attribute 'str'

In [7]:
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.INFO, datefmt='%I:%M:%S')

In [8]:
import torch
import numpy as np
import evaluate

AttributeError: module 'sympy.printing' has no attribute 'str'

In [None]:
modelName = 'google-bert/bert-base-uncased'

In [None]:
id2label = {0: "Negative", 1: "Positive"}
label2id = {"Negative": 0, "Positive": 1}

model = AutoModelForSequenceClassification.from_pretrained(
    modelName, num_labels=2, id2label=id2label, label2id=label2id
)

In [None]:
model

In [None]:
data_files = {'train': 'train.parquet', 'test': 'test.parquet'}
dataset = load_dataset('datasets/imdbstfd', data_files=data_files)

In [None]:
dataset['test']["label"]

In [None]:
type(dataset)

In [None]:
tokenizer = AutoTokenizer.from_pretrained(modelName)

In [9]:
# Freeze parameters:
for name, param in model.base_model.named_parameters():
    logger.info(name)
    param.requires_grad = False

for name, param in model.named_parameters():
    logger.info(name)

NameError: name 'model' is not defined

In [10]:
# Unfreeze some parameters:
# layer = []
# layer.append("pooler")
# # layer.append("classifier")
# print(layer)
# for name, param in model.named_parameters():
#     for l in layer:
#         if l in name:
#             logger.info(name)
#             param.requires_grad = True

for name, param in model.base_model.named_parameters():
    if "pooler" in name:
        logger.info(name)
        param.requires_grad = True

NameError: name 'model' is not defined

In [11]:
model.classifier.weight

NameError: name 'model' is not defined

In [12]:
model.base_model.pooler.dense.weight

NameError: name 'model' is not defined

In [13]:
model.base_model.embeddings.word_embeddings.weight.requires_grad

NameError: name 'model' is not defined

In [14]:
def tokenize_function(examples):
    text = examples["text"]

    tokenizer.truncation_side="left"
    tokenized_inputs=tokenizer(
        text,
        return_tensors="np",
        truncation=True,
        max_length=512
    )
    return tokenized_inputs


In [None]:
if tokenizer.pad_token is None:
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    model.resize_token_embeddings(len(tokenizer))

tokenized_dataset = dataset.map(tokenize_function, batched=True)

In [None]:
tokenized_dataset

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
data_collator

In [None]:
text_list = ["It was good", "Not a fan, don't recommend", "Better than the first one.",
             "This is not worth watching even once", "This one is a pass."]

print("Untrained model predictions")
print("---------------------------")

for text in text_list:
    inputs = tokenizer.encode(text, return_tensors="pt")
    logits = model(inputs).logits
    predictions = torch.argmax(logits)

    print(text + " - " + id2label[predictions.tolist()])

In [None]:
accuracy = evaluate.load("accuracy")
auc_score = evaluate.load("roc_auc")

def compute_metrics(eval_pred):
    predictions, labels = eval_pred

    probabilities = np.exp(predictions) / np.exp(predictions).sum(-1, keepdims=True)
    positive_class_probs = probabilities[:, 1]
#     logging.info(positive_class_probs)
    auc = np.round(auc_score.compute(prediction_scores=positive_class_probs,
                                    references=labels)['roc_auc'], 3)

    predicted_classes = np.argmax(predictions, axis=1)
#     logging.info(predicted_classes)

    acc = np.round(accuracy.compute(predictions=predicted_classes,
                                    references=labels)['accuracy'], 3)

    return {"Accuracy": acc, "AUC": auc}

In [None]:
lr=2e-4
batch_size=16
num_epochs=2

training_args=TrainingArguments(
    output_dir=modelName + "-IMDB-retrain",
    learning_rate=lr,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=num_epochs,
    logging_strategy="epoch",
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,

    gradient_accumulation_steps=4,
    warmup_steps=2
)

In [None]:
import os
import wandb


os.environ["WANDB_MODE"] = "offline"
wandb.init(
    project="my-awesome-project")

In [None]:
trainer=Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    processing_class=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)

In [None]:
# trainer.train()

In [None]:
modelFT = AutoModelForSequenceClassification.from_pretrained(
    "./models/bert-base-uncased-IMDB-retrain", num_labels=2, id2label=id2label, label2id=label2id
)
modelFT.eval()

In [None]:
type(modelFT)

In [None]:
modelFT.base_model.pooler.dense.weight

In [None]:
modelFT.classifier.weight

In [None]:
tokenizerFT = AutoTokenizer.from_pretrained("./models/bert-base-uncased-IMDB-retrain", use_fast=True)

In [None]:
probList = []
labelList = []
indexList = []
for i, prompt in enumerate(tokenized_dataset['test']["text"][:40]):
    inputs = tokenizerFT(prompt, return_tensors="pt", truncation=True, max_length=512)
    outputs = modelFT(inputs["input_ids"])
    out = nn.Softmax(dim=1)(outputs["logits"])
    probList.append(out.detach().numpy())
    outMax = torch.argmax(out)
    indexList.append(np.array(outMax))
    label = dataset['test']["label"][i]
    labelList.append(np.array(label))
    logger.info(f"Prob: {outMax}, Label: {label}, Logits: {out}")

In [None]:
accuracy_score(labelList, indexList)


In [None]:
modelFT.modules

In [None]:
layer = 'classifier'
fcWeight = modelFT._modules.get(layer).weight
print(fcWeight.shape)
flatWeight = fcWeight.reshape(1, -1)[0].cpu().detach().numpy()
print(flatWeight.mean() - 3 * flatWeight.std(), flatWeight.mean() + 3 * flatWeight.std())
print(flatWeight.mean() - 2 * flatWeight.std(), flatWeight.mean() + 2 * flatWeight.std())
print(flatWeight.mean() - 1 * flatWeight.std(), flatWeight.mean() + 1 * flatWeight.std())

In [None]:
def layerSearch(layer: str) -> torch.Tensor:
    for name, param in model.named_parameters():
        if layer in name:
            tensor = param
    return tensor

layerSearch("bert.encoder.layer.11.output.LayerNorm.bias")

In [None]:
scale = 2
state_dict = modelFT._modules.get("classifier").state_dict()

for name, param in state_dict.items():
    # Don't update if this is not a weight.
    if not "weight" in name:
        continue

    # Define two conditions
    condition1 = param <= 0.025481238044449128
    condition2 = param >= -0.025227365069440566

    # Combine the conditions using logical AND
    combined_condition = condition1 & condition2

    transformed_param = torch.where(combined_condition, 0, param)
    transformed_param = torch.where(param != 0, transformed_param*scale, transformed_param)

    param.copy_(transformed_param)

modelFT._modules.get("classifier").weight

In [None]:
probList = []
labelList = []
indexList = []
for i, prompt in enumerate(tokenized_dataset['test']["text"][:5]):

    inputs = tokenizerFT(prompt, return_tensors="pt", truncation=True, max_length=512)

    outputs = modelFT(inputs["input_ids"])
    out = nn.Softmax(dim=1)(outputs["logits"])
    outMax = torch.argmax(out)
    label = dataset['test']["label"][i]

    probList.append(out[0].tolist())
    indexList.append(outMax.item())
    labelList.append(label)

    logger.info(f"Prob: {outMax}, Label: {label}, Logits: {out}")

In [None]:
np.array(labelList)

In [None]:
probList = np.array(probList)
probList[:,1]

In [None]:
roc_auc_score(np.array(labelList), probList[:, 1])

# accuracy_score(labelList, indexList)
# # probList[:3]

In [None]:
from scipy import stats
from scipy.stats import logistic, norm
import matplotlib.pyplot as plt

features = flatWeight
plt.hist(features, bins='auto', density=True)
plt.plot(np.sort(np.array(features)), norm.pdf(np.sort(np.array(features)),\
                  features.mean(),\
                  features.std()))

plt.plot(np.sort(np.array(features)), logistic.pdf(np.sort(np.array(features)),\
                  features.mean(),\
                  features.std()))

plt.title(

          '\nNorm statistic: '+ str(stats.kstest(np.sort(np.array(features)), \
                                            norm.cdf(np.sort(np.array(features)),\
                                              features.mean(),\
                                              features.std())).statistic) +

        '\nLog statistic: '+ str(stats.kstest(np.sort(np.array(features)), \
                          logistic.cdf(np.sort(np.array(features)),\
                          features.mean(),\
                          features.std())).statistic), fontsize=7)