## Download a youtube video

In [16]:
!pip install pytube

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pytube
  Downloading pytube-12.1.2-py3-none-any.whl (57 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.0/57.0 KB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytube
Successfully installed pytube-12.1.2


In [17]:
from pytube import YouTube

In [18]:
def Download(link):
    youtubeObject = YouTube(link)
    youtubeObject = youtubeObject.streams.get_highest_resolution()
    try:
        youtubeObject.download(output_path="")
    except:
        print("An error has occurred")
    print("Download is completed successfully")

In [19]:
link = "https://www.youtube.com/watch?v=HSPYgwP9R84"
Download(link)

Download is completed successfully


## Object detection

In [None]:
!pip install transformers

In [21]:
import cv2
import numpy as np
import torch.cuda
from torchvision import transforms
from transformers import CLIPProcessor, CLIPModel
import matplotlib.pyplot as plt
import matplotlib.patches as pltpatches
from tqdm.notebook import tqdm
import threading
import time

from google.colab.patches import cv2_imshow
from IPython.display import clear_output, Image

In [22]:
def frame_to_tensor(frame: np.ndarray):
    transform = transforms.ToTensor()
    frame_t = transform(frame)
    return frame_t

In [23]:
def get_frame_patches(frame: np.ndarray, patch_size):
    """
    Function to split the frame into patches of size @patch_dim
    :param frame: the frame of the video
    :param patch_size: the dimension of the patches
    :return: the patches
    """
    frame_t = frame_to_tensor(frame)
    # unfold the tensor along the 0-dimension to get the batch dimension
    patches = frame_t.data.unfold(0, 3, 3)

    # create vertical patches (in the height dimension)
    patches = patches.unfold(1, patch_size, patch_size)

    # create horizontal patches (in width dimension)
    patches = patches.unfold(2, patch_size, patch_size)

    print(f"Shape of the patches = {patches.shape}")
    return patches

In [24]:
def load_model(model_id="openai/clip-vit-base-patch32"):
    """
    Function to load the transformer model and the respective preprocessor
    :param model_id: id of the model to load
    :return: the processor and the model requested
    """
    processor = CLIPProcessor.from_pretrained(model_id)
    model = CLIPModel.from_pretrained(model_id)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)
    return model, processor, device

In [25]:
def run_inference(model, processor, device, prompt, patches, patch_size, window, stride):
    """
    Function to run the model and get the similarity scores
    :param model: the Visual Transformer to be run
    :param processor: the processor associated to the Transformer to run
    :param device: the hardware devoted to run the model
    :param patches: the patches drawn from the frame
    :param patch_size: the size of the patches
    :param window: the amount of patches seen by the model
    :return: scores associated to the big patches
    """
    scores = torch.zeros(patches.shape[1], patches.shape[2])
    runs = torch.ones(patches.shape[1], patches.shape[2])

    for Y in range(0, patches.shape[1]-window+1, stride):
        for X in range(0, patches.shape[2]-window+1, stride):
            big_patch = torch.zeros(patch_size * window, patch_size * window, 3)
            patch_batch = patches[0, Y:Y+window, X:X+window]
            for y in range(window):
                for x in range(window):
                    big_patch[
                    y * patch_size:(y + 1) * patch_size, x * patch_size:(x + 1) * patch_size, :
                    ] = patch_batch[y, x].permute(1, 2, 0)
            # we preprocess the image and class label with the CLIP processor
            inputs = processor(
                images=big_patch,  # big patch image sent to CLIP
                return_tensors="pt",  # tell CLIP to return pytorch tensor
                text=prompt,  # class label sent to CLIP
                padding=True
            ).to(device) # move to device if possible

            # calculate and retrieve similarity score
            score = model(**inputs).logits_per_image.item()
            # sum up similarity scores from current and previous big patches
            # that were calculated for patches within the current window
            scores[Y:Y+window, X:X+window] += score
            # calculate the number of runs on each patch within the current window
            runs[Y:Y+window, X:X+window] += 1
    # calculate average scores
    scores /= runs
    # clip scores
    for _ in range(3):
        scores = np.clip(scores-scores.mean(), 0, np.inf)
    # normalize scores
    scores = (scores - scores.min()) / (scores.max() - scores.min())
    return scores

In [26]:
def get_box(scores, patch_size, threshold):
    detection = scores > threshold
    # find box corners
    y_min, y_max = np.nonzero(detection)[:, 0].min().item(), np.nonzero(detection)[:, 0].max().item()+1
    x_min, x_max = np.nonzero(detection)[:, 1].min().item(), np.nonzero(detection)[:, 1].max().item()+1
    # convert from patch co-ords to pixel co-ords
    y_min *= patch_size
    y_max *= patch_size
    x_min *= patch_size
    x_max *= patch_size
    # calculate box height and width
    height = y_max - y_min
    width = x_max - x_min
    return x_min, y_min, width, height

In [31]:
def detect(model, processor, device, prompts, frame, patch_size=64, window=3, stride=1, threshold=0.5):
    """
    Function to the detect the objects in the frame. It uses the frames to look for the specified items.
    It creates a plot of the image containing the detected objects.
    :param model: model to run for the inference
    :param processor: processor associated to the model
    :param device: the hardware used to run the inference
    :param prompts: the objects to find in the frame
    :param frame: the specified frame
    :param patch_size: the size of the patches
    :param window: the amount of patches to search in simultaneously
    :return: the bounding box parameters
    """
    colors = ['#FAFF00', '#8CF1FF']
    # build image patches for detection
    frame_patches = get_frame_patches(frame, patch_size)
    frame_t = frame_to_tensor(frame)
    # convert image to format for displaying with matplotlib
    """
    image = np.moveaxis(frame_t.data.numpy(), 0, -1)
    X = frame_patches.shape[1]
    Y = frame_patches.shape[2]
    # initialize plot to display image + bounding boxes
    fig, ax = plt.subplots(figsize=(Y*0.5, X*0.5))
    ax.imshow(image)
    """
    bounding_box_list = []
    # process image through object detection steps
    for i, prompt in enumerate(tqdm(prompts)):
        scores = run_inference(model, processor, device, prompt, frame_patches, patch_size, window, stride)
        x, y, width, height = get_box(scores, patch_size, threshold)
        if width > 0 and height > 0:
          bounding_box_list.append((x, y, width, height))
        # create the bounding box
        # rect = pltpatches.Rectangle((x, y), width, height, linewidth=3, edgecolor=colors[i], facecolor='none')
        # cv2.rectangle(frame, (x, y), (x+width, y+height), [0, 255, 0])
        # add the patch to the Axes
        # ax.add_patch(rect)
    # cv2.imshow("Frame", frame)
    return bounding_box_list

## Online Object Detection

In [28]:
def show_video_and_detect(input_file_path, prompts):
    """
    Function to show the video in an external window.
    When the video is paused the detection algorithm is run with the specified prompts.
    @param: input_file_path path of the video to be shown
    """
    # Show the video
    capture = cv2.VideoCapture(input_file_path)
    frame_width = capture.get(cv2.CAP_PROP_FRAME_WIDTH)
    frame_height = capture.get(cv2.CAP_PROP_FRAME_HEIGHT)
    fps = np.ceil(capture.get(cv2.CAP_PROP_FPS))
    print(f"fps:{fps:.2f}, frame width: {frame_width}, frame height: {frame_height}")

    model, processor, device = load_model()

    while capture.isOpened():
        ret, frame = capture.read()

        if ret:
            clear_output(wait=True)
            cv2_imshow(frame)
            # Press Q on keyboard to exit
            key = cv2.waitKey(25)
            if key & 0xFF == ord('q'):
                break
            elif key == 32:
                t0 = time.time()
                detect(model, processor, prompts=prompts, device=device, frame=frame)
                t1 = time.time()
                print(f"Time for detection = {t1-t0}")
                cv2.waitKey()
        # Break the loop
        else:
            break
    # When everything done, release
    # the video capture object
    capture.release()

    # Closes all the frames
    cv2.destroyAllWindows()

In [None]:
show_video_and_detect("/content/The Devil Wears Prada (45) Movie CLIP - Andy Gets a Makeover (2006) HD.mp4", prompts=["black t-shirt"])

##  Offline object detection


In [1]:
!pip install pafy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pafy
  Downloading pafy-0.5.5-py2.py3-none-any.whl (35 kB)
Installing collected packages: pafy
Successfully installed pafy-0.5.5


In [2]:
!pip install youtube_dl==2020.12.7

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting youtube_dl==2020.12.7
  Downloading youtube_dl-2020.12.7-py2.py3-none-any.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m40.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: youtube_dl
Successfully installed youtube_dl-2020.12.7


In [47]:
import pafy
import random
import youtube_dl
import cv2 as cv
from tqdm import tqdm

In [78]:
# Get the videon 
ydl_opts = {}
ydl=youtube_dl.YoutubeDL(ydl_opts)
info_dict=ydl.extract_info("https://www.youtube.com/watch?v=HSPYgwP9R84", download=False)
formats = info_dict.get('formats', None)
for f_number, f in enumerate(formats):
  if f.get('format_note', None) == '360p' and f.get('ext', None) == 'mp4' and f.get('filesize', None) != None:
    print(f)

[youtube] HSPYgwP9R84: Downloading webpage
{'format_id': '396', 'url': 'https://rr2---sn-5hne6nzk.googlevideo.com/videoplayback?expire=1679949538&ei=gqohZJvnCpDM1gLDzKqoCA&ip=35.234.175.150&id=o-ANCpNXcBtqoaF_3scD28GWFgLXuX75uWBogwkk31ZbZO&itag=396&aitags=133%2C134%2C135%2C136%2C137%2C160%2C242%2C243%2C244%2C247%2C248%2C278%2C394%2C395%2C396%2C397%2C398%2C399&source=youtube&requiressl=yes&mh=6q&mm=31%2C26&mn=sn-5hne6nzk%2Csn-5goeenes&ms=au%2Conr&mv=m&mvi=2&pl=20&vprv=1&mime=video%2Fmp4&ns=MzaF3RbycjJCPicdbmsayDUM&gir=yes&clen=3911216&dur=159.534&lmt=1617694467598556&mt=1679927595&fvip=1&keepalive=yes&fexp=24007246&c=WEB&txp=5531432&n=VRY32ZrMH7RDwHineF&sparams=expire%2Cei%2Cip%2Cid%2Caitags%2Csource%2Crequiressl%2Cvprv%2Cmime%2Cns%2Cgir%2Cclen%2Cdur%2Clmt&sig=AOq0QJ8wRQIhALL_lr8tfMZU8uuoSBJ-lHvpA1IaIIoL-bToPawfiPqWAiAzPS--n5ew4EcA0JE5zlDtjglKBcvy8dznhhf1MMbOlw%3D%3D&lsparams=mh%2Cmm%2Cmn%2Cms%2Cmv%2Cmvi%2Cpl&lsig=AG3C_xAwRAIgar02HsWVxO92XCqhVlyXD6KDJZqqXUOuH4XDMY2hmcwCIBrJ1C13bAEn1xEx5

In [95]:
def detect_objects(video_url, prompts, frame_per_second=5, patch_size=64, window=3, stride=1, threshold=0.5):
  """
  Function to detect the specified object in each frame of the video
  @param: video_url is the link to the video to analyse
  @param: prompts is a list of texts descriving the objects to detect
  @param: frame_per_second is the number of frame to analyse in a second
  @param: the others are related to the object detection
  """
  # Get the video
  ydl_opts = {}
  ydl=youtube_dl.YoutubeDL(ydl_opts)
  info_dict=ydl.extract_info(video_url, download=False)

  # Instanciate the lists
  images_list = []
  filenames_list = []
  bounding_box_list = []

  model, processor, device = load_model()

  formats = info_dict.get('formats', None)
  print("Obtaining frames")
  for f_number, f in enumerate(formats):

    # If the resolution is 360p and if the format is webm (faster than mp4)
    if f.get('format_note', None) == '360p' and f.get('ext', None) == 'webm' and f.get('filesize', None) != None:
      # Get the url
      url = f.get('url', None)

      # Define how many frames to skip between each analysis
      skip_frames = int(f['fps'] / frame_per_second)

      cap = cv.VideoCapture(url)
      current_frame = 0
      t0 = time.time()
      # Till the end of the video
      while True:
        print(f'Computing frame number {len(images_list)}')
        # Get the frame
        ret, frame = cap.read()
        if not ret:
            break
        # Define a name for the frame
        filename = r"video_shot" + str(len(images_list)) + ".png"
        # If save the frame
        # cv.imwrite(filename.format(count), frame)
        # Append the frame and the filename in the lists
        images_list.append(frame)
        filenames_list.append(filename)
        bounding_boxes = detect(model,
                                processor, 
                                prompts=prompts, 
                                device=device, 
                                frame=frame,
                                patch_size=patch_size, 
                                window=window, 
                                stride=stride, 
                                threshold=threshold)
        bounding_box_list.append(bounding_boxes)

        # Skip some frames
        current_frame += skip_frames
        cap.set(1, current_frame)
        if cv.waitKey(30) & 0xFF == ord('q'):
            break
      print("Saved {} images with format {} and resolution {} in {:.4} seconds ({:.4} minutes)".format(len(images_list), f.get('ext', None), f.get('format_note',None), (time.time() - t0), (time.time() - t0) / 60 ))
      # If a valid format has been found and analysed
      # if len(images_list) > 0:
      #  break
      # cap.release()
  # Return the lists
  return  images_list, filenames_list, bounding_box_list

In [123]:
def show_frames(images, row_dim=4, num_of_images=None):
  """
  Function to show all the frames in the list
  @params: images is the list of images
  @params: row_dim is the number of images to show in a row
  @param: num_of_images indicates how many images to show
  """
  if num_of_images == None or num_of_images <= 0 or num_of_images > len(images):
    num_of_images = len(images)

  fig, axs = plt.subplots(int(num_of_images / row_dim) + 1, row_dim, figsize=(20, len(images[0][0]) // row_dim))

  for i in tqdm(range(num_of_images)):
    axs[int(i/row_dim), i%row_dim].imshow(images[i])
    axs[int(i/row_dim), i%row_dim].set_title('Frame: {}'.format(i))

  plt.plot()

In [125]:
def show_frames_with_bounding_box(images_list, bounding_boxes, row_dim=6, num_of_images=None):
  """
  Function to show all the frames in the list with the found bounding box
  @params: images_list is the list of images
  @params: bounding_boxes is a list of bounding box associated with the images.
            For each image there a list of tuple: (x, y, width, heigth)
  @params: row_dim is the number of images to show in a row
  @param: num_of_images indicates how many images to show
  """
  # Create a copy of the list
  images = images_list.copy()

  if num_of_images == None or num_of_images <= 0 or num_of_images > len(images):
    num_of_images = len(images)

  fig, axs = plt.subplots(int(num_of_images / row_dim) + 1, row_dim, figsize=(20, len(images[0][0]) // row_dim))

  for i in tqdm(range(num_of_images)):
    # If there is at least one box
    if len(bounding_boxes[i]) > 0:
      for bb in bounding_boxes[i]:
        cv2.rectangle(images[i], # frame
                      (bb[0], # x
                       bb[1]),# y 
                      (bb[0]+bb[2], # width
                       bb[1]+bb[3]),# length 
                      [random.randint(128, 255), random.randint(128, 255), random.randint(128, 255)]) # random color
    axs[int(i/row_dim), i%row_dim].imshow(images[i])
    axs[int(i/row_dim), i%row_dim].set_title('Frame: {}'.format(i))

  plt.plot()

In [None]:
images_list, image_names_list, bounding_box_list = detect_objects(
    video_url="https://www.youtube.com/watch?v=HSPYgwP9R84",
    prompts=["a small lamp made of white glass", "black single-breasted jacket"], 
    frame_per_second=1, 
    patch_size=64, 
    window=4, 
    stride=1, 
    threshold=0.5)

In [128]:
print(f'The number of frames is {len(images_list)}')

The number of frames is 160


In [None]:
show_frames(images=images_list, row_dim=4, num_of_images=None)

In [None]:
show_frames_with_bounding_box(images_list=images_list, bounding_boxes=bounding_box_list, row_dim=4, num_of_images=None)