#### Environment initialization

In [None]:
import os
import sys

if 'KAGGLE_URL_BASE' in os.environ:
    print('Running on Kaggle so initializing the environment...')

    os.environ['__KGLTBX_ENVIRONMENT'] = 'kaggle'
    sys.path.append('/kaggle/input/kaggle-toolbox')
    sys.path.append('/kaggle/input/lib-textstat')

#### Imports

In [None]:
import functools
import os
import typing as t
from pathlib import Path

import kaggle_toolbox.features.generation as features
import kaggle_toolbox.nlp.features as text_features
import numpy as np
import pandas as pd
import torch
from catboost import CatBoostRegressor
from kaggle_toolbox.data import DatasetItem, Movable
from kaggle_toolbox.device import CUDADevice
from kaggle_toolbox.ensembling import EnsemblingStrategy, MeanEnsemblingStrategy
from kaggle_toolbox.environment import Environment
from kaggle_toolbox.layers import SqueezeDim
from kaggle_toolbox.nlp.transformer import Backbone, Model as TransformerModel, StandardModel, \
    MeanPooler, AttentionHeadPooler, TakeNthSqueezer, ConcatSqueezer, get_tokenizer_for_backbone, \
    Tokenizer, TokenizerResult, TokenizerResultCollator, seed_everything
from kaggle_toolbox.prediction import PredDict
from kaggle_toolbox.predictor import StandardPredictor
from kaggle_toolbox.progress import NotebookProgressBar
from kaggle_toolbox.typing import DynamicDict
from textstat import textstat
from torch.utils.data import Dataset as TorchDataset, default_collate as default_collate_fn
from transformers.data.data_collator import DataCollatorWithPadding
from transformers.utils.generic import PaddingStrategy
from transformers.utils.logging import set_verbosity_error as set_transformers_verbosity_error


NotebookProgressBar.attach_to_pandas()
set_transformers_verbosity_error()

#### Collator

In [None]:
_X = t.TypeVar('_X', bound=Movable)

class DatasetItemCollator(t.Generic[_X]):

    def __init__(
            self,
            x_collate_fn: t.Callable[[t.List[_X]], _X],
            id_collate_fn: t.Callable[[t.List[t.List[str]]], t.List[str]] = default_collate_fn):
        self._x_collate_fn = x_collate_fn
        self._id_collate_fn = id_collate_fn

    def __call__(self, item_list: t.List[DatasetItem[_X]]) -> DatasetItem[_X]:
        return DatasetItem(
            id=self._id_collate_fn([item.id for item in item_list]),
            x=self._x_collate_fn([item.x for item in item_list]))

#### Dataset

In [None]:
class Dataset(TorchDataset[DatasetItem[TokenizerResult]]):

    def __init__(
            self,
            df: pd.DataFrame,
            tokenizer: Tokenizer,
            max_len: int):
        self._df = df.copy().reset_index(drop=True)
        self._tokenizer = tokenizer
        self._max_len = max_len

    def _get_tokenizer_input(self, row: DynamicDict) -> str:
        (
            full_text,
         ) = (
            row.get_typed_or_raise('full_text', str),
         )

        return full_text

    def sort_by_tokenizer_input_len(self):
        self._df['_tok_input_len'] = self._df.progress_apply(
            lambda row: self._get_tokenizer_input(DynamicDict(t.cast(t.Dict[str, t.Any], row))), axis=1)
        self._df = self._df.sort_values('_tok_input_len')

    def __len__(self) -> int:
        return len(self._df)

    def __getitem__(self, idx: int) -> DatasetItem[TokenizerResult]:
        row = self._df.iloc[idx]

        tokenizer_input = self._get_tokenizer_input(DynamicDict(t.cast(t.Dict[str, t.Any], row)))
        id = str(row['text_id'])

        tokenizer_result = self._tokenizer.tokenize(
            tokenizer_input, max_len=self._max_len)

        return DatasetItem(
            id=[id],
            x=tokenizer_result)

#### Parameters

In [None]:
ENVIRONMENT = os.getenv('__KGLTBX_ENVIRONMENT', 'laptop')
_env = _env = Environment(ENVIRONMENT)

TARGET_LIST = [
    'cohesion',
    'syntax',
    'vocabulary',
    'phraseology',
    'grammar',
    'conventions',
]

SEED = 42
NUM_FOLDS = 5
DEVICE = CUDADevice()
MAX_LEN = 1024
BATCH_SIZE = _env.param(
    kaggle=8,
    # colab=4,
    laptop=2)
NUM_WORKERS = _env.param(kaggle=2, colab=2, laptop=4)

ROOT_DIR = _env.param(
    kaggle=Path('/kaggle'),
    laptop=Path('/kaggle'))
DATA_DIR = _env.param(
    kaggle=ROOT_DIR / 'input',
    laptop=ROOT_DIR / 'data')
MODEL_DIR = _env.param(
    kaggle=DATA_DIR,
    laptop=ROOT_DIR / 'models')
FP_ELL_DATASET_DIR = _env.param(
    kaggle=DATA_DIR / 'feedback-prize-english-language-learning',
    laptop=DATA_DIR / 'fp-ell')
INPUT_CSV_PATH = _env.param(
    kaggle=FP_ELL_DATASET_DIR / 'test.csv',
    laptop=FP_ELL_DATASET_DIR / 'train.csv')
OUTPUT_CSV_PATH = _env.param(
    kaggle=ROOT_DIR / 'working/submission.csv',
    laptop=ROOT_DIR / 'submission/submission.csv')

BACKBONE = 'microsoft/deberta-v3-base'
BACKBONE_PATH = _env.param(
    kaggle=str(DATA_DIR / 'deberta-v3-base/deberta-v3-base'),
    laptop='microsoft/deberta-v3-base')


In [None]:
print(f'GPU model: {DEVICE.get_name()}')

#### Pinning the seed

In [None]:
seed_everything(seed=SEED)

#### Data loading

In [None]:
def _read_data() -> pd.DataFrame:
    return pd.read_csv(INPUT_CSV_PATH).iloc[:60]

all_df = _read_data()
all_df.head(3)

#### Model builders

##### Cohesion

In [None]:
def _build_cohesion_fold_model(fold: int) -> TransformerModel[TokenizerResult]:
    backbone = Backbone.from_huggingface_checkpoint(BACKBONE_PATH, zero_out_dropout=True)
    model = StandardModel(
        backbone=backbone,
        squeezer=TakeNthSqueezer(),
        pooler=AttentionHeadPooler(backbone.out_dim_size),
        dnn=torch.nn.Sequential(
            torch.nn.LayerNorm(backbone.out_dim_size),
            torch.nn.Linear(backbone.out_dim_size, 1),
            SqueezeDim(),
        ))
    model.load_state_dict(torch.load(
        MODEL_DIR / f'fp-ell-transformer-training-cohesion/cohesion-v1-layer_norm-ep_4-valfreq_0p25-pooler_att-full-fold_{fold}.pt'))
    return model

##### Syntax

In [None]:
def _build_syntax_fold_model(fold: int) -> TransformerModel[TokenizerResult]:
    backbone = Backbone.from_huggingface_checkpoint(BACKBONE_PATH, zero_out_dropout=True)
    model = StandardModel(
        backbone=backbone,
        squeezer=TakeNthSqueezer(),
        pooler=MeanPooler(),
        dnn=torch.nn.Sequential(
            torch.nn.LayerNorm(backbone.out_dim_size),
            torch.nn.Linear(backbone.out_dim_size, 1),
            SqueezeDim(),
        ))
    model.load_state_dict(torch.load(
        MODEL_DIR / f'fp-ell-transformer-training-syntax/syntax-v1-layer_norm-ep_3-valfreq_0p25-full-fold_{fold}.pt'))
    return model

##### Vocabulary

In [None]:
def _build_vocabulary_fold_model(fold: int) -> TransformerModel[TokenizerResult]:
    backbone = Backbone.from_huggingface_checkpoint(BACKBONE_PATH, zero_out_dropout=True)
    model = StandardModel(
        backbone=backbone,
        squeezer=TakeNthSqueezer(),
        pooler=MeanPooler(),
        dnn=torch.nn.Sequential(
            torch.nn.LayerNorm(backbone.out_dim_size),
            torch.nn.Linear(backbone.out_dim_size, 1),
            SqueezeDim(),
        ))
    model.load_state_dict(torch.load(
        MODEL_DIR / f'fp-ell-transformer-training-vocabulary/vocabulary-v1-layer_norm-ep_3-valfreq_0p25-std_init-fold_{fold}.pt'))
    return model

##### Phraseology

In [None]:
def _build_phraseology_fold_model(fold: int) -> TransformerModel[TokenizerResult]:
    backbone = Backbone.from_huggingface_checkpoint(BACKBONE_PATH, zero_out_dropout=True)
    model = StandardModel(
        backbone=backbone,
        squeezer=TakeNthSqueezer(),
        pooler=MeanPooler(),
        dnn=torch.nn.Sequential(
            torch.nn.LayerNorm(backbone.out_dim_size),
            torch.nn.Linear(backbone.out_dim_size, 1),
            SqueezeDim(),
        ))
    model.load_state_dict(torch.load(
        MODEL_DIR / f'fp-ell-transformer-training-phraseology/phraseology-v1-layer_norm-ep_3-valfreq_0p25-std_init-full-fold_{fold}.pt'))
    return model

##### Grammar

In [None]:
def _build_grammar_fold_model(fold: int) -> TransformerModel[TokenizerResult]:
    backbone = Backbone.from_huggingface_checkpoint(BACKBONE_PATH, zero_out_dropout=True)
    model = StandardModel(
        backbone=backbone,
        squeezer=ConcatSqueezer([9, 10, 11, 12]),
        pooler=MeanPooler(),
        dnn=torch.nn.Sequential(
            torch.nn.LayerNorm(backbone.out_dim_size * 4),
            torch.nn.Linear(backbone.out_dim_size * 4, 1),
            SqueezeDim(),
        ))
    model.load_state_dict(torch.load(
        MODEL_DIR / f'fp-ell-transformer-training-grammar/grammar-v1-lnorm-ep_4-valfreq_0p25-sqzr_cat_9_to_12-full-fold_{fold}.pt'))
    return model

##### Conventions

In [None]:
def _build_conventions_fold_model(fold: int) -> TransformerModel[TokenizerResult]:
    backbone = Backbone.from_huggingface_checkpoint(BACKBONE_PATH, zero_out_dropout=True)
    model = StandardModel(
        backbone=backbone,
        squeezer=TakeNthSqueezer(),
        pooler=MeanPooler(),
        dnn=torch.nn.Sequential(
            torch.nn.LayerNorm(backbone.out_dim_size),
            torch.nn.Linear(backbone.out_dim_size, 1),
            SqueezeDim(),
        ))
    model.load_state_dict(torch.load(
        MODEL_DIR / f'fp-ell-transformer-training-conventions/conventions-v1-layer_norm-ep_3-valfreq_0p25-full-fold_{fold}.pt'))
    return model

#### Entrypoint

In [None]:
def _predict_by_model(df: pd.DataFrame, model_builder: t.Callable[[], TransformerModel[TokenizerResult]]) -> PredDict:
    tokenizer = get_tokenizer_for_backbone(
        backbone=BACKBONE,
        checkpoint=BACKBONE_PATH,
        padding_strategy=PaddingStrategy.DO_NOT_PAD)
    predictor = StandardPredictor(
        model=model_builder(),
        batch_size=BATCH_SIZE,
        num_workers=NUM_WORKERS,
        collator=DatasetItemCollator(
            id_collate_fn=lambda x: sum(x, []),
            x_collate_fn=TokenizerResultCollator(DataCollatorWithPadding(tokenizer.tokenizer))),
        device=DEVICE,
        progress_bar=NotebookProgressBar())

    dataset = Dataset(df, tokenizer=tokenizer, max_len=MAX_LEN)
    return predictor.predict(dataset)


def _predict(
        df: pd.DataFrame,
        model_builder_list: t.List[t.Callable[[], TransformerModel[TokenizerResult]]],
        ensembling_strategy: EnsemblingStrategy) -> PredDict:
    return ensembling_strategy.ensemble([
        _predict_by_model(df=df, model_builder=model_builder)
        for model_builder in model_builder_list
    ])


cohesion_df = pd.DataFrame([
    {'text_id': id, 'cohesion_lvl1_score': score}
    for id, (score,) in _predict(
        df=all_df,
        model_builder_list=[
            functools.partial(_build_cohesion_fold_model, fold=fold)
            for fold in range(NUM_FOLDS)
        ],
        ensembling_strategy=MeanEnsemblingStrategy()).items()
])
syntax_df = pd.DataFrame([
    {'text_id': id, 'syntax_lvl1_score': score}
    for id, (score,) in _predict(
        df=all_df,
        model_builder_list=[
            functools.partial(_build_syntax_fold_model, fold=fold)
            for fold in range(NUM_FOLDS)
        ],
        ensembling_strategy=MeanEnsemblingStrategy()).items()
])
vocabulary_df = pd.DataFrame([
    {'text_id': id, 'vocabulary_lvl1_score': score}
    for id, (score,) in _predict(
        df=all_df,
        model_builder_list=[
            functools.partial(_build_vocabulary_fold_model, fold=fold)
            for fold in range(NUM_FOLDS)
        ],
        ensembling_strategy=MeanEnsemblingStrategy()).items()
])
phraseology_df = pd.DataFrame([
    {'text_id': id, 'phraseology_lvl1_score': score}
    for id, (score,) in _predict(
        df=all_df,
        model_builder_list=[
            functools.partial(_build_phraseology_fold_model, fold=fold)
            for fold in range(NUM_FOLDS)
        ],
        ensembling_strategy=MeanEnsemblingStrategy()).items()
])
grammar_df = pd.DataFrame([
    {'text_id': id, 'grammar_lvl1_score': score}
    for id, (score,) in _predict(
        df=all_df,
        model_builder_list=[
            functools.partial(_build_grammar_fold_model, fold=fold)
            for fold in range(NUM_FOLDS)
        ],
        ensembling_strategy=MeanEnsemblingStrategy()).items()
])
conventions_df = pd.DataFrame([
    {'text_id': id, 'conventions_lvl1_score': score}
    for id, (score,) in _predict(
        df=all_df,
        model_builder_list=[
            functools.partial(_build_conventions_fold_model, fold=fold)
            for fold in range(NUM_FOLDS)
        ],
        ensembling_strategy=MeanEnsemblingStrategy()).items()
])
all_df = all_df\
    .merge(cohesion_df, left_on='text_id', right_on='text_id')\
    .merge(syntax_df, left_on='text_id', right_on='text_id')\
    .merge(vocabulary_df, left_on='text_id', right_on='text_id')\
    .merge(phraseology_df, left_on='text_id', right_on='text_id')\
    .merge(grammar_df, left_on='text_id', right_on='text_id')\
    .merge(conventions_df, left_on='text_id', right_on='text_id')

In [None]:
_LVL1_SCORE_FEATURE_LIST = [f'{target}_lvl1_score' for target in TARGET_LIST]
_FEATURE_GENERATOR_LIST = [
    # Score-based
    *features.L1Distance.pairwise_from_feature_list(_LVL1_SCORE_FEATURE_LIST),
    features.Mean(name='lvl1_mean', feature_list=_LVL1_SCORE_FEATURE_LIST),
    features.Stdev(name='lvl1_std', feature_list=_LVL1_SCORE_FEATURE_LIST),
    # Custom simple
    text_features.SubstrCount(name='num_commas', substr=','),
    text_features.SubstrCount(name='num_dots', substr='.'),
    text_features.SubstrCount(name='num_colons', substr=':'),
    text_features.SubstrCount(name='num_semicolons', substr=';'),
    text_features.SubstrCount(name='num_ellipsis', substr='...'),
    text_features.SubstrCount(name='num_newlines', substr='\n'),
    text_features.SubstrCount(name='num_spaces', substr=' '),
    # TextStat simple
    text_features.Func(name='syllable_count', func=textstat.syllable_count),
    text_features.Func(name='lexicon_count', func=functools.partial(textstat.lexicon_count, removepunct=True)),
    text_features.Func(name='char_count', func=functools.partial(textstat.char_count, ignore_spaces=True)),
    text_features.Func(name='letter_count', func=functools.partial(textstat.letter_count, ignore_spaces=True)),
    text_features.Func(name='polysyllabcount', func=functools.partial(textstat.polysyllabcount)),
    text_features.Func(name='monosyllabcount', func=functools.partial(textstat.monosyllabcount)),
    # Custom complex
    features.Div(name='ratio_commas', lhs_feature='num_commas', rhs_feature='char_count'),
    features.Div(name='ratio_dots', lhs_feature='num_dots', rhs_feature='char_count'),
    features.Div(name='ratio_colons', lhs_feature='num_colons', rhs_feature='char_count'),
    features.Div(name='ratio_semicolons', lhs_feature='num_semicolons', rhs_feature='char_count'),
    features.Div(name='ratio_ellipsis', lhs_feature='num_ellipsis', rhs_feature='char_count'),
    features.Div(name='ratio_newlines', lhs_feature='num_newlines', rhs_feature='char_count'),
    features.Div(name='ratio_spaces', lhs_feature='num_spaces', rhs_feature='char_count'),
    # TextStat complex
    text_features.Func(name='flesch_reading_ease', func=textstat.flesch_reading_ease),
    text_features.Func(name='flesch_kincaid_grade', func=textstat.flesch_kincaid_grade),
    text_features.Func(name='gunning_fog', func=textstat.gunning_fog),
    text_features.Func(name='smog_index', func=textstat.smog_index),
    text_features.Func(name='automated_readability_index', func=textstat.automated_readability_index),
    text_features.Func(name='coleman_liau_index', func=textstat.coleman_liau_index),
    text_features.Func(name='linsear_write_formula', func=textstat.linsear_write_formula),
    text_features.Func(name='dale_chall_readability_score', func=textstat.dale_chall_readability_score),
    text_features.Func(name='text_standard', func=functools.partial(textstat.text_standard, float_output=True)),  # type: ignore
    text_features.Func(name='spache_readability', func=textstat.spache_readability),
    text_features.Func(name='mcalpine_eflaw', func=textstat.mcalpine_eflaw),
    text_features.Func(name='reading_time', func=functools.partial(textstat.reading_time, ms_per_char=14.69)),
]

def build_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    text_srs = df['full_text']

    feature_arr_dict = text_features.generate_text_features(
        generator_list=_FEATURE_GENERATOR_LIST,
        text_seq=text_srs.tolist(),
        init_feature_array_dict={
            f'{target}_lvl1_score': df[f'{target}_lvl1_score'].values
            for target in TARGET_LIST
        })  # type: ignore
    for feature_name, feature_arr in feature_arr_dict.items():
        df[feature_name] = feature_arr

    return df


all_df = build_features(all_df)
lvl1_features = all_df[[col for col in all_df.columns if col not in {'text_id', 'full_text'}]].values

In [None]:
def predict_lvl2(feature_arr: np.ndarray, target: str) -> np.ndarray:
    pred_arr_list = []
    for fold in range(NUM_FOLDS):
        regressor = CatBoostRegressor(task_type='GPU').load_model(
            str(MODEL_DIR / f'fp-ell-models-boosting/lvl2-catboost-{target}-cv1-fold_{fold}.cbm'))
        pred_arr_list.append(regressor.predict(feature_arr))
    return np.stack(pred_arr_list, axis=0).mean(axis=0)

In [None]:
all_df['cohesion'] = predict_lvl2(lvl1_features, target='cohesion')
all_df['syntax'] = predict_lvl2(lvl1_features, target='syntax')
all_df['vocabulary'] = predict_lvl2(lvl1_features, target='vocabulary')
all_df['phraseology'] = predict_lvl2(lvl1_features, target='phraseology')
all_df['grammar'] = predict_lvl2(lvl1_features, target='grammar')
all_df['conventions'] = all_df['conventions_lvl1_score']

pred_df = all_df[['text_id', *TARGET_LIST]]

In [None]:
pred_df.head(3)

In [None]:
pred_df.to_csv(OUTPUT_CSV_PATH, index=False)