In [3]:
import functools
import operator
import os
import time
import json
import joblib
import numpy as np
from keras.layers import Input, LSTM, Dense
from keras.models import Model, load_model
import config
import nltk
nltk.download('all')
from nltk.tokenize import word_tokenize
import math
import extract_features

[nltk_data] Downloading collection 'all'
[nltk_data]    | 
[nltk_data]    | Downloading package abc to /root/nltk_data...
[nltk_data]    |   Unzipping corpora/abc.zip.
[nltk_data]    | Downloading package alpino to /root/nltk_data...
[nltk_data]    |   Unzipping corpora/alpino.zip.
[nltk_data]    | Downloading package averaged_perceptron_tagger to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data]    | Downloading package averaged_perceptron_tagger_ru to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Unzipping
[nltk_data]    |       taggers/averaged_perceptron_tagger_ru.zip.
[nltk_data]    | Downloading package basque_grammars to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Unzipping grammars/basque_grammars.zip.
[nltk_data]    | Downloading package biocreative_ppi to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Unzipping corpora/biocreative_ppi.zip.
[nltk_data]    | Downloadin

In [None]:
import os
import json
import numpy as np
import torch
import joblib
import nltk
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score

# Make sure NLTK data is downloaded (uncomment if needed)
# nltk.download('punkt')

# -------------------------------
# Dynamic Paths Setup (relative to project root)
# -------------------------------
BASE_DIR = os.getcwd()  # Project root directory
MODEL_DIR = os.path.join(BASE_DIR, "model_final_2")   # Saved models and tokenizer
TEST_DATA_DIR = os.path.join(BASE_DIR, "data", "testing_data")  # Testing data directory
VIDEO_DIR = os.path.join(TEST_DATA_DIR, "video")         # Test video files folder
FEAT_TEST_DIR = os.path.join(TEST_DATA_DIR, "feat_test")   # Extracted test features folder

# Files for predictions and ground truth labels:
PRED_FILE = os.path.join(TEST_DATA_DIR, "test_predictions.txt")
FINAL_PRED_FILE = os.path.join(TEST_DATA_DIR, "FinalTestPredictions.txt")
TEST_LABEL_FILE = os.path.join(TEST_DATA_DIR, "testing_label.txt")

# -------------------------------
# Hyperparameters / Configuration
# -------------------------------
latent_dim = 256          # Base latent dimension (encoder is bidirectional => output dim = 2*latent_dim)
num_encoder_tokens = 2560
num_decoder_tokens = 1500
time_steps_encoder = 20
max_probability = -1      # For beam search (if used)
search_type = 'greedy'    # or 'beam'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# -------------------------------
# Inference Module (Dynamic, PyTorch version)
# -------------------------------
class BiLSTMEncoder(torch.nn.Module):
    def __init__(self, input_size, latent_dim, num_layers=1):
        super(BiLSTMEncoder, self).__init__()
        self.lstm = torch.nn.LSTM(input_size, latent_dim, num_layers=num_layers,
                                  batch_first=True, bidirectional=True)
    def forward(self, x):
        outputs, (h, c) = self.lstm(x)
        # Concatenate forward and backward hidden states.
        h_cat = torch.cat((h[0], h[1]), dim=-1)  # shape: (batch, 2*latent_dim)
        c_cat = torch.cat((c[0], c[1]), dim=-1)
        return h_cat.unsqueeze(0), c_cat.unsqueeze(0)

class Decoder(torch.nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers=1, dropout=0.5):
        super(Decoder, self).__init__()
        self.embedding = torch.nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.lstm = torch.nn.LSTM(embed_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = torch.nn.Linear(hidden_size, vocab_size)
    def forward(self, x, hidden, cell):
        embedded = self.embedding(x)  # (batch, seq_len, embed_size)
        outputs, (h, c) = self.lstm(embedded, (hidden, cell))
        logits = self.fc(outputs)     # (batch, seq_len, vocab_size)
        return logits, h, c

class VideoDescriptionInference(object):
    def __init__(self, latent_dim, num_encoder_tokens, num_decoder_tokens,
                 time_steps_encoder, max_probability, model_dir, test_data_dir, search_type):
        self.latent_dim = latent_dim
        self.num_encoder_tokens = num_encoder_tokens
        self.num_decoder_tokens = num_decoder_tokens
        self.time_steps_encoder = time_steps_encoder
        self.max_probability = max_probability
        self.model_dir = model_dir
        self.test_path = test_data_dir
        self.search_type = search_type
        self.inf_encoder_model = None
        self.inf_decoder_model = None
        self.tokenizer = None

    def load_inference_models(self):
        # Load tokenizer (assumed saved using joblib)
        tokenizer_path = os.path.join(self.model_dir, "tokenizer1500")
        with open(tokenizer_path, 'rb') as file:
            self.tokenizer = joblib.load(file)
        # Initialize inference models
        self.inf_encoder_model = BiLSTMEncoder(self.num_encoder_tokens, self.latent_dim).to(device)
        self.inf_decoder_model = Decoder(self.num_decoder_tokens, embed_size=256,
                                         hidden_size=self.latent_dim*2).to(device)
        # Load state_dicts
        encoder_weights = torch.load(os.path.join(self.model_dir, "encoder_model_LSTM_LSTM.pth"), map_location=device)
        decoder_weights = torch.load(os.path.join(self.model_dir, "decoder_model_LSTM_LSTM.pth"), map_location=device)
        self.inf_encoder_model.load_state_dict(encoder_weights)
        self.inf_decoder_model.load_state_dict(decoder_weights)
        self.inf_encoder_model.eval()
        self.inf_decoder_model.eval()
        print("Inference models loaded.")

    def index_to_word(self):
        # Invert tokenizer word_index (assumed dict)
        inv_map = {v: k for k, v in self.tokenizer.word_index.items()}
        inv_map[1] = 'bos'
        inv_map[2] = 'eos'
        return inv_map

    def greedy_search(self, feature):
        # feature: numpy array with shape (time_steps_encoder, num_encoder_tokens)
        f_tensor = torch.tensor(feature, dtype=torch.float).unsqueeze(0).to(device)  # (1, T, D)
        with torch.no_grad():
            hidden, cell = self.inf_encoder_model(f_tensor)
        bos_index = self.tokenizer.word_index.get('bos', 1)
        target_seq = torch.tensor([[bos_index]], dtype=torch.long).to(device)
        inv_map = self.index_to_word()
        sentence = ""
        max_len = 15
        for _ in range(max_len):
            with torch.no_grad():
                output, hidden, cell = self.inf_decoder_model(target_seq, hidden, cell)
            logits = output.squeeze(1)  # (1, vocab_size)
            y_hat = logits.argmax(dim=-1).item()
            if y_hat == 0:
                continue
            word = inv_map.get(y_hat, None)
            if word is None or word == 'eos':
                break
            sentence += word + " "
            target_seq = torch.tensor([[y_hat]], dtype=torch.long).to(device)
        return sentence.strip()

    def beam_search(self, feature):
        # Placeholder: using greedy search as fallback
        return self.greedy_search(feature)

    def get_test_data(self):
        """
        For each video in test_path/video, load corresponding feature .npy files from test_path/feat_test.
        Returns:
          X_test: list of lists, each inner list contains feature arrays for one video.
          X_test_ids: list of video IDs.
        """
        X_test = []
        X_test_ids = []
        video_files = os.listdir(os.path.join(self.test_path, "video"))
        for filename in video_files:
            video_id = os.path.splitext(filename)[0]
            featList = []
            scene_idx = 1
            while True:
                if scene_idx == 1:
                    path = os.path.join(self.test_path, "feat_test", f"{video_id}.npy")
                else:
                    path = os.path.join(self.test_path, "feat_test", f"{video_id}_{scene_idx}.npy")
                if os.path.exists(path):
                    feat = np.load(path)
                    featList.append(feat)
                    scene_idx += 1
                else:
                    break
            if featList:
                X_test.append(featList)
                X_test_ids.append(video_id)
        return X_test, X_test_ids

    def test(self):
        """
        Runs inference on test videos and writes predictions to a text file.
        Each line: video_id, [caption1, caption2, ...]
        """
        X_test, X_test_ids = self.get_test_data()
        output_path = os.path.join(self.test_path, "test_predictions.txt")
        with open(output_path, "w") as out_file:
            for i, video_id in enumerate(X_test_ids):
                captions = []
                for feat in X_test[i]:
                    if self.search_type == "greedy":
                        caption = self.greedy_search(feat.reshape(self.time_steps_encoder, self.num_encoder_tokens))
                    else:
                        caption = self.beam_search(feat.reshape(self.time_steps_encoder, self.num_encoder_tokens))
                    captions.append(caption)
                out_file.write(f"{video_id},{captions}\n")
        print(f"Predictions saved to {output_path}")

# -------------------------------
# Predict Test Code and Evaluation
# -------------------------------
def predict_and_evaluate():
    # Instantiate inference object with dynamic paths
    inference = VideoDescriptionInference(latent_dim, num_encoder_tokens, num_decoder_tokens,
                                          time_steps_encoder, max_probability, MODEL_DIR, TEST_DATA_DIR, search_type)
    inference.load_inference_models()
    inference.test()  # Runs inference and writes predictions to test_predictions.txt

    # Read predictions
    pred_file = os.path.join(TEST_DATA_DIR, "test_predictions.txt")
    predictions = {}
    with open(pred_file, "r") as f:
        for line in f:
            parts = line.strip().split(",", 1)
            if len(parts) == 2:
                video_id = parts[0]
                # Expecting a string representation of a list, e.g. "['caption1', 'caption2']"
                try:
                    captions = eval(parts[1])
                except Exception as e:
                    captions = parts[1]
                predictions[video_id] = captions

    # Load ground truth test labels from a file (assumed format: each line "video_id: caption")
    test_labels = {}
    test_label_file = os.path.join(TEST_DATA_DIR, "testing_label.txt")
    if os.path.exists(test_label_file):
        with open(test_label_file, "r") as f:
            for line in f:
                if ":" in line:
                    video_id, caption = line.split(":", 1)
                    test_labels[video_id.strip()] = caption.strip()
    else:
        print(f"Test label file not found at {test_label_file}")

    # Evaluate predictions (using BLEU and METEOR)
    bleu_scores = []
    meteor_scores = []
    for vid, preds in predictions.items():
        if vid in test_labels:
            reference = [test_labels[vid].split()]
            # Use the first predicted caption from the list (or adjust as needed)
            hypothesis = preds[0].split() if isinstance(preds, list) and len(preds) > 0 else preds.split()
            bleu = sentence_bleu(reference, hypothesis, weights=(0.5, 0.5, 0, 0))
            meteor = meteor_score(reference, hypothesis)
            bleu_scores.append(bleu)
            meteor_scores.append(meteor)
            print(f"Video {vid}: BLEU = {bleu:.4f}, METEOR = {meteor:.4f}")
    if bleu_scores and meteor_scores:
        print("Average BLEU:", np.mean(bleu_scores))
        print("Average METEOR:", np.mean(meteor_scores))
    else:
        print("No evaluation scores computed.")

    # Write final predictions to a separate file
    final_pred_file = os.path.join(TEST_DATA_DIR, "FinalTestPredictions.txt")
    with open(final_pred_file, "w") as f:
        for vid, caps in predictions.items():
            f.write(f"{vid}:{caps}\n")
    print(f"Final predictions saved to {final_pred_file}")

if __name__ == "__main__":
    predict_and_evaluate()
