### ELECTRA : Efficiently Learning an Encodeer that classifies Token Replacement Accurately

- [ELECTRA: Pre-training Text Encoders as Discriminators Rather Than Generators, ICLR’20](https://arxiv.org/abs/2003.10555)
- 작성자 : 220200013 이해중


기존의 BERT, GPT-2 모델과 달리 **다른 형태(GAN : generative adversial network과 유사)** 로 pre-trained 된 모델로 ELECTRA 학습을 위한 전체 모델 구조는 Genrator와 Discriminator로, 두 개의 네트워크를 필요로 한다.

- Generator : `BERT`에서 사용된 마스크 언어 모델(**M**asked **L**anguage **M**odel)과 동일하게 구성됨 (언어의 특성을 잘 학습할 수 있도록 함)
- Discriminator : Discriminator는 입력 토큰 시퀀스에 대해서 각 토큰이 **original**인지 **replaced**인지 binary classification Task

#### Generator 

1. 입력 $\bf{x}$ = $[x_1, x_2, ... , x_n]$에 대해서 마스킹할 위치의 집합 $\bf{m}$ = $[m_1, m_2, m_3, ... , m_k]$을 결정한다.
    - 모든 마스킹 위치는 $1$과 $n$ 사이의 정수로, 다음과 같이 uniform distribution을 사용하여 할당할 수 있다.
    - $m_i \sim uniform (1, n) \text{ for } i = 1 \text{ to } k $
    - 마스킹할 $k$ 수는 보통 $\frac{15}{100}\times n$을 사용한다. (전체 토큰의 15%)
    
    
    
2. 결정한 위치에 있는 입력 토큰을 $[MASK]$로 치환한다.
    - 이 과정을 $\bf{x}^{masked} = REPLACE$ $(\bf{x}, \bf{m},$ $[MASK] )$로 표현
3. 마스킹 된 입력 $\bf{x}^{masked}$에 대해서 **generator**는 아래와 같이 원래의 토큰이 무엇인지 예측할 수 있다.
    - 이 과정을 다음과 같이 표현할 수 있다. ($t$ 번째 토큰에 대한 예측)
    
$$
p_G\left(x_t \mid \bf{x}^{masked}\right)=\exp \left(e\left(x_t\right)^T h_G(\bf{x}^{masked})_t\right) / \sum_{x^{\prime}} \exp \left(e\left(x^{\prime}\right)^T h_G(\bf{x}^{masked})_t\right)
$$

 위 식에서 $e(\cdot)$는 임베딩을 의미하며, 위 식은 language model의 출력 레이어와 임베딩 레이어의 가중치를 공유(weight sharing)하겠다는 의미

4. 최종적으로 아래와 같은 MLM loss로 학습한다.

$$
\mathcal{L}_{\mathrm{MLM}}\left(\mathbf{x}, \theta_G\right)=\mathbb{E}\left(\sum_{i \in \mathbf{m}}-\log p_G\left(x_i \mid \mathbf{x}^{\text {masked }}\right)\right)
$$

![image.png](https://github.com/DeepHaeJoong/SGU_2022_NLP/blob/master/image/electra_01.png?raw=true)

####  Discriminator

Discriminator는 입력 토큰 시퀀스에 대해서 각 토큰이 **original**인지 **replaced**인지 binary classification으로 학습하며, 구체적인 학습 매커니즘은 다음과 같다.

1. Generator를 이용해서 마스킹 된 입력 토큰을 예측하게 된다. (Generator의 학습 메커니즘 1~3 단계 해당됨)
2. Generator에서 마스킹할 위치의 집합 $\bf{m}$에 해당되는 위치의 토큰을 $[MASK]$가 아닌 generator의 softmax 분포 $p_{G}(x_t|\bf{x})$d에 대해 샘플링한 토큰으로 치환(corrupt)함
    - Original input : [**'the'** , 'chef', **'cooked'** , 'the', 'meal']
    - Masked input (Input for generator) : [**[MASK]**, 'chef', **[MASK]**, 'the', 'meal']
    - Input (Input for discriminator) : [$\color{blue}{\text{'the'}}$, 'chef', $\color{red}{\text{'ate'}}$, 'the', 'meal']
    
        - 첫 번째 단어 'the'는 샘플링했는데 원래 입력 토큰 'the'와 동일하게 출력된 경우
        - 세 번째 단어 'ate'는 샘플링했는데 원래 입력 토큰 'cooked'와 다르게 출력된 경우
        
    - 이러한 치환 과정은 다음과 같이 정리할 수 있다.
    
$$
\begin{gathered}
\mathbf{x}^{\text {corrupt }}=\mathrm{REPLACE}(\mathbf{x}, \mathbf{m}, \hat{\mathbf{x}}) \\
\hat{\mathbf{x}} \sim p_G\left(x_i \mid \mathbf{x}^{\text {masked }}\right) \text { for } i \in \mathbf{m}
\end{gathered}
$$

3. 치환된 입력 $\bf{x}^{corrupt}$에 대해서 discriminator는 아래와 같이 각 토큰이 원래 입력과 동일한지 치환된 것인지 예측하게 된다.

    - Target class 
        - Original : 이 위치에 해당하는 토큰은 원본 문장의 토큰과 같은 것
        - Replaced : 이 위치에 해당하는 토큰은 generator에 의해 변형된 것
        
    - 이런 과정을 구학적으로 표현하면 다음과 같다. ($t$번째 토큰에 대한 예측)
$$
D\left(\mathbf{x}^{\text {corrupt }}, t\right)=\operatorname{sigmoid}\left(w^T h_D\left(\mathbf{x}^{\text {corrupt }}\right)_t\right)
$$

4. 최종적으로 Discriminator의 Loss는 다음과 같다.

$$
\begin{aligned}
& \mathcal{L}_{D i s c}\left(\mathbf{x}, \theta_D\right) \\
& =\mathbb{E}\left(\sum_{t=1}^n-1\left(x_t^{\text {corrupt }}=x_t\right) \log D\left(\mathbf{x}^{\text {corrupt }}, t\right)-1\left(x_t^{\text {corrupt }} \neq x_t\right) \log \left(1-D\left(\mathbf{x}^{\text {corrupt }}, t\right)\right)\right. \\
& ) \\
&
\end{aligned}
$$

최종적으로 ELECTRA는 대용량 corpus $\mathcal{X}$에 대해서 generator loss와 discriminator loss의 합을 최소화하는 방향으로 학습하게 된다. $\lambda = 50$으로 설정했다고 한다.

$$
\min _{\theta_G, \theta_D} \sum_{\mathbf{x} \in \mathcal{X}} \mathcal{L}_{\mathrm{MLM}}\left(\mathbf{x}, \theta_G\right)+\lambda \mathcal{L}_{\text {Disc }}\left(\mathbf{x}, \theta_D\right)
$$

Generator 에서 샘플링 과정이 있기 때문에 discriminator loss는 generator로 역전파 되지 않으며, 위의 구조로 pre-training을 마친 뒤에 generator는 버리고 discriminator만 취해서 downstream task으로 fine-tuning을 진행한다.

### code

The following example uses reformer-pytorch, which is available to be pip installed.

In [2]:
#!pip install electra-pytorch
#!pip install reformer_pytorch

#### Electra class 정의

In [6]:
import math
from functools import reduce
from collections import namedtuple

import torch
from torch import nn
import torch.nn.functional as F

# constants

# Electra class는 loss, MLM loss, ..등의 다양한 값들을 tuple type으로 return 함  
Results = namedtuple('Results', [
    'loss',
    'mlm_loss',
    'disc_loss',
    'gen_acc',
    'disc_acc',
    'disc_labels',
    'disc_predictions'
])

# helpers

def log(t, eps=1e-9):
    return torch.log(t + eps)

def gumbel_noise(t):
    noise = torch.zeros_like(t).uniform_(0, 1)
    return -log(-log(noise))

def gumbel_sample(t, temperature = 1.):
    return ((t / temperature) + gumbel_noise(t)).argmax(dim=-1)

def prob_mask_like(t, prob):
    return torch.zeros_like(t).float().uniform_(0, 1) < prob

def mask_with_tokens(t, token_ids):
    init_no_mask = torch.full_like(t, False, dtype=torch.bool)
    mask = reduce(lambda acc, el: acc | (t == el), token_ids, init_no_mask)
    return mask

def get_mask_subset_with_prob(mask, prob):
    batch, seq_len, device = *mask.shape, mask.device
    max_masked = math.ceil(prob * seq_len)

    num_tokens = mask.sum(dim=-1, keepdim=True)
    mask_excess = (mask.cumsum(dim=-1) > (num_tokens * prob).ceil())
    mask_excess = mask_excess[:, :max_masked]

    rand = torch.rand((batch, seq_len), device=device).masked_fill(~mask, -1e9)
    _, sampled_indices = rand.topk(max_masked, dim=-1)
    sampled_indices = (sampled_indices + 1).masked_fill_(mask_excess, 0)

    new_mask = torch.zeros((batch, seq_len + 1), device=device)
    new_mask.scatter_(-1, sampled_indices, 1)
    return new_mask[:, 1:].bool()

In [23]:
class HiddenLayerExtractor(nn.Module):
    def __init__(self, net, layer = -2):
        super().__init__()
        self.net = net
        self.layer = layer

        self.hidden = None
        self.hook_registered = False

    def _find_layer(self):
        if type(self.layer) == str:
            modules = dict([*self.net.named_modules()])
            return modules.get(self.layer, None)
        elif type(self.layer) == int:
            children = [*self.net.children()]
            return children[self.layer]
        return None

    def _hook(self, _, __, output):
        self.hidden = output

    def _register_hook(self):
        layer = self._find_layer()
        assert layer is not None, f'hidden layer ({self.layer}) not found'
        handle = layer.register_forward_hook(self._hook)
        self.hook_registered = True

    def forward(self, x):
        if self.layer == -1:
            return self.net(x)

        if not self.hook_registered:
            self._register_hook()

        _ = self.net(x)
        hidden = self.hidden
        self.hidden = None
        assert hidden is not None, f'hidden layer {self.layer} never emitted an output'
        return hidden

# main electra class

class Electra(nn.Module):
    def __init__(
        self,
        generator,
        discriminator,
        *,
        num_tokens = None,
        discr_dim = -1,
        discr_layer = -1,
        mask_prob = 0.15,
        replace_prob = 0.85,
        random_token_prob = 0.,
        mask_token_id = 2,
        pad_token_id = 0,
        mask_ignore_token_ids = [],
        disc_weight = 50.,
        gen_weight = 1.,
        temperature = 1.):
        super().__init__()

        self.generator = generator
        self.discriminator = discriminator

        if discr_dim > 0:
            self.discriminator = nn.Sequential(
                HiddenLayerExtractor(discriminator, layer = discr_layer),
                nn.Linear(discr_dim, 1)
            )

        # mlm related probabilities
        self.mask_prob = mask_prob
        self.replace_prob = replace_prob

        self.num_tokens = num_tokens
        self.random_token_prob = random_token_prob

        # token ids
        self.pad_token_id = pad_token_id
        self.mask_token_id = mask_token_id
        self.mask_ignore_token_ids = set([*mask_ignore_token_ids, pad_token_id])

        # sampling temperature
        self.temperature = temperature

        # loss weights
        self.disc_weight = disc_weight
        self.gen_weight = gen_weight


    def forward(self, input, **kwargs):
        b, t = input.shape

        replace_prob = prob_mask_like(input, self.replace_prob)

        # do not mask [pad] tokens, or any other tokens in the tokens designated to be excluded ([cls], [sep])
        # also do not include these special tokens in the tokens chosen at random
        no_mask = mask_with_tokens(input, self.mask_ignore_token_ids)
        mask = get_mask_subset_with_prob(~no_mask, self.mask_prob)

        # get mask indices
        mask_indices = torch.nonzero(mask, as_tuple=True)

        # mask input with mask tokens with probability of `replace_prob` (keep tokens the same with probability 1 - replace_prob)
        masked_input = input.clone().detach()

        # set inverse of mask to padding tokens for labels
        gen_labels = input.masked_fill(~mask, self.pad_token_id)

        # clone the mask, for potential modification if random tokens are involved
        # not to be mistakened for the mask above, which is for all tokens, whether not replaced nor replaced with random tokens
        masking_mask = mask.clone()

        # if random token probability > 0 for mlm
        if self.random_token_prob > 0:
            assert self.num_tokens is not None, 'Number of tokens (num_tokens) must be passed to Electra for randomizing tokens during masked language modeling'

            random_token_prob = prob_mask_like(input, self.random_token_prob)
            random_tokens = torch.randint(0, self.num_tokens, input.shape, device=input.device)
            random_no_mask = mask_with_tokens(random_tokens, self.mask_ignore_token_ids)
            random_token_prob &= ~random_no_mask
            masked_input = torch.where(random_token_prob, random_tokens, masked_input)

            # remove random token prob mask from masking mask
            masking_mask = masking_mask & ~random_token_prob

        # [mask] input
        masked_input = masked_input.masked_fill(masking_mask * replace_prob, self.mask_token_id)

        # get generator output and get mlm loss
        logits = self.generator(masked_input, **kwargs)

        mlm_loss = F.cross_entropy(
            logits.transpose(1, 2),
            gen_labels,
            ignore_index = self.pad_token_id
        )

        # use mask from before to select logits that need sampling
        sample_logits = logits[mask_indices]

        # sample
        sampled = gumbel_sample(sample_logits, temperature = self.temperature)

        # scatter the sampled values back to the input
        disc_input = input.clone()
        disc_input[mask_indices] = sampled.detach()

        # generate discriminator labels, with replaced as True and original as False
        disc_labels = (input != disc_input).float().detach()

        # get discriminator predictions of replaced / original
        non_padded_indices = torch.nonzero(input != self.pad_token_id, as_tuple=True)

        # get discriminator output and binary cross entropy loss
        disc_logits = self.discriminator(disc_input, **kwargs)
        disc_logits = disc_logits.reshape_as(disc_labels)

        disc_loss = F.binary_cross_entropy_with_logits(
            disc_logits[non_padded_indices],
            disc_labels[non_padded_indices]
        )

        # gather metrics
        with torch.no_grad():
            gen_predictions = torch.argmax(logits, dim=-1)
            disc_predictions = torch.round((torch.sign(disc_logits) + 1.0) * 0.5)
            gen_acc = (gen_labels[mask] == gen_predictions[mask]).float().mean()
            disc_acc = 0.5 * (disc_labels[mask] == disc_predictions[mask]).float().mean() + 0.5 * (disc_labels[~mask] == disc_predictions[~mask]).float().mean()

        # return weighted sum of losses
        return Results(self.gen_weight * mlm_loss + self.disc_weight * disc_loss, mlm_loss, disc_loss, gen_acc, disc_acc, disc_labels, disc_predictions)

#### generator 및 discriminator instance 정의  

- `reformer_pytorch` library에서 제공되는 "Efficient Transformer" 모델을 사용함


(1) instantiate the generator and discriminator, making sure that the generator is roughly a quarter to a half of the size of the discriminator : (일반적으로 generator의 크기는 discriminator 크기의 1/4배로 설정)

In [21]:
from reformer_pytorch import ReformerLM

In [18]:
generator = ReformerLM(
    num_tokens = 20000,
    emb_dim = 128,
    dim = 256,              # smaller hidden dimension
    heads = 4,              # less heads
    ff_mult = 2,            # smaller feed forward intermediate dimension
    dim_head = 64,
    depth = 12,
    max_seq_len = 1024
)

In [19]:
discriminator = ReformerLM(
    num_tokens = 20000,
    emb_dim = 128,
    dim = 1024,
    dim_head = 64,
    heads = 16,
    depth = 12,
    ff_mult = 4,
    max_seq_len = 1024
)

#### generator와 discriminator의 embedding 설정

(2) weight tie the token and positional embeddings of generator and discriminator

In [20]:
generator.token_emb = discriminator.token_emb
generator.pos_emb = discriminator.pos_emb

#### instantiate electra 

(3) instantiate electra


In [24]:
trainer = Electra(
    generator,
    discriminator,
    discr_dim = 1024,           # the embedding dimension of the discriminator
    discr_layer = 'reformer',   # the layer name in the discriminator, whose output would be used for predicting token is still the same or replaced
    mask_token_id = 2,          # the token id reserved for masking
    pad_token_id = 0,           # the token id for padding
    mask_prob = 0.15,           # masking probability for masked language modeling
    mask_ignore_token_ids = []  # ids of tokens to ignore for mask modeling ex. (cls, sep)
)

#### train 과정 예시

In [25]:
data = torch.randint(0, 20000, (1, 1024))

results = trainer(data)
results.loss.backward()

In [26]:
results

Results(loss=tensor(47.5575, grad_fn=<AddBackward0>), mlm_loss=tensor(9.9434, grad_fn=<NllLoss2DBackward0>), disc_loss=tensor(0.7523, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>), gen_acc=tensor(0.), disc_acc=tensor(0.5189), disc_labels=tensor([[0., 0., 0.,  ..., 0., 0., 0.]]), disc_predictions=tensor([[1., 0., 0.,  ..., 1., 1., 1.]]))

### Reference 

- https://github.com/lucidrains/electra-pytorch
- https://github.com/lucidrains/reformer-pytorch