<a href="https://colab.research.google.com/github/PaulNjinu254/Updated-Seq2Seq/blob/main/Copy_of_Seq2Seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
"""
Title: Character-level recurrent sequence-to-sequence model
Author: [fchollet](https://twitter.com/fchollet)
Date created: 2017/09/29
Last modified: 2023/11/22
Description: Character-level recurrent sequence-to-sequence model.
Accelerator: GPU
"""

"""
## Introduction

This example demonstrates how to implement a basic character-level
recurrent sequence-to-sequence model. We apply it to translating
short English sentences into short French sentences,
character-by-character. Note that it is fairly unusual to
do character-level machine translation, as word-level
models are more common in this domain.

**Summary of the algorithm**

- We start with input sequences from a domain (e.g. English sentences)
    and corresponding target sequences from another domain
    (e.g. French sentences).
- An encoder LSTM turns input sequences to 2 state vectors
    (we keep the last LSTM state and discard the outputs).
- A decoder LSTM is trained to turn the target sequences into
    the same sequence but offset by one timestep in the future,
    a training process called "teacher forcing" in this context.
    It uses as initial state the state vectors from the encoder.
    Effectively, the decoder learns to generate `targets[t+1...]`
    given `targets[...t]`, conditioned on the input sequence.
- In inference mode, when we want to decode unknown input sequences, we:
    - Encode the input sequence into state vectors
    - Start with a target sequence of size 1
        (just the start-of-sequence character)
    - Feed the state vectors and 1-char target sequence
        to the decoder to produce predictions for the next character
    - Sample the next character using these predictions
        (we simply use argmax).
    - Append the sampled character to the target sequence
    - Repeat until we generate the end-of-sequence character or we
        hit the character limit.
"""

"""
## Setup
"""

# ==============================
# Imports
# ==============================
import numpy as np
import keras
import os
from pathlib import Path
import requests
import zipfile
import io

# ==============================
# Download and extract dataset with User-Agent spoof
# ==============================
url = "http://www.manythings.org/anki/fra-eng.zip"
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) "
                  "Chrome/115.0 Safari/537.36"
}

print("Downloading dataset...")
r = requests.get(url, headers=headers)
r.raise_for_status()

os.makedirs("data", exist_ok=True)
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
    z.extractall("data")

print("Dataset downloaded and extracted.")

data_path = os.path.join("data", "fra.txt")

# ==============================
# Configuration
# ==============================
batch_size = 64      # Batch size for training
epochs = 100         # Number of epochs to train for
latent_dim = 256     # Latent dimensionality of the encoding space
num_samples = 10000  # Number of samples to train on

# ==============================
# Prepare the data
# ==============================
input_texts = []
target_texts = []
input_characters = set()
target_characters = set()

with open(data_path, "r", encoding="utf-8") as f:
    lines = f.read().split("\n")

for line in lines[: min(num_samples, len(lines) - 1)]:
    input_text, target_text, _ = line.split("\t")
    target_text = "\t" + target_text + "\n"  # start & end tokens
    input_texts.append(input_text)
    target_texts.append(target_text)
    input_characters.update(list(input_text))
    target_characters.update(list(target_text))

input_characters = sorted(list(input_characters))
target_characters = sorted(list(target_characters))

num_encoder_tokens = len(input_characters)
num_decoder_tokens = len(target_characters)
max_encoder_seq_length = max(len(txt) for txt in input_texts)
max_decoder_seq_length = max(len(txt) for txt in target_texts)

print("Number of samples:", len(input_texts))
print("Number of unique input tokens:", num_encoder_tokens)
print("Number of unique output tokens:", num_decoder_tokens)
print("Max sequence length for inputs:", max_encoder_seq_length)
print("Max sequence length for outputs:", max_decoder_seq_length)

input_token_index = {char: i for i, char in enumerate(input_characters)}
target_token_index = {char: i for i, char in enumerate(target_characters)}

encoder_input_data = np.zeros(
    (len(input_texts), max_encoder_seq_length, num_encoder_tokens), dtype="float32"
)
decoder_input_data = np.zeros(
    (len(input_texts), max_decoder_seq_length, num_decoder_tokens), dtype="float32"
)
decoder_target_data = np.zeros(
    (len(input_texts), max_decoder_seq_length, num_decoder_tokens), dtype="float32"
)

for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
    for t, char in enumerate(input_text):
        encoder_input_data[i, t, input_token_index[char]] = 1.0
    encoder_input_data[i, t + 1 :, input_token_index[" "]] = 1.0
    for t, char in enumerate(target_text):
        decoder_input_data[i, t, target_token_index[char]] = 1.0
        if t > 0:
            decoder_target_data[i, t - 1, target_token_index[char]] = 1.0
    decoder_input_data[i, t + 1 :, target_token_index[" "]] = 1.0
    decoder_target_data[i, t:, target_token_index[" "]] = 1.0

# ==============================
# Build the model
# ==============================
encoder_inputs = keras.Input(shape=(None, num_encoder_tokens))
encoder = keras.layers.LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_inputs)
encoder_states = [state_h, state_c]

decoder_inputs = keras.Input(shape=(None, num_decoder_tokens))
decoder_lstm = keras.layers.LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
decoder_dense = keras.layers.Dense(num_decoder_tokens, activation="softmax")
decoder_outputs = decoder_dense(decoder_outputs)

model = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

# ==============================
# Train the model
# ==============================
model.compile(optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"])
model.fit(
    [encoder_input_data, decoder_input_data],
    decoder_target_data,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
)
model.save("s2s_model.keras")

# ==============================
# Inference models
# ==============================
model = keras.models.load_model("s2s_model.keras")

encoder_inputs = model.input[0]
encoder_outputs, state_h_enc, state_c_enc = model.layers[2].output
encoder_model = keras.Model(encoder_inputs, [state_h_enc, state_c_enc])

decoder_inputs = model.input[1]
decoder_state_input_h = keras.Input(shape=(latent_dim,))
decoder_state_input_c = keras.Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

decoder_lstm = model.layers[3]
decoder_outputs, state_h_dec, state_c_dec = decoder_lstm(
    decoder_inputs, initial_state=decoder_states_inputs
)
decoder_states = [state_h_dec, state_c_dec]
decoder_dense = model.layers[4]
decoder_outputs = decoder_dense(decoder_outputs)

decoder_model = keras.Model(
    [decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states
)

reverse_input_char_index = {i: char for char, i in input_token_index.items()}
reverse_target_char_index = {i: char for char, i in target_token_index.items()}

# ==============================
# Decode function
# ==============================
def decode_sequence(input_seq):
    states_value = encoder_model.predict(input_seq, verbose=0)
    target_seq = np.zeros((1, 1, num_decoder_tokens))
    target_seq[0, 0, target_token_index["\t"]] = 1.0

    decoded_sentence = ""
    while True:
        output_tokens, h, c = decoder_model.predict([target_seq] + states_value, verbose=0)
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = reverse_target_char_index[sampled_token_index]
        decoded_sentence += sampled_char

        if sampled_char == "\n" or len(decoded_sentence) > max_decoder_seq_length:
            break

        target_seq = np.zeros((1, 1, num_decoder_tokens))
        target_seq[0, 0, sampled_token_index] = 1.0
        states_value = [h, c]

    return decoded_sentence

# ==============================
# Test decoding
# ==============================
for seq_index in range(20):
    input_seq = encoder_input_data[seq_index : seq_index + 1]
    decoded_sentence = decode_sequence(input_seq)
    print("-")
    print("Input sentence:", input_texts[seq_index])
    print("Decoded sentence:", decoded_sentence)


Downloading dataset...
Dataset downloaded and extracted.
Number of samples: 10000
Number of unique input tokens: 70
Number of unique output tokens: 91
Max sequence length for inputs: 14
Max sequence length for outputs: 59
Epoch 1/100
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 348ms/step - accuracy: 0.7053 - loss: 1.5669 - val_accuracy: 0.7175 - val_loss: 1.0727
Epoch 2/100
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m45s[0m 357ms/step - accuracy: 0.7463 - loss: 0.9729 - val_accuracy: 0.7183 - val_loss: 0.9866
Epoch 3/100
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m84s[0m 370ms/step - accuracy: 0.7629 - loss: 0.8627 - val_accuracy: 0.7502 - val_loss: 0.8693
Epoch 4/100
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 346ms/step - accuracy: 0.7868 - loss: 0.7744 - val_accuracy: 0.7775 - val_loss: 0.7724
Epoch 5/100
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 355ms/step - accuracy: 0.8020 - los

In [20]:
# Imports
import os
import zipfile
import pickle
from pathlib import Path
from PIL import Image
import torch
import torch.nn as nn
import torchvision.transforms as transforms

# Google Drive mount
from google.colab import drive
print("[INFO] Mounting Google Drive...")
drive.mount('/content/drive', force_remount=False)

# Paths
MODEL_ZIP_PATH = "/content/drive/MyDrive/pretrained_model.zip"
EXTRACT_DIR = "/content/pretrained_model"
VOCAB_PATH = "/content/drive/MyDrive/vocab.pkl"
IMAGES_DIR = "/content/drive/MyDrive/Graduation_Photos"
OUTPUT_PATH = "/content/drive/MyDrive/captions_results.txt"

# Vocabulary class
class Vocabulary(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0

    def add_word(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def __call__(self, word):
        return self.word2idx.get(word, self.word2idx.get('<unk>', 0))

    def __len__(self):
        return len(self.word2idx)

    def idx2word(self, idx):
        return self.idx2word.get(idx, '<unk>')

# Extract model zip
os.makedirs(EXTRACT_DIR, exist_ok=True)
if MODEL_ZIP_PATH and os.path.exists(MODEL_ZIP_PATH):
    print(f"[INFO] Extracting model zip {MODEL_ZIP_PATH} -> {EXTRACT_DIR} ...")
    with zipfile.ZipFile(MODEL_ZIP_PATH, 'r') as z:
        z.extractall(EXTRACT_DIR)
else:
    print(f"[INFO] Model zip not found at {MODEL_ZIP_PATH}. If your encoder/decoder are already extracted, ensure they are in {EXTRACT_DIR}.")

print("[INFO] Extract dir contents:", os.listdir(EXTRACT_DIR))

# Load vocabulary (robust to different pickle formats)
print("[INFO] Loading vocabulary from", VOCAB_PATH)
with open(VOCAB_PATH, 'rb') as f:
    raw_vocab = pickle.load(f)

# Normalize vocab into an object with word2idx and idx2word attributes
if isinstance(raw_vocab, Vocabulary):
    vocab_obj = raw_vocab
elif isinstance(raw_vocab, dict):
    # try to detect common formats
    if 'word2idx' in raw_vocab and 'idx2word' in raw_vocab:
        vocab_obj = Vocabulary()
        vocab_obj.word2idx = raw_vocab['word2idx']
        vocab_obj.idx2word = raw_vocab['idx2word']
    elif 'stoi' in raw_vocab and 'itos' in raw_vocab:
        vocab_obj = Vocabulary()
        vocab_obj.word2idx = raw_vocab['stoi']
        # build idx2word
        vocab_obj.idx2word = {i: w for i, w in enumerate(raw_vocab['itos'])}
    elif 'itos' in raw_vocab:
        # itos = list index->word
        itos = raw_vocab['itos']
        vocab_obj = Vocabulary()
        vocab_obj.idx2word = {i: w for i, w in enumerate(itos)}
        vocab_obj.word2idx = {w: i for i, w in enumerate(itos)}
    else:
        try:
            # detect if keys are ints
            if all(isinstance(k, int) for k in raw_vocab.keys()):
                vocab_obj = Vocabulary()
                vocab_obj.idx2word = {int(k): v for k, v in raw_vocab.items()}
                vocab_obj.word2idx = {v: int(k) for k, v in raw_vocab.items()}
            else:
                # assume it's word->idx
                vocab_obj = Vocabulary()
                vocab_obj.word2idx = {k: int(v) for k, v in raw_vocab.items()}
                vocab_obj.idx2word = {int(v): k for k, v in raw_vocab.items()}
        except Exception:
            raise RuntimeError("Unrecognized vocab.pkl format. Inspect the pickle content.")
else:
    raise RuntimeError("Unrecognized vocab.pkl type. Expected Vocabulary object or dict.")

print(f"[INFO] Vocab loaded. Size = {len(vocab_obj)} words")

# Helper for idx->word
def idx_to_word(idx):
    # vocab_obj stores idx2word mapping as dict
    return vocab_obj.idx2word.get(int(idx), '<unk>')

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("[INFO] Device:", device)

# Model definitions
import torchvision.models as models

class EncoderCNN(nn.Module):
    def __init__(self, embed_size=256):
        super().__init__()
        # using resnet152 feature extractor
        resnet = models.resnet152(weights=models.ResNet152_Weights.IMAGENET1K_V2)
        for param in resnet.parameters():
            param.requires_grad = False
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.linear = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)

    def forward(self, images):
        with torch.no_grad():
            features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.bn(self.linear(features))
        return features

class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.num_layers = num_layers
        self.hidden_size = hidden_size

    def forward(self, features, captions):
        embeddings = self.embed(captions[:, :-1])
        inputs = torch.cat((features.unsqueeze(1), embeddings), dim=1)
        hiddens, _ = self.lstm(inputs)
        outputs = self.linear(hiddens)
        return outputs

    def sample(self, features, states=None, max_len=20):
        sampled_ids =_


    def sample(self, features, states=None, max_len=20):
        """Generate captions for given image features using greedy search."""
        sampled_ids = []
        inputs = features.unsqueeze(1)
        for _ in range(max_len):
            hiddens, states = self.lstm(inputs, states)
            outputs = self.linear(hiddens.squeeze(1))
            _, predicted = outputs.max(1)
            sampled_ids.append(predicted.item())
            if idx_to_word(predicted.item()) == '<end>':
                break
            inputs = self.embed(predicted).unsqueeze(1)
        return sampled_ids

# 7) Load trained model weights
print("[INFO] Loading pretrained models...")
encoder_path = os.path.join(EXTRACT_DIR, "encoder-5-3000.pkl")
decoder_path = os.path.join(EXTRACT_DIR, "decoder-5-3000.pkl")

embed_size = 256
hidden_size = 512
vocab_size = len(vocab_obj)

encoder = EncoderCNN(embed_size).to(device)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size).to(device)

encoder.load_state_dict(torch.load(encoder_path, map_location=device))
decoder.load_state_dict(torch.load(decoder_path, map_location=device))

encoder.eval()
decoder.eval()
print("[INFO] Models loaded successfully.")

# 8) Image preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.485, 0.456, 0.406),
                         std=(0.229, 0.224, 0.225))
])

# Caption generation function
def generate_caption(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        features = encoder(image_tensor)
        sampled_ids = decoder.sample(features)
    words = [idx_to_word(word_id) for word_id in sampled_ids]
    # remove <start> and <end> tokens
    words = [w for w in words if w not in ["<start>", "<end>"]]
    return " ".join(words)

# Run captioning on images
results = []
image_files = [f for f in os.listdir(IMAGES_DIR) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

for img_name in image_files:
    img_path = os.path.join(IMAGES_DIR, img_name)
    caption = generate_caption(img_path)
    print(f"{img_name} -> {caption}")
    results.append(f"{img_name}\t{caption}")

# Save results
with open(OUTPUT_PATH, "w") as f:
    for line in results:
        f.write(line + "\n")

print(f"[INFO] Captions saved to {OUTPUT_PATH}")

[INFO] Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[INFO] Extracting model zip /content/drive/MyDrive/pretrained_model.zip -> /content/pretrained_model ...
[INFO] Extract dir contents: ['encoder-5-3000.pkl', 'decoder-5-3000.pkl']
[INFO] Loading vocabulary from /content/drive/MyDrive/vocab.pkl
[INFO] Vocab loaded. Size = 9956 words
[INFO] Device: cpu
[INFO] Loading pretrained models...


Downloading: "https://download.pytorch.org/models/resnet152-f82ba261.pth" to /root/.cache/torch/hub/checkpoints/resnet152-f82ba261.pth
100%|██████████| 230M/230M [00:03<00:00, 78.2MB/s]


[INFO] Models loaded successfully.
IMG_20240313_173320.jpg -> a bunch of different types of scissors on a table .
IMG_9837.JPG -> a person holding a frisbee in his mouth .
njinu 1.jpg -> a man wearing a hat and tie with a hat .
IMG-20231123-WA0010.jpg -> a man with a hat and a tie in his hand .
IMG20231124092344.jpg -> a man wearing a suit and tie with a hat .
DSC_2026.JPG -> a man holding a baby in a blanket on a bed .
IMG_20231124_085132.jpg -> a man and woman in a green dress holding a frisbee .
IMG-20231123-WA0015.jpg -> a man with a hat and a tie in his hand .
IMG-20231123-WA0002.jpg -> a group of people standing in front of a large crowd .
[INFO] Captions saved to /content/drive/MyDrive/captions_results.txt


In [None]:
# Running It with Keras Instead of PyTorch
'''
If you have a PyTorch implementation but want to run it in Keras:

Steps:

Model Architecture Conversion

Identify the layers and structure in the PyTorch model.py.

Recreate the architecture in Keras using tf.keras.layers equivalents (e.g., nn.Linear → Dense, nn.Conv2d → Conv2D, nn.LSTM → LSTM).

Keep parameter sizes identical so weights can be mapped.

Weight Conversion

PyTorch and Keras use different formats for weights.

Use a library like onnx to export the PyTorch model to the ONNX format, then load into TensorFlow/Keras via onnx-tf or tf2onnx.

Alternatively, manually load the .pth file, extract tensors with state_dict(), and assign them to Keras layers with layer.set_weights(), making sure dimensions match.

Tokenizer & Data Preprocessing

Replace PyTorch text/image preprocessing (torchvision.transforms, custom tokenizers) with Keras equivalents (tf.image, Tokenizer, TextVectorization).

Training / Inference Adjustments

Inference steps (model.eval() in PyTorch) translate to model.predict() in Keras.

Batch handling will be via tf.data pipelines instead of PyTorch DataLoader.
'''


'''
Translating Between Japanese and English
Steps:

Use a Japanese tokenizer like MeCab or SentencePiece (because Japanese text does not have spaces).

Prepare parallel corpus (e.g., JESC, Tatoeba, or Kyoto Free Translation Task dataset).

Train a Seq2Seq or Transformer model (Hugging Face’s MarianMT or T5 works well).

For inference, ensure proper preprocessing:

Japanese → tokenization (subwords)

English → detokenization



Advanced Machine Translation Methods

Attention Mechanisms (Bahdanau, Luong)

Transformers (Vaswani et al., 2017) — models like BERT, GPT, and MarianMT.

Multilingual Models — single model trained on multiple languages (mBART, mT5).

Pre-trained Models with Fine-tuning — start with a general MT model, fine-tune on your specific domain.



Generating Images from Text (Opposite of Captioning)
Techniques:

Diffusion Models (e.g., Stable Diffusion, DALL·E 2, MidJourney)

GANs (StackGAN, AttnGAN — text-conditioned image generation)

CLIP + Diffusion (guiding generation with text embeddings)

Neural Rendering (NeRF-based, though mainly 3D)

Basic Flow for Text-to-Image:

Encode text into vector representation (BERT/CLIP encoder).

Feed into generative model (Diffusion or GAN).

Generate image pixels conditioned on the text embedding.
'''