<a href="https://colab.research.google.com/github/ZYF-B/Pytorch_learning/blob/main/LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-2.20.0-py3-none-any.whl.metadata (19 kB)
Collecting pyarrow>=15.0.0 (from datasets)
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting requests>=2.32.2 (from datasets)
  Downloading requests-2.32.3-py3-none-any.whl.metadata (4.6 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.5.0,>=2023.1.0 (from fsspec[http]<=2024.5.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.5.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-2.20.0-py3-none-any.whl (547 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m547.8/547.8 kB[0m [31m23.5 MB/s[0m eta [36m0:00:00

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from datasets import load_dataset
import matplotlib.pyplot as plt
%matplotlib inline


torch.manual_seed(1024)

<torch._C.Generator at 0x7815c43f1fb0>

In [None]:
# 超参数
learning_rate = 1e-3
eval_iters = 100
batch_size = 128
sequence_len = 64
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
raw_datasets = load_dataset('tiny_shakespeare')
train_data = raw_datasets['train']['text'][0]
val_data = raw_datasets['validation']['text'][0]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading builder script:   0%|          | 0.00/3.73k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/6.10k [00:00<?, ?B/s]

The repository for tiny_shakespeare contains custom code which must be executed to correctly load the dataset. You can inspect the repository content at https://hf.co/datasets/tiny_shakespeare.
You can avoid this prompt in future by passing the argument `trust_remote_code=True`.

Do you wish to run the custom code? [y/N] y


Downloading data:   0%|          | 0.00/435k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/1 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1 [00:00<?, ? examples/s]

In [None]:
class CharTokenizer:

    def __init__(self, data, end_ind=0):
        # data: list[str]
        # 得到所有的字符
        chars = sorted(list(set(''.join(data))))
        self.char2ind = {s: i + 1 for i, s in enumerate(chars)}
        self.char2ind['<|e|>'] = end_ind
        self.ind2char = {v: k for k, v in self.char2ind.items()}
        self.end_ind = end_ind

    def encode(self, x):
        # x: str
        return [self.char2ind[i] for i in x]

    def decode(self, x):
        # x: int or list[x]
        if isinstance(x, int):
            return self.ind2char[x]
        return [self.ind2char[i] for i in x]

tokenizer = CharTokenizer(train_data)
test_str = 'RES'
re = tokenizer.encode(test_str)
print(re)
print(len(tokenizer.char2ind))
''.join(tokenizer.decode(range(len(tokenizer.char2ind))))

[31, 18, 32]
66


"<|e|>\n !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"

In [None]:
@torch.no_grad()
def generate(model, context, tokenizer, max_new_tokens=300):
    # context: (1, T)
    #out = []
    out = context.tolist()[0]
    model.eval()
    for _ in range(max_new_tokens):
        logits = model(context)            # (1, T, vs)
        probs = F.softmax(logits[:, -1, :], dim=-1)  # (1, vs)
        ix = torch.multinomial(probs, num_samples=1)  # (1, 1)
        context = torch.concat((context, ix), dim=-1)
        out.append(ix.item())
        if out[-1] == tokenizer.end_ind:
          break
    model.train()
    return out

In [None]:
train_datas = torch.tensor(tokenizer.encode(train_data), dtype=torch.long)
val_datas = torch.tensor(tokenizer.encode(val_data), dtype=torch.long)

In [None]:
def get_batch(split, tokenizer):
    # generate a small batch of data of inputs x and targets y
    data = train_datas if split == 'train' else val_datas
    ix = torch.randint(len(data) - sequence_len, (batch_size,))
    x = torch.stack([data[i:i+sequence_len] for i in ix])
    y = torch.stack([data[i+1:i+sequence_len+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [None]:
@torch.no_grad()
def estimate_loss(model, tokenizer):
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split, tokenizer)
            logits = model(X)
            loss = F.cross_entropy(logits.transpose(-2, -1), Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [None]:
def train(model, tokenizer, optimizer, max_step = 5000, eval_step = 200):
  for step in range(max_step):
    if step % eval_step == 0 or step == max_step - 1:
        losses = estimate_loss(model, tokenizer=tokenizer)
        print(f"step {step}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    xb, yb = get_batch('train', tokenizer=tokenizer)
    logits = model(xb)
    loss = F.cross_entropy(logits.transpose(-2, -1), yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

In [None]:
class LSTMCell(nn.Module):

    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        combined_size = hidden_size + input_size
        self.forget_gate = nn.Linear(combined_size, hidden_size)     # 遗忘门
        self.in_gate = nn.Linear(combined_size, hidden_size)         # 输入门
        self.new_cell_state = nn.Linear(combined_size, hidden_size)  # 备选细胞状态
        self.out_gate = nn.Linear(combined_size, hidden_size)        # 输出门

    def forward(self, input, state=None):
        # input: (B, I)
        # state: ((B, H), (B, H))
        B = input.shape[0]
        if state is None:
            state = self.init_state(B, input.device)
        hs, cs = state
        combined = torch.concat((input, hs), dim=-1)   # (B, I + H)
        # 细胞状态的更新
        ingate = F.sigmoid(self.in_gate(combined))
        forgetgate = F.sigmoid(self.forget_gate(combined))
        ncs = F.tanh(self.new_cell_state(combined))
        cs = (cs * forgetgate) + (ingate * ncs)
        # 隐藏状态的更新
        outgate = F.sigmoid(self.out_gate(combined))
        hs = F.tanh(cs) * outgate
        return hs, cs

    def init_state(self, B, device):
        hs = torch.zeros((B, self.hidden_size), device=device)
        cs = torch.zeros((B, self.hidden_size), device=device)
        return hs, cs

In [None]:
class LSTM(nn.Module):

    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.cell = LSTMCell(input_size, hidden_size)

    def forward(self, input, state=None):
        # input:  (B, T, C)
        # state:  ((B, H), (B, H))
        # out:    (B, T, H)
        B, T, C = input.shape
        re = []
        for i in range(T):
            state = self.cell(input[:, i, :], state)
            re.append(state[0])
        return torch.stack(re, dim=1)

In [None]:
class CharLSTM(nn.Module):

    def __init__(self, vs):
        super().__init__()
        self.emb_size = 256
        self.hidden_size = 128
        self.emb = nn.Embedding(vs, self.emb_size)
        self.dp = nn.Dropout(0.4)
        self.lstm1 = LSTM(self.emb_size, self.hidden_size)
        self.ln1 = nn.LayerNorm(self.hidden_size)
        self.lstm2 = LSTM(self.hidden_size, self.hidden_size)
        self.ln2 = nn.LayerNorm(self.hidden_size)
        self.lstm3 = LSTM(self.hidden_size, self.hidden_size)
        self.ln3 = nn.LayerNorm(self.hidden_size)
        self.lm = nn.Linear(self.hidden_size, vs)

    def forward(self, x):
        # x: (B, T)
        embeddings = self.emb(x)   # (B, T, C)
        h = self.ln1(self.dp(self.lstm1(embeddings)))  # (B, T, H)
        h = self.ln2(self.dp(self.lstm2(h)))           # (B, T, H)
        h = self.ln3(self.dp(self.lstm3(h)))           # (B, T, H)
        output = self.lm(h)
        return output

In [None]:
model = CharLSTM(len(tokenizer.char2ind)).to(device)
model, sum(p.numel() for p in model.parameters())

(CharLSTM(
   (emb): Embedding(66, 256)
   (dp): Dropout(p=0.4, inplace=False)
   (lstm1): LSTM(
     (cell): LSTMCell(
       (forget_gate): Linear(in_features=384, out_features=128, bias=True)
       (in_gate): Linear(in_features=384, out_features=128, bias=True)
       (new_cell_state): Linear(in_features=384, out_features=128, bias=True)
       (out_gate): Linear(in_features=384, out_features=128, bias=True)
     )
   )
   (ln1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
   (lstm2): LSTM(
     (cell): LSTMCell(
       (forget_gate): Linear(in_features=256, out_features=128, bias=True)
       (in_gate): Linear(in_features=256, out_features=128, bias=True)
       (new_cell_state): Linear(in_features=256, out_features=128, bias=True)
       (out_gate): Linear(in_features=256, out_features=128, bias=True)
     )
   )
   (ln2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
   (lstm3): LSTM(
     (cell): LSTMCell(
       (forget_gate): Linear(in_features=256, out_feat

In [None]:
context = torch.tensor(tokenizer.encode('def'), device=device).unsqueeze(0)
print(''.join(tokenizer.decode(generate(model, context, tokenizer))))

def?$BmLIvZBuA'ecx zYT$eMVY.wYQgvP!og<|e|>


In [None]:
estimate_loss(model, tokenizer)

{'train': tensor(4.4365), 'val': tensor(4.4424)}

In [None]:
train(model=model, tokenizer=tokenizer, optimizer=optim.AdamW(model.parameters(), lr=learning_rate))

step 0: train loss 4.4369, val loss 4.4433
step 200: train loss 1.9862, val loss 2.0627
step 400: train loss 1.8126, val loss 1.9346
step 600: train loss 1.7109, val loss 1.8522
step 800: train loss 1.6626, val loss 1.8120
step 1000: train loss 1.6160, val loss 1.7620
step 1200: train loss 1.5946, val loss 1.7368
step 1400: train loss 1.5605, val loss 1.7071
step 1600: train loss 1.5498, val loss 1.6900
step 1800: train loss 1.5305, val loss 1.6792
step 2000: train loss 1.5201, val loss 1.6667
step 2200: train loss 1.5095, val loss 1.6601
step 2400: train loss 1.5050, val loss 1.6503
step 2600: train loss 1.4994, val loss 1.6436
step 2800: train loss 1.4882, val loss 1.6342
step 3000: train loss 1.4807, val loss 1.6331
step 3200: train loss 1.4795, val loss 1.6315
step 3400: train loss 1.4727, val loss 1.6284
step 3600: train loss 1.4648, val loss 1.6145
step 3800: train loss 1.4601, val loss 1.6076
step 4000: train loss 1.4550, val loss 1.6166
step 4200: train loss 1.4541, val loss 1.

In [None]:
context = torch.tensor(tokenizer.encode('B'), device=device).unsqueeze(0)
print(''.join(tokenizer.decode(generate(model, context, tokenizer, max_new_tokens=500))))

But, is ribour?

QUEEN MARGARET:
And may greater, thy haes, and the fable I
The gates to his majesty?

PETRUCHIO:
How slays now, our night mine brother's doom:
Which they grave betay of well decair of us.

GLOUCESTER:
Sir, the services, the thousand prayer of vistances have
in this daughters; such such what make our answer of the rugh,
And over a than his command by a sent done with
Be joy in second our charity; and by her,
Which he country of mean his most side.

QUEEN ELIZABETH:
I have aman, to
