In [2]:
import json
import zipfile
import os
import random
from torch.optim.lr_scheduler import StepLR

In [3]:
# def read_json_from_zip(zip_path, json_filename):
#     with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#         with zip_ref.open(json_filename) as json_file:
#             data = json_file.read().decode('utf-8')
#             json_data = json.loads(data)
#             return json_data
        
def read_json_from_folder(folder_path, json_filename):
    file_path = os.path.join(folder_path, json_filename)
    with open(file_path, 'r', encoding='utf-8') as json_file:
        json_data = json.load(json_file)
    return json_data

In [4]:
recipe = read_json_from_folder("/kaggle/input/recipe","full_format_recipes.json")

In [5]:
filtered_data = [
        'Recipe for ' + x['title']+ ' | ' + ' '.join(x['directions']) for x in recipe
                                                                            if 'title' in x
                                                                            and x['title'] is not None
                                                                            and 'directions' in x
                                                                            and x['directions'] is not None
]

In [19]:
import re
import string

def pad_punctuation(s):
    s = re.sub(f"([{string.punctuation}])", r' \1 ', s)
    s = re.sub(' +', ' ', s)
    return s



import torch
from torch.utils.data import Dataset, DataLoader
from collections import Counter

class TextDataset(Dataset):
    def __init__(self, texts, vocab, seq_length):
        self.texts = texts
        self._dict = vocab
        self.vocab = lambda x:vocab.get(x)
        self.seq_length = seq_length
        self.revocab = {value: key for key, value in vocab.items()}
        self.token2word = lambda x :self.revocab.get(x)

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        tokens = self.texts[idx].split()
        token_ids = [self.vocab(token) if token in self._dict else self.vocab('<unk>') for token in tokens]
        token_ids = token_ids[:self.seq_length] + [self.vocab('<pad>')] * max(0, self.seq_length - len(token_ids))
        return torch.tensor(token_ids)

def build_vocab(texts, max_tokens=10000):
    counter = Counter()
    for text in texts:
        counter.update(text.split())
    sorted_vocab = sorted(counter.items(), key=lambda x: x[1], reverse=True)[:max_tokens-2]
    vocab_list = ['<pad>', '<unk>'] + [item[0] for item in sorted_vocab]
    vocab_dict = {word: idx for idx, word in enumerate(vocab_list)}
    return vocab_dict


In [20]:
text_data = [pad_punctuation(x) for x in filtered_data]
vocab = build_vocab(text_data, max_tokens=10000)
dataset = TextDataset(text_data, vocab, seq_length=200)
# data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

In [21]:
from torch.nn.utils.rnn import pad_sequence

def prepare_inputs(batch):
    x = batch[:, :-1]
    y = batch[:, 1:]
    return x, y

class MyCollate:
    def __call__(self, batch):
        batch = pad_sequence(batch, batch_first=True, padding_value=vocab.get('<pad>'))
        return prepare_inputs(batch)

collate_fn = MyCollate()
data_loader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)


In [22]:
for x, y in data_loader:
    print("Input (x):", x)
    print("Target (y):", y)
    print("x shape:", x.shape)
    print("y shape:", y.shape)
    break

Input (x): tensor([[  26,   16, 4629,  ..., 1012,    8,   11],
        [  26,   16, 1148,  ...,    0,    0,    0],
        [  26,   16, 1582,  ...,    0,    0,    0],
        ...,
        [  26,   16, 9098,  ...,    0,    0,    0],
        [  26,   16, 1189,  ...,    0,    0,    0],
        [  26,   16,    1,  ...,  723,   60, 5730]])
Target (y): tensor([[  16, 4629,   13,  ...,    8,   11,   23],
        [  16, 1148, 2470,  ...,    0,    0,    0],
        [  16, 1582,    4,  ...,    0,    0,    0],
        ...,
        [  16, 9098,  897,  ...,    0,    0,    0],
        [  16, 1189,  803,  ...,    0,    0,    0],
        [  16,    1, 9169,  ...,   60, 5730,    5]])
x shape: torch.Size([32, 199])
y shape: torch.Size([32, 199])


In [39]:
import torch
import torch.nn as nn
import torch.optim as optim

class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
#         self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.lstm = nn.GRU(embedding_dim, hidden_dim, batch_first=True,bidirectional=True)
        self.dropout = nn.Dropout(.2)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.softmax = nn.Softmax(dim=-1)
        
        
    

    def _initialize_weights(self):
        # Glorot initialization for embedding and fully connected layer
        nn.init.xavier_uniform_(self.embedding.weight)
        nn.init.xavier_uniform_(self.fc.weight)
        if self.fc.bias is not None:
            nn.init.zeros_(self.fc.bias)

        # Glorot initialization for LSTM input-hidden and hidden-hidden weights
        for name, param in self.lstm.named_parameters():
            if 'weight_ih' in name or 'weight_hh' in name:
                nn.init.xavier_uniform_(param.data)
            elif 'bias' in name:
                # Initialize biases to zero, except for forget gate bias
                param.data.fill_(0)
                if 'bias_ih' in name:
                    n = param.size(0)
                    param.data[n//4:n//2].fill_(1)  # Set forget gate bias to 1
                
    def forward(self, x):
        x = self.embedding(x)
        x,_ = self.lstm(x)
        x = self.dropout(x)
        x = self.fc(x)
        x = self.softmax(x)
        return x

In [40]:

vocab_size = 10000
embedding_dim = 150
hidden_dim = 128
output_dim = 10000

model = LSTMModel(vocab_size, embedding_dim, hidden_dim, output_dim)
model._initialize_weights()

In [41]:
# Loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=.03)
model.to("cuda")
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

In [None]:
epochs = 25
loss_list = []
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in data_loader:
        inputs, targets = batch
        inputs, targets = inputs.to("cuda"), targets.to("cuda")
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs.view(-1, outputs.shape[-1]), targets.reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    average_loss = total_loss / len(data_loader)
    loss_list.append(average_loss)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {average_loss:.4f}")
    scheduler.step()

Epoch 1/25, Loss: 8.9445
Epoch 2/25, Loss: 8.9393


In [None]:
import torch
import torch.nn as nn
import numpy as np

class TextGenerator:
    def __init__(self, model, index_to_word, top_k=10):
        self.model = model
        self.index_to_word = index_to_word
        self.word_to_index = {word: index for index, word in enumerate(index_to_word)}
        self.top_k = top_k

    def sample_from(self, probs, temperature):
        probs = probs.cpu().numpy()  # Convert to numpy array
        probs = probs ** (1 / temperature)
        probs = probs / np.sum(probs)
        return np.random.choice(len(probs), p=probs), probs

    def generate(self, start_prompt, max_tokens, temperature):
        start_tokens = [self.word_to_index.get(x, 1) for x in start_prompt.split()]
        sample_token = None
        info = []

        self.model.eval()  # Set the model to evaluation mode

        with torch.no_grad():
            while len(start_tokens) < max_tokens and sample_token != 0:
                x = torch.tensor([start_tokens]).long()
                if torch.cuda.is_available():
                    x = x.cuda()

                y = self.model(x)
                y = y[0, -1, :]  # Get the last token's predictions
                y = nn.functional.softmax(y, dim=-1)

                sample_token, probs = self.sample_from(y, temperature)
                info.append({'prompt': start_prompt, 'word_probs': probs})
                start_tokens.append(sample_token)
                start_prompt = start_prompt + ' ' + self.index_to_word.get(sample_token)

        print(f"\ngenerated text:\n{start_prompt}\n")
        return info

    def on_epoch_end(self, epoch, logs=None):
        self.generate("recipe for", max_tokens=100, temperature=1.0)

In [None]:
index_to_word = dataset.revocab

generator = TextGenerator(model, index_to_word)

In [38]:

generator.generate("recipe for grilled apple", max_tokens=100, temperature=0.3)

# for epoch in range(5):
#     generator.on_epoch_end(epoch)


generated text:
recipe for grilled apple apéritif Spanish North cellophane tostada Sushi Shrimp Omelet prickly pounding jícama aerates collects Crumbs seep winery coats Ball Need farmer dente scrub Pilsner Argentina mineral Savory when Another concentrate Overlap Saute preparation 105°F peats canning 28 caraway rancid twist begin church Alsatian themselves Crisp Blended cannellini capon Instead layered mitt drumette littleneck pastrami 101 mocha y snap reconstituted period Remember boil Bosc Ale candied red rather think peels coral Jumbo Cubes Gather Straddle 15minutes creases Pizzaiola ziplock overmix brew Cioppino rösti varietal lacquered concentrate wingtips container slice Rest freekeh adequate Fontina chunks—to syrups Shank overblend Wilt



[{'prompt': 'recipe for grilled apple',
  'word_probs': array([9.973049e-05, 9.973049e-05, 2.795009e-03, ..., 9.973049e-05,
         9.973049e-05, 9.973049e-05], dtype=float32)},
 {'prompt': 'recipe for grilled apple apéritif',
  'word_probs': array([9.9732155e-05, 9.9732155e-05, 2.7773038e-03, ..., 9.9732155e-05,
         9.9732155e-05, 9.9732155e-05], dtype=float32)},
 {'prompt': 'recipe for grilled apple apéritif Spanish',
  'word_probs': array([1.2028263e-03, 9.9877223e-05, 1.0638614e-04, ..., 9.9877223e-05,
         9.9877223e-05, 9.9877223e-05], dtype=float32)},
 {'prompt': 'recipe for grilled apple apéritif Spanish North',
  'word_probs': array([1.0025557e-04, 9.9731798e-05, 2.7810370e-03, ..., 9.9731798e-05,
         9.9731798e-05, 9.9731798e-05], dtype=float32)},
 {'prompt': 'recipe for grilled apple apéritif Spanish North cellophane',
  'word_probs': array([9.9730416e-05, 9.9730416e-05, 2.7953829e-03, ..., 9.9730416e-05,
         9.9730416e-05, 9.9730416e-05], dtype=float32)}

In [133]:
index_to_word.get(1)

'<unk>'