In [7]:
import torch
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, Dataset
import torch.optim as optim
import random
import torch.nn as nn
import pandas as pd
import numpy as np

In [2]:
from google.colab import drive
drive.mount('/content/drive')

data = '/content/drive/My Drive/Colab Notebooks/rnn_dof/dataset(copy).csv'
path = '/content/drive/My Drive/Colab Notebooks/rnn_dof/mapped_words_modified.txt'

Mounted at /content/drive


In [3]:


#model infrastructure
class Linear:

  def __init__(self, fan_in, fan_out, bias=True):
    self.weight = torch.randn((fan_in, fan_out)) / fan_in**0.5 # note: kaiming init
    self.bias = torch.zeros(fan_out) if bias else None

  def __call__(self, x):
    self.out = x @ self.weight
    if self.bias is not None:
      self.out += self.bias
    return self.out

  def parameters(self):
    return [self.weight] + ([] if self.bias is None else [self.bias])

# -----------------------------------------------------------------------------------------------
class BatchNorm1d:

  def __init__(self, dim, eps=1e-5, momentum=0.1):
    self.eps = eps
    self.momentum = momentum
    self.training = True
    # parameters (trained with backprop)
    self.gamma = torch.ones(dim)
    self.beta = torch.zeros(dim)
    # buffers (trained with a running 'momentum update')
    self.running_mean = torch.zeros(dim)
    self.running_var = torch.ones(dim)

  def __call__(self, x):
    # calculate the forward pass
    if self.training:
      if x.ndim == 2:
        dim = 0
      elif x.ndim == 3:
        dim = (0,1)
      xmean = x.mean(dim, keepdim=True) # batch mean
      xvar = x.var(dim, keepdim=True) # batch variance
    else:
      xmean = self.running_mean
      xvar = self.running_var
    xhat = (x - xmean) / torch.sqrt(xvar + self.eps) # normalize to unit variance
    self.out = self.gamma * xhat + self.beta
    # update the buffers
    if self.training:
      with torch.no_grad():
        self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * xmean
        self.running_var = (1 - self.momentum) * self.running_var + self.momentum * xvar
    return self.out

  def parameters(self):
    return [self.gamma, self.beta]

# -----------------------------------------------------------------------------------------------
class Tanh:
  def __call__(self, x):
    self.out = torch.tanh(x)
    return self.out
  def parameters(self):
    return []

# -----------------------------------------------------------------------------------------------
class Embedding:

  def __init__(self, num_embeddings, embedding_dim):
    self.weight = torch.randn((num_embeddings, embedding_dim))

  def __call__(self, IX):
    self.out = self.weight[IX]
    return self.out

  def parameters(self):
    return [self.weight]

# -----------------------------------------------------------------------------------------------
class FlattenConsecutive:

  def __init__(self, n):
    self.n = n

  def __call__(self, x):
    B, T, C = x.shape
    x = x.view(B, T//self.n, C*self.n)
    if x.shape[1] == 1:
      x = x.squeeze(1)
    self.out = x
    return self.out

  def parameters(self):
    return []

# -----------------------------------------------------------------------------------------------
class Sequential:

  def __init__(self, layers):
    self.layers = layers

  def __call__(self, x):
    for layer in self.layers:
      x = layer(x)
    self.out = x
    return self.out

  def parameters(self):
    # get parameters of all layers and stretch them out into one list
    return [p for layer in self.layers for p in layer.parameters()]
##########################################

In [4]:
chars = [
    'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
    'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
    'u', 'v', 'w', 'x', 'y', 'z', 'a1', 'b1', 'c1', 'd1',
    'e1', 'f1', 'g1', 'h1', 'i1', 'j1', 'k1', 'l1', 'm1',
    'n1', 'o1', 'p1', 'q1', 'r1', 's1', 't1', 'u1', 'v1', 'w1'
]
stoi = {s:i+1 for i,s in enumerate(chars)}
stoi["."] = 0
itos = {i:s for s,i in stoi.items()}

In [14]:


# Load the text data from a file
with open('/content/drive/My Drive/Colab Notebooks/rnn_dof/mapped_words_modified.txt', 'r', encoding='utf-8') as file:
    text_data = file.read()

# Preprocess the text data
symbols = text_data.split()
vocab = sorted(set(symbols))
symbol_to_idx = {symbol: idx for idx, symbol in enumerate(vocab)}
idx_to_symbol = {idx: symbol for idx, symbol in enumerate(vocab)}

# Create sequences
seq_length = 10
sequences = []
for i in range(len(symbols) - seq_length):
    seq = symbols[i:i+seq_length+1]
    sequences.append(seq)

# Convert sequences to indices
def encode_seq(seq):
    return [symbol_to_idx[symbol] for symbol in seq]

encoded_sequences = [encode_seq(seq) for seq in sequences]

# Create dataset
class SymbolDataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx][:-1]), torch.tensor(self.sequences[idx][1:])

dataset = SymbolDataset(encoded_sequences)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Define the RNN model
class RNNModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(RNNModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.RNN(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        out, _ = self.rnn(x)
        out = self.fc(out)
        return out

vocab_size = len(vocab)
embed_size = 64
hidden_size = 128
num_layers = 1

model = RNNModel(vocab_size, embed_size, hidden_size, num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
num_epochs = 5
for epoch in range(num_epochs):
    for inputs, targets in dataloader:
        outputs = model(inputs)
        # Reshape outputs to (batch_size * sequence_length, vocab_size)
        outputs = outputs.view(-1, vocab_size)
        # Reshape targets to (batch_size * sequence_length)
        targets = targets.view(-1)

        loss = criterion(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Save the model
torch.save(model.state_dict(), 'symbol_generation_rnn_model.pth')



Epoch [1/5], Loss: 0.5073
Epoch [2/5], Loss: 0.4412
Epoch [3/5], Loss: 0.4245
Epoch [4/5], Loss: 0.3432
Epoch [5/5], Loss: 0.3543


In [46]:
import torch.nn.functional as F

# Function to generate symbols
def generate_text(model, next_symbols, seq_length):
    sequences = []
    for _ in range(1000):
        start_text = '.'
        symbols = [start_text]
        for _ in range(next_symbols):
            x = torch.tensor([[symbol_to_idx[s] for s in symbols[-seq_length:]]], dtype=torch.long)
            y_pred = model(x)
            y_pred = y_pred[:, -1, :]  # Use the last time step output
            y_pred = F.softmax(y_pred, dim=1)
            last_symbol_id = torch.multinomial(y_pred, num_samples=1).item()
            while last_symbol_id == 0:
                last_symbol_id = torch.multinomial(y_pred, num_samples=1).item()
            symbols.append(idx_to_symbol[last_symbol_id])
        sequences.append(''.join(symbols))
    return sequences

# Example usage
generated_sequences = generate_text(model, 79, seq_length)
for seq in generated_sequences:
    print(seq)


.c1ohpo1b1xxddomoooammmaomnhomllap1q1sm1r1q1n1p1l1aaaaaaaaaaaaaaaaaaaaaaaaaadaaaaaaataaaaaa
.c1ohpp1b1yyddomoooammmaomniplllaaaaaaaaaaaaaaaaaaaaaaaaaaaaomaddomnipmmlat1v1mt1aaaaa
.c1ohrm1zwwddomoooammmaomnipnmlat1v1pu1w1v1q1u1u1aaaaaaomahetpoooammmaomniplllaaaaaaaaaaad
.d1ohrm1zwwddomoooammmaomniplllaaaaaaaaaaaaaaaaaaaaaaaaaaaaadng1aaaaaaaaaaaaaaaaaaa
.d1ohpp1b1xxddomoooammmaomniplllaaaaaaaaaaaaaaaomaeerpoooammmaomniplllaaaaaaaaaaaaa
.f1ohqo1a1xxddomoooammmaomnipmmlas1u1ms1aaaaaaomagetpoooammmaomnhomllak1n1sk1aaaaaaaaaadn
.d1ohqo1a1xxddomoooammmaomnipnmlas1v1pt1v1v1n1u1u1aaaaaaomafdsmoooammmaomniplllaaaaaaaaaaom
.e1ohrl1yvvddomoooammmaomnipmmlat1v1lt1aaaaaaaaaaaaaabng1aaaaam1q1r1j1o1o1aaaaaaaaaaaaaadng1a
.e1ohtj1vuuddomoooammmaomniplllaaaaaaaaaaaaaaaaaaaaaaomaddomllal1o1rl1aaaaaaaaaaaaabn
.e1ohsl1yvvddomoooammmaomniplllaaaaaaaaaaaaomaddomnipmmlat1v1qu1aaaaaaaaaaaaaaaadng1aa
.d1ohr1odcbfdsmoooammmaomnhonllan1q1pm1r1q1n1p1l1aaaaaaaaaaomaddomllak1n1sk1aaaaaaaaadng1aaaaa
.e1oks1ueeajazattt

In [47]:
# Write to a text file
# Split the big string by newlines


# Write the big string to a text file
# Write to a text file
with open('/content/drive/My Drive/Colab Notebooks/rnn_dof/output_text_file.txt', 'w', encoding='utf-8') as file:
    for string in generated_sequences:
        file.write(string + '\n')

print("Strings have been written to 'output_text_file.txt'")

Strings have been written to 'output_text_file.txt'
