
# （NLP下游任务）基于Fnet的文本生成

基于 Transformer 的模型在理解和处理序列方面表现出色，因为它们利用了一种称为“自注意力”的机制。这涉及仔细检查每个标记以辨别其与序列中每个其他标记的关系。尽管自注意力很有效，但它的缺点在于计算成本。对于长度为 N 的序列，自注意力需要 N^2 次操作，从而导致二次缩放。这在计算上可能很昂贵且耗时，尤其是对于长句子，这会对序列长度施加限制，例如标准 BERT 模型中的 512 个标记约束。

已经出现了许多方法来解决二次缩放的计算效率低下问题。解决这一挑战的最新创新是 FNet，它完全取代了自注意力层。FNet 引入了一种替代机制，与传统的自注意力范式不同，旨在实现处理序列的可比或增强的性能。在本文中，我们将重点介绍使用 Pytorch 在 Python 中实现用于文本生成的 FNet 架构。

## **FNet**


The Transformer architecture is renowned for its dominance in natural language processing (NLP). It uses a core component, the attention mechanism, which connects input tokens by weighing their relevance to each other. While various studies have probed the Transformer and its attention sublayers, the computational cost of self-attention remains a challenge, particularly for long sequences.

In response to this challenge, a recent innovation, FNet, introduces a novel approach by replacing the self-attention layer entirely. Instead of self-attention, FNet utilizes simpler token mixing mechanisms, such as parameterized matrix multiplications and, remarkably, the Fourier transform. Unlike traditional self-attention, the Fourier transform has no parameters yet achieves comparable performance, scaling efficiently to long sequences due to the Fast Fourier transform (FFT) algorithm.

In [11]:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


使用 wikitext 语料库来训练数据。
![data](https://img2.imgtp.com/2024/05/30/FsscINxY.png)

In [35]:
from datasets import load_dataset
datasets = load_dataset('wikitext','wikitext-2-raw-v1')

## 清理数据

Decalare 一个 preprocess_text 函数将
- 将所有单词变为小写
- 删除任何特殊字符
- 替换任何多个空格

使用 map 函数执行上述预处理

使用 filter 函数仅保留长度大于 20 的数据

In [36]:
import re


def preprocess_text(sentence):
	# lowering the sentence and storing in text vaiable
	text = sentence['text'].lower()
	# removing other than characters and punctuations
	text = re.sub('[^a-z?!.,]', ' ', text)
	text = re.sub('\s\s+', ' ', text) # removing double spaces
	sentence['text'] = text
	return sentence


datasets['train'] = datasets['train'].map(preprocess_text)
datasets['test'] = datasets['test'].map(preprocess_text)
datasets['validation'] = datasets['validation'].map(preprocess_text)

datasets['train'] = datasets['train'].filter(lambda x: len(x['text']) > 20)
datasets['test'] = datasets['test'].filter(lambda x: len(x['text']) > 20)
datasets['validation'] = datasets['validation'].filter(
	lambda x: len(x['text']) > 20)


## tokenizer

使用 hugging face 中预先训练的 tokenizer。代码使用 AutoTokenizer.from_pretrained 加载预先训练的 tokenizer (distilbert-base-uncased-finetuned-sst-2-english)。
声明一个 tokenizer 函数来标记输入。此函数将一个句子作为输入，使用加载的 tokenizer 对其进行标记，并返回标记后的句子。
代码使用数据集库中的 map 函数对测试数据集中的输入句子进行标记。然后使用 remove_columns 方法删除原始文本列，仅留下标记后的输入。
然后，可以在模型训练或评估期间使用此 DataLoader 迭代标记和填充的输入序列批次。DataCollat​​orWithPadding 确保每个批次中的序列都填充到该批次中最长的序列的长度。

In [37]:
from torch.utils.data import DataLoader
from transformers import DataCollatorWithPadding
from transformers import AutoTokenizer

checkpoint = "distilbert-base-uncased-finetuned-sst-2-english"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)


# Tokenizer
def tokenize(sentence):
	sentence = tokenizer(sentence['text'], truncation=True)
	return sentence


tokenized_inputs = datasets['test'].map(tokenize)
tokenized_inputs = tokenized_inputs.remove_columns(['text'])


# DataCollator
batch = 16
data_collator = DataCollatorWithPadding(
	tokenizer=tokenizer, padding=True, return_tensors="pt")
dataloader = DataLoader(
	tokenized_inputs, batch_size=batch, collate_fn=data_collator)


## 嵌入位置编码

创建两个类

- 位置编码负责生成 Transformer 模型中使用的位置编码。
- PositionalEmbedding类将 token 作为输入并首先嵌入。然后将其与位置编码相结合，这对于在 Transformer 模型中捕获顺序信息至关重要。

In [38]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.fft as fft
import numpy as np
import pandas as pd

class PositionalEncoding(torch.nn.Module):


	def __init__(self, d_model, max_sequence_length):
		super().__init__()
		self.d_model = d_model
		self.max_sequence_length = max_sequence_length
		self.positional_encoding = self.create_positional_encoding().to(device)

	def create_positional_encoding(self):

		# Initialize positional encoding matrix
		positional_encoding = np.zeros((self.max_sequence_length, self.d_model))

		# Calculate positional encoding for each position and each dimension
		for pos in range(self.max_sequence_length):
			for i in range(0, self.d_model, 2):
				# Apply sin to even indices in the array; indices in Python start at 0 so i is even.
				positional_encoding[pos, i] = np.sin(pos / (10000 ** ((2 * i) / self.d_model)))

				if i + 1 < self.d_model:
					# Apply cos to odd indices in the array; we add 1 to i because indices in Python start at 0.
					positional_encoding[pos, i + 1] = np.cos(pos / (10000 ** ((2 * i) / self.d_model)))

		# Convert numpy array to PyTorch tensor and return it
		return torch.from_numpy(positional_encoding).float()

	def forward(self, x):
		expanded_tensor = torch.unsqueeze(self.positional_encoding, 0).expand(x.size(0), -1, -1).to(device)

		return x.to(device) + expanded_tensor[:,:x.size(1), :]

class PositionalEmbedding(nn.Module):
    def __init__(self, sequence_length, vocab_size, embed_dim):
        super(PositionalEmbedding, self).__init__()
        self.token_embeddings = nn.Embedding(vocab_size, embed_dim)
        self.position_embeddings = PositionalEncoding(embed_dim,sequence_length)

    def forward(self, inputs):
        embedded_tokens = self.token_embeddings(inputs).to(device)
        embedded_positions = self.position_embeddings(embedded_tokens).to(device)
        return embedded_positions.to(device)


创建 FNet 编码器

下面的类根据 Fnet 架构实现 Fnet 编码器

此编码器层将傅里叶变换作为处理输入序列的关键组件。

将傅里叶变换应用于输入序列，并将结果的实部添加到原始输入。

随后进行层规范化和密集投影，最后的结果再次进行规范化。

Initialization （__init__ 方法）：


构造函数使用 embed_dim（嵌入维度）和 density_dim（中间密集层的维度）等参数初始化编码器层。定义一个 nn.Sequential 块（self.dense_proj），由两个线性层组成，中间有 ReLU 激活，用于将输入投影到不同的维度。创建了两个 nn.LayerNorm 实例（self.layernorm_1 和 self.layernorm_2），每个实例在前向传递中的特定操作之后应用。


Forward Pass（前向方法）：


前向方法将输入作为输入，代表编码器输入。对输入应用傅里叶变换（fft.fft2），提取结果的实部（fft_result.real.float()）。
将原始输入添加到傅里叶变换结果的实部，并应用层归一化（self.layernorm_1）以获得proj_input。将中间密集投影（self.dense_proj）应用于proj_input，并将结果添加到proj_input。应用最终层归一化（self.layernorm_2），并返回结果。


In [39]:
class FNetEncoder(nn.Module):

    def __init__(self,embed_dim, dense_dim):
        super(FNetEncoder,self).__init__()
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.dense_proj = nn.Sequential(nn.Linear(self.embed_dim,self.dense_dim), nn.ReLU(), nn.Linear(self.dense_dim,self.embed_dim))

        self.layernorm_1 = nn.LayerNorm(self.embed_dim)
        self.layernorm_2 = nn.LayerNorm(self.embed_dim)

    def forward(self,inputs):

        fft_result = fft.fft2(inputs)

        #taking real part
        fft_real = fft_result.real.float()

        proj_input = self.layernorm_1 (inputs + fft_real)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input +proj_output)


第 7 步：创建 FnetDecoder

decoder基于transformer

解码器层采用多种注意机制、层规范化和密集投影来捕获依赖关系并通过解码过程转换信息。

第一个多头注意将传入解码器的输入作为其查询、键和值的输入。

第二个多头注意将第一个多头注意的输入作为其查询向量，将编码器输出作为其键和值向量。

每个步骤后使用层规范化有助于稳定和规范中间表示。

初始化（__init__ 方法）：

构造函数使用 embed_dim（嵌入维度）、dense_dim（中间密集层的维度）和 num_heads（注意头的数量）等参数初始化解码器层。

创建了 nn.MultiheadAttention 的两个实例（self.attention_1 和 self.attention_2），每个实例的输入和输出维度均为 embed_dim，num_heads 为 num_heads。定义一个 nn.Sequential 块 (self.dense_proj)，它由两个线性层组成，中间有 ReLU 激活，用于将输出投影到不同的维度。创建三个 nn.LayerNorm 实例 (self.layernorm_1、self.layernorm_2 和 self.layernorm_3)，每个实例在前向传递中的特定操作之后应用。

前向传递（前向方法）：

前向方法将输入（解码器输入）、encoder_outputs（编码器的输出）和可选掩码作为输入。
​使用 nn.Transformer.generate_square_subsequent_mask 生成因果掩码，以防止关注未来的标记。此掩码应用于第一个注意机制 (self.attention_1)。
第一个注意力机制 (self.attention_1) 关注解码器输入（输入）并应用层规范化 (self.layernorm_1)。结果添加到原始输入以形成 out_1。
如果提供了掩码（在训练期间可用），第二个注意机制 (self.attention_2) 将使用键填充掩码 (key_padding_mask) 的注意应用于编码器输出 (encoder_outputs)。否则，它将执行不带任何掩码的注意。
将结果添加到 out_1，并应用层规范化 (self.layernorm_2) 以获得 out_2。
将中间密集投影 (self.dense_proj) 应用于 out_2，并将结果添加到 out_2。应用最终层规范化 (self.layernorm_3)，并返回结果。

In [40]:
class FNetDecoder(nn.Module):

    def __init__(self,embed_dim,dense_dim,num_heads):
        super(FNetDecoder,self).__init__()
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads

        self.attention_1 = nn.MultiheadAttention(embed_dim,num_heads,batch_first=True)
        self.attention_2 = nn.MultiheadAttention(embed_dim,num_heads,batch_first=True)

        self.dense_proj = nn.Sequential(nn.Linear(embed_dim, dense_dim),nn.ReLU(),nn.Linear(dense_dim, embed_dim))

        self.layernorm_1 = nn.LayerNorm(embed_dim)
        self.layernorm_2 = nn.LayerNorm(embed_dim)
        self.layernorm_3 = nn.LayerNorm(embed_dim)

    def forward(self, inputs, encoder_outputs, mask=None):
        causal_mask = nn.Transformer.generate_square_subsequent_mask(inputs.size(1)).to(device)

        attention_output_1, _ = self.attention_1(inputs, inputs, inputs, attn_mask=causal_mask)
        out_1 = self.layernorm_1(inputs + attention_output_1)

        if mask != None:
            attention_output_2, _ = self.attention_2(out_1, encoder_outputs, encoder_outputs, key_padding_mask =torch.transpose(mask, 0, 1).to(device))
        else:
            attention_output_2, _ = self.attention_2(out_1, encoder_outputs, encoder_outputs)
            out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)


In [41]:
class FNetModel(nn.Module):
	def __init__(self, max_length, vocab_size, embed_dim, latent_dim, num_heads):
		super(FNetModel, self).__init__()

		self.encoder_inputs = PositionalEmbedding(max_length,vocab_size, embed_dim)
		self.encoder1 = FNetEncoder(embed_dim, latent_dim)
		self.encoder2 = FNetEncoder(embed_dim, latent_dim)
		self.encoder3 = FNetEncoder(embed_dim, latent_dim)
		self.encoder4 = FNetEncoder(embed_dim, latent_dim)


		self.decoder_inputs = PositionalEmbedding(max_length,vocab_size, embed_dim)
		self.decoder1 = FNetDecoder(embed_dim, latent_dim, num_heads)
		self.decoder2 = FNetDecoder(embed_dim, latent_dim, num_heads)
		self.decoder3 = FNetDecoder(embed_dim, latent_dim, num_heads)
		self.decoder4 = FNetDecoder(embed_dim, latent_dim, num_heads)


		self.dropout = nn.Dropout(0.5)
		self.dense = nn.Linear(embed_dim, vocab_size)

	def encoder(self,encoder_inputs):
		x_encoder = self.encoder_inputs(encoder_inputs)
		x_encoder = self.encoder1(x_encoder)
		x_encoder = self.encoder2(x_encoder)
		x_encoder = self.encoder3(x_encoder)
		x_encoder = self.encoder4(x_encoder)
		return x_encoder

	def decoder(self,decoder_inputs,encoder_output,att_mask):
		x_decoder = self.decoder_inputs(decoder_inputs)
		x_decoder = self.decoder1(x_decoder, encoder_output,att_mask) ## HERE for inference
		x_decoder = self.decoder2(x_decoder, encoder_output,att_mask) ## HERE for inference
		x_decoder = self.decoder3(x_decoder, encoder_output,att_mask) ## HERE for inference
		x_decoder = self.decoder4(x_decoder, encoder_output,att_mask) ## HERE for inference
		decoder_outputs = self.dense(x_decoder)

		return decoder_outputs

	def forward(self, encoder_inputs, decoder_inputs,att_mask = None):
		encoder_output = self.encoder(encoder_inputs)
		decoder_output = self.decoder(decoder_inputs,encoder_output,att_mask=None)
		return decoder_output


超参数

In [42]:

MAX_LENGTH = 512
VOCAB_SIZE = len(tokenizer.vocab)
EMBED_DIM = 256
LATENT_DIM = 100
NUM_HEADS = 4

fnet_model = FNetModel(MAX_LENGTH, VOCAB_SIZE, EMBED_DIM, LATENT_DIM, NUM_HEADS).to(device)


1. **定义优化器和损失函数**：使用Adam优化器来更新模型参数，并使用交叉熵损失（CrossEntropyLoss）作为损失函数。

2. **训练模型**：模型被训练10个epoch（完整遍历训练数据集10次）。

3. **批次处理**：使用dataloader将训练数据集按批次进行迭代处理。

4. **提取输入和目标序列**：
    - `encoder_inputs_tensor`：输入序列
    - `decoder_inputs_tensor`：目标序列，其中decoder输入序列右移一个位置，用于teacher forcing技术（在序列生成任务中使用的策略）。

5. **应用注意力掩码（attention mask）**：对输入序列应用掩码，以处理填充（padding），掩码中有效token设为True，填充token设为False。

6. **零梯度**：使用`optimizer.zero_grad()`将优化器的梯度置零，为新的反向传播（backward pass）做准备。

7. **生成预测**：使用模型（fnet_model）基于encoder和decoder输入生成预测（outputs）。

8. **掩码目标序列**：创建目标序列的掩码版本，将填充位置设置为-100，以排除这些位置对损失的贡献。

9. **计算损失**：计算模型输出和掩码目标序列之间的交叉熵损失。

10. **累积损失**：将损失累积到`train_loss`变量中。

11. **反向传播**：使用`loss.backward()`进行反向传播，计算梯度。

12. **更新优化器**：使用`optimizer.step()`更新优化器。

![lossup](https://img2.imgtp.com/2024/05/30/1shvjduz.png)

![loss](https://img2.imgtp.com/2024/05/30/7roo3vta.png)

In [43]:
# # Define your optimizer and loss function
optimizer = torch.optim.Adam(fnet_model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=0)

epochs = 4
for epoch in range(epochs):
	train_loss = 0
	for batch in dataloader:
		encoder_inputs_tensor = batch['input_ids'][:,:-1].to(device)
		decoder_inputs_tensor = batch['input_ids'][:,1:].to(device)

		att_mask = batch['attention_mask'][:,:-1].to(device).to(dtype=bool)
		optimizer.zero_grad()
		outputs = fnet_model(encoder_inputs_tensor, decoder_inputs_tensor,att_mask)
		decoder_inputs_tensor.masked_fill(batch['attention_mask'][:,1:].ne(1).to(device), -100).to(device)

		loss = criterion(outputs.reshape(-1, VOCAB_SIZE), decoder_inputs_tensor.reshape(-1))
		train_loss = train_loss + loss.item()
		loss.backward()
		optimizer.step()
	print (f" epoch: {epoch}, train_loss : {train_loss}")


 epoch: 0, train_loss : 602.0846122503281
 epoch: 1, train_loss : 161.70500153303146
 epoch: 2, train_loss : 42.63783755898476
 epoch: 3, train_loss : 9.68971714284271


In [47]:
MAX_LENGTH = 100

def decode_sentence(input_sentence, fnet_model):
    fnet_model.eval()

    with torch.no_grad():
        tokenized_input_sentence = torch.tensor(tokenizer(preprocess_text(input_sentence)['text'])['input_ids']).to(device)#
        tokenzied_target_sentence = torch.tensor([101]).to(device) # '[CLS]' token
        current_text = preprocess_text(input_sentence)['text']
        for i in range(MAX_LENGTH):
            predictions = fnet_model(tokenized_input_sentence[:-1].unsqueeze(0),tokenzied_target_sentence.unsqueeze(0))
            predicted_index = torch.argmax(predictions[0, -1, :]).item()
            predicted_token = tokenizer.decode(predicted_index)
            if predicted_token == "[SEP]":  # Assuming [end] is the end token
              break
            current_text += " "+ predicted_token
            tokenized_target_sentence = torch.cat([tokenzied_target_sentence, torch.tensor([predicted_index]).to(device)], 0).to(device)
            tokenized_input_sentence = torch.tensor(tokenizer(current_text)['input_ids']).to(device)
        return current_text
decode_sentence({'text': 'hello? can u hear me,'}, fnet_model)

'hello? can u hear me, coaches ##cap tank ##cap republic influence republic signed republic republic influence placed ##cap placed influence republic ##cap ##cap coaches signed placed placed republic ##cap placed ##cap coaches ##cap coaches ##cap ##cap placed ##cap placed placed ##cap coaches ##cap ##cap placed ##cap placed placed ##cap placed ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap ##cap'