In [1]:
import torch
import numpy as np
import argparse
import pickle
import cv2
import os
from collections import Counter
from sklearn.linear_model import LinearRegression
from PIL import Image
# from clip_interrogator import Config, Interrogator
from matplotlib import pyplot as plt
import pdb
import subprocess
import torch.nn.functional as F
import torch
from transformers import AutoProcessor, AutoModelForImageTextToText
from transformers import SegformerImageProcessor, SegformerForSemanticSegmentation
from transformers import AutoProcessor, Blip2ForConditionalGeneration

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
annotation_file = open("./annotations_public.pkl", 'rb')
annotations = pickle.load(annotation_file)#Load annotations
annotation_file.close()

In [None]:
# params for corner detection 
device = torch.device("cuda" if torch.cuda.is_available() else "CPU")

video_root = './COOOL_Benchmark/processed_videos/'
output_dir = "./COOOL_Benchmark/processed_videos_midas/"
os.makedirs(output_dir, exist_ok=True)
video_num = 0

def detect_hazard(object_cor, frame):
    is_it_hazard = False
    caption = ""
    x1, y1, x2, y2 = object_cor
    if x1 < 0:
       x1 = 0
    if x2 < 0:
       x2 = 0
    if y1 < 0:
       y1 = 0
    if y2 < 0:
       y2 = 0
    frame_height, frame_width, _ = frame.shape

    # Check if the coordinates are within bounds
    if (y1 - 20 >= 0 and y2 + 20 <= frame_height and 
        x1 - 20 >= 0 and x2 + 20 <= frame_width):
        cropped_object = frame[y1-20 :y2 + 20, x1 - 20:x2 + 20]
    else:
        cropped_object = frame[y1:y2, x1:x2]

    prompt0 = "Question: Is this an animal or a car or a human or a flying-object or a floating-object or an alien? Answer:"
    cropped_image = Image.fromarray(cv2.cvtColor(cropped_object, cv2.COLOR_BGR2RGB))  # Convert to PIL Image
    
    inputs = processor_hazard(cropped_image, text=prompt0, return_tensors="pt").to(device, torch.float16)
    generated_ids = model_hazard.generate(**inputs, max_new_tokens=10)
    # print(f"Generated IDs: {generated_ids}")
    generated_text_general = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
    # print("generated_text_general:", generated_text_general)
    generated_text_general = generated_text_general.split()[-1]
    # print("Last word of generated_text:", generated_text_general)

    contains_car = "car" in generated_text_general.lower()
    contains_human = any(word in generated_text_general.lower() for word in ["human", "person", "man", "woman", "men", "women", "kid"])
    contains_animal = any(word in generated_text_general.lower() for word in ["animal", "dog", "cat", "snake", "bird", "Kangaroo", "moose", "deer", "rabbit", "lizard", "cow", "horse", "goose", "duck", "mouse"])
    contains_flyingobject = "flying-object" in generated_text_general.lower()
    contains_object = any(word in generated_text_general.lower() for word in ["road", "alien"])
    
    if contains_car:
        prompt1 = "Question: Is this car in the opposing lane or a preceding vehicle or in the wrong way? Answer:"
        # print("prompt1:", prompt1)
        inputs = processor_hazard(cropped_image, text=prompt1, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=100)
        generated_text = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        # print("generated_text_car:", generated_text)
        contains_lane = any(word in generated_text.lower() for word in ["wrong", "opposing"])
        if contains_lane:
            is_it_hazard = False

    if contains_human:
        # Specific prompt to describe appearance
        prompt_appearance = " This person is wearing a"
        inputs = processor_hazard(cropped_image, text=prompt_appearance, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=20)  # Limit the response to approximately 10 words
        appearance_caption = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        
        prompt1 = "Question: Is this person crossing the street? Answer:"
        # print("prompt1:", prompt1)
        inputs = processor_hazard(cropped_image, text=prompt1, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=100)
        generated_text = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        # print("generated_text_human:", generated_text)
        contains_lane = any(word in generated_text.lower() for word in ["yes"])
        
        if contains_lane:
            caption = str(generated_text_general) + " The person is going to cross the road " + appearance_caption
            is_it_hazard = True 
            
    if contains_animal:
        # Specific prompt to describe appearance
        prompt_color = f" The color of the {generated_text_general} "
        inputs = processor_hazard(cropped_image, text=prompt_color, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=20)  # Limit the response to approximately 10 words
        appearance_color = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()

        prompt_appearance = f" The characteristic of the {generated_text_general} "
        inputs = processor_hazard(cropped_image, text=prompt_appearance, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=20)  # Limit the response to approximately 10 words
        appearance_caption = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        
        prompt1 = "Question: Is this animal crossing the street? Answer:"
        # print("prompt1:", prompt1)
        inputs = processor_hazard(cropped_image, text=prompt1, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=100)
        generated_text = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        # print("generated_text_animal:", generated_text)
        contains_lane = any(word in generated_text.lower() for word in ["yes"])
        
        if contains_lane:
            caption = "It is a "+ str(generated_text_general) + f". The {generated_text_general} is going to cross the road {appearance_color}. {appearance_caption}."
            is_it_hazard = True  
            
    if contains_flyingobject:
        # Specific prompt to describe appearance
        prompt_color = f" The color of the {generated_text_general} "
        inputs = processor_hazard(cropped_image, text=prompt_color, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=20)  # Limit the response to approximately 10 words
        appearance_color = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()

        prompt_appearance = f" The characteristic of the {generated_text_general} "
        inputs = processor_hazard(cropped_image, text=prompt_appearance, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=20)  # Limit the response to approximately 10 words
        appearance_caption = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        
        prompt1 = "Question: Is this object thrown into the air? Answer:"
        # print("prompt1:", prompt1)
        inputs = processor_hazard(cropped_image, text=prompt1, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=100)
        generated_text = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        # print("generated_text_flying:", generated_text)
        contains_lane = any(word in generated_text.lower() for word in ["yes"])
        if contains_lane:
            caption = "It is a "+ str(generated_text_general) + f". The {generated_text_general} is thrown to air {appearance_color}. {appearance_caption}."
            is_it_hazard = True

    if contains_object:
        # Specific prompt to describe appearance
        prompt_color = f" The color of the {generated_text_general} "
        inputs = processor_hazard(cropped_image, text=prompt_color, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=20)  # Limit the response to approximately 10 words
        appearance_color = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()

        prompt_appearance = f" The characteristic of the {generated_text_general} "
        inputs = processor_hazard(cropped_image, text=prompt_appearance, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=20)  # Limit the response to approximately 10 words
        appearance_caption = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        
        prompt1 = "Question: Is this object on the road? Answer:"
        # print("prompt1:", prompt1)
        inputs = processor_hazard(cropped_image, text=prompt1, return_tensors="pt").to(device, torch.float16)
        generated_ids = model_hazard.generate(**inputs, max_new_tokens=100)
        generated_text = processor_hazard.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        # print("generated_text_object:", generated_text)
        contains_lane = any(word in generated_text.lower() for word in ["yes"])
        if contains_lane:
            caption = "It is an object on the "+ str(generated_text_general) + f". The object is on the road {appearance_color}. {appearance_caption}."
            is_it_hazard = True
        
    return is_it_hazard, caption
    

def analyze_hazard_results(hazard_results):
    object_summary = {}

    for obj_id, captions in hazard_results.items():
        # Extract the first four words of each caption
        first_four_words = [" ".join(caption.split()[:4]) for caption in captions]
        
        # Count the occurrences of each phrase
        word_counts = Counter(first_four_words)
        
        # Find the most repetitive phrase
        most_repetitive_phrase = word_counts.most_common(1)[0][0]
        
        # Find the caption with the highest length for the most repetitive phrase
        filtered_captions = [caption for caption in captions if most_repetitive_phrase in caption]
        longest_caption = max(filtered_captions, key=len)
        
        # Save results for this object
        object_summary[obj_id] = {
            "most_repetitive_phrase": most_repetitive_phrase,
            "count": word_counts[most_repetitive_phrase],
            "longest_caption": longest_caption
        }
    
    return object_summary


with open("results_blip_lp2.csv", 'w') as results_file:
    results_file.write("ID,Driver_State_Changed")
    for i in range(23):
        results_file.write(f",Hazard_Track_{i},Hazard_Name_{i}")
    results_file.write("\n")

    processor_hazard = AutoProcessor.from_pretrained("Salesforce/blip2-opt-2.7b")
    model_hazard = Blip2ForConditionalGeneration.from_pretrained("Salesforce/blip2-opt-2.7b", torch_dtype=torch.float16).to(device)
    
    
    for video in sorted(list(annotations.keys())):
        print("video:", video)
        video_num += 1   
        if video_num > 0:
            # print("video_num:", video_num)
            video_stream = cv2.VideoCapture(os.path.join(video_root, video+'.mp4'))
            frame = 0
            previous_centroids = []
            captioned_tracks = {}
            track_id_lifecycle = {} 
            hazard_results = {}
    
            fps = int(video_stream.get(cv2.CAP_PROP_FPS))
            width = int(video_stream.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(video_stream.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for output video
            output_video_path = os.path.join(output_dir, f"{video}_midas_hazard_v8.mp4")
            out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

            
            frame = 0
            video_stream = cv2.VideoCapture(os.path.join(video_root, video+'.mp4'))
            while video_stream.isOpened():
                
                #########################################################################################
                # Find objects in closer positions to dashcam using results out of MiDas
                video_frame = f'{video}_{frame}'
                file_path = f"./unique_ids/unique_ids_{video}.pkl"
                
                with open(file_path, "rb") as f:
                    unique_ids = pickle.load(f)
                ret, frame_image = video_stream.read()
                if ret == False: 
                    # assert frame == len(annotations[video].keys())
                    break
                #########################################################################################             
                #Gather BBoxes from annotations
                bboxes = {}
                centroids = []
                chips = {}
                track_ids = []
                det_far = {}
                for ann_type in ['challenge_object']:
                    for i in range(len(annotations[video][frame][ann_type])):
                        x1, y1, x2, y2 = annotations[video][frame][ann_type][i]['bbox']
                        track_id = annotations[video][frame][ann_type][i]['track_id']
                        if track_id in unique_ids:               
                            hazard_track = track_id
                            # print("hazard_track:", hazard_track)
                            object_cor = int(x1), int(y1), int(x2), int(y2)
                            ##########################################################
                            # Produce captions for each objects and filter them if they are cars
                            is_hazard, caption = detect_hazard(object_cor, frame_image)
                            if len(caption) > 1:
                                # print("caption:", caption)
                                if track_id not in hazard_results:
                                    hazard_results[track_id] = []
                                hazard_results[track_id].append(caption)
                            # Draw bounding box and object ID on the frame
                            if is_hazard:
                                cv2.rectangle(frame_image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                                cv2.putText(frame_image, f"ID: {hazard_track}", (int(x1), int(y1) - 10),
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
                                cv2.putText(frame_image, str(is_hazard)[0], (int(x2), int(y2) + 10),
                                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
                                cv2.putText(frame_image, caption, (int(x2), int(y2) + 20),
                                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
                    
                out.write(frame_image)
                        
                frame +=1
            # print("filtered_summary:", filtered_summary, "\n")
            video_stream.release()
            # print(analyze_hazard_results(hazard_results)) 
            object_summary = analyze_hazard_results(hazard_results)
            output_file_path = f"hazard_results/hazard_results_{video}_v8.pkl"
            with open(output_file_path, "wb") as pkl_file:
                pickle.dump(object_summary, pkl_file)

        # if video_num == 9:
        #     break
        out.release()


In [5]:
# in previous step we saved the if of each objects with their frame in which they have brightest color (meaning they are so close to dashcam in that specific frame. 
# and we will produce captions based in these frames for each objects)
file_path = f"./unique_ids/unique_ids_video_0001.pkl"
# Open the pickle file and load its contents
with open(file_path, "rb") as f:
    unique_ids = pickle.load(f)
unique_ids

{'1001': 354,
 '431': 324,
 '408': 108,
 '3': 361,
 '6': 0,
 '5': 6,
 '73': 265,
 '79': 89}