In [None]:
!pip install tensorflow numpy pillow

import numpy as np
import os
import pickle
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import to_categorical
from PIL import Image, UnidentifiedImageError

MAX_CAPTION_LENGTH = 100
VOCAB_SIZE = 10000
EMBEDDING_DIM = 128
LSTM_UNITS = 128
BATCH_SIZE = 2
EPOCHS = 20

from google.colab import files

!rm -rf images
!rm -rf models
os.makedirs('images', exist_ok=True)
os.makedirs('models', exist_ok=True)

def upload_file(prompt, required_name):
    print(prompt)
    uploaded = files.upload()
    while required_name not in uploaded:
        print(f"Error: File must be named exactly '{required_name}'")
        print("Please upload again:")
        uploaded = files.upload()
    return uploaded

upload_file("1. Please upload your captions.txt file:", "captions.txt")
upload_file("\n2. Please upload your images (as ZIP file):", "images.zip")

print("\nUnzipping images...")
!unzip -o images.zip -d images/
print("\nFiles in images directory:", os.listdir('images'))

BASE_DIR = '/content'
IMAGE_DIR = os.path.join(BASE_DIR, "images", "images")
CAPTION_FILE = os.path.join(BASE_DIR, "captions.txt")
MODEL_SAVE_PATH = os.path.join(BASE_DIR, "models", "image_captioning_model.h5")
TOKENIZER_SAVE_PATH = os.path.join(BASE_DIR, "models", "tokenizer.pkl")

def load_captions(caption_file):
    captions = {}
    with open(caption_file, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split(',', 1)
            if len(parts) != 2:
                continue
            img_id, caption = parts
            captions.setdefault(img_id.strip(), []).append('startseq ' + caption.strip().lower() + ' endseq')
    return captions

def extract_features(image_dir):
    model = VGG16(weights='imagenet', include_top=False, pooling='avg')
    features = {}
    for img_name in os.listdir(image_dir):
        if not img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
            continue
        try:
            img_path = os.path.join(image_dir, img_name)
            img = Image.open(img_path).convert('RGB').resize((224, 224))
            img_array = img_to_array(img) / 255.0
            features[os.path.splitext(img_name)[0]] = model.predict(np.expand_dims(img_array, axis=0), verbose=0).flatten()
        except:
            continue
    return features

def prepare_all_data(captions, features, tokenizer):
    X1, X2, y = [], [], []
    for img_id, caps in captions.items():
        if img_id not in features:
            continue
        for cap in caps:
            seq = tokenizer.texts_to_sequences([cap])[0]
            for i in range(1, len(seq)):
                X1.append(features[img_id])
                X2.append(pad_sequences([seq[:i]], maxlen=MAX_CAPTION_LENGTH)[0])
                y.append(to_categorical(seq[i], num_classes=vocab_size))
    return [np.array(X1), np.array(X2)], np.array(y)

def build_model():
    input1 = Input(shape=(512,))
    fe1 = Dense(128, activation='relu')(input1)

    input2 = Input(shape=(MAX_CAPTION_LENGTH,))
    se1 = Embedding(vocab_size, EMBEDDING_DIM)(input2)
    se2 = LSTM(LSTM_UNITS)(se1)

    decoder1 = add([fe1, se2])
    decoder2 = Dense(128, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)

    model = Model(inputs=[input1, input2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

def generate_caption_beam_search(model, tokenizer, photo_feature, max_length=30, beam_index=3):
    start = [[[tokenizer.word_index['startseq']], 0.0]]
    while len(start[0][0]) < max_length:
        temp = []
        for s in start:
            sequence = pad_sequences([s[0]], maxlen=max_length)
            preds = model.predict([photo_feature, sequence], verbose=0)
            top_preds = np.argsort(preds[0])[-beam_index:]
            for w in top_preds:
                next_seq, prob = s[0][:], s[1]
                next_seq.append(w)
                prob += np.log(preds[0][w] + 1e-10)
                temp.append([next_seq, prob])
        start = sorted(temp, reverse=False, key=lambda l: l[1])[-beam_index:]
    final_seq = start[-1][0]
    caption = [tokenizer.index_word.get(i, '') for i in final_seq]
    caption = ' '.join(caption)
    caption = caption.replace('startseq', '').strip()
    if 'endseq' in caption:
        caption = caption[:caption.index('endseq')].strip()
    return caption

if __name__ == "__main__":
    tf.config.set_visible_devices([], 'GPU')
    print("\n=== STARTING PIPELINE ===")

    captions = load_captions(CAPTION_FILE)
    features = extract_features(IMAGE_DIR)

    all_captions = [cap for caps in captions.values() for cap in caps]
    tokenizer = Tokenizer(num_words=VOCAB_SIZE, oov_token="<OOV>", filters='')
    tokenizer.fit_on_texts(all_captions)
    vocab_size = len(tokenizer.word_index) + 1

    X_train, y_train = prepare_all_data(captions, features, tokenizer)

    actual_batch_size = min(BATCH_SIZE, len(y_train))

    model = build_model()
    model.fit(X_train, y_train, batch_size=actual_batch_size, epochs=EPOCHS, verbose=2, validation_split=0.2)

    model.save(MODEL_SAVE_PATH)
    with open(TOKENIZER_SAVE_PATH, 'wb') as f:
        pickle.dump(tokenizer, f)

    print("\n=== TESTING ===")
    test_images = [f for f in os.listdir(IMAGE_DIR) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    if test_images:
        test_image = os.path.join(IMAGE_DIR, test_images[0])
        print(f"Testing on: {test_images[0]}")
        img = Image.open(test_image).convert('RGB').resize((224, 224))
        img_array = img_to_array(img) / 255.0
        photo_feature = features[os.path.splitext(test_images[0])[0]].reshape((1, 512))

        caption = generate_caption_beam_search(model, tokenizer, photo_feature, max_length=MAX_CAPTION_LENGTH, beam_index=5)
        print("\nGenerated Caption:", caption)
    else:
        print("No test images available")

    print("\n=== PROCESS COMPLETED ===")