In [3]:
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
import time
import re
import copy

dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
dev

device(type='cpu')

In [4]:
MAX_LEN = 50
ABC = ['none'] + [w for w in set('abcdefghijklmnopqrstuvwxyz ')]
CHAR_TO_INDEX = {w: i for i, w in enumerate(ABC)}
INDEX_TO_CHAR = {i: w for i, w in enumerate(ABC)}

df = pd.read_csv('data.csv').iloc[:,[-2]]
df = df.dropna(subset=['normalized_text'])

df['text_in'] = [' '.join(re.findall('[\w]+', i)) for i in df['normalized_text']]

df = df.iloc[:, 1:]
df.head()

Unnamed: 0,text_in
0,maggie look whats that
1,lee mur lee mur
2,zee boo zee boo
3,im trying to teach maggie that nature doesnt e...
4,its like an ox only it has a hump and a dewlap...


In [6]:
def convert_to_torch(text):
    output = torch.zeros((len(text), MAX_LEN), dtype=int)
    for i in range(len(text)):
        for j, w in enumerate(text[i]):
            if j >= MAX_LEN:
                break
            output[i, j] = CHAR_TO_INDEX.get(w, CHAR_TO_INDEX['none'])

    return output

### Задание 1

In [None]:
def Сaesar(string, num):
    output = ''
    for c in string:
        if c.isalpha():
            new_num = ord(c) + num
            if new_num > ord('z'):
                new_num -= 26
            output += chr(new_num)
        else:
            output += c
    return output


In [None]:
Сaesar('test', 2)

In [None]:
df['text_out'] = df['text_in'].apply(lambda v: Сaesar(v, 2))
df.head()

In [None]:
train, test = train_test_split(df, test_size=0.2)
train_text = [[c for c in ph] for ph in train['text_out'] if type(ph) is str]
train_label= [[c for c in ph] for ph in train['text_in'] if type(ph) is str]
test_text   = [[c for c in ph] for ph in test['text_out'] if type(ph) is str]
test_label  = [[c for c in ph] for ph in test['text_in'] if type(ph) is str]

X_train= convert_to_torch(train_text)
Y_train= convert_to_torch(train_label)
X_test = convert_to_torch(test_text)
Y_test = convert_to_torch(test_label)


In [None]:
class RNN_Network(torch.nn.Module):
    def __init__(self):
        super(RNN_Network, self).__init__()
        self.embeddings = torch.nn.Embedding(len(ABC), 28)
        self.rnn = torch.nn.RNN(28, 256, batch_first=True)
        self.linear = torch.nn.Linear(256, 28)

    def forward(self, sentences, state=None):
        embds = self.embeddings(sentences)
        out, new_state = self.rnn(embds, state)
        result = self.linear(out)
        return result, new_state


In [None]:
model = RNN_Network().to(dev)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
epochs = 40
loss_best = 10**10
model_best = None


In [None]:
for epoch in range(epochs):
    start = time.time()
    train_loss = 0.
    train_passed = 0
    test_loss = 0
    test_passed = 0

    for i in range(int(len(X_train) / 100)):
        X_batch = X_train[i * 100:(i + 1) * 100].to(dev)
        Y_batch = Y_train[i * 100:(i + 1) * 100].flatten().to(dev)
        model.train()
        optimizer.zero_grad()
        answers, _ = model.forward(X_batch)
        answers = answers.view(-1, len(ABC))
        loss = criterion(answers, Y_batch).to(dev)

        if loss < loss_best:
            model_best = copy.copy(model).to(dev)
            loss_best = loss.to(dev)

        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        train_passed += 1

    with torch.no_grad():
        model.eval()
        answers, _ = model.forward(X_test.to(dev))
        answers = answers.view(-1, len(ABC))
        loss = criterion(answers, Y_test.flatten().to(dev))
        test_loss += loss.item()
        test_passed += 1

    if epoch%1 == 0:
        print(f"Epoch {epoch}. Time: {time.time() - start:.3f}, Train loss: {train_loss / train_passed:.3f}, Test loss: {test_loss / test_passed:.6f}")


In [None]:
loss_best


In [None]:
pred = convert_to_torch([[c for c in s] for s in df['text_out'] if type(s) is str])

pred


In [None]:
df_short = df.loc[:50]
df_short.shape[0]


In [None]:
texts = []
for line in range(df_short.shape[0]):
    str = ''

    for i in model_best(pred.to(dev))[0][line].argmax(dim=1).detach():
        str += INDEX_TO_CHAR[i.item()]

    texts.append(str)

df_short['predict'] = texts
df_short


### Задание 2

In [5]:
df

Unnamed: 0,text_in
0,maggie look whats that
1,lee mur lee mur
2,zee boo zee boo
3,im trying to teach maggie that nature doesnt e...
4,its like an ox only it has a hump and a dewlap...
...,...
11634,too bad we didnt come dressed as popular carto...
11635,yeah mom guess what for a dollar a man sold me...
11636,hows it going bart
11637,maybe you need to play on their sympathies mor...


In [7]:
X = convert_to_torch([[c for c in s] for s in df['text_in'].tolist()])
X

tensor([[ 3,  2, 26,  ...,  0,  0,  0],
        [ 7, 22, 22,  ...,  0,  0,  0],
        [11, 22, 22,  ...,  0,  0,  0],
        ...,
        [ 6, 15, 13,  ...,  0,  0,  0],
        [ 3,  2, 23,  ..., 21,  7, 22],
        [ 2,  6, 21,  ...,  0,  0,  0]])

In [8]:
class Network(torch.nn.Module):
    def __init__(self, ):
        super(Network, self).__init__()
        
        self.embeddings = torch.nn.Embedding(len(ABC), 30)
        self.rnn = torch.nn.RNN(30, 128, batch_first=True)
        self.out = torch.nn.Sequential(
            torch.nn.Linear(128, 64),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(64, 28)
        )
    
    def forward(self, sentences, state=None):
        embds = self.embeddings(sentences)
        x, s = self.rnn(embds, state)
        result = self.out(x)
        return result


def generate_sentence(txt_list):
    sentence =  [c for c in txt_list]
    x = torch.zeros((1, len(sentence)), dtype=int).to(dev)

    for j,w in enumerate(sentence):
        if j >= MAX_S_LEN:
            break
        x[0, j] = CHAR_TO_INDEX.get(w, CHAR_TO_INDEX['none'])

    for i in range(MAX_LEN):
        o = model(x)
        w = torch.argmax(o[-1, -1, :], keepdim=True)
        x = torch.cat([x, w.unsqueeze(0)], axis=1)
        ww = INDEX_TO_CHAR[w.item()]
        if ww == 'none':
            break

        sentence.append(ww)

    return ''.join(sentence)


MAX_S_LEN = 100
model = Network().to(dev)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=.0001)

phrase = 'bart i do so much for you'


In [10]:
for ep in range(20):
    start = time.time()
    train_loss = 0.
    train_passed = 0

    for i in range(int(len(X) / 100)):
        batch = X[i * 100:(i + 1) * 100]
        X_batch = batch[:, :-1].to(dev)
        Y_batch = batch[:, 1:].flatten().to(dev)

        optimizer.zero_grad()
        answers = model(X_batch)
        answers = answers.view(-1, len(INDEX_TO_CHAR))
        loss = criterion(answers, Y_batch).to(dev)
        train_loss += loss.item()

        loss.backward()
        optimizer.step()
        train_passed += 1

    if ep%10 == 0:
        print("\nEpoch {}. Time: {:.3f}, Train loss: {:.3f}".format(ep, time.time() - start, train_loss / train_passed))
        s = generate_sentence(phrase)
        print(s)
    else:
        print(f"\rEpoch {ep}, loss: {train_loss / train_passed:.3f}", end='')



Epoch 0. Time: 4.702, Train loss: 1.266
bart i do so much for you the that the think the that in the this it the th
Epoch 9, loss: 1.253
Epoch 10. Time: 3.738, Train loss: 1.251
bart i do so much for you the that the there the that i that i think the th
Epoch 19, loss: 1.239

#### Генерация

In [14]:
generate_sentence('too bad we')

'too bad we that the been the thing the think the thin the th'