In [1]:
import time
from datetime import datetime
import numpy as np
from IPython.display import Video
from PIL import Image
from tqdm.notebook import tqdm

import cv2
import torch
import torchvision.transforms as T
from torchvision.transforms.functional import InterpolationMode
from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig

from groq import Groq

In [2]:
# check cuda availability and set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

Device: cuda


In [3]:
# set video path
video_path = "demo videos/L01.5 Simple Properties of Probabilities - MIT OpenCourseWare.mp4"

Video(video_path)

## Part 1: Preprocess video

In [4]:
DISPLAY_WIDTH = 640
DISPLAY_HEIGHT = 480

OUTPUT_WIDTH = 1920
OUTPUT_HEIGHT = 480
OUTPUT_FPS = 30

MOTION_THRESHOLD = 5
PIXELS_COUNT_THRESHOLD = 500

### FarneBack Optical Flow

In [5]:
save_output = False

In [6]:
# load video
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Error opening video file")
    exit()

# read first frame
ret, frame1 = cap.read()
if not ret:
    cap.release()
    print("Error reading first frame")
    exit()

#Resize frame
frame1 = cv2.resize(frame1, (DISPLAY_WIDTH, DISPLAY_HEIGHT))

# coonvert first frame to grayscale mode
prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

# save output
if save_output:
    out = cv2.VideoWriter('output_farneback_opt_flow.mp4', cv2.VideoWriter_fourcc(*'mp4v'), OUTPUT_FPS, (OUTPUT_WIDTH, OUTPUT_HEIGHT)) # use "mp4v" for .mp4 files and "XVID" for .avi files

while cap.isOpened():

    # read next frame
    ret, frame2 = cap.read()
    if not ret:
        break

    # resize frame
    frame2 = cv2.resize(frame2, (DISPLAY_WIDTH, DISPLAY_HEIGHT))

    # convert frame to grayscale
    frame2_gray = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    # compute optical flow
    flow = cv2.calcOpticalFlowFarneback(prvs, frame2_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

    # calculate magnitude and angle of optical flow
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    # create motion binary mask
    motion_mask = mag > MOTION_THRESHOLD

    # calculate number of pixels with motion
    motion_pixels = np.sum(motion_mask)

    # detect motion when number of motion pixels exceeds certain threshold
    if motion_pixels > PIXELS_COUNT_THRESHOLD:
        print(f"Motion detected, with flow magnitude: {np.mean(mag[motion_mask])}")

    # create a frame copy to draw optical flow vector field
    vector_field = frame2.copy()

    # set paparams for drawing optical flow vector field
    step = 16
    h, w = frame2_gray.shape

    for y in range(0, h, step):
        for x in range(0, w, step):
            # get the flow vector at (x,y)
            fx, fy = flow[y, x]

            # draw the arrowed line representing the flow vector
            end_point = (int(x + fx), int(y + fy))
            cv2.arrowedLine(vector_field, (x, y), end_point, (0, 255, 0), 1, tipLength=0.5)

    # convert the binary image mask to bgr mode
    motion_mask_bgr = cv2.cvtColor(motion_mask.astype(np.uint8) * 255, cv2.COLOR_GRAY2BGR)

    # stack images horizontally for display
    display_frame = np.hstack((frame2, vector_field, motion_mask_bgr))

    # add text for average motion magnitude
    cv2.rectangle(display_frame, (0, 0), (DISPLAY_WIDTH, DISPLAY_HEIGHT//10), (0, 0, 0), -1)
    cv2.putText(display_frame, f"Avg. Motion Magnitude: {np.mean(mag[motion_mask]):.2f}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.8, color=(255, 255, 255), thickness=2)

    # display frame
    cv2.imshow('FarneBack Optical Flow', display_frame)

    # upadate previous frame for next iteration
    prvs = frame2_gray.copy()

    # save output
    if save_output:
        out.write(display_frame)

    # exit if 'ESC' key is pressed
    if cv2.waitKey(30) & 0xFF == 27:
        break

# relsearce resources
cap.release()
if save_output:
    out.release()
cv2.destroyAllWindows()

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Motion detected, with flow magnitude: 26.312284469604492
Motion detected, with flow magnitude: 27.659997940063477
Motion detected, with flow magnitude: 28.271286010742188
Motion detected, with flow magnitude: 24.164230346679688
Motion detected, with flow magnitude: 22.104902267456055
Motion detected, with flow magnitude: 17.64073371887207
Motion detected, with flow magnitude: 21.623214721679688
Motion detected, with flow magnitude: 19.745525360107422
Motion detected, with flow magnitude: 23.459720611572266
Motion detected, with flow magnitude: 22.313190460205078
Motion detected, with flow magnitude: 23.93301773071289
Motion detected, with flow magnitude: 28.29143714904785
Motion detected, with flow magnitude: 29.950002670288086
Motion detected, with flow magnitude: 32.1051139831543
Motion detected, with flow magnitude: 32.241817474365234
Motion detected, with flow magnitude: 30.46117401123047
Motion detected, with flow magnitude: 32.4863166809082
Motion detected, with flow magnitude: 3

### Reduce frame count using motion detection

In [5]:
# create frame list
frames_list = []

# load video to process
cap = cv2.VideoCapture(video_path)

# get the first frame
ret, frame1 = cap.read()
if not ret:
    print("failed to read the video.")
    cap.relsease()
    exit()

# resize_frame
frame1 = cv2.resize(frame1, (DISPLAY_WIDTH, DISPLAY_HEIGHT))

# add the first frame to the list
frames_list.append(("00:00", frame1))

# convert the first frame to grayscale
prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

# get video frame count and initialize progress bar
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
pbar = tqdm(total=frame_count, desc="Processing frames")

while cap.isOpened():
    pbar.update(1)

    # read next frame
    ret, frame2 = cap.read()
    if not ret:
        print("failed to read frame")
        break

    # get current frame timestamp
    frame_time_sec = np.round(cap.get(cv2.CAP_PROP_POS_MSEC) / 1000, 1).astype(int)
    minutes, seconds = divmod(frame_time_sec, 60) # same as using frame_time_sec//60 and frame_time_sec%60
    time_formatted = f"{int(minutes)}:{int(seconds):02d}"

    # resize frame
    frame2 = cv2.resize(frame2, (DISPLAY_WIDTH, DISPLAY_HEIGHT))

    # convert frame to grayscale
    frame2_gray = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

    # compute optical flow
    flow = cv2.calcOpticalFlowFarneback(prvs, frame2_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

    # calculate magnitude and angle of optical flow
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    # create motion binary mask
    motion_mask = mag > MOTION_THRESHOLD

    # calculate number of pixels with motion
    motion_pixels = np.sum(motion_mask)

    # detect motion when number of motion pixels exceeds certain threshold
    if motion_pixels > PIXELS_COUNT_THRESHOLD:
        frame2_rgb = cv2.cvtColor(frame2, cv2.COLOR_BGR2RGB)
        frames_list.append((time_formatted, frame2_rgb))

    # update previous frame for next iteration
    prvs = frame2_gray.copy()

# relsearce resources
cap.release()

Processing frames:   0%|          | 0/19908 [00:00<?, ?it/s]

failed to read frame


In [6]:
# check frame reduction results

# load video
cap = cv2.VideoCapture(video_path)

# get the length of the video in frames count
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"Original video frames count: {frame_count}\nReduced video frames count: {len(frames_list)}")

Original video frames count: 19908
Reduced video frames count: 271


In [8]:
def play_video(frames, fps=30):

    # set delay between frames
    delay = int(1000/fps)

    for frame in frames:
        # convert frame to BGR mode
        frame_bgr = cv2.cvtColor(frame[1], cv2.COLOR_RGB2BGR)
        cv2.imshow("Video", frame_bgr)

        # press "ESC" to exit
        if cv2.waitKey(delay) & 0xFF == 27:
            break

    cv2.destroyAllWindows()

In [10]:
# play reduced video
play_video(frames_list, fps=30)

### Reduce video to 1 FPS

In [7]:
# calculate timestamp difference between frames
def calculate_timestamp_diff(time1, time2):

    # define timestamp format
    timestamp_format = "%M:%S"

    # convert timestamps to datetime objects
    time1 = datetime.strptime(time1, timestamp_format)
    time2 = datetime.strptime(time2, timestamp_format)

    # calculate timestamp difference
    timestamp_diff = abs((time2 - time1).total_seconds())

    return timestamp_diff

In [8]:
# reduce video to 1 fps

# initialize new frame list
frames_list_1_fps = [frames_list[0]]

for frame in frames_list[1:]:
    timestamp_diff = calculate_timestamp_diff(frame[0], frames_list_1_fps[-1][0])

    if timestamp_diff >= 1:
        frames_list_1_fps.append(frame)

In [9]:
print(f"Previous reduced video frames count: {len(frames_list)}\nNew reduced video frames count: {len(frames_list_1_fps)}")

Previous reduced video frames count: 271
New reduced video frames count: 85


In [27]:
# play reduced video
play_video(frames_list_1_fps, fps=30)

## Part 2: Video Captioning

### Mono-internVL

In [10]:
# Define mean and standard deviation values for ImageNet normalization
IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)

In [11]:
def build_transform(input_size):
    """
    Build a transform that resizes the image to the given size and normalizes it
    to the range [-1, 1] using the ImageNet mean and standard deviation.

    Args:
        input_size (int): The size to resize the image to.

    Returns:
        A callable that takes in an image and applies the specified transform.
    """
    transform = T.Compose([
        T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img),  # Ensure image is in RGB mode
        T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),  # Resize to target size
        T.ToTensor(),  # Convert image to tensor
        T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD)  # Normalize using ImageNet mean and std
    ])
    return transform

In [12]:
def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
    """
    Find the closest aspect ratio to the given aspect ratio in the list of target_ratios.

    Args:
        aspect_ratio (float): The aspect ratio to find the closest match to.
        target_ratios (list): A list of tuples containing the target aspect ratios.
        width (int): The width of the original image.
        height (int): The height of the original image.
        image_size (int): The size of the image to resize to.

    Returns:
        A tuple containing the aspect ratio that is closest to the given aspect ratio.
    """
    best_ratio_diff = float('inf')
    best_ratio = (1, 1)
    area = width * height
    # Iterate through each ratio, finding the closest match
    for ratio in target_ratios:
        target_aspect_ratio = ratio[0] / ratio[1]
        ratio_diff = abs(aspect_ratio - target_aspect_ratio)
        if ratio_diff < best_ratio_diff:
            best_ratio_diff = ratio_diff
            best_ratio = ratio
        elif ratio_diff == best_ratio_diff:
            if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
                best_ratio = ratio
    return best_ratio

In [13]:
def dynamic_preprocess(image, min_num=1, max_num=12, image_size=448, use_thumbnail=False):
    """
    Dynamically preprocess an image by splitting it into tiles of size (image_size, image_size)
    and optionally adding a thumbnail of size (image_size, image_size).

    Args:
        image (PIL.Image): The image to preprocess.
        min_num (int, optional): The minimum number of tiles to split the image into. Defaults to 1.
        max_num (int, optional): The maximum number of tiles to split the image into. Defaults to 12.
        image_size (int, optional): The size of the tiles to split the image into. Defaults to 448.
        use_thumbnail (bool, optional): Whether to add a thumbnail of size (image_size, image_size) to the end of the list of
            tiles. Defaults to False.

    Returns:
        A list of PIL.Image objects, where each object is a tile of size (image_size, image_size) or a thumbnail of size
            (image_size, image_size) if use_thumbnail is True.
    """
    orig_width, orig_height = image.size
    aspect_ratio = orig_width / orig_height

    # calculate the existing image aspect ratio
    target_ratios = set(
        (i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if
        i * j <= max_num and i * j >= min_num)
    target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])

    # find the closest aspect ratio to the target
    target_aspect_ratio = find_closest_aspect_ratio(
        aspect_ratio, target_ratios, orig_width, orig_height, image_size)

    # calculate the target width and height
    target_width = image_size * target_aspect_ratio[0]
    target_height = image_size * target_aspect_ratio[1]
    blocks = target_aspect_ratio[0] * target_aspect_ratio[1]

    # resize the image
    resized_img = image.resize((target_width, target_height))
    processed_images = []
    for i in range(blocks):
        box = (
            (i % (target_width // image_size)) * image_size,
            (i // (target_width // image_size)) * image_size,
            ((i % (target_width // image_size)) + 1) * image_size,
            ((i // (target_width // image_size)) + 1) * image_size
        )
        # split the image
        split_img = resized_img.crop(box)
        processed_images.append(split_img)
    assert len(processed_images) == blocks
    if use_thumbnail and len(processed_images) != 1:
        thumbnail_img = image.resize((image_size, image_size))
        processed_images.append(thumbnail_img)
    return processed_images

In [14]:
def load_image(image, input_size=448, max_num=12):
    """
    Loads and processes an image from a file.

    Args:
        image: Image as ndarray.
        input_size (int, optional): The size to which the image will be resized. Defaults to 448.
        max_num (int, optional): The maximum number of image tiles to process. Defaults to 12.

    Returns:
        torch.Tensor: A tensor containing the processed pixel values of the image.
    """
    image = Image.fromarray(image)
    transform = build_transform(input_size=input_size)
    images = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, max_num=max_num)
    pixel_values = [transform(image) for image in images]
    pixel_values = torch.stack(pixel_values)
    return pixel_values

In [15]:
path = 'OpenGVLab/Mono-InternVL-2B'
quantization_config = BitsAndBytesConfig(load_in_8bit=True)  # You can also use load_in_4bit=True
model = AutoModel.from_pretrained(
    path,
    torch_dtype=torch.bfloat16,
    low_cpu_mem_usage=True,
    quantization_config=quantization_config,
    trust_remote_code=True).eval()
tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, use_fast=False)





Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [16]:
generation_config = dict(max_new_tokens=256, do_sample=True)

In [17]:
# generate caption for each frame
captions_list = []
for frame in tqdm(frames_list_1_fps[::3]):
    pixel_values = load_image(frame[1], max_num=12).to(torch.bfloat16).cuda()

    prompt = "<image>\nDescribe the image and what is contains, concisely. Start with 'This frame depicts'"
    with torch.no_grad():
        response = model.chat(tokenizer, pixel_values, prompt, generation_config)

    captions_list.append((frame[0], response))

  0%|          | 0/29 [00:00<?, ?it/s]



In [18]:
captions_list[:10]

[('00:00',
  'This frame depicts a sequence of events related to expressions of the atomic axioms, likely from a physics or symbolic algebra course. The content of the image contains mathematical expressions and their corresponding values describing how certain events relate to the properties of atoms.\n\nHere is a summary of the equations and their interpretations:\n\n1. **P(A) ≥ 0**: This states the probability or probability mass associated with an event happening at atom A.\n**Interpretation:** This indicates there is a 100% chance that atom A will have a positive value.\n\n2. **P(Ω) = 1**: This denotes the probability that state Ω (possibly involving some state) exists and is true.\n**Interpretation:** This means atom Ω is guaranteed to exist and the condition associated with atom Ω is true.\n\n3. **For disjoint events:** Then the formulas for how disjoint events intersect are used in the equation.\n**P( ΝU B ) = P( Ν ) + P( B )**: **Interpretation:** Here, disjoint events (in thi

## Video Q&A / Summarization

In [19]:
from dotenv import load_dotenv

load_dotenv()

True

In [20]:
import os

groq_api_key = os.getenv("GROQ_API_KEY")

In [21]:
client = Groq(api_key = groq_api_key)

In [22]:
# respond to query based on captions

def generate_response(captions, query, groq_client):
    captions_list_to_text = "/n".join(["[" + cap[0] + "] " + cap[1] for cap in captions])  # [00:00] caption 0\n[00:01] caption 1 ...
    chat_completion = groq_client.chat.completions.create(
        messages = [
            {
                "role": "user",
                "content": f"Based on the following chronologically sorted frame captions, where each line has a timestamp and a frame caption, {query}: {captions_list_to_text}",
            }
        ],
        model = "llama-3.1-70b-versatile"
    )
    return chat_completion.choices[0].message.content

In [23]:
query = "Summarize this educational video into chapters and maintain the chronological order"
answer = generate_response(captions_list, query, client)

In [24]:
print(answer)

The video appears to be an educational presentation on simple consequences of axioms, focusing on various mathematical concepts. Here's a summary of the video in chronological order, divided into chapters:

**Chapter 1: Introduction to Axioms (00:00 - 00:37)**

The video starts by introducing axioms related to atomic expressions, probability, and disjoint events. It explains the basic concepts of axioms and how they are used to describe atomic behavior.

**Chapter 2: Axioms and Consequences (00:37 - 2:10)**

This chapter delves deeper into the axioms and their consequences. It discusses the first and second axioms, which state that the probability of an event is greater than or equal to 0 and less than or equal to 1, respectively.

**Chapter 3: Disjoint Events and Axioms (2:10 - 2:58)**

This chapter explains disjoint events and how they relate to axioms. It discusses the concept of disjoint events and how they can be represented using mathematical equations.

**Chapter 4: Geometric Co

In [None]:
query = "Generate a test question"
answer = generate_response(captions_list, query, client)
print(answer)

Test question: 

**Question 1.** Suppose you have three disjoint events A, B, and C. The probability of A, P(A), is 0.4, the probability of B, P(B), is 0.3, and the probability of C, P(C), is 0.5. What is the probability of the union of A, B, and C, P(A U B U C)?

**A)** 0.2
**B)** 0.4
**C)** 1.2
**D)** 0.7


In [27]:
query = "Generate three test questions with answers"
answer = generate_response(captions_list, query, client)
print(answer)

Here are three test questions with answers based on the provided frame captions:

**Test Question 1**
What is the interpretation of the equation P(A) ≥ 0 in the context of atomic axioms?

A) There is a 0% chance that atom A will have a positive value.
B) There is a 100% chance that atom A will have a positive value.
C) There is a 50% chance that atom A will have a positive value.
D) There is a negative chance that atom A will have a positive value.

**Answer:** B) There is a 100% chance that atom A will have a positive value.

**Test Question 2**
What is the interpretation of the equation P(Ω) = 1 in the context of atomic axioms?

A) There is a 0% chance that atom Ω will exist and the condition associated with atom Ω is true.
B) There is a 100% chance that atom Ω will exist and the condition associated with atom Ω is true.
C) There is a 50% chance that atom Ω will exist and the condition associated with atom Ω is true.
D) There is a negative chance that atom Ω will exist and the condit