<a href="https://colab.research.google.com/github/dineshkumar-2003/Datascience-projects/blob/main/Next_word_prediction_formula.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import math
from collections import Counter


# --- Dataset ---
sentences = [
    "i like to eat pizza",
    "i like to eat sushi",
    "he runs to eat pizza",
    "i like to eat dosa",
    "i like to eat pongal",
    "i like to eat idly",
]


tokens = [word.lower() for s in sentences for word in s.split()]
counts = Counter(tokens)

# Add special tokens
vocab = ["<pad>", "<unk>"] + sorted(set(counts))
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}
vocab_size = len(vocab)

print("Vocab:", vocab)

# --- Config ---
#vocab = ["<pad>", "<unk>", "i", "like", "to", "eat", "pizza", "sushi", "he", "runs"]
vocab_size = len(vocab)
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}
embedding_dim = 8
seq_len = 3
num_heads = 2
hidden_dim = 16
learning_rate = 0.01
epochs = 500

# --- Tokenizer ---
def tokenize(sentence):
    return [word_to_idx.get(word.lower(), word_to_idx["<unk>"]) for word in sentence.split()]


data = []
for s in sentences:
    tokens = tokenize(s)
    for i in range(len(tokens) - seq_len):
        context = tokens[i:i + seq_len]
        next_word = tokens[i + seq_len]
        data.append((context, next_word))

# --- Positional Encoding ---
def get_positional_encoding(seq_len, d_model):
    PE = np.zeros((seq_len, d_model))
    for pos in range(seq_len):
        for i in range(0, d_model, 2):
            PE[pos, i] = math.sin(pos / (10000 ** ((2 * i)/d_model)))
            if i+1 < d_model:
                PE[pos, i + 1] = math.cos(pos / (10000 ** ((2 * i)/d_model)))
    return PE

# --- Attention ---
def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=-1, keepdims=True)

def scaled_dot_product_attention(Q, K, V):
    d_k = Q.shape[-1]
    scores = Q @ K.T / np.sqrt(d_k)  # (seq_len, seq_len)
    attn = softmax(scores)
    return attn @ V  # (seq_len, depth)

# --- Multi-head Attention ---
class MultiHeadAttention:
    def __init__(self, d_model, num_heads):
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        self.Wq = [np.random.randn(d_model, self.depth) * 0.1 for _ in range(num_heads)]
        self.Wk = [np.random.randn(d_model, self.depth) * 0.1 for _ in range(num_heads)]
        self.Wv = [np.random.randn(d_model, self.depth) * 0.1 for _ in range(num_heads)]
        self.Wo = np.random.randn(d_model, d_model) * 0.1

    def forward(self, x):
        heads = []
        for i in range(self.num_heads):
            Q = x @ self.Wq[i]
            K = x @ self.Wk[i]
            V = x @ self.Wv[i]
            head = scaled_dot_product_attention(Q, K, V)
            heads.append(head)
        concat = np.concatenate(heads, axis=-1)
        return concat @ self.Wo

# --- Feed Forward ---
class FeedForward:
    def __init__(self, d_model, hidden_dim):
        self.W1 = np.random.randn(d_model, hidden_dim) * 0.1
        self.b1 = np.zeros((hidden_dim,))
        self.W2 = np.random.randn(hidden_dim, d_model) * 0.1
        self.b2 = np.zeros((d_model,))

    def forward(self, x):
        return np.maximum(0, x @ self.W1 + self.b1) @ self.W2 + self.b2

# --- Transformer Block ---
class TransformerBlock:
    def __init__(self, d_model, num_heads, hidden_dim):
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, hidden_dim)

    def forward(self, x):
        x = x + self.mha.forward(x)
        x = x + self.ffn.forward(x)
        return x

# --- Simple Transformer ---
class SimpleTransformer:
    def __init__(self, vocab_size, d_model, seq_len, num_heads, hidden_dim):
        self.embeddings = np.random.randn(vocab_size, d_model) * 0.1
        self.positional_encoding = get_positional_encoding(seq_len, d_model)
        self.block = TransformerBlock(d_model, num_heads, hidden_dim)
        self.output_layer = np.random.randn(d_model, vocab_size) * 0.1

    def forward(self, x_ids):
        x = self.embeddings[x_ids] + self.positional_encoding
        x = self.block.forward(x)
        final_token = x[-1]
        logits = final_token @ self.output_layer
        return softmax(logits)

# --- Training ---
model = SimpleTransformer(vocab_size, embedding_dim, seq_len, num_heads, hidden_dim)

for epoch in range(epochs):
    total_loss = 0
    for context, target in data:
        probs = model.forward(context)
        target_one_hot = np.zeros(vocab_size)
        target_one_hot[target] = 1
        loss = -np.log(probs[target] + 1e-9)
        total_loss += loss

        # Gradient descent (manual)
        grad_output = probs.copy()
        grad_output[target] -= 1
        grad = np.outer(model.block.forward(model.embeddings[context] + model.positional_encoding)[-1], grad_output)
        model.output_layer -= learning_rate * grad

    if epoch % 100 == 0:
        print(f"Epoch {epoch}, Loss: {total_loss:.4f}")

# # --- Prediction Function ---
# def predict_next_word(context_words):
#     x_ids = tokenize(" ".join(context_words))
#     probs = model.forward(x_ids)
#     return idx_to_word[np.argmax(probs)]

# # Predict
# predict_next_word(["i", "like", "to", "eat"])

def sample_with_temperature(probs, temperature=1.0):
    if temperature == 0:
        return np.argmax(probs)
    scaled = np.log(probs + 1e-9) / temperature
    scaled = np.exp(scaled) / np.sum(np.exp(scaled))
    return np.random.choice(len(probs), p=scaled)



def generate_sequence(model, context_words, num_words):
    generated = context_words.copy()

    for _ in range(num_words):
        # Trim or pad to seq_len
        input_seq = generated[-seq_len:]
        if len(input_seq) < seq_len:
            padding = ["<pad>"] * (seq_len - len(input_seq))
            input_seq = padding + input_seq

        x_ids = tokenize(" ".join(input_seq))
        probs = model.forward(x_ids)
        next_word_idx = sample_with_temperature(probs, temperature=1.0)  # 0.7–1.0 is usually nice
        next_word = idx_to_word[next_word_idx]
        generated.append(next_word)

    return " ".join(generated)


generated_text = generate_sequence(model, ["i", "like", "to"], num_words=5)
print("Generated:", generated_text)


Vocab: ['<pad>', '<unk>', 'dosa', 'eat', 'he', 'i', 'idly', 'like', 'pizza', 'pongal', 'runs', 'sushi', 'to']
Epoch 0, Loss: 30.7654
Epoch 100, Loss: 17.4240
Epoch 200, Loss: 15.7363
Epoch 300, Loss: 14.6583
Epoch 400, Loss: 13.8757
Generated: i like to pizza dosa pongal eat eat


In [None]:
import numpy as np
import math
from collections import Counter

# --- Dataset ---
sentences = [
    "i like to eat pizza",
    "i like to eat sushi",
    "he runs to eat pizza",
    "i like to eat dosa",
    "i like to eat pongal",
    "i like to eat idly",
]

# --- Vocabulary Building ---
tokens = [word.lower() for s in sentences for word in s.split()] + ["<eos>"]
counts = Counter(tokens)
vocab = ["<pad>", "<unk>"] + sorted(set(counts))
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}
vocab_size = len(vocab)

# --- Config ---
embedding_dim = 8
seq_len = 3  # Now using 3-word context
num_heads = 2
hidden_dim = 16
learning_rate = 0.01
epochs = 1000

# --- Tokenizer ---
def tokenize(sentence):
    return [word_to_idx.get(word.lower(), word_to_idx["<unk>"]) for word in sentence.split()]

# --- Dataset Preparation ---
data = []
for s in sentences:
    tokens = tokenize(s) + [word_to_idx["<eos>"]]
    for i in range(len(tokens) - seq_len):
        context = tokens[i:i + seq_len]
        next_word = tokens[i + seq_len]
        data.append((context, next_word))

# --- Positional Encoding ---
def get_positional_encoding(seq_len, d_model):
    PE = np.zeros((seq_len, d_model))
    for pos in range(seq_len):
        for i in range(0, d_model, 2):
            PE[pos, i] = math.sin(pos / (10000 ** ((2 * i)/d_model)))
            if i+1 < d_model:
                PE[pos, i + 1] = math.cos(pos / (10000 ** ((2 * i)/d_model)))
    return PE

# --- Attention Components ---
def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=-1, keepdims=True)

def scaled_dot_product_attention(Q, K, V):
    d_k = Q.shape[-1]
    scores = Q @ K.T / np.sqrt(d_k)
    attn = softmax(scores)
    return attn @ V

class MultiHeadAttention:
    def __init__(self, d_model, num_heads):
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        self.Wq = [np.random.randn(d_model, self.depth) * 0.1 for _ in range(num_heads)]
        self.Wk = [np.random.randn(d_model, self.depth) * 0.1 for _ in range(num_heads)]
        self.Wv = [np.random.randn(d_model, self.depth) * 0.1 for _ in range(num_heads)]
        self.Wo = np.random.randn(d_model, d_model) * 0.1

    def forward(self, x):
        heads = []
        for i in range(self.num_heads):
            Q = x @ self.Wq[i]
            K = x @ self.Wk[i]
            V = x @ self.Wv[i]
            head = scaled_dot_product_attention(Q, K, V)
            heads.append(head)
        concat = np.concatenate(heads, axis=-1)
        return concat @ self.Wo

class FeedForward:
    def __init__(self, d_model, hidden_dim):
        self.W1 = np.random.randn(d_model, hidden_dim) * 0.1
        self.b1 = np.zeros((hidden_dim,))
        self.W2 = np.random.randn(hidden_dim, d_model) * 0.1
        self.b2 = np.zeros((d_model,))

    def forward(self, x):
        return np.maximum(0, x @ self.W1 + self.b1) @ self.W2 + self.b2

class TransformerBlock:
    def __init__(self, d_model, num_heads, hidden_dim):
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, hidden_dim)

    def forward(self, x):
        x = x + self.mha.forward(x)
        x = x + self.ffn.forward(x)
        return x

class SimpleTransformer:
    def __init__(self, vocab_size, d_model, seq_len, num_heads, hidden_dim):
        self.embeddings = np.random.randn(vocab_size, d_model) * 0.1
        self.positional_encoding = get_positional_encoding(seq_len, d_model)
        self.block = TransformerBlock(d_model, num_heads, hidden_dim)
        self.output_layer = np.random.randn(d_model, vocab_size) * 0.1

    def forward(self, x_ids):
        x = self.embeddings[x_ids] + self.positional_encoding
        x = self.block.forward(x)
        final_token = x[-1]  # Use last token
        logits = final_token @ self.output_layer
        return softmax(logits), final_token

# --- Training ---
model = SimpleTransformer(vocab_size, embedding_dim, seq_len, num_heads, hidden_dim)

for epoch in range(epochs):
    total_loss = 0
    for context, target in data:
        probs, last_token_embedding = model.forward(context)
        target_one_hot = np.zeros(vocab_size)
        target_one_hot[target] = 1
        loss = -np.log(probs[target] + 1e-9)
        total_loss += loss

        grad_output = probs.copy()
        grad_output[target] -= 1
        grad = np.outer(last_token_embedding, grad_output)

        # Update output layer
        model.output_layer -= learning_rate * grad

        # Update embeddings
        for i, word_id in enumerate(context):
            model.embeddings[word_id] -= learning_rate * grad_output @ model.output_layer.T

    if epoch % 200 == 0:
        print(f"Epoch {epoch}, Loss: {total_loss:.4f}")

# --- Prediction Utilities ---
def sample_with_temperature(probs, temperature=1.0):
    if temperature == 0:
        return np.argmax(probs)
    scaled = np.log(probs + 1e-9) / temperature
    scaled = np.exp(scaled) / np.sum(np.exp(scaled))
    return np.random.choice(len(probs), p=scaled)

def generate_sequence(model, context_words, num_words):
    generated = context_words.copy()
    for _ in range(num_words):
        input_seq = generated[-seq_len:]
        if len(input_seq) < seq_len:
            padding = ["<pad>"] * (seq_len - len(input_seq))
            input_seq = padding + input_seq
        x_ids = tokenize(" ".join(input_seq))
        probs, _ = model.forward(x_ids)
        next_word_idx = sample_with_temperature(probs, temperature=0.8)
        next_word = idx_to_word[next_word_idx]
        if next_word == "<eos>":
            break
        generated.append(next_word)
    return " ".join(generated)

def predict_next_word(context_words):
    x_ids = tokenize(" ".join(context_words))
    probs, _ = model.forward(x_ids)
    return idx_to_word[np.argmax(probs)]

# --- Example Predictions ---
print("Next word after 'i like to':", predict_next_word(["i", "like", "to"]))
print("Generated sequence:", generate_sequence(model, ["i", "like", "to"], num_words=5))
print("Generated sequence:", generate_sequence(model, ["hr", "runs"], num_words=5))


Epoch 0, Loss: 47.8853
Epoch 200, Loss: 14.9743
Epoch 400, Loss: 13.8410
Epoch 600, Loss: 15.3992
Epoch 800, Loss: 18.4507
Next word after 'i like to': eat
Generated sequence: i like to eat idly
Generated sequence: hr runs eat idly
