## Single Model Inference (deberta-large)

#### **Private Test Set:**
FB1 metric: **0.7304637538407162**

In [None]:
import codecs
from text_unidecode import unidecode

def replace_encoding_with_utf8(error):
    return error.object[error.start : error.end].encode("utf-8"), error.end


def replace_decoding_with_cp1252(error):
    return error.object[error.start : error.end].decode("cp1252"), error.end


# Register the encoding and decoding error handlers for `utf-8` and `cp1252`.
codecs.register_error("replace_encoding_with_utf8", replace_encoding_with_utf8)
codecs.register_error("replace_decoding_with_cp1252", replace_decoding_with_cp1252)

def resolve_encodings_and_normalize(text):
    """Resolve the encoding problems and normalize the abnormal characters."""
    text = (
        text.encode("raw_unicode_escape")
        .decode("utf-8", errors="replace_decoding_with_cp1252")
        .encode("cp1252", errors="replace_encoding_with_utf8")
        .decode("utf-8", errors="replace_decoding_with_cp1252")
    )
    text = unidecode(text)
    return text

def clean_text(text):
    text = text.replace(u'\xa0', u' ')
    text = text.replace(u'\x85', u'\n')
    text = text.strip()
    text = resolve_encodings_and_normalize(text)

    return text

In [None]:
import torch
import re
import numpy as np
import random
import pytorch_lightning as pl
from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):
    def __init__(self, texts, tokenizer, cfg):
        
        self.texts = texts
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, index):

        text = clean_text(self.texts[index])

        tokens = self.tokenizer(
            text,
            add_special_tokens = True,
            return_offsets_mapping=True
            )
        
        input_ids = torch.LongTensor(tokens['input_ids'])
        attention_mask = torch.LongTensor(tokens['attention_mask'])
        offset_mapping = np.array(tokens['offset_mapping'])
        offset_mapping = self.strip_offset_mapping(text, offset_mapping)

        # token slices of words
        woff = self.get_word_offsets(text)
        toff = offset_mapping
        wx1, wx2 = woff.T
        tx1, tx2 = toff.T
        ix1 = np.maximum(wx1[..., None], tx1[None, ...])
        ix2 = np.minimum(wx2[..., None], tx2[None, ...])
        ux1 = np.minimum(wx1[..., None], tx1[None, ...])
        ux2 = np.maximum(wx2[..., None], tx2[None, ...])
        ious = (ix2 - ix1).clip(min=0) / (ux2 - ux1 + 1e-12)
        assert (ious > 0).any(-1).all()

        word_boxes = []
        for row in ious:
            inds = row.nonzero()[0]
            word_boxes.append([inds[0], 0, inds[-1] + 1, 1])
        word_boxes = torch.FloatTensor(word_boxes)

        return dict(text=text, input_ids=input_ids, attention_mask=attention_mask, word_boxes=word_boxes)

    def strip_offset_mapping(self, text, offset_mapping):
        ret = []
        for start, end in offset_mapping:
            match = list(re.finditer('\S+', text[start:end]))
            if len(match) == 0:
                ret.append((start, end))
            else:
                span_start, span_end = match[0].span()
                ret.append((start + span_start, start + span_end))
        return np.array(ret)

    def get_word_offsets(self, text):
        matches = re.finditer("\S+", text)
        spans = []
        words = []
        for match in matches:
            span = match.span()
            word = match.group()
            spans.append(span)
            words.append(word)
        assert tuple(words) == tuple(text.split())
        return np.array(spans)
    
class CustomCollator(object):
    def __init__(self, pad_token_id):
        self.pad_token_id = pad_token_id

    def __call__(self, samples):
        batch_size = len(samples)
        assert batch_size == 1, f'Only batch_size=1 supported, got batch_size={batch_size}.'

        sample = samples[0]
        
        max_seq_length = len(sample['input_ids'])
        padded_length = max_seq_length

        input_shape = (1, padded_length)
        input_ids = torch.full(input_shape,
                               self.pad_token_id,
                               dtype=torch.long)
        attention_mask = torch.zeros(input_shape, dtype=torch.long)

        seq_length = len(sample['input_ids'])
        input_ids[0, :seq_length] = sample['input_ids']
        attention_mask[0, :seq_length] = sample['attention_mask']

        text = sample['text']
        word_boxes = sample['word_boxes']

        return dict(text=text,
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    word_boxes=word_boxes)
    
class TextDataModule(pl.LightningDataModule):
    def __init__(
        self,
        texts = None,
        tokenizer = None,
        cfg = None,
    ):
        super().__init__()
        self.texts = texts
        self.tokenizer = tokenizer
        self.cfg = cfg

    def setup(self, stage):
        if stage == 'predict':
            self.predict_dataset = TextDataset(self.texts, self.tokenizer, self.cfg)
        else:
            raise Exception()

    def predict_dataloader(self):
        custom_collator = CustomCollator(self.tokenizer.pad_token_id)
        return DataLoader(self.predict_dataset, **self.cfg["val_loader"], collate_fn=custom_collator)

In [None]:
import pytorch_lightning as pl
from transformers import AutoConfig, AutoModel, AutoTokenizer
from transformers import get_polynomial_decay_schedule_with_warmup, get_cosine_schedule_with_warmup

import pandas as pd
import yaml

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.ops import roi_align, nms

def aggregate_tokens_to_words(feat, word_boxes):
    feat = feat.permute(0, 2, 1).unsqueeze(2)
    output = roi_align(feat, [word_boxes], 1, aligned=True)
    return output.squeeze(-1).squeeze(-1)

def span_nms(start, end, score, nms_thr):
    boxes = torch.stack(
        [
            start,
            torch.zeros_like(start),
            end,
            torch.ones_like(start),
        ],
        dim=1,
    ).float()
    keep = nms(boxes, score, nms_thr)
    return keep

def get_pred(col):
    def row_wise(row):
        return " ".join([str(x) for x in range(row.start,row.end)])
    return row_wise

def make_preds(preds, ids, th=0.303):

    r = []

    for text_id, (obj_pred, reg_pred, cls_pred, eff_pred) in zip(ids, preds):

        obj_pred = obj_pred.sigmoid()
        reg_pred = reg_pred.exp()
        cls_pred = cls_pred.sigmoid()
        eff_pred = eff_pred.softmax(-1)

        obj_scores = obj_pred
        cls_scores, cls_labels = cls_pred.max(-1)
        eff_scores = eff_pred
        pr_scores = (obj_scores * cls_scores)**0.5

        pos_inds = pr_scores > 0.5

        if pos_inds.sum() == 0:
            continue
        
        pr_score, pr_label, pr_eff = pr_scores[pos_inds], cls_labels[pos_inds], eff_scores[pos_inds]
        pos_loc = pos_inds.nonzero().flatten()
        start = pos_loc - reg_pred[pos_inds, 0]
        end = pos_loc + reg_pred[pos_inds, 1]

        min_idx, max_idx = 0, obj_pred.numel() - 1
        start = start.clamp(min=min_idx, max=max_idx).round().long()
        end = end.clamp(min=min_idx, max=max_idx).round().long()

        # nms
        keep = span_nms(start, end, pr_score, th)
        start = start[keep]
        end = end[keep]
        pr_score = pr_score[keep]
        pr_label = pr_label[keep]
        pr_eff = pr_eff[keep]

        res = dict(
            id=text_id,
            start=start.cpu().numpy(),
            end=end.cpu().numpy(),
            score_discourse_type=pr_score.cpu().numpy(),
            discourse_type=pr_label.cpu().numpy(),
            score_discourse_effectiveness_0 = pr_eff[:,0].cpu().numpy() + pr_eff[:,2].cpu().numpy(),
            score_discourse_effectiveness_1 = pr_eff[:,1].cpu().numpy(),
        )

        res = pd.DataFrame(res).sort_values('start').reset_index(drop=True)
        res['predictionstring'] = res.apply(get_pred(' '),axis=1)
        
        r.append(res)

    return pd.concat(r,axis=0).reset_index(drop=True)

class TextModel(pl.LightningModule):
    def __init__(self, cfg, config_path=None):

        super().__init__()
        self.cfg = cfg

        model_cfg = cfg['model']

        self.config = torch.load(config_path)    
        self.backbone = AutoModel.from_config(self.config)

        hidden_size = self.config.hidden_size
        self.fc = nn.Linear(hidden_size, 1+2+7+3)
        
    def forward(self, inputs):
        x = self.backbone(**inputs).last_hidden_state
        x = self.fc(x)
        return x

    def forward_logits(self, data):

        batch_size = data['input_ids'].size(0)
        assert batch_size == 1, f'Only batch_size=1 supported, got batch_size={batch_size}.'
        
        inputs = {
            'input_ids': data['input_ids'],
            'attention_mask': data['attention_mask'],
        }
        
        logits = self(inputs)

        logits = aggregate_tokens_to_words(logits, data['word_boxes'])
        assert logits.size(0) == data['text'].split().__len__()

        obj_pred = logits[..., 0]
        reg_pred = logits[..., 1:3]
        cls_pred = logits[..., 3:-3]
        eff_pred = logits[..., -3:]
        
        return obj_pred, reg_pred, cls_pred, eff_pred

    def predict_step(self, data, batch_idx):
        obj_pred, reg_pred, cls_pred, eff_pred = self.forward_logits(data)

        return obj_pred.detach().cpu(), reg_pred.detach().cpu(), cls_pred.detach().cpu(), eff_pred.detach().cpu()

In [None]:
def predict(model, datamodule, ids):
    preds = trainer.predict(model, datamodule=datamodule)
    pred_df = make_preds(preds, ids, 0.303)
    
    return pred_df

def predict_single(model, text):
    datamodule = TextDataModule(texts=[text], tokenizer=tokenizer, cfg=cfg)

    preds = trainer.predict(model, datamodule=datamodule)
    pred_df = make_preds(preds, ['NO_ID'], 0.303)
    
    return pred_df

In [None]:
tokenizer = AutoTokenizer.from_pretrained('/kaggle/input/tlab-dataset/tokenizer/tokenizer')
config_path = '/kaggle/input/tlab-dataset/config.pth'

with open('/kaggle/input/tlab-dataset/deberta_large_025.yml', 'r') as f:
    cfg = yaml.safe_load(f)
    
trainer = pl.Trainer(logger=False, **cfg['trainer'])

model = TextModel(cfg, config_path)

state_dict = torch.load('/kaggle/input/tlab-dataset/best_deberta_large.ckpt')['state_dict']

for key in list(state_dict.keys()):
    state_dict[key] = state_dict.pop(key)

model.load_state_dict(state_dict, strict=True)

In [None]:
df = pd.read_csv('/kaggle/input/tlab-dataset/persuade_corpus.csv', low_memory=False)
df = df[df['test_split_feedback_1'] == 'Private'].reset_index(drop=True)
df = df[['essay_id', 'full_text']].drop_duplicates('essay_id').reset_index(drop=True)

texts = df['full_text'].values.tolist()[:10]
ids = df['essay_id'].values.tolist()[:10]

datamodule = TextDataModule(texts=texts, tokenizer=tokenizer, cfg=cfg)

In [None]:
pred_df = predict(model, datamodule, ids)
pred_df

In [None]:
pred_df = predict_single(model, texts[0])
pred_df