In [21]:
pip install openprompt

[0mNote: you may need to restart the kernel to use updated packages.


In [22]:
pip install evaluate


[0mNote: you may need to restart the kernel to use updated packages.


In [23]:
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
from transformers import TrainingArguments, Trainer
import evaluate
from datasets import load_metric


In [24]:
model_type = "XLM-R" #@param ["SinBERT", "Bert", "XLM-R"]
technique = "hate speech" #@param ["humor", "hate speech", "sentiment"]
load_adapter = False #@param {type:"boolean"}
# train = True #@param {type:"boolean"}
unfreeze_model = False #@param {type:"boolean"}
save_adapter = False #@param {type:"boolean"}
oversample_dataset = True #@param {type:"boolean"}
lang_adapter_setting = "none" #@param ["none", "stack", "parallel"]
random_state = 42 #@param
adapter_config = "pfeiffer" #@param ["houlsby", "pfeiffer"]
over_sampling_technique = "ROS" #@param ["", "ROS","ADASYN", "SMOTE", "BorderlineSMOTE"]
sampling_strategy = "1:0.25:0.25" #@param [] {allow-input: true} 
## eg: 1:0.25:0.25 for hate | 0.5 for humor | 1:1:1:1 or 0.5:1:0.5:0.5 or 0.25:1:0.25:0.25 for sentiment

In [25]:
from openprompt.data_utils.text_classification_dataset import PROCESSORS
from datasets import load_dataset


In [27]:
dataset_path="/kaggle/input/cmcs-dataset-task/ompleted_draft - ompleted_draft.csv"
all_data = pd.read_csv(dataset_path)

if (technique == "humor"):
  all_data = all_data[['Sentence', 'Humor']]
elif (technique == "hate speech"):
  all_data = all_data[['Sentence', 'Hate_speech']]
else:
  all_data = all_data[['Sentence', 'Sentiment']]

all_data.columns = ['Sentence', 'Label']
all_data['Label'], uniq = pd.factorize(all_data['Label'])

X = all_data['Sentence'].values.tolist()
y = all_data['Label'].values.tolist()

In [28]:
all_data.groupby("Label").count()


Unnamed: 0_level_0,Sentence
Label,Unnamed: 1_level_1
0,12262
1,348
2,908


In [29]:
uniq

Index(['Not offensive', 'Hate-Inducing', 'Abusive'], dtype='object')

In [30]:
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN, BorderlineSMOTE

In [31]:
if (technique == 'humor'):
    num_labels=2
    id2label={ 0: "Non-humorous", 1: "Humorous"}
elif (technique == 'hate speech'):
    num_labels=3
    id2label={ 0: "Not offensive", 1: "Hate-Inducing", 2: "Abusive"}
else:
    num_labels=4
    id2label={ 0: "Negative", 1: "Neutral", 2: "Positive", 3:"Conflict"}

In [32]:
def apply_oversampling(x, y):

  (unique, counts) = np.unique(y, axis=0, return_counts=True)
  print("Class Distribution Without Oversampling", counts)

  # define oversampling strategy
  if (over_sampling_technique == ""):
    return x, y
  elif (over_sampling_technique == "ROS"):
    if (technique=="humor"):
      oversample = RandomOverSampler(sampling_strategy = float(sampling_strategy))
    elif (technique=="hate speech"):
      sampling_ratio = sampling_strategy.split(":");
      oversample = RandomOverSampler(sampling_strategy = {
          0:int(counts[0]*float(sampling_ratio[0])), 
          1:int(counts[0]*float(sampling_ratio[1])), 
          2:int(counts[0]*float(sampling_ratio[2]))
          })
    elif (technique=="sentiment"):
      sampling_ratio = sampling_strategy.split(":");
      oversample = RandomOverSampler(sampling_strategy = {
          0:int(counts[1]*float(sampling_ratio[0])), 
          1:int(counts[1]*float(sampling_ratio[1])), 
          2:int(counts[1]*float(sampling_ratio[2])),
          3:int(counts[1]*float(sampling_ratio[3]))
          })
  elif (over_sampling_technique == "ADASYN"):
    oversample = ADASYN(sampling_strategy="minority")
  elif (over_sampling_technique == "SMOTE"):
    oversample = SMOTE()
  elif (over_sampling_technique == "BorderlineSMOTE"):
    oversample = BorderlineSMOTE()

  # fit and apply the transform
  X_over, y_over = oversample.fit_resample(x, y)

  (unique, counts) = np.unique(y_over, axis=0, return_counts=True)
  print("Class Distribution After Oversampling", counts)

  return X_over, y_over

In [33]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state = random_state)

In [34]:
if oversample_dataset:
  # apply oversampling
  X_train = np.array(X_train).reshape(-1, 1)
  X_train, y_train = apply_oversampling(X_train, y_train)
  X_train = [x[0] for x in X_train.tolist()]

Class Distribution Without Oversampling [11036   314   816]
Class Distribution After Oversampling [11036  2759  2759]


In [17]:
examples=[]
from openprompt.data_utils import InputExample
for i in range(len(X_train)):
  examples.append(InputExample(
        guid = i,
        text_a =X_train[i],
        label=y_train[i]
    ))

In [18]:

from transformers.models.auto.tokenization_auto import tokenizer_class_from_name

from openprompt.plms.utils import TokenizerWrapper
from typing import List, Dict
from collections import defaultdict

class MLMTokenizerWrapper(TokenizerWrapper):
    add_input_keys = ['input_ids', 'attention_mask', 'token_type_ids']

    @property
    def mask_token(self):
        return self.tokenizer.mask_token

    @property
    def mask_token_ids(self):
        return self.tokenizer.mask_token_id

    @property
    def num_special_tokens_to_add(self):
        if not hasattr(self, '_num_specials'):
            self._num_specials = self.tokenizer.num_special_tokens_to_add()
        return self._num_specials

    def tokenize_one_example(self, wrapped_example, teacher_forcing):
        ''' # TODO doesn't consider the situation that input has two parts
        '''

        wrapped_example, others = wrapped_example

        # for some dataset like SuperGLUE.COPA, the answer requires prediction an span of
        # the input. Or in generation tasks, we need to generate a piece of target_text.
        # In these case, it tokenized to the encoded_tgt_text for future use.
        encoded_tgt_text = []
        if 'tgt_text' in others:
            tgt_text = others['tgt_text']
            if isinstance(tgt_text, str):
                tgt_text = [tgt_text]
            for t in tgt_text:
                encoded_tgt_text.append(self.tokenizer.encode(t, add_special_tokens=False))


        mask_id = 0 # the i-th the mask token in the template.

        encoder_inputs = defaultdict(list)
        for piece in wrapped_example:
            if piece['loss_ids']==1:
                if teacher_forcing: # fill the mask with the tgt task
                    raise RuntimeError("Masked Language Model can't perform teacher forcing training!")
                else:
                    encode_text = [self.mask_token_ids]
                mask_id += 1

            if piece['text'] in self.special_tokens_maps.keys():
                to_replace = self.special_tokens_maps[piece['text']]
                if to_replace is not None:
                    piece['text'] = to_replace
                else:
                    raise KeyError("This tokenizer doesn't specify {} token.".format(piece['text']))

            if 'soft_token_ids' in piece and piece['soft_token_ids']!=0:
                encode_text = [0] # can be replace by any token, since these token will use their own embeddings
            else:
                encode_text = self.tokenizer.encode(piece['text'], add_special_tokens=False)

            encoding_length = len(encode_text)
            encoder_inputs['input_ids'].append(encode_text)
            for key in piece:
                if key not in ['text']:
                    encoder_inputs[key].append([piece[key]]*encoding_length)

        encoder_inputs = self.truncate(encoder_inputs=encoder_inputs)
        # delete shortenable ids
        encoder_inputs.pop("shortenable_ids")
        encoder_inputs = self.concate_parts(input_dict=encoder_inputs)
        encoder_inputs = self.add_special_tokens(encoder_inputs=encoder_inputs)
        # create special input ids
        encoder_inputs['attention_mask'] = [1] *len(encoder_inputs['input_ids'])
        if self.create_token_type_ids:
            encoder_inputs['token_type_ids'] = [0] *len(encoder_inputs['input_ids'])
        # padding
        encoder_inputs = self.padding(input_dict=encoder_inputs, max_len=self.max_seq_length, pad_id_for_inputs=self.tokenizer.pad_token_id)


        if len(encoded_tgt_text) > 0:
            encoder_inputs = {**encoder_inputs, "encoded_tgt_text": encoded_tgt_text}# convert defaultdict to dict
        else:
            encoder_inputs = {**encoder_inputs}
        return encoder_inputs

In [19]:
from statistics import mode
from typing import List, Optional
from transformers.modeling_utils import PreTrainedModel
from transformers.tokenization_utils import PreTrainedTokenizer
from transformers import BertConfig, BertTokenizer, BertModel, BertForMaskedLM, \
                         RobertaConfig, RobertaTokenizer, RobertaModel, RobertaForMaskedLM, \
                         XLMRobertaConfig, XLMRobertaTokenizer, XLMRobertaModel, XLMRobertaForMaskedLM
from collections import namedtuple
from yacs.config import CfgNode

from openprompt.utils.logging import logger


ModelClass = namedtuple("ModelClass", ('config', 'tokenizer', 'model','wrapper'))

_MODEL_CLASSES = {
    'bert': ModelClass(**{
        'config': BertConfig,
        'tokenizer': BertTokenizer,
        'model':BertForMaskedLM,
        'wrapper': MLMTokenizerWrapper,
    }),
    'roberta': ModelClass(**{
        'config': RobertaConfig,
        'tokenizer': RobertaTokenizer,
        'model':RobertaForMaskedLM,
        'wrapper': MLMTokenizerWrapper
    }),
    'xlm': ModelClass(**{
        'config': XLMRobertaConfig,
        'tokenizer': XLMRobertaTokenizer,
        'model': XLMRobertaForMaskedLM,
        'wrapper': MLMTokenizerWrapper
    }),
}


def get_model_class(plm_type: str):
    return _MODEL_CLASSES[plm_type]


def load_plm(model_name, model_path, specials_to_add = None):
    r"""A plm loader using a global config.
    It will load the model, tokenizer, and config simulatenously.

    Args:
        config (:obj:`CfgNode`): The global config from the CfgNode.

    Returns:
        :obj:`PreTrainedModel`: The pretrained model.
        :obj:`tokenizer`: The pretrained tokenizer.
        :obj:`model_config`: The config of the pretrained model.
        :obj:`wrapper`: The wrapper class of this plm.
    """
    model_class = get_model_class(plm_type = model_name)
    model_config = model_class.config.from_pretrained(model_path)
    # you can change huggingface model_config here
    # if 't5'  in model_name: # remove dropout according to PPT~\ref{}
    #     model_config.dropout_rate = 0.0
    if 'gpt' in model_name: # add pad token for gpt
        specials_to_add = ["<pad>"]
        # model_config.attn_pdrop = 0.0
        # model_config.resid_pdrop = 0.0
        # model_config.embd_pdrop = 0.0
    model = model_class.model.from_pretrained(model_path, config=model_config)
    tokenizer = model_class.tokenizer.from_pretrained(model_path)
    wrapper = model_class.wrapper


    model, tokenizer = add_special_tokens(model, tokenizer, specials_to_add=specials_to_add)

    if 'opt' in model_name:
        tokenizer.add_bos_token=False
    return model, tokenizer, model_config, wrapper




def load_plm_from_config(config: CfgNode):
    r"""A plm loader using a global config.
    It will load the model, tokenizer, and config simulatenously.

    Args:
        config (:obj:`CfgNode`): The global config from the CfgNode.

    Returns:
        :obj:`PreTrainedModel`: The pretrained model.
        :obj:`tokenizer`: The pretrained tokenizer.
        :obj:`model_config`: The config of the pretrained model.
        :obj:`model_config`: The wrapper class of this plm.
    """
    plm_config = config.plm
    model_class = get_model_class(plm_type = plm_config.model_name)
    model_config = model_class.config.from_pretrained(plm_config.model_path)
    # you can change huggingface model_config here
    # if 't5'  in plm_config.model_name: # remove dropout according to PPT~\ref{}
    #     model_config.dropout_rate = 0.0
    if 'gpt' in plm_config.model_name: # add pad token for gpt
        if "<pad>" not in config.plm.specials_to_add:
            config.plm.specials_to_add.append("<pad>")
    model = model_class.model.from_pretrained(plm_config.model_path, config=model_config)
    tokenizer = model_class.tokenizer.from_pretrained(plm_config.model_path)
    wrapper = model_class.wrapper
    model, tokenizer = add_special_tokens(model, tokenizer, specials_to_add=config.plm.specials_to_add)
    return model, tokenizer, model_config, wrapper

def add_special_tokens(model: PreTrainedModel,
                       tokenizer: PreTrainedTokenizer,
                       specials_to_add: Optional[List[str]] = None):
    r"""add the special_tokens to tokenizer if the special token
    is not in the tokenizer.

    Args:
        model (:obj:`PreTrainedModel`): The pretrained model to resize embedding
                after adding special tokens.
        tokenizer (:obj:`PreTrainedTokenizer`): The pretrained tokenizer to add special tokens.
        specials_to_add: (:obj:`List[str]`, optional): The special tokens to be added. Defaults to pad token.

    Returns:
        The resized model, The tokenizer with the added special tokens.

    """
    if specials_to_add is None:
        return model, tokenizer
    for token in specials_to_add:
        if "pad" in token.lower():
            if tokenizer.pad_token is None:
                tokenizer.add_special_tokens({'pad_token': token})
                model.resize_token_embeddings(len(tokenizer))
                logger.info("pad token is None, set to id {}".format(tokenizer.pad_token_id))
    return model, tokenizer

In [20]:
plm, tokenizer, model_config, WrapperClass =load_plm("xlm", "xlm-roberta-base")

Downloading:   0%|          | 0.00/615 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.04G [00:00<?, ?B/s]

KeyboardInterrupt: 

In [None]:
from openprompt.prompts import MixedTemplate
promptTemplate = MixedTemplate(
    model=plm,
    text = '{"placeholder": "text_a"} {"soft": "It"} {"soft": "was"} {"mask"}.',
    tokenizer = tokenizer,
)

In [None]:
classes = [ 
    "0",
    "1",
    "2",
]

In [None]:
from openprompt.prompts import SoftVerbalizer
promptVerbalizer = SoftVerbalizer(tokenizer, plm, num_classes=3)

In [None]:
from openprompt import PromptForClassification
promptModel = PromptForClassification(
    template = promptTemplate,
    plm = plm,
    verbalizer = promptVerbalizer,
)

In [None]:

MAX_LEN = 128

from openprompt import PromptDataLoader
data_loader = PromptDataLoader(
    dataset = examples,
    tokenizer = tokenizer,
    template = promptTemplate,
    tokenizer_wrapper_class=WrapperClass,
    batch_size=1,
    max_length=MAX_LEN,
    truncation=True,
    padding="max_length"
)

In [None]:
import torch
from transformers import  AdamW, get_linear_schedule_with_warmup
loss_func = torch.nn.CrossEntropyLoss()
optimizer_grouped_parameters = [
    {'params': [p for n,p in promptModel.template.named_parameters() if "raw_embedding" not in n]}
]
optimizer = AdamW(optimizer_grouped_parameters, lr=1e-3)

optimizer_grouped_parameters1 = [
    {'params': promptModel.verbalizer.group_parameters_1, "lr":3e-5},
    {'params': promptModel.verbalizer.group_parameters_2, "lr":3e-4},
]

optimizer1 = AdamW(optimizer_grouped_parameters1)


In [None]:
promptModel=promptModel.cuda()
for epoch in range(2):
    tot_loss = 0
    for step, inputs in enumerate(data_loader):
        inputs = inputs.cuda()
        logits = promptModel(inputs)
        labels = inputs['label']
        loss = loss_func(logits, labels)
        loss.backward()
        tot_loss += loss.item()
        optimizer.step()
        optimizer.zero_grad()
        optimizer1.step()
        optimizer1.zero_grad()
        print(tot_loss/(step+1))

In [None]:
examples_test=[]
from openprompt.data_utils import InputExample
for i in range(len(X_test)):
  examples_test.append(InputExample(
        guid = i,
        text_a =X_test[i],
        label=y_test[i]
    ))
  

In [None]:
data_loader = PromptDataLoader(
    dataset = examples_test,
    tokenizer = tokenizer,
    template = promptTemplate,
    tokenizer_wrapper_class=WrapperClass,
    batch_size=1,
    max_length=MAX_LEN,
    truncation=True,
    padding="max_length"
)

In [None]:
import torch
torch.cuda.empty_cache()
allpreds = []
alllabels = []
promptModel=promptModel.cuda()
for step, inputs in enumerate(data_loader):
    inputs=inputs.cuda();
    logits = promptModel(inputs)    
    labels = inputs['label']
    alllabels.extend(labels.cpu().tolist())
    allpreds.extend(torch.argmax(logits, dim=-1).cpu().tolist())


In [None]:

def compute_metrics(allpreds,alllabels):
    metric1 = load_metric("precision")
    metric2 = load_metric("recall")
    metric3 = load_metric("f1")
    metric4 = load_metric("accuracy")
    
    predictions, labels = allpreds,alllabels
    precision = metric1.compute(predictions=predictions, references=labels, average="weighted")["precision"]
    recall = metric2.compute(predictions=predictions, references=labels, average="weighted")["recall"]
    f1 = metric3.compute(predictions=predictions, references=labels, average="weighted")["f1"]
    accuracy = metric4.compute(predictions=predictions, references=labels)["accuracy"]
    macro_precision = metric1.compute(predictions=predictions, references=labels, average="macro")["precision"]
    macro_recall = metric2.compute(predictions=predictions, references=labels, average="macro")["recall"]
    macro_f1 = metric3.compute(predictions=predictions, references=labels, average="macro")["f1"]
    return {"accuracy":accuracy, "precision": precision, "recall": recall, "f1": f1, "macro_precision": macro_precision, "macro_recall": macro_recall, "macro_f1": macro_f1}

In [None]:
compute_metrics(allpreds,alllabels)

In [None]:
c=0
for i,j in zip(allpreds, alllabels):
  if(i!=j):
    print(c,X_test[c],"---->","predicted :", uniq[i]," actual :",  uniq[j])

  c+=1

In [None]:
c=0
for i,j in zip(allpreds, alllabels):
  if(i==j):
    print(c,X_test[c],"---->","predicted :", uniq[i]," actual :",  uniq[j])
    
  c+=1