In [12]:
from __future__ import print_function
import sys
import os
import pickle
import skimage.io as io
import cv2
from PIL import Image
import torch

from scenedetect.video_manager import VideoManager
from scenedetect.scene_manager import SceneManager
from scenedetect.stats_manager import StatsManager
from scenedetect.video_stream import VideoStream

from scenedetect.detectors.content_detector import ContentDetector
from scenedetect.detectors.threshold_detector import ThresholdDetector
from typing import List, Tuple


BASE_DIR=os.environ['PROJECT_DIRECTORY']
sys.path.append(BASE_DIR+'software_utils/')

# To account for path errors
try:
    from models.image_captioner import ImageCaptioner
    from models.video_captioner import VideoCaptioner
    from models.encoderCNN import EncoderCNN
except ImportError:
    pass

try:
    from software_utils.create_transformer import create_transformer
    from software_utils.vocabulary import Vocabulary
except ImportError:
    pass

In [13]:
root_path=BASE_DIR
coco_vocab_path=BASE_DIR+'Data/processed/coco_vocab.pkl'
msrvtt_vocab_path='Data/processed/msrvtt_vocab.pkl'
base_model='resnet152'
ic_model_path='models/image_model/image_caption-model11-20-0.1309-5.0.pkl'
vc_model_path='models/video_model/video_caption-model11-110-0.3354-5.0.pkl'
im_embedding_size=2048
vid_embedding_size=2048
embed_size=256
hidden_size=512
num_frames=40
max_caption_length=35
ic_rnn_type='lstm'
vc_rnn_type='gru'
im_res=224

In [14]:
with open(msrvtt_vocab_path, 'rb') as f:
    msrvtt_vocab = pickle.load(f)
with open(coco_vocab_path, 'rb') as f:
    coco_vocab = pickle.load(f)

In [15]:
transformer = create_transformer()
encoder = EncoderCNN(base_model)

In [16]:
video_captioner = VideoCaptioner(
            vid_embedding_size,
            embed_size,
            hidden_size,
            len(msrvtt_vocab),
            rnn_type='lstm',
            start_id=msrvtt_vocab.word2idx[msrvtt_vocab.start_word],
            end_id=msrvtt_vocab.word2idx[msrvtt_vocab.end_word]
)

Selected RNN Type is lstm


In [17]:
if torch.cuda.is_available():
    print("Cuda is available")
    vc_checkpoint = torch.load(root_path + vc_model_path)
else:
    vc_checkpoint = torch.load(root_path + vc_model_path, map_location='cpu')
video_captioner.load_state_dict(vc_checkpoint['params'])

Cuda is available


  vc_checkpoint = torch.load(root_path + vc_model_path)


<All keys matched successfully>

In [18]:
if torch.cuda.is_available():
    print("Cuda is available")
    vc_checkpoint = torch.load(vc_model_path)
    encoder.cuda()
    video_captioner.cuda()
else:
    vc_checkpoint = torch.load(vc_model_path, map_location='cpu')

Cuda is available


  vc_checkpoint = torch.load(vc_model_path)


In [19]:
video_captioner.load_state_dict(vc_checkpoint['params'])
encoder.eval()
video_captioner.eval()

VideoCaptioner(
  (inp): Linear(in_features=2048, out_features=256, bias=True)
  (inp_dropout): Dropout(p=0.2, inplace=False)
  (inp_bn): BatchNorm1d(256, eps=1e-05, momentum=0.01, affine=True, track_running_stats=True)
  (embed): Embedding(14748, 256)
  (rnn): LSTM(256, 512, batch_first=True)
  (out): Linear(in_features=512, out_features=14748, bias=True)
)

In [20]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [21]:
# Function to capture live video and generate captions
def live_scene_caption(camera_index: int = 0):
    cap = cv2.VideoCapture(camera_index)
    if not cap.isOpened():
        raise Exception(f"Unable to access camera with index {camera_index}")

    # Initialize StatsManager and SceneManager
    stats_manager = StatsManager()
    scene_manager = SceneManager(stats_manager)
    scene_manager.add_detector(ContentDetector(threshold=30, min_scene_len=40))

    # Variables to track scene and embeddings
    frame_count = 0
    vid_array = None
    scene_start_frame = 0
    captured_frames = []

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Convert frame to RGB format for PIL processing
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                        
            # Update the scene manager with the current frame
            scene_manager._process_frame(frame_count, frame_rgb)

            # Check if a new scene has been detected
            scene_list = scene_manager.get_scene_list()

                        
            # if scene_list and scene_list[-1][1].get_frames() == frame_count:
            #     # Scene change detected, process the previous scene
            #     if vid_array is not None:
            #         # Encode the collected frames as a video embedding
            #         vid_embeddings = encoder(vid_array)

            #         # Predict caption for the detected scene
            #         encoded_captions = video_captioner.predict(vid_embeddings.unsqueeze(0), beam_size=5).cpu().numpy().astype(int)
            #         captions = [msrvtt_vocab.decode(caption, clean=True, join=True) for caption in encoded_captions]
            #         print(captions)
            #         # Print the generated caption for the scene
            #         print(f"Scene change detected at frame {frame_count}: {captions[0]}")
                
            #     # Start capturing frames for the next scene
            #     vid_array = torch.zeros((num_frames, 3, 224, 224))
            #     captured_frames = []
            #     scene_start_frame = frame_count
                
            # else:
            if vid_array is not None:
                # Encode the collected frames as a video embedding
                vid_embeddings = encoder(vid_array)
                
                if torch.cuda.is_available():
                    vid_embeddings = vid_embeddings.cuda()
                
                # Predict caption for the detected scene
                encoded_captions = video_captioner.predict(vid_embeddings, beam_size=5).cpu().numpy().astype(int)
                captions = [msrvtt_vocab.decode(caption, clean=True, join=True) for caption in encoded_captions]
                # Overlay the description on the frame
                font = cv2.FONT_HERSHEY_SIMPLEX
                cv2.putText(frame, captions[0], (10, 50), font, 1, (255, 0, 255), 2, cv2.LINE_AA)
                            
            # Start capturing frames for the next scene
            vid_array = torch.zeros((num_frames, 3, 224, 224))
            vid_array=vid_array.cuda()
            captured_frames = []
            scene_start_frame = frame_count
                
            # Process frames for embedding if within num_frames
            if frame_count - scene_start_frame < num_frames and vid_array is not None:
                try:
                    frame_pil = Image.fromarray(frame_rgb).convert('RGB')
                    if torch.cuda.is_available():
                        frame_tensor = transformer(frame_pil).cuda().unsqueeze(0)
                    else:
                        frame_tensor = transformer(frame_pil).unsqueeze(0)
                        
                    vid_array[frame_count - scene_start_frame] = frame_tensor
                except Exception as e:
                    print(f"Error processing frame at index {frame_count}: {e}")
                    
            # Display the video feed
            cv2.imshow('Live Video Feed', frame)

            frame_count += 1

            # Press 'q' to exit
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    finally:
        cap.release()
        cv2.destroyAllWindows()

In [22]:
live_scene_caption(camera_index=0)
