<a href="https://colab.research.google.com/github/Pmilivojevic/PyTorch/blob/main/LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torch import nn
import string
import random
# !pip install unidecode
import unidecode
from torch.utils.tensorboard import SummaryWriter

device = 'cuda' if torch.cuda.is_available() else 'cpu'

Collecting unidecode
  Downloading Unidecode-1.3.7-py3-none-any.whl (235 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/235.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━[0m [32m122.9/235.5 kB[0m [31m3.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.7


In [None]:
all_characters = string.printable
n_caracters = len(all_characters)

fl = unidecode.unidecode(
    open(
        '/content/drive/MyDrive/ColabNotebooks/PyTorch/Dataset/names.txt'
    ).read()
)

In [None]:
class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, num_layers, output_size):
    super().__init__()

    self.hidden_size = hidden_size
    self.num_layers = num_layers

    self.embed = nn.Embedding(input_size, hidden_size)
    self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
    self.fc = nn.Linear(hidden_size, output_size)

  def forward(self, x, hidden, cell):
    out = self.embed(x)
    out, (hidden, cell) = self.lstm(out.unsqueeze(1), (hidden, cell))
    out = self.fc(out.reshape(out.shape[0], -1))

    return out, (hidden, cell)

  def init_hidden(self, batch_size):
    hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
    cell = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)

    return hidden, cell

In [None]:
class Generator():
  def __init__(self):
    super().__init__()

    self.chunk_len = 250
    self.num_epochs = 5000
    self.batch_size = 1
    self.print_every = 50
    self.hidden_size = 256
    self.num_layers = 2
    self.lr = 0.003

  def char_tensor(self, string):
    tensor = torch.zeros(len(string)).long()

    for c in range(len(string)):
      tensor[c] = all_characters.index(string[c])

    return tensor

  def get_rand_batch(self):
    start_idx = random.randint(0, len(fl) - self.chunk_len)
    end_idx = start_idx + self.chunk_len + 1

    text_str = fl[start_idx:end_idx]
    text_input = torch.zeros(self.batch_size, self.chunk_len)
    text_target = torch.zeros(self.batch_size, self.chunk_len)

    for i in range(self.batch_size):
      text_input[i,:] = self.char_tensor(text_str[:-1])
      text_target[i,:] = self.char_tensor(text_str[1:])

    return text_input.long(), text_target.long()

  def generate(self, initial_str="A", predict_len=100, temperature=0.85):
        hidden, cell = self.rnn.init_hidden(batch_size=self.batch_size)
        initial_input = self.char_tensor(initial_str)
        predicted = initial_str

        for p in range(len(initial_str) - 1):
            _, (hidden, cell) = self.rnn(
                initial_input[p].view(1).to(device), hidden, cell
            )

        last_char = initial_input[-1]

        for p in range(predict_len):
            output, (hidden, cell) = self.rnn(
                last_char.view(1).to(device), hidden, cell
            )
            output_dist = output.data.view(-1).div(temperature).exp()
            top_char = torch.multinomial(output_dist, 1)[0]
            predicted_char = all_characters[top_char]
            predicted += predicted_char
            last_char = self.char_tensor(predicted_char)

        return predicted

  def train(self):
    self.rnn = RNN(
        n_caracters,
        self.hidden_size,
        self.num_layers,
        n_caracters
    ).to(device)

    optimizer = torch.optim.Adam(self.rnn.parameters(), lr=self.lr)
    criterion = nn.CrossEntropyLoss()
    writer = SummaryWriter(f'runs/names0')

    print('=> Start training!!!!')

    for epoch in range(1, self.num_epochs + 1):
      input, target = self.get_rand_batch()
      hidden, cell = self.rnn.init_hidden(self.batch_size)

      self.rnn.zero_grad()
      loss = 0
      input = input.to(device)
      target = target.to(device)

      for c in range(self.chunk_len):
        out, (hidden, cell) = self.rnn(input[:,c], hidden, cell)
        loss += criterion(out, target[:,c])

      loss.backward()
      optimizer.step()
      loss = loss.item() / self.chunk_len

      if epoch % self.print_every == 0:
        print(f'Loss: {loss}')
        pred = self.generate()
        if pred not in fl:
          print(pred)

      writer.add_scalar('Training loss', loss, global_step=epoch)

In [None]:
gennames = Generator()
gennames.train()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import spacy
import pandas as pd
from sklearn.model_selection import train_test_split
# !python -m spacy download de_core_news_sm
# !pip install -U torchtext==0.6
from torchtext.data import Field, TabularDataset, BucketIterator, Iterator
from torchtext.datasets import Multi30k

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
english_txt = open(
    '/content/drive/MyDrive/ColabNotebooks/PyTorch/Datasets/europarl-v7.de-en.en',
    encoding='utf8'
).read().split('\n')

german_txt = open(
    '/content/drive/MyDrive/ColabNotebooks/PyTorch/Datasets/europarl-v7.de-en.de',
    encoding='utf8'
).read().split('\n')

In [None]:
raw_data = {
    'English': [line for line in english_txt[0:1000]],
    'German': [line for line in german_txt[0:1000]]
}

df = pd.DataFrame(raw_data, columns=['English', 'German'])

train, test = train_test_split(df, test_size=0.2)

train.to_json(
    '/content/drive/MyDrive/ColabNotebooks/PyTorch/Datasets/train_de.json',
    orient='records',
    lines=True
)
test.to_json(
    '/content/drive/MyDrive/ColabNotebooks/PyTorch/Datasets/test_de.json',
    orient='records',
    lines=True
)

In [None]:
spacy_eng = spacy.load('en_core_web_sm')
spacy_ger = spacy.load('de_core_news_sm')

def tokenize_eng(text):
  return [tok.text for tok in spacy_eng.tokenizer(text)]

def tokenize_ger(text):
  return [tok.text for tok in spacy_ger.tokenizer(text)]

english = Field(
    sequential=True,
    use_vocab=True,
    tokenize=tokenize_eng,
    lower=True
)

german = Field(
    sequential=True,
    use_vocab=True,
    tokenize=tokenize_ger,
    lower=True
)

fields = {'English': ('eng', english), 'German': ('ger', german)}

train_data, test_data = TabularDataset.splits(
    path='/content/drive/MyDrive/ColabNotebooks/PyTorch/Datasets',
    train='train_de.json',
    test='test_de.json',
    format='json',
    fields=fields
)

In [None]:
english.build_vocab(train_data, max_size=10000, min_freq=2)
german.build_vocab(train_data, max_size=10000, min_freq=2)

train_iterator, test_iterator = BucketIterator.splits(
    (train_data, test_data),
    batch_size=32,
    device=device
)

In [None]:
for batch in train_iterator:
  print(batch)

In [None]:
english = Field(sequential=True, use_vocab=True, tokenize=tokenize_eng, lower=True)
german = Field(sequential=True, use_vocab=True, tokenize=tokenize_ger, lower=True)

train_data, vaL_data, test_data = Multi30k.splits(
    # root='/content/drive/MyDrive/ColabNotebooks/PyTorch/Datasets',
    exts=(".de", ".en"),
    fields=(german, english)
)

FileNotFoundError: ignored

In [None]:
english.build_vocab(train_data, max_size=10000, min_freq=2)
german.build_vocab(train_data, max_size=10000, min_freq=2)

In [None]:
# spacy_en = spacy.load('en_core_web_sm')
# spacy_ger = spacy.load('de_core_web_sm')

# def tokenize(text):
#   return [tok.text for tok in spacy_en.tokenize(text)]

# quote = Field(sequential=True, use_vocab=True, tokenize=tokenize, lower=True)
# score = Field(sequential=False, use_vocab=False)

# fields = {'quote': ('q', quote), 'score': ('s', score)}

In [None]:
train_data, test_data = TabularDataset.splits(
    path = '/content/drive/MyDrive/ColabNotebooks/PyTorch/Datasets/torchtext',
    train = 'train.json',
    test = 'test.json',
    format = 'json',
    fields = fields
)

In [None]:
quote.build_vocab(train_data, max_size=10000, min_freq=1)

train_iterator, test_iterator = BucketIterator.splits(
    (train_data, test_data),
    batch_size=2,
    device=device
)

In [None]:
for batch in train_iterator:
  print(batch.q)
  print(batch.s)

tensor([[10, 27],
        [21, 29],
        [ 4,  7],
        [ 3, 26],
        [ 6, 18],
        [11,  2],
        [17, 25],
        [ 4,  1],
        [ 3,  1],
        [30,  1],
        [28,  1],
        [ 5,  1],
        [13,  1],
        [ 2,  1],
        [ 9,  1],
        [23,  1]])
tensor([1, 0])
tensor([[33],
        [19],
        [24],
        [14],
        [15],
        [34],
        [32],
        [31],
        [16],
        [20],
        [22],
        [12],
        [ 5],
        [ 8]])
tensor([1])
