In [None]:
# model

import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class FactorizedEmbedding(nn.Module):
    def __init__(self, vocab_size, embed_dim, max_sequence_length=300):
        super(FactorizedEmbedding, self).__init__()
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.max_sequence_length = max_sequence_length
        self.embedding = nn.Embedding(vocab_size, embed_dim)
    
    def forward(self, input):
        embedded = self.embedding(input)
        return embedded

class EncoderLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.LeakyReLU(),
            nn.Linear(hidden_dim, embed_dim),
            nn.Sigmoid(),
        )
        self.norm = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        # Self-attention
        residual = x
        x, _ = self.self_attn(x, x, x)
        x = residual + self.dropout(x)
        x = self.norm(x)
        
        # Feed-forward
        residual = x
        x = self.ffn(x)
        x = residual + self.dropout(x)
        x = self.norm(x)
        
        return x

class Encoder(nn.Module):
    def __init__(self, num_layers, embed_dim, num_heads, hidden_dim, dropout):
        super(Encoder, self).__init__()
        self.layers = nn.ModuleList([
            EncoderLayer(embed_dim, num_heads, hidden_dim, dropout)
            for _ in range(num_layers)
        ])
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        
        return x

class StochasticDecoderLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim, dropout):
        super(StochasticDecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.cross_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.LeakyReLU(),
            nn.Linear(hidden_dim, embed_dim),
            nn.Sigmoid(),
        )
        self.norm = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, encoder_output):
        # Self-attention
        residual = x
        x, _ = self.self_attn(x, x, x)
        x = residual + self.dropout(x)
        x = self.norm(x)
        
        # Cross-attention
        residual = x
        x, _ = self.cross_attn(x, encoder_output, encoder_output)
        x = residual + self.dropout(x)
        x = self.norm(x)
        
        # Feed-forward + stochastic
        residual = x
        x = self.ffn(x)
        x = residual + torch.randn_like(x)
        x = self.norm(x)

        return x

class DecoderLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.cross_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.LeakyReLU(),
            nn.Linear(hidden_dim, embed_dim),
            nn.ReLU(),
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.norm3 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_output, tgt_mask=None):
        # Self-attention
        residual = x
        attn_output, _ = self.self_attn(x, x, x, key_padding_mask=tgt_mask)
        x = residual + self.dropout(attn_output)
        x = self.norm1(x)

        # Cross-attention
        residual = x
        cross_attn_output, _ = self.cross_attn(x, encoder_output, encoder_output)
        x = residual + self.dropout(cross_attn_output)
        x = self.norm2(x)
        
        # Feed-forward
        residual = x
        x = self.ffn(x)
        x = residual + self.dropout(x)
        x = self.norm3(x)
        
        return x

class DecoderStochastic(nn.Module):
    def __init__(self, num_layers, embed_dim, num_heads, hidden_dim, dropout, max_output_length):
        super(DecoderStochastic, self).__init__()
        self.layers = nn.ModuleList([
            StochasticDecoderLayer(embed_dim, num_heads, hidden_dim, dropout)
            for _ in range(num_layers)
        ])
        self.max_output_length = max_output_length
    
    def forward(self, x, encoder_output):
        for layer in self.layers:
            x = layer(x, encoder_output)
            x = x[:, :self.max_output_length]
        return x
    
class Decoder(nn.Module):
    def __init__(self, num_layers, embed_dim, num_heads, hidden_dim, dropout, max_output_length):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList([
            DecoderLayer(embed_dim, num_heads, hidden_dim, dropout)
            for _ in range(num_layers)
        ])
        self.max_output_length = max_output_length

    def forward(self, x, encoder_output):
        for layer in self.layers:
            x = layer(x, encoder_output)
            
            # 截斷長度到 self.max_output_length
            x = x[:, :self.max_output_length]
            
            # 填充
            padding_length = self.max_output_length - x.size(1)
            if padding_length > 0:
                padding = x.new_zeros(x.size(0), padding_length, x.size(2))
                x = torch.cat((x, padding), dim=1)
        
        return x

class Transformer(nn.Module):
    def __init__(self, word_embedd_size, encoder_embed_dim, decoder_embed_dim, num_encoder_layers, num_decoder_layers, num_heads, hidden_dim, dropout, max_output_length):
        super(Transformer, self).__init__()
        self.embedding_encoder = FactorizedEmbedding(word_embedd_size, encoder_embed_dim)
        self.embedding_decoder = FactorizedEmbedding(word_embedd_size, decoder_embed_dim)
        self.encoder = Encoder(num_encoder_layers, encoder_embed_dim, num_heads, hidden_dim, dropout)
        self.decoder_stochastic = DecoderStochastic(num_decoder_layers-12, decoder_embed_dim, num_heads, hidden_dim, dropout, max_output_length)
        self.decoder_1 = Decoder(6, decoder_embed_dim, num_heads, hidden_dim, dropout, max_output_length)
        self.decoder_2 = Decoder(6, decoder_embed_dim, num_heads, hidden_dim, dropout, max_output_length)
        self.fc = nn.Linear(decoder_embed_dim, word_embedd_size)
    
    def forward(self, src, tgt=None, tgt_mask=None, pad_mask=None):
        src = F.pad(src, (0, 50 - src.size(1)), value=0)
        encoder_embedded = self.embedding_encoder(src)
        encoder_output = self.encoder(encoder_embedded)
        
        if tgt is not None:
            tgt = F.pad(tgt, (0, 50 - tgt.size(1)), value=0)
            decoder_embedded = self.embedding_decoder(tgt)
            decoder_output = self.decoder_1(decoder_embedded, encoder_output)
            decoder_output = self.decoder_stochastic(decoder_embedded, decoder_output)
            decoder_output = self.decoder_2(decoder_embedded, decoder_output)
            # 用softmax做機率分布輸出
            output = self.fc(decoder_output)
            output = nn.functional.softmax(output, dim=-1)
        
            if tgt_mask is not None:
                tgt_mask = tgt_mask.unsqueeze(2)
                output = output.masked_fill(tgt_mask == 0, float('-inf'))
            if pad_mask is not None:
                pad_mask = pad_mask.unsqueeze(2)
                output = output.masked_fill(pad_mask.unsqueeze(2) == 0, float('-inf'))
        
        else:
            # 從起始token，然後逐步生成下一個token，直到生成結束或達到最大長度
            
            generated_tokens = []  # 用於儲存生成出來的token
            max_length = 50  # 最大生成長度
            start_token = 0

            # 初始化生成序列，將起始token加到生成序列中
            current_token = start_token
            generated_tokens.append(current_token)

            # 逐步生成下一個token，直到達到最大長度或生成結束token
            while len(generated_tokens) < max_length and current_token != 0:
                # 使用模型解碼器來預測下一個token
                decoder_input = torch.tensor([generated_tokens[-1]], dtype=torch.long)
                decoder_embedded = self.embedding_decoder(decoder_input)
                
                # 不同的解碼層
                decoder_output_1 = self.decoder_1(decoder_embedded, encoder_output)
                decoder_output_stochastic = self.decoder_stochastic(decoder_embedded, decoder_output_1)
                decoder_output_2 = self.decoder_2(decoder_embedded, decoder_output_stochastic)
                
                # 用softmax做機率分布輸出
                output = self.fc(decoder_output_2)
                output = nn.functional.softmax(output, dim=-1)
                
                if tgt_mask is not None:
                    tgt_mask = tgt_mask.unsqueeze(2)
                    output = output.masked_fill(tgt_mask == 0, float('-inf'))

                # 取出最後一個時間步的機率分佈
                next_token_probs = output[0, -1, :]
                # 選擇最高機率token
                next_token = torch.argmax(next_token_probs).item()
                
                generated_tokens.append(next_token)
                
                # 是否生成ending token
                if next_token == 0:
                    break

            return generated_tokens

In [None]:
# tools

from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import re
import numpy as np
from torch.nn.utils.rnn import pad_sequence

PAD_TOKEN = 0

def load_vocab(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        vocab = file.read().splitlines()
    return vocab

def Trans_text(text, vocab, unk_token='2'):
    decoded_text = []
    for word in text:
        if word in vocab:
            decoded_text.append(vocab.index(word))
        elif word == " ":
            decoded_text.append(1)
        else:
            decoded_text.append(2)
    return np.array(decoded_text, dtype=np.int64).flatten()

def encoder_input(src, encoder_vocab_path):

    encoder_vocab = load_vocab(encoder_vocab_path)
    encoder_encoded = Trans_text(src, encoder_vocab)

    return encoder_encoded

def decoder_input(tgt, decoder_vocab_path):
    
    decoder_vocab = load_vocab(decoder_vocab_path)
    decoder_encoded = Trans_text(tgt, decoder_vocab)

    return decoder_encoded

# 小說數據集
class NovelDataset(Dataset):
    def __init__(self, text):
        text = text + "|"
        self.textLen = len(text)
        self.count = len(text)//49
        self.src_text = text[:self.count*49]
        self.tgt_text = text[49:]
        
    def __len__(self):
        return self.count - 1

    def __getitem__(self, idx):
        src_data = self.src_text[idx*49 : (idx+1)*49] + '§'
        # '|' == 開頭結尾符號
        tgt = '|' + self.tgt_text[idx*49 : (idx+1)*49]
        # tokenizer
        input_tokens = encoder_input(src_data, embedd_vocab_path)
        target_tokens = decoder_input(tgt, embedd_vocab_path)

        input_tokens = np.array(input_tokens, dtype=np.int64)
        target_tokens = np.array(target_tokens, dtype=np.int64)
        
        return input_tokens, target_tokens

def collate_fn(data):
    input_tokens, target_tokens = zip(*data)

    # 轉換tensor
    input_tokens = [torch.tensor(tokens, dtype=torch.long) for tokens in input_tokens]
    target_tokens = [torch.tensor(tokens, dtype=torch.long) for tokens in target_tokens]

    # 填充序列
    padded_input_tokens = pad_sequence(input_tokens, batch_first=True)
    padded_target_tokens = pad_sequence(target_tokens, batch_first=True)

    # pad_mask
    # PAD_TOKEN = 0
    # pad_mask = (padded_target_tokens != PAD_TOKEN)
    
    return padded_input_tokens, padded_target_tokens#, pad_mask

def create_tgt_mask(tgt):
    # 建立與目標序列相同形狀的全是零的張量
    tgt_mask = torch.zeros_like(tgt, dtype=torch.bool)

    # 在序列的上三角型部分設為True，表示不能看到未来資訊
    seq_len = tgt.size(1)
    tgt_mask[:, 1:seq_len] = torch.triu(torch.ones(1, seq_len-1, dtype=torch.bool), diagonal=1)

    return tgt_mask

# 繪製loss的折線圖
def plot_loss(loss_values):
    plt.plot(loss_values, label='Training Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.legend()
    plt.show()

In [None]:
# 參數初始化

# def weights_init(m):
#     if isinstance(m, nn.Linear):
#         nn.init.xavier_uniform_(m.weight.data)
#         if m.bias is not None:
#             nn.init.constant_(m.bias.data, 0)

# model.apply(weights_init)

In [None]:
import os
from torch.cuda.amp import autocast, GradScaler

# 模型參數
word_embedd_size = 9713
encoder_embed_dim = 768
decoder_embed_dim = 768
num_encoder_layers = 6
num_decoder_layers = 18
num_heads = 16
hidden_dim = 1024
dropout = 0.1
max_output_length = 50

# 模型和scaler
model = Transformer(word_embedd_size, encoder_embed_dim, decoder_embed_dim, num_encoder_layers, num_decoder_layers, num_heads, hidden_dim, dropout, max_output_length)
scaler = GradScaler()

# 設置訓練環境
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# 訓練參數、optimizer、criterion
embedd_vocab_path = "vocab\\vocabularyDel.txt"
batch_size = 4
num_epochs = 20
learning_rate = 0.0005
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss()
loss_values = []

# 獲得指定資料夾裡所有檔案路徑
folder_path = 'WebNovelTxt'
file_paths_list = []
for root, dirs, files in os.walk(folder_path):
    for file in files:
        file_path = os.path.relpath(os.path.join(root, file), folder_path)
        file_paths_list.append(file_path)
        
step = 0

# 訓練
for epoch in range(num_epochs):
    total_loss = 0
    num_batches = 0

    # 載入資料
    for i in file_paths_list:
        with open('WebNovelTxt\\' + i, 'r', encoding='utf-8') as file:
            novel_data = file.read()

        # 創建Dataset、DataLoader
        dataset = NovelDataset(novel_data)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
        
        # for input_ids, target_ids, pad_mask in dataloader:
        for input_ids, target_ids in dataloader:
            input_ids = input_ids.to(device)
            # one hot encode
            target = F.one_hot(target_ids, num_classes=word_embedd_size)
            target = target.type(torch.float64)
            target_ids = target_ids.to(device)
            # tgt_mask生成
            tgt_mask = create_tgt_mask(target)
            tgt_mask = tgt_mask.to(device)
            # pad_mask = pad_mask.to(device)
            target = target.to(device)

            # 清除梯度值
            optimizer.zero_grad()

            # 降低運算記憶體
            with autocast():
                
                # 前向傳播
                output = model(input_ids, target_ids, tgt_mask=tgt_mask, pad_mask=None)
                
                # 計算損失
                loss = criterion(output, target)
                
            scaler.scale(loss).backward()
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()
            total_loss += loss.item()
            num_batches += 1
        
        step += 1
        if step % 10 == 0:
            torch.save(model.state_dict(), 'save\\step' + str(step) + 'state_dict.pt')
    torch.save(model.state_dict(), 'save\\' + str(epoch) + 'state_dict.pt')

    avg_loss = total_loss / num_batches
    loss_values.append(avg_loss)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

# 繪製loss的摺線圖
plot_loss(loss_values)

In [None]:
# 載入儲存好模型參數

# 模型參數
word_embedd_size = 9713
encoder_embed_dim = 768
decoder_embed_dim = 768
num_encoder_layers = 6
num_decoder_layers = 18
num_heads = 16
hidden_dim = 1024
dropout = 0.1
max_output_length = 50
embedd_vocab_path = "vocab\\vocabularyDel.txt"

# 建模型
model = Transformer(word_embedd_size, encoder_embed_dim, decoder_embed_dim, num_encoder_layers, num_decoder_layers, num_heads, hidden_dim, dropout, max_output_length)
model.load_state_dict(torch.load('step700000state_dict.pt'))

# 設置訓練環境
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [None]:
# 載入已訓練的模型權重
# model.load_state_dict(torch.load('trained_model.pth'))

# 評估模式
model.eval()

# 讀取字彙表txt
vocab_file = "vocab\\vocabularyDel.txt"

# 創建字彙表對應token的字典
vocab = {}
with open(vocab_file, 'r', encoding='utf-8') as f:
    lines = f.readlines()
    for index, line in enumerate(lines):
        # 移除換行符
        line = line.strip()
        token = str(index)
        word = line
        vocab[token] = word

# 範例句
input_word = '我是範例句'

# 轉成token
input_tokens = encoder_input(input_word, embedd_vocab_path)
input_tokens = np.array(input_tokens, dtype=np.int64)
src = torch.tensor(input_tokens, dtype=torch.long)
src = src.reshape(1, -1)

# 需要的輸入長度
desired_seq_length = 50

# 檢查輸入的長度，太長截斷，太短填充
if src.shape[1] < desired_seq_length:
    # 如果輸入長度太短，進行填充
    padding_length = desired_seq_length - src.shape[1]
    src = F.pad(src, (0, padding_length), value=0)
elif src.shape[1] > desired_seq_length:
    # 如果輸入長度太長，進行截斷
    src = src[:, -desired_seq_length:]

src = src.to(device)
generate_length = 50  # 自定義生成的最大長度

# 起始tgt token
start_token = torch.tensor([[0]], dtype=torch.long).to(device)

# 生成的token sequence
generated_sequence = [start_token]

while len(generated_sequence) < generate_length:
    with torch.no_grad():
        # 處理生成的token序列
        decoder_input = generated_sequence[-1].long().to(device)
        generated_tokens = model.forward(src, tgt=decoder_input)

        # 處理模型的輸出預測結果
        # 將模型預測的下個token添加到generated_sequence
        next_token_probs = generated_tokens[:, -1, :]
        next_token = torch.multinomial(next_token_probs, 1)
        generated_sequence.append(next_token)
        # 遇到結束符號停止生成
        if next_token.item() == 0:
            break

# 將生成的token，轉為文本輸出
generated_text = ''
for token in generated_sequence:
    newWord = vocab[str(token.item())]
    generated_text += newWord
print('------------------------------文章------------------------------')
print(generated_text)