<a href="https://colab.research.google.com/github/JDonohoe101/MSc-Research-Project-Scripts/blob/main/HateMM_For_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Commands to run on google colab terminal:
*   pip install openai-whisper
*   pip install torch




In [16]:
# -- Imports --
import whisper
import cv2
import os
import time
import torch
import gc
import shutil
import json
import pandas as pd
import numpy as np
from transformers import BlipProcessor, BlipForConditionalGeneration, logging
from PIL import Image
from moviepy.editor import VideoFileClip, AudioFileClip

In [23]:
#-- Global constants and variables --
VIDEO_FOLDER_PATH = "/content/drive/MyDrive/MSC RESEARCH PROJECT for COLAB/Datasets/HateMM - Data/7799469/hate_videos"
ANNOTATION_FILE_PATH = "/content/drive/MyDrive/MSC RESEARCH PROJECT for COLAB/Datasets/HateMM - Data/7799469/HateMM_annotation.csv"

In [3]:
# -- Image caption model initialisation --
logging.set_verbosity_error()  # silence info & warnings
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device); # ';' suppresses printing model details

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



preprocessor_config.json:   0%|          | 0.00/287 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/506 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/711k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.56k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/990M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/990M [00:00<?, ?B/s]

# **Utility Functions**
*These functions help process the videos in to text across various modalities.*


In [27]:
def convert_video_to_audio(video_name: str) -> AudioFileClip | None:
    """
    Converts any valid video format in to mp3 audio to be
    fed in to OpenAIs Whisper model later.

    Params: video_name (str)
    Name of the video file.

    Returns: Audio file of type AudioFileClip. If no audio found, returns None
    """
    video = get_video(video_name)
    audio = video.audio
    #TODO folderise each video such that they will each contain all of its processed info

    return audio

def get_video(video_name: str) -> VideoFileClip | None:
    """
    Retrieves path to the video and converts it to a
    video file to be processed programatically.

    Params: video_name (str)
    Name of the video file.

    Returns: Video file of type VideoFileClip. If no video found, returns None.
    """
    path_to_video = os.path.join(VIDEO_FOLDER_PATH, video_name)
    video = VideoFileClip(path_to_video)
    return video

def get_all_annotations() -> dict:
    """
    Retrieves the csv file of all annotations and converts
    them in to a list.

    Returns: Dictionary of all annotations.
    """
    data = pd.read_csv(ANNOTATION_FILE_PATH)
    annotations_dict = data.to_dict(orient='records')
    return annotations_dict

def get_annotation(video_name: str, annotations: dict) -> str|None:
    """
    Queries the annotation of the specified video and
    returns it as string. May consider returning ALL
    labels instead of just the hate/non-hate label.

    Params:
    video_name (str) - name of the video we are querying.
    annotations (dict) - all annotations of the dataset.

    Returns: Annotation as string or None if record doesn't exist.
    """
    record = next((row for row in annotations if row['video_file_name'] == video_name), None)
    if record:
        return str(record.get('label'))
    return None

def extract_video_frames(video_name: str) -> None:
    """
    Creates a directory for the extracted frames, samples the frames
    from the video provided, saves them in to the folder created.

    Params: video_name
    Name of the video we are processing.
    """
    # Load the video
    path_to_video = os.path.join(VIDEO_FOLDER_PATH, video_name)
    cap = cv2.VideoCapture(path_to_video)
    base_name = os.path.splitext(video_name)[0]  # removes extension
    frame_folder = os.path.join(VIDEO_FOLDER_PATH, f"{base_name}_frames")
    os.makedirs(frame_folder, exist_ok=True)

    # Check if video was opened successfully
    if not cap.isOpened():
        print("Error: Could not open video.")
        exit()

    frame_count = 0  # Initialize frame counter
    SAMPLE_RATE = 90 # how many frames before sampling next
    # Loop through each frame in the video
    while True:
        success, frame = cap.read()

        # Break the loop if the video ends
        if not success:
            break

        if frame_count % SAMPLE_RATE == 0:
            # Save the frame as an image
            frame_filename = os.path.join(frame_folder,f"{base_name}_frame_{frame_count:04d}.jpg")
            cv2.imwrite(frame_filename, frame)
            #print(f"Frame {frame_count} saved as {frame_filename}")#debug

        frame_count += 1

    # Release the video capture object
    cap.release()

def get_audio_transcript(video_name: str) -> str|list:
    """
    Writes an audio file to the system and then uses
    the Whisper model to transcribe the audio in to
    textual format.

    Params: video_name
    Name of the video file.

    Returns: Transcribed audio in textual format of type string OR list.
    """
    #Retrieves audio from video, then writes audio file to users system
    audio = convert_video_to_audio(video_name)
    if audio is None:
      return f"None: No audio was found in {video_name}"

    audio_path = os.path.join(VIDEO_FOLDER_PATH, os.path.splitext(video_name)[0])+"_audio.mp3"
    audio.write_audiofile(audio_path, logger=None)

    #Load model and extract audio transcription given the audio files' path
    model = whisper.load_model("small")
    result = model.transcribe(audio_path)
    audio_transcript = result["text"]

    #Cleanup code
    del model
    torch.cuda.empty_cache()
    gc.collect()
    os.remove(audio_path)

    return audio_transcript

#TODO put this in the other py file

def generate_caption(processor: BlipProcessor, model: BlipForConditionalGeneration, image_path: str) -> str:
    """
    Uses the BLIP model to generate a textual caption for a video frame.

    Params:
    processor - Image-to-tensor converter.
    model - Pre-trained image captioning model.
    image_path - File path to the selected image.

    Returns:
    The generated caption for the video frame.
    """
    image = Image.open(image_path).convert("RGB")
    inputs = processor(images=image, return_tensors="pt").to(device)
    output = model.generate(**inputs)
    caption = processor.decode(output[0], skip_special_tokens=True)
    return caption

def output_captions(video_name: str) -> dict:
    """
    Compiles and returns all frame captions per video.

    Params:
    video_name - ID of the video being processed.

    Returns:
    output_captions - Dictionary of all captions produced for the video.
    """
    output_captions = {}
    frame_folder = os.path.join(VIDEO_FOLDER_PATH, f"{os.path.splitext(video_name)[0]}_frames")

    for filename in os.scandir(frame_folder):
        frame_path = os.path.join(frame_folder, filename)
        caption = generate_caption(processor, model, frame_path)
        print(f"{filename.name} CAPTION: {caption}")
        output_captions[filename.name] = caption
    shutil.rmtree(frame_folder)
    return output_captions

#POSSIBLY REDUNDANT FUNCTION
# def delete_audio(audio_name: str) -> None:
#     """
#     Simple function that removes unnecessary audio files post processing.

#     Params:
#     audio_name - Name of the audio file we wish to delete from the system.
#     """
#     path = os.path.join(VIDEO_FOLDER_PATH, audio_name)
#     os.remove(path)

def export_to_json(total_video_data):
    output_root = os.path.join(VIDEO_FOLDER_PATH, "extracted_video_data")  # Main folder to hold all video data
    os.makedirs(output_root, exist_ok=True)  # Create if not exists
    for video in total_video_data:
          video_name = video["video_name"]
          video_name_no_ext = os.path.splitext(video_name)[0]
          output_path = os.path.join(output_root, f"{video_name_no_ext}_extracted_video_data.json")
          with open(output_path, "w") as json_file:
              json.dump(video, json_file, indent=4)




# **Main Loop**
This code acts as a high level overview of the data pipeline.

In [28]:
def process_videos() -> None:
    """
    This is the main function which extracts multimodal semantic features from
    videos in to prompts that can be fed in to both open and closed
    source LLMs.

    """

    total_extracted_video_data_list = [] #list to hold all videos extracted data
    VALID_EXTENSIONS = ['.mp4', '.mov', '.avi', '.mkv'] #checking formats avoids trying to process hidden files

    all_annotations_dict = get_all_annotations()

    # -- Main loop --
    for video in os.scandir(VIDEO_FOLDER_PATH):
        if video.is_file() and os.path.splitext(video.name)[1].lower() in VALID_EXTENSIONS:  # check if it's a (valid) video file
            video_name = video.name

            # -- Calls to the extraction logic in util file --
            extract_video_frames(video_name)
            video_audio_transcript = get_audio_transcript(video_name)
            print(f"\nVIDEO: ", video_name)
            print(f"AUDIO TRANSCRIPT: ",video_audio_transcript,'\n')#debug statement, remove later
            video_captions = output_captions(video_name)
            video_annotation = get_annotation(video_name, all_annotations_dict)
            print(f"ANNOTATION: ", video_annotation,'\n')

            extracted_video_data = { #Dictionary used to hold all extracted data in a video
                "video_name" : video_name,
                # "video_frames" : video_frames,
                "video_audio_transcript" : video_audio_transcript,
                "video_captions" : video_captions,
                "video_annotation" : video_annotation
            }

            total_extracted_video_data_list.append(extracted_video_data)
            torch.cuda.empty_cache()
            #time.sleep(0.01)  #debug statement for progress bar

    export_to_json(total_extracted_video_data_list) #call to export logic

In [29]:
# -- Initial logic --
if __name__ == "__main__":
    #Defines the path to the video folder and update the path in the utils module.
    process_videos()

Exception ignored in: <function FFMPEG_AudioReader.__del__ at 0x7874f6105120>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/moviepy/audio/io/readers.py", line 252, in __del__
    def __del__(self):

KeyboardInterrupt: 



VIDEO:  hate_video_4.mp4
AUDIO TRANSCRIPT:   Fuck jigs. We were these kings. We were the real Jews and the real Egyptians man. Bang bang. 

hate_video_4_frame_0000.jpg CAPTION: a man in a colorful outfit holding a stick
hate_video_4_frame_0090.jpg CAPTION: a man in a colorful outfit holding a stick
hate_video_4_frame_0180.jpg CAPTION: a man in a colorful outfit holding a stick
hate_video_4_frame_0270.jpg CAPTION: a man in a colorful outfit holding a stick
hate_video_4_frame_0360.jpg CAPTION: a man in a colorful outfit holding a stick
ANNOTATION:  Hate 



KeyboardInterrupt: 