# Closed Captioning
Implements a feature using Langchain's image_captions.py and audio_speech_to_text.py to produce .srt files. This system will provide both subtitles and visual scene descriptions, essentially creating closed captioning.

## Imports

In [35]:
# imports for closed captioning
import cv2
import numpy as np
import os
import transformers
import numpy as np
import ffmpeg
import openai
transformers.logging.set_verbosity_error()

from langchain.document_loaders import AssemblyAIAudioTranscriptLoader
from langchain.document_loaders import ImageCaptionLoader
from langchain.document_loaders.assemblyai import TranscriptFormat

## Object model

In [22]:
class CaptionModel:
    def __init__(self, start_time, end_time, closed_caption):
        self.start_time = start_time
        self.end_time = end_time
        self.closed_caption = closed_caption

    def __str__(self):
        return f"start_time: {self.start_time}, end_time: {self.end_time}, closed_caption: {self.closed_caption}"

## Video model

In [18]:
class VideoModel:
    def __init__(self, start_time, end_time, image_description):
        self.start_time = start_time
        self.end_time = end_time
        self.image_description = image_description

    def __str__(self):
        return f"start_time: {self.start_time}, end_time: {self.end_time}, image_description: {self.image_description}"
    
    def get_start_time(self):
        return self.start_time
    
    def get_end_time(self):
        return self.end_time
    
    def get_image_description(self):
        return self.image_description
    
    def set_start_time(self, start_time):
        self.start_time = start_time

    def set_end_time(self, end_time):
        self.end_time = end_time
    
    def set_image_description(self, image_description):
        self.image_description = image_description

## Audio model

In [7]:
class AudioModel:
    def __init__(self, start_time, end_time, subtitle_text):
        self.start_time = start_time
        self.end_time = end_time
        self.subtitle_text = subtitle_text

    def __str__(self):
        return f"start_time: {self.start_time}, end_time: {self.end_time}, subtitle_text: {self.subtitle_text}"
    
    def get_start_time(self):
        return self.start_time
    
    def get_end_time(self):
        return self.end_time
    
    def get_subtitle_text(self):
        return self.subtitle_text
    
    def set_start_time(self, start_time):
        self.start_time = start_time

    def set_end_time(self, end_time):
        self.end_time = end_time
    
    def set_subtitle_text(self, subtitle_text):
        self.subtitle_text = subtitle_text

## Audio Speech to Text

In [None]:
audio_file = "test_data/test.mp3"

loader = AssemblyAIAudioTranscriptLoader(
    file_path=audio_file, 
    api_key="f50c08e20ecd4544b175953636f0b936", 
    transcript_format=TranscriptFormat.SUBTITLES_SRT
)

docs = loader.load()

def CreateTranscriptModel(doc):
    transcription = doc.strip().split("\n")
    times = transcription[1].split(" --> ")
    start_time = times[0].strip()
    end_time = times[1].strip()

    subtitle_text = ' '.join(transcription[2:]).strip()

    transcript_model = AudioModel(start_time, end_time, subtitle_text)

    return transcript_model

print(CreateTranscriptModel(docs[0].page_content))

## Audio Split Model

In [15]:
def convert_to_mp3(mp4_path):
    (
        ffmpeg
        .input(mp4_path)
        .output("test_data/audio.mp3", format="mp3")
        .run()
    )

def CreateTranscriptModels(doc):
    subtitles = doc.strip().split("\n\n")  # Splitting based on double newline, which separates SRT entries
    models = []

    for subtitle in subtitles:
        lines = subtitle.split("\n")
        if len(lines) >= 3:  # Checking if there are enough lines for an index, timestamp, and text
            times = lines[1].split(" --> ")
            start_time = times[0].strip()
            end_time = times[1].strip()

            subtitle_text = ' '.join(lines[2:]).strip()

            transcript_model = AudioModel(start_time, end_time, subtitle_text)
            models.append(transcript_model)

    return models

loader = AssemblyAIAudioTranscriptLoader(
    file_path="test_data/audio.mp3", 
    api_key="f50c08e20ecd4544b175953636f0b936", 
    transcript_format=TranscriptFormat.SUBTITLES_SRT
)

# Assuming loader.load() returns the full transcript in a single document
docs = loader.load()

# This will now create a list of lists of AudioModel instances
all_audio_models = [CreateTranscriptModels(doc.page_content) for doc in docs]

# Flatten the list if necessary
audio_models = [model for sublist in all_audio_models for model in sublist]

In [21]:
for audio_model in audio_models:
    print(audio_model)

start_time: 00:00:00,570, end_time: 00:00:03,966, subtitle_text: Let's make tea. Duet with me. You read red,
start_time: 00:00:04,068, end_time: 00:00:06,910, subtitle_text: I read green. Can I get you a drink?
start_time: 00:00:09,250, end_time: 00:00:12,878, subtitle_text: Tea or coffee? How do
start_time: 00:00:12,884, end_time: 00:00:13,920, subtitle_text: you take it?
start_time: 00:00:16,050, end_time: 00:00:16,800, subtitle_text: Here.
start_time: 00:00:19,810, end_time: 00:00:22,400, subtitle_text: Great work. Now let's try it again.


## Video Split to Frames

In [None]:
def frame_difference(prev_frame, curr_frame, threshold=30):
    # Compute the absolute difference between the current frame and the previous frame
    diff = cv2.absdiff(prev_frame, curr_frame)
    # Thresholding to get the binary image, where white represents significant difference
    _, thresh = cv2.threshold(diff, threshold, 255, cv2.THRESH_BINARY)
    # If there are any white pixels in thresh, the difference is significant
    return np.any(thresh)

# Initialize the video capture
capture = cv2.VideoCapture('test_data/eng_convo.mp4')
fps = capture.get(cv2.CAP_PROP_FPS)
frame_duration = 1000 / fps

video_models = []

frameNr = 0
ret, prev_frame = capture.read()
prev_frame_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) if ret else None
prev_start_time = 0
start_time = 0

while ret:
    end_time = prev_start_time
    prev_start_time = capture.get(cv2.CAP_PROP_POS_MSEC)

    ret, frame = capture.read()
    if not ret:
        start_time = end_time + frame_duration
        break
    
    start_time = prev_start_time
    # Convert to grayscale for comparison
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Compare with the previous frame
    if frameNr == 0 or frame_difference(prev_frame_gray, frame_gray):
        end_time = capture.get(cv2.CAP_PROP_POS_MSEC)
        cv2.imwrite(f'test_data/output_frames/frame.jpg', frame)
        prev_frame_gray = frame_gray

        # Define the path to the "output_frames" folder
        folder_path = f'test_data/output_frames/'

        # List all .jpg files in the folder
        image_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith(".jpg")]

        # Create an instance of the ImageCaptionLoader
        loader = ImageCaptionLoader(images=image_files)

        # Load captions for the images
        list_docs = loader.load()

        video_model = VideoModel(start_time, end_time, list_docs[len(list_docs) - 1].page_content.replace("[SEP]", "").strip())
        video_models.append(video_model)

        frameNr += 1

# Release the video capture object
capture.release()

In [32]:
for video_model in video_models:
    print(video_model)

start_time: 0.0, end_time: 40.0, image_description: an image of a kitchen area in a building
start_time: 40.0, end_time: 80.0, image_description: an image of a kitchen in a building


## Creating the SRT file

In [19]:
def milliseconds_to_srt_time(milliseconds):
    seconds, milliseconds = divmod(milliseconds, 1000)
    minutes, seconds = divmod(seconds, 60)
    hours, minutes = divmod(minutes, 60)
    return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02},{int(milliseconds):03}"

# Using video_models create an SRT file
def CreateSRTFile(video_models):
    file = open("test_data/output.srt", "w")
    count = 1
    for video_model in video_models:
        file.write(str(count) + "\n")
        file.write(str(milliseconds_to_srt_time(video_model.get_start_time())) + " --> " + str(milliseconds_to_srt_time(video_model.get_end_time())) + "\n")
        file.write(video_model.get_image_description() + "\n\n")
        count += 1
    file.close()

CreateSRTFile(video_models)

## Audio and Video Combination

In [33]:
# Assuming the VideoModel class definition is already in scope

caption_data = """
start_time: 0.0, end_time: 40.0, image_description: an image of a kitchen area in a building
start_time: 480.0, end_time: 2240.0, image_description: a young girl in various settings, last seen wearing a gray sweater
start_time: 2320.0, end_time: 9320.0, image_description: footage of a woman in a sweater appears repeatedly throughout this segment
start_time: 12360.0, end_time: 14200.0, image_description: the same image of a woman in a sweater is shown at multiple intervals
start_time: 15960.0, end_time: 22360.0, image_description: multiple instances of a woman holding a cup of coffee
"""

# Parsing the raw caption data and instantiating VideoModel objects
video_models = []
for line in caption_data.strip().split("\n"):
    if not line.strip():
        continue  # Skip empty lines
    parts = line.split(",")
    start_time = float(parts[0].split(":")[1].strip())
    end_time = float(parts[1].split(":")[1].strip())
    image_description = parts[2].split(":")[1].strip()

    video_model = VideoModel(start_time, end_time, image_description)
    video_models.append(video_model)

# Display the created VideoModel objects
for vm in video_models:
    print(vm)

start_time: 0.0, end_time: 40.0, image_description: an image of a kitchen area in a building
start_time: 480.0, end_time: 2240.0, image_description: a young girl in various settings
start_time: 2320.0, end_time: 9320.0, image_description: footage of a woman in a sweater appears repeatedly throughout this segment
start_time: 12360.0, end_time: 14200.0, image_description: the same image of a woman in a sweater is shown at multiple intervals
start_time: 15960.0, end_time: 22360.0, image_description: multiple instances of a woman holding a cup of coffee


In [None]:
def parse_time(s):
    """Converts a time string into seconds."""
    h, m, s = map(float, s.replace(',', '.').split(':'))
    return h * 3600 + m * 60 + s

def find_overlapping_models(video_models, audio_models):
    primary_start = video_models.get_start_time()
    primary_end = video_models.get_end_time()
    overlapping_models = []
    for model in audio_models:
        model_start = parse_time(str(model.get_start_time()))
        model_end = parse_time(str(model.get_end_time()))
        if model_start < primary_end and model_end > primary_start:
            overlapping_models.append(model)
    return overlapping_models

def validate_and_adjust_description(audio_model, video_model):
    # Construct the prompt for OpenAI
    prompt = f"Does the following subtitle make sense: '{audio_model.get_subtitle_text()}' with the image described as '{video_model.get_image_description()}'?"

    # Define openai API key
    openai.api_key = "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"

    # Query OpenAI (ensure your API key and the model are correctly set)
    response = openai.Completion.create(
      engine="text-davinci-003",
      prompt=prompt,
      max_tokens=50
    )

    # Interpret the response (this part might require tuning based on response structure)
    # For simplicity, let's assume if response is not affirmative, we adjust the description
    if 'yes' not in response.choices[0].text.lower():
        # Here, you would implement logic to adjust the image description based on the audio description
        # This could be a complex task depending on the context and might require additional natural language processing
        new_description = "adjusted description based on audio"  # Placeholder
        video_model.set_image_description(new_description)

def create_caption_models(video_models, audio_models):
    caption_models = []

    for video_model in video_models:
        overlapping_audio_models = find_overlapping_models(video_model, audio_models)
        for audio_model in overlapping_audio_models:
            validate_and_adjust_description(audio_model, video_model)
            caption = f"[{video_model.get_image_description()}] {audio_model.get_subtitle_text()}"
            caption_model = CaptionModel(video_model.get_start_time(), audio_model.get_end_time(), caption)
            caption_models.append(caption_model)

    return caption_models  

# Create caption models
caption_models = create_caption_models(video_models, audio_models)

# Now, caption_models contains your finalized caption models

In [None]:
# print all caption models
for caption_model in caption_models:
    print(caption_model)