In [1]:
import time
import math
import numpy as np
import torch
from torch import nn,optim
import torch.nn.functional as F
import sys
import zipfile

In [29]:
device='cpu'

In [48]:
def load_data_jay_lyrics(): 
    with zipfile.ZipFile(r'F:\study\ml\ebooks3\6\jaychou_lyrics.txt.zip') as zif:
        with zif.open('jaychou_lyrics.txt') as f:
            corpus_chars =f.read().decode('utf-8')
    corpus_chars=corpus_chars.replace('\n',' ').replace('\r',' ')
    corpus_chars=corpus_chars[0:10000]
    
    idx_to_char=list(set(corpus_chars))
    char_to_idx=dict([(char,i) for i,char in enumerate(idx_to_char)])
    vocab_size=len(char_to_idx)
    corpus_indices=[char_to_idx[char] for char in corpus_chars]
    
    return corpus_indices,char_to_idx,idx_to_char,vocab_size

In [49]:
corpus_indices,char_to_idx,idx_to_char,vocab_size=load_data_jay_lyrics()

In [50]:
num_hiddens=256
rnn_layer=nn.RNN(input_size=vocab_size,hidden_size=num_hiddens)

In [51]:
rnn_layer

RNN(1027, 256)

In [52]:
num_steps=35
batch_size=2
state=None
X=torch.rand(num_steps,batch_size,vocab_size)
Y,state_new=rnn_layer(X,state)
print(Y.shape,len(state_new),state_new[0].shape)

torch.Size([35, 2, 256]) 1 torch.Size([2, 256])


In [53]:
num_hiddens=256
rnn_layer2=nn.RNN(input_size=vocab_size,hidden_size=num_hiddens,num_layers=2)

In [36]:
num_steps=35
batch_size=2
state=None
X=torch.rand(num_steps,batch_size,vocab_size)
Y,state_new=rnn_layer2(X,state)
print(Y.shape,len(state_new),state_new[0].shape)

torch.Size([35, 2, 256]) 2 torch.Size([2, 256])


In [37]:
num_hiddens=256
rnn_layer3=nn.LSTM(input_size=vocab_size,hidden_size=num_hiddens,num_layers=2)

In [38]:
num_steps=35
batch_size=2
state=None
X=torch.rand(num_steps,batch_size,vocab_size)
Y,state_new=rnn_layer3(X,state)
print('X shape :',X.shape)
print(Y.shape,len(state_new),state_new[0].shape)

X shape : torch.Size([35, 2, 1027])
torch.Size([35, 2, 256]) 2 torch.Size([2, 2, 256])


In [54]:
def one_hot(x,n_class,dtype=torch.float32):
    x=x.long()
    res=torch.zeros(x.shape[0],n_class,dtype=dtype,device=x.device)
    res.scatter_(1,x.view(-1,1),1)
    return res

In [55]:
def to_onehot(x,n_class):
    return [one_hot(x[:,i],n_class) for i in range(x.shape[1])]

In [56]:
class RNNModel(nn.Module):
    def __init__(self,rnn_layer,vocab_size):
        super().__init__()
        self.rnn=rnn_layer
        self.hidden_size=rnn_layer.hidden_size * ( 2 if rnn_layer.bidirectional else 1)
        self.vocab_size=vocab_size
        self.dense=nn.Linear(self.hidden_size,vocab_size)
        self.state=None
    
    def forward(self,inputs,state):
        X=to_onehot(inputs,self.vocab_size)
        ## y: num_steps , batch_size,num_hiddens
        ## state: batch_size,num_layers,num_hiddens
        Y,self.state=self.rnn(torch.stack(X),state)
        output=self.dense(Y.view(-1,Y.shape[-1]))
        return output,self.state

In [57]:
def predict_rnn_pytorch(prefix,num_chars,model,vocab_size,device,idx_to_char,char_to_idx):
    state=None
    output=[char_to_idx[prefix[0]]]
    for t in range(num_chars+len(prefix) -1):
        X=torch.tensor([output[-1]],device=device).view(1,1)
        if state is not None:
            if isinstance(state,tuple):
                state=(state[0].to(device),state[1].to(device))
            else:
                state=state.to(device)
        (Y,state)=model(X,state)
        if t < len(prefix) -1:
            output.append(char_to_idx[prefix[t+1]])
        else:
            output.append(int(Y.argmax(dim=1).item()))
    return ''.join([idx_to_char[i] for i in output])

In [58]:
model=RNNModel(rnn_layer,vocab_size).to(device)
predict_rnn_pytorch('分开',10,model,vocab_size,device,idx_to_char,char_to_idx)

'分开司惚味着出出出语惚味'

In [62]:
a=torch.Tensor(np.array([[1],[2],[3]]))
b=to_onehot(a,vocab_size)
b

[tensor([[0., 1., 0.,  ..., 0., 0., 0.],
         [0., 0., 1.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]])]

In [63]:
torch.stack(b)

tensor([[[0., 1., 0.,  ..., 0., 0., 0.],
         [0., 0., 1.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]]])

In [64]:
a=torch.Tensor(np.array([[1,2,3]]))
b=to_onehot(a,vocab_size)
b

[tensor([[0., 1., 0.,  ..., 0., 0., 0.]]),
 tensor([[0., 0., 1.,  ..., 0., 0., 0.]]),
 tensor([[0., 0., 0.,  ..., 0., 0., 0.]])]

In [66]:
def data_iter_consecutive(corpus_indices,batch_size,num_steps,device=None):
    if device is None:
        device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        corpus_indices=torch.tensor(corpus_indices,dtype=torch.float32,device=device)
        data_len=len(corpus_indices)
        batch_len=data_len//batch_size
        indices=corpus_indices[0:batch_size*batch_len].view(batch_size,batch_len)
        epoch_size=(batch_len-1) // num_steps
        for i in range(epoch_size):
            i=i*num_steps
            X=indices[:,i:i+num_steps]
            Y=indices[:,i+1:i+num_steps+1]
            yield X,Y

In [67]:
def grad_clipping(params,theta):
    norm=torch.tensor([0.0])
    for param in params:
        norm +=(param.grad.data **2).sum()
    norm=norm.sqrt().item()
    if norm > theta:
        for param in params:
            param.grad.data *=(theta/norm)

In [None]:
def train_and_predict_rnn_pytorch(model,num_hiddens,vocab_size,device,corpus_indices,idx_to_char,char_to_idx,num_epochs,num_steps,lr
                                 clipping_theta,batch_size,pred_period,pred_len,prefixes):
    loss=nn.CrossEntropyLoss()
    optimizer=torch.optim.Adad(model.parameters(),lr=lr)
    model.to(device)
    state=None
    for epoch in range(num_epochs):
        l_sum,n,start=0.0,0,time.time()
        data_iter=data_iter_consecutive(corpus_indices,batch_size,num_steps,device)
        for X,Y in data_iter:
            if isinstance(state,tuple):
                state=(start[0].detach(),start[1].detach())
            else:
                state=state.detach()
        (output,state)=model(X,state)
        y=torch.transpose(Y,0,1).contiguous().view(-1)
        l=loss(output,y.long())
        optimizer.zero_grad()
        l.backward()
        grad_clipping(model.parameters(),clipping_theta,device)
        optimizer.step()
        l_sum +=l.item() * y.shape[0]
        n+=y.shape[0]
        
        
            
    
    