# Imports

In [None]:
# General imports
import numpy as np
import pandas as pd
from IPython.display import display
import random
import shutil
import os

# Neural network imports
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras.callbacks import EarlyStopping
from transformers import TFAutoModel, AutoTokenizer
from sklearn.model_selection import GroupKFold
!pip install /kaggle/input/autocorrect/autocorrect-2.6.1.tar
from autocorrect import Speller

# Initializing autocorrect
spell = Speller(lang='en', fast=True)

In [None]:
class CFG:
    epochs=12
    pre_trained_model_name="/kaggle/input/deberta-v3-large/deberta_v3_large/"
    final_model_path = f'full_model_scaled-{epochs}.keras'
    learning_rate=0.00015
    weight_decay=1e-4
    warmup_steps=100
    hidden_dropout_prob=0.
    attention_probs_dropout_prob=0.
    n_splits=4
    batch_size=4
    random_seed=42
    max_length=1575
    embeddings_len=1024

In [None]:
def seed_everything(random_seed):
    
    os.environ['PYTHONHASHSEED'] = str(random_seed)
    np.random.seed(random_seed)
    tf.random.set_seed(random_seed)
    random.seed(random_seed)
    keras.utils.set_random_seed(random_seed)

In [None]:
def move_to_working_folder(source_path, destination_path):
    shutil.copy(source_path, destination_path)

# Preprocess Input

In [None]:
prefix1 = "Think through this step by step: "
prefix2 = "Pay attention to the content and wording: "

# This function creates input ids, attention mask and head mask
def preprocess(summary, prompt_question, prompt_text, tokenizer, is_demo=False):
    
    sep = f" {tokenizer.sep_token} " 
    
    if is_demo:
        summary = prefix1 + prompt_question + sep + prefix2 + spell(summary) + sep + prompt_text
        tokenized = tokenizer(summary,
                              add_special_tokens=False,
                              truncation=True,
                              padding='max_length',
                              return_tensors='tf',
                              max_length=CFG.max_length,
                              return_attention_mask=True)
    else:
        summary = prefix1 + prompt_question + sep + prefix2 + summary.apply(spell) + sep + prompt_text
        tokenized = tokenizer.batch_encode_plus(summary.tolist(),
                                                add_special_tokens=False,
                                                truncation=True,
                                                padding='max_length',
                                                return_tensors='tf',
                                                max_length=CFG.max_length,
                                                return_attention_mask=True)
    
    input_ids = tokenized['input_ids']
    attention_mask = tokenized['attention_mask']

    # Create head mask
    head_mask = np.zeros(input_ids.shape)
    for i, summ in enumerate(input_ids.numpy()):
        use_full = False
        for j, token in enumerate(summ):
            if token == tokenizer.sep_token_id:
                use_full = not use_full  
            elif token == tokenizer.pad_token_id:
                break
            head_mask[i][j] = (1. if use_full else 0.) 
    return [input_ids.numpy(), attention_mask.numpy(), head_mask.astype(np.float16)]

# Define Model

In [None]:
# Creates a model that wraps the pre trained model
@keras.utils.register_keras_serializable()
class PreTrainedModel(keras.Model):
    def __init__(self, model_path, trainable=False, num_layers_to_freeze=0, name=None, **kwargs):
        super().__init__(name=name, **kwargs)
        self.model_path = model_path
        self.trainable = trainable
        self.num_layers_to_freeze = num_layers_to_freeze
        
        # Load model and tokenizer
        self.model = TFAutoModel.from_pretrained(model_path + "model") 
        self.tokenizer = AutoTokenizer.from_pretrained(model_path + "tokenizer")
        
        # Define model configurations
        self.model.trainable = self.trainable
        self.model.config.hidden_dropout_prob = CFG.hidden_dropout_prob
        self.model.config.attention_probs_dropout_prob = CFG.attention_probs_dropout_prob
        
        # Freeze layers if trainable
        if self.trainable:
            self.model.trainable = self.trainable
            if self.trainable:
                for layer in self.model.layers[0].encoder.layer[:self.num_layers_to_freeze]:
                    layer.trainable = False

    # Call the pre trained model and get the all hidden state
    def call(self, input_ids, attention_mask):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True)
        return output.hidden_states
    
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'model_path': self.model_path,
            'trainable': self.trainable,
            'num_layers_to_freeze': self.num_layers_to_freeze
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)
    
@keras.utils.register_keras_serializable()
def build_deberta():
    return PreTrainedModel(CFG.pre_trained_model_name, name="deberta_layer")

In [None]:
# Define layers for head mask step

@keras.utils.register_keras_serializable()
class ExpandDimsLayer(layers.Layer):
    def __init__(self, **kwargs):
        super(ExpandDimsLayer, self).__init__(**kwargs)

    def call(self, inputs):
        return tf.expand_dims(tf.cast(inputs, dtype=tf.float32), axis=-1)

@keras.utils.register_keras_serializable()
class MaskedEmbeddingsLayer(layers.Layer):
    def __init__(self, **kwargs):
        super(MaskedEmbeddingsLayer, self).__init__(**kwargs)

    def call(self, inputs):
        hidden_state, h_mask = inputs
        return tf.multiply(hidden_state, h_mask)

### Define Loss Function

In [None]:
# The loss function
@keras.utils.register_keras_serializable()
def mcrmse(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float16)
    y_pred = tf.cast(y_pred, tf.float16)
    columnwise_mse = tf.reduce_mean(tf.square(y_true - y_pred), axis=0)
    return tf.reduce_mean(tf.sqrt(columnwise_mse), axis=-1)

# Generate Predictions

In [None]:
def generate_predictions(model, data):
    contents = []
    wordings = []
    ids = []
    predictions = model.predict(x=[data['input_ids'], data['attention_mask'], data['head_mask']],
                                batch_size=CFG.batch_size)

    for idx, output in enumerate(predictions):
        contents.append(output[0])
        wordings.append(output[1])
        ids.append(data['student_id'][idx])

    contents = np.exp(contents) - 3
    wordings = np.exp(wordings) - 3
        
    return ids, contents, wordings