## Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install sentence-transformers
!pip install flask-ngrok
!pip install pyngrok
# !pip install ffmpeg-python
# !pip3 install SpeechRecognition
# !apt install libasound2-dev portaudio19-dev libportaudio2 libportaudiocpp0 ffmpeg
# !pip3 install PyAudio

Collecting sentence-transformers
  Downloading sentence-transformers-2.2.2.tar.gz (85 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.0/86.0 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting transformers<5.0.0,>=4.6.0 (from sentence-transformers)
  Downloading transformers-4.31.0-py3-none-any.whl (7.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m18.9 MB/s[0m eta [36m0:00:00[0m
Collecting sentencepiece (from sentence-transformers)
  Downloading sentencepiece-0.1.99-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m41.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting huggingface-hub>=0.4.0 (from sentence-transformers)
  Downloading huggingface_hub-0.16.4-py3-none-any.whl (268 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m268.8/268.8 kB

In [None]:
import os
import re
import cv2
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import efficientnet
from tensorflow.keras.layers import TextVectorization

from sentence_transformers import SentenceTransformer
from IPython.display import HTML, Audio
from google.colab.output import eval_js
from base64 import b64decode
import pickle
# from scipy.io.wavfile import read as wav_read
# import io
# import ffmpeg
# from scipy.io.wavfile import write
# import speech_recognition as sr

# r = sr.Recognizer()
sbert_model = SentenceTransformer('bert-base-nli-mean-tokens')

seed = 111
np.random.seed(seed)
tf.random.set_seed(seed)

Downloading (…)821d1/.gitattributes:   0%|          | 0.00/391 [00:00<?, ?B/s]

Downloading (…)_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Downloading (…)8d01e821d1/README.md:   0%|          | 0.00/3.95k [00:00<?, ?B/s]

Downloading (…)d1/added_tokens.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

Downloading (…)01e821d1/config.json:   0%|          | 0.00/625 [00:00<?, ?B/s]

Downloading (…)ce_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/438M [00:00<?, ?B/s]

Downloading (…)nce_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading (…)821d1/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/399 [00:00<?, ?B/s]

Downloading (…)8d01e821d1/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)1e821d1/modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

## Downloading the Flickr8K dataset

In [None]:
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_text.zip
!unzip -qq Flickr8k_Dataset.zip
!unzip -qq Flickr8k_text.zip
!rm Flickr8k_Dataset.zip Flickr8k_text.zip

In [None]:
IMAGES_PATH = "Flicker8k_Dataset"
IMAGE_SIZE = (299, 299)
VOCAB_SIZE = 10000
SEQ_LENGTH = 25
EMBED_DIM = 512
FF_DIM = 512
BATCH_SIZE = 64
EPOCHS = 1
AUTOTUNE = tf.data.AUTOTUNE

## Preparing the dataset

In [None]:
def load_captions_data(filename):
    with open(filename) as caption_file:
        caption_data = caption_file.readlines()
        caption_mapping = {}
        text_data = []
        images_to_skip = set()
        for line in caption_data:
            line = line.rstrip("\n")
            img_name, caption = line.split("\t")
            img_name = img_name.split("#")[0]
            img_name = os.path.join(IMAGES_PATH, img_name.strip())
            tokens = caption.strip().split()
            if len(tokens) < 5 or len(tokens) > SEQ_LENGTH:
                images_to_skip.add(img_name)
                continue
            if img_name.endswith("jpg") and img_name not in images_to_skip:
                caption = "<start> " + caption.strip() + " <end>"
                text_data.append(caption)

                if img_name in caption_mapping:
                    caption_mapping[img_name].append(caption)
                else:
                    caption_mapping[img_name] = [caption]
        for img_name in images_to_skip:
            if img_name in caption_mapping:
                del caption_mapping[img_name]
        return caption_mapping, text_data

def train_val_split(caption_data, train_size=0.8, shuffle=True):
    all_images = list(caption_data.keys())
    if shuffle:
        np.random.shuffle(all_images)
    train_size = int(len(caption_data) * train_size)
    training_data = {
        img_name: caption_data[img_name] for img_name in all_images[:train_size]
    }
    validation_data = {
        img_name: caption_data[img_name] for img_name in all_images[train_size:]
    }
    return training_data, validation_data

captions_mapping, text_data = load_captions_data("Flickr8k.token.txt")

train_data, valid_data = train_val_split(captions_mapping)
print("Number of training samples: ", len(train_data))
print("Number of validation samples: ", len(valid_data))

captions_mapping, text_data = load_captions_data("Flickr8k.token.txt")

train_data, valid_data = train_val_split(captions_mapping)
print("Number of training samples: ", len(train_data))
print("Number of validation samples: ", len(valid_data))

Number of training samples:  6114
Number of validation samples:  1529
Number of training samples:  6114
Number of validation samples:  1529


## Vectorizing the text data


In [None]:
def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

strip_chars = "!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"
strip_chars = strip_chars.replace("<", "")
strip_chars = strip_chars.replace(">", "")

vectorization = TextVectorization(
    max_tokens=VOCAB_SIZE,
    output_mode="int",
    output_sequence_length=SEQ_LENGTH,
    standardize=custom_standardization,
)
vectorization.adapt(text_data)

image_augmentation = keras.Sequential(
    [
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.2),
        layers.RandomContrast(0.3),
    ]
)

In [None]:
def decode_and_resize(img_path, size=IMAGE_SIZE):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, IMAGE_SIZE)
    return img

def read_train_image(img_path, size=IMAGE_SIZE):
    img = decode_and_resize(img_path)
    img = image_augmentation(tf.expand_dims(img, 0))[0]
    img = tf.image.convert_image_dtype(img, tf.float32)
    return img

def read_valid_image(img_path, size=IMAGE_SIZE):
    img = decode_and_resize(img_path)
    img = tf.image.convert_image_dtype(img, tf.float32)
    return img

def make_dataset(images, captions, split="train"):
    if split == "train":
        img_dataset = tf.data.Dataset.from_tensor_slices(images).map(
            read_train_image, num_parallel_calls=AUTOTUNE
        )
    else:
        img_dataset = tf.data.Dataset.from_tensor_slices(images).map(
            read_valid_image, num_parallel_calls=AUTOTUNE
        )

    cap_dataset = tf.data.Dataset.from_tensor_slices(captions).map(
        vectorization, num_parallel_calls=AUTOTUNE
    )

    dataset = tf.data.Dataset.zip((img_dataset, cap_dataset))
    dataset = dataset.batch(BATCH_SIZE).shuffle(256).prefetch(AUTOTUNE)
    return dataset

# list_path = '/content/drive/MyDrive/FinalYearProject/shareable/list_dump/main/files.pkl'
# mlist_path = '/content/drive/MyDrive/FinalYearProject/shareable/list_dump/main/mfile.pkl'
# imglist_path = '/content/drive/MyDrive/FinalYearProject/shareable/list_dump/img_list/imgfile.pkl'
# imglist_pathload = '/content/drive/MyDrive/FinalYearProject/shareable/list_dump/img_list/mimgfile.pkl'

# def dump_clist(clist):
#     with open(list_path, 'wb') as file:
#         pickle.dump(clist, file)

# def load_clist(mlist_path=mlist_path):
#     with open(mlist_path, 'rb') as file:
#         clist = pickle.load(file)
#     return clist

# def dump_imglist(imglist):
#     with open(imglist_path, 'wb') as file:
#         pickle.dump(imglist, file)

# def load_imglist():
#     with open(imglist_pathload, 'rb') as file:
#         clist = pickle.load(file)
#     return clist

train_dataset = make_dataset(
    list(train_data.keys()), list(train_data.values()), split="train"
)

valid_dataset = make_dataset(
    list(valid_data.keys()), list(valid_data.values()), split="valid"
)

In [None]:
def get_cnn_model():
    base_model = efficientnet.EfficientNetB0(
        input_shape=(*IMAGE_SIZE, 3), include_top=False, weights="imagenet",
    )
    base_model.trainable = False
    base_model_out = base_model.output
    base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out)
    cnn_model = keras.models.Model(base_model.input, base_model_out)
    return cnn_model


class TransformerEncoderBlock(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.0
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.dense_1 = layers.Dense(embed_dim, activation="relu")

    def call(self, inputs, training, mask=None):
        inputs = self.layernorm_1(inputs)
        inputs = self.dense_1(inputs)

        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=None,
            training=training,
        )
        out_1 = self.layernorm_2(inputs + attention_output_1)
        return out_1


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32))

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_tokens = embedded_tokens * self.embed_scale
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoderBlock(layers.Layer):
    def __init__(self, embed_dim, ff_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.ff_dim = ff_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.1
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.1
        )
        self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu")
        self.ffn_layer_2 = layers.Dense(embed_dim)

        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()

        self.embedding = PositionalEmbedding(
            embed_dim=EMBED_DIM, sequence_length=SEQ_LENGTH, vocab_size=VOCAB_SIZE
        )
        self.out = layers.Dense(VOCAB_SIZE, activation="softmax")

        self.dropout_1 = layers.Dropout(0.3)
        self.dropout_2 = layers.Dropout(0.5)
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, training, mask=None):
        inputs = self.embedding(inputs)
        causal_mask = self.get_causal_attention_mask(inputs)

        if mask is not None:
            padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32)
            combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32)
            combined_mask = tf.minimum(combined_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=combined_mask,
            training=training,
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
            training=training,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        ffn_out = self.ffn_layer_1(out_2)
        ffn_out = self.dropout_1(ffn_out, training=training)
        ffn_out = self.ffn_layer_2(ffn_out)

        ffn_out = self.layernorm_3(ffn_out + out_2, training=training)
        ffn_out = self.dropout_2(ffn_out, training=training)
        preds = self.out(ffn_out)
        return preds

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)


class ImageCaptioningModel(keras.Model):
    def __init__(
        self, cnn_model, encoder, decoder, num_captions_per_image=5,
    ):
        super().__init__()
        self.cnn_model = cnn_model
        self.encoder = encoder
        self.decoder = decoder
        self.loss_tracker = keras.metrics.Mean(name="loss")
        self.acc_tracker = keras.metrics.Mean(name="accuracy")
        self.num_captions_per_image = num_captions_per_image

    def calculate_loss(self, y_true, y_pred, mask):
        loss = self.loss(y_true, y_pred)
        mask = tf.cast(mask, dtype=loss.dtype)
        loss *= mask
        return tf.reduce_sum(loss) / tf.reduce_sum(mask)

    def calculate_accuracy(self, y_true, y_pred, mask):
        accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2))
        accuracy = tf.math.logical_and(mask, accuracy)
        accuracy = tf.cast(accuracy, dtype=tf.float32)
        mask = tf.cast(mask, dtype=tf.float32)
        return tf.reduce_sum(accuracy) / tf.reduce_sum(mask)

    def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True):
        encoder_out = self.encoder(img_embed, training=training)
        batch_seq_inp = batch_seq[:, :-1]
        batch_seq_true = batch_seq[:, 1:]
        mask = tf.math.not_equal(batch_seq_true, 0)
        batch_seq_pred = self.decoder(
            batch_seq_inp, encoder_out, training=training, mask=mask
        )
        loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask)
        acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask)
        return loss, acc

    def train_step(self, batch_data):
        batch_img, batch_seq = batch_data
        batch_loss = 0
        batch_acc = 0

        img_embed = self.cnn_model(batch_img)

        for i in range(self.num_captions_per_image):
            with tf.GradientTape() as tape:
                loss, acc = self._compute_caption_loss_and_acc(
                    img_embed, batch_seq[:, i, :], training=True
                )

                batch_loss += loss
                batch_acc += acc

            train_vars = (
                self.encoder.trainable_variables + self.decoder.trainable_variables
            )

            grads = tape.gradient(loss, train_vars)

            self.optimizer.apply_gradients(zip(grads, train_vars))

        batch_acc /= float(self.num_captions_per_image)
        self.loss_tracker.update_state(batch_loss)
        self.acc_tracker.update_state(batch_acc)

        return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}

    def test_step(self, batch_data):
        batch_img, batch_seq = batch_data
        batch_loss = 0
        batch_acc = 0

        img_embed = self.cnn_model(batch_img)

        for i in range(self.num_captions_per_image):
            loss, acc = self._compute_caption_loss_and_acc(
                img_embed, batch_seq[:, i, :], training=False
            )

            batch_loss += loss
            batch_acc += acc

        batch_acc /= float(self.num_captions_per_image)

        self.loss_tracker.update_state(batch_loss)
        self.acc_tracker.update_state(batch_acc)

        return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}

    @property
    def metrics(self):
        return [self.loss_tracker, self.acc_tracker]


cnn_model = get_cnn_model()
encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1)
decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2)
caption_model = ImageCaptioningModel(
    cnn_model=cnn_model, encoder=encoder, decoder=decoder
)

Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5


## Model training

In [None]:
cross_entropy = keras.losses.SparseCategoricalCrossentropy(
    from_logits=False, reduction="none"
)

class LRSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, post_warmup_learning_rate, warmup_steps):
        super().__init__()
        self.post_warmup_learning_rate = post_warmup_learning_rate
        self.warmup_steps = warmup_steps

    def __call__(self, step):
        global_step = tf.cast(step, tf.float32)
        warmup_steps = tf.cast(self.warmup_steps, tf.float32)
        warmup_progress = global_step / warmup_steps
        warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress
        return tf.cond(
            global_step < warmup_steps,
            lambda: warmup_learning_rate,
            lambda: self.post_warmup_learning_rate,
        )


num_train_steps = len(train_dataset) * EPOCHS
num_warmup_steps = num_train_steps // 15
lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps)

caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy)

caption_model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=valid_dataset,
)



<keras.callbacks.History at 0x7c2e9b3ed180>

## Check sample predictions

##Function for video

In [None]:
vocab = vectorization.get_vocabulary()
index_lookup = dict(zip(range(len(vocab)), vocab))
max_decoded_sentence_length = SEQ_LENGTH - 1
valid_images = list(valid_data.keys())


def generate_caption_vid(video_path):

    caption_list = []

    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    iter = 0

    print('Processing video...')

    while cap.isOpened():
        ret, img = cap.read()
        if not ret:
            break
        # print(iter)
        iter+=1
        if ret and iter % 300 == 0:

            caption_dict = {}

            current_timestamp = iter/fps

            im = tf.image.resize(img, (299,299))
            sample_img = tf.image.convert_image_dtype(im, tf.float32)

            img = sample_img.numpy().clip(0, 255).astype(np.uint8)
            # plt.imshow(img)
            # plt.show()

            img = tf.expand_dims(sample_img, 0)
            img = caption_model.cnn_model(img)

            encoded_img = caption_model.encoder(img, training=False)

            decoded_caption = "<start> "
            for i in range(max_decoded_sentence_length):
                tokenized_caption = vectorization([decoded_caption])[:, :-1]
                mask = tf.math.not_equal(tokenized_caption, 0)
                predictions = caption_model.decoder(
                    tokenized_caption, encoded_img, training=False, mask=mask
                )
                sampled_token_index = np.argmax(predictions[0, i, :])
                sampled_token = index_lookup[sampled_token_index]
                if sampled_token == " <end>":
                    break
                decoded_caption += " " + sampled_token

            decoded_caption = decoded_caption.replace("<start> ", "")
            decoded_caption = decoded_caption.replace(" <end>", "").strip()

            caption_dict["Caption"] = decoded_caption
            caption_dict["Timestamp"] = current_timestamp

            caption_list.append(caption_dict)
    return caption_list
    print('Done')
    # dump_clist(caption_list)


In [None]:
video_path = '/content/drive/MyDrive/FinalYearProject/UI/FlaskApp/static/vid_1.mp4'
cap_listl = generate_caption_vid(video_path)


Processing video...


In [None]:
# cap_listl = load_clist()

In [None]:
cap_listl

[{'Caption': 'a dog is running through the water', 'Timestamp': 10.0},
 {'Caption': 'a man in a blue shirt is wearing a blue shirt and a blue shirt and a blue shirt and a blue shirt is',
  'Timestamp': 20.0},
 {'Caption': 'a man in a blue shirt is holding a red shirt and a red shirt and a woman in a red shirt and a',
  'Timestamp': 30.0},
 {'Caption': 'a man in a blue shirt is jumping on a red shirt',
  'Timestamp': 40.0},
 {'Caption': 'a man in a blue shirt is holding a red and a red and a red and a red and a red shirt is',
  'Timestamp': 50.0},
 {'Caption': 'a man in a blue shirt is jumping a red shirt is jumping on a red and a red and a red and white',
  'Timestamp': 60.0},
 {'Caption': 'a dog is jumping a white dog is jumping on a field',
  'Timestamp': 70.0},
 {'Caption': 'a man in a white shirt is running on a field',
  'Timestamp': 80.0},
 {'Caption': 'a man in a blue shirt is jumping on a red shirt',
  'Timestamp': 90.0}]

## Function for images

In [None]:
def generate_caption_img(img_path):

    img = cv2.imread(img_path)

    caption_dict = {}

    im = tf.image.resize(img, (299,299))
    sample_img = tf.image.convert_image_dtype(im, tf.float32)

    img = sample_img.numpy().clip(0, 255).astype(np.uint8)
    # plt.imshow(img)
    # plt.show()

    img = tf.expand_dims(sample_img, 0)
    img = caption_model.cnn_model(img)

    encoded_img = caption_model.encoder(img, training=False)

    decoded_caption = "<start> "
    for i in range(max_decoded_sentence_length):
        tokenized_caption = vectorization([decoded_caption])[:, :-1]
        mask = tf.math.not_equal(tokenized_caption, 0)
        predictions = caption_model.decoder(
            tokenized_caption, encoded_img, training=False, mask=mask
        )
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = index_lookup[sampled_token_index]
        if sampled_token == " <end>":
            break
        decoded_caption += " " + sampled_token

    decoded_caption = decoded_caption.replace("<start> ", "")
    decoded_caption = decoded_caption.replace(" <end>", "").strip()

    caption_dict[img_path] = decoded_caption

    return caption_dict

In [None]:
caption_dict

NameError: ignored

##  finding best matching image caption

In [None]:
def cosine(u, v):
    return np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v))

def find_similarity(sbert_model, generated_sent, user_sent_embed):
    generated_sent_embed = sbert_model.encode([generated_sent])[0]
    return cosine(generated_sent_embed, user_sent_embed)

def find_image_path(img_cap_list, user_sent):
    max_sim_score = 0
    matched_img_path = ''
    user_sent_embed = sbert_model.encode([user_sent])[0]
    for cap_dict in img_cap_list:
        generated_sent = list(cap_dict.values())[0]
        sim_score = find_similarity(sbert_model, generated_sent, user_sent_embed)
        if max_sim_score < sim_score:
            max_sim_score = sim_score
            matched_img_path = list(cap_dict.keys())[0]
    print(matched_img_path)
    return matched_img_path

## Functions to find the best matching scene

In [None]:
def cosine(u, v):
    return np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v))

def find_similarity(sbert_model, generated_sent, user_sent_embed):
    generated_sent_embed = sbert_model.encode([generated_sent])[0]
    return cosine(generated_sent_embed, user_sent_embed)

def find_scene(sbert_model, user_sent, caption_list):
    max_similarity = 0
    final_timestamp = 0

    # generating user sentence embeddings only once outside for loop
    user_sent_embed = sbert_model.encode([user_sent])[0]

    for caption_dict in caption_list:

        generated_sent = caption_dict['Caption']
        timestamp = caption_dict['Timestamp']

        sim = find_similarity(sbert_model, generated_sent, user_sent_embed)

        if sim > max_similarity:
            max_similarity = sim
            final_timestamp = timestamp

    return final_timestamp

## Running Flask Server

In [None]:
from flask import Flask, jsonify, render_template, request
from flask_ngrok import run_with_ngrok
from pyngrok import ngrok
!ngrok authtoken ************************

In [None]:
%cd /content/drive/MyDrive/FinalYearProject/UI/IntegratedUI/intui

In [None]:
# final flask code
import os
from flask import Flask, request, render_template, send_from_directory

app = Flask(__name__)
run_with_ngrok(app)

APP_ROOT = '/content/drive/MyDrive/FinalYearProject/UI/IntegratedUI/intui'

print(F'ROOOT: {APP_ROOT}')

def getTS(description):
    return find_scene(sbert_model, description, cap_listl)

@app.route("/")
def index():
    return render_template("home.html")

@app.route("/uploadvid")
def viduploadindex():
    return render_template("upload1.html")


@app.route("/uploadimg")
def imguploadindex():
    return render_template("upload2.html")


@app.route("/upload__vid", methods=["POST"])
def upload1():
    target = os.path.join(APP_ROOT, 'videos/')
    print(target)
    if not os.path.isdir(target):
            os.mkdir(target)
    else:
        print("Couldn't create upload directory: {}".format(target))
    print(request.files.getlist("file"))
    for upload in request.files.getlist("file"):
        print(upload)
        print("{} is the file name".format(upload.filename))
        filename = upload.filename
        destination = "/".join([target, filename])
        print ("Accept incoming file:", filename)
        print ("Save it to:", destination)
        upload.save(destination)

    # return send_from_directory("images", filename, as_attachment=True)
    return render_template("upload1.html", image_name=filename)


@app.route("/upload__img", methods=["POST"])
def upload2():
    target = os.path.join(APP_ROOT, 'images/')
    print(target)
    if not os.path.isdir(target):
            os.mkdir(target)
    else:
        print("Couldn't create upload directory: {}".format(target))
    print(request.files.getlist("file"))
    for upload in request.files.getlist("file"):
        print(upload)
        print("{} is the file name".format(upload.filename))
        filename = upload.filename
        destination = "/".join([target, filename])
        print ("Accept incoming file:", filename)
        print ("Save it to:", destination)
        upload.save(destination)

    # return send_from_directory("images", filename, as_attachment=True)
    return render_template("upload2.html", image_name=filename)


@app.route('/upload__vid/<filename>')
def send_image1(filename):
    return send_from_directory("videos", filename)


@app.route('/upload__img/<filename>')
def send_image2(filename):
    return send_from_directory("images", filename)


@app.route('/gallery1')
def get_gallery1():
    vid_names = os.listdir('./videos')
    print(f'image name path vid: {vid_names}')
    return render_template("index.html", vid_names=vid_names)


@app.route('/gallery2')
def get_gallery2():
    image_names = os.listdir('./images')
    # print(image_names)
    found_img_name = ""
    return render_template("gallery.html", image_names=image_names, found_img_name = found_img_name,flag1 = 0)


@app.route('/agallery11', methods=["POST","GET"])
def find_video():
    vid_names = os.listdir('./videos')

    if request.method == "POST":
        description = request.form.get("inpDesc")
        print(f'Image Description: {description}')
    # val = getTS(img_desc)
    val = getTS(description)

    return render_template("index.html", vid_names=vid_names, val=val)


@app.route('/agallery12', methods=["POST","GET"])
def find_image():
    image_names = os.listdir('./images')
    # print(image_names)

    if request.method == "POST":
        img_desc = request.form.get("inpDesc")
        print(f'Image Description: {img_desc}')

    found_img_path = find_image_path(imgcaplist, img_desc)
    found_img_name = found_img_path.split('/')[-1]
    print(f'Found img name: {found_img_name}')

    return render_template("gallery.html", image_names=image_names, found_img_name = found_img_name, flag1 = 1)


if __name__ == "__main__":
    app.run()