In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained(
    "mlfoundations-dev/oh-dcft-v3.1-gemini-1.5-pro-qwen",
    device_map="auto",
    torch_dtype="auto",
    trust_remote_code=True,
)
tokenizer = AutoTokenizer.from_pretrained("mlfoundations-dev/oh-dcft-v3.1-gemini-1.5-pro-qwen")

In [None]:
from transformers import pipeline

# Create a pipeline
generator = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    return_full_text=False,
    max_new_tokens=5,
    do_sample=False
)

In [5]:
import os
import json
# from langchain_google_genai import GoogleGenerativeAI
# from langchain_core.messages import HumanMessage, SystemMessage

# # Get the Google API key from environment variables
# GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')

# def get_gemini_model():
#     """Initializes the Google Generative AI Gemini model."""
#     return GoogleGenerativeAI(
#         model="gemini-1.5-pro",
#         google_api_key='AIzaSyCS6hrLH7o2noK69XmNxZV9k2UiS_DaBYE',
#         temperature=0.7
#     )

def generate_response(patch_values: dict) -> dict:
    """
    Analyzes patch values using the Phi-3.5 model and returns a response.

    Args:
        patch_values (dict): A dictionary representing patch values.

    Returns:
        dict: The AI-generated response or an error message.
    """

    prompt = """
    You are an AI specialized in depth image analysis for navigation. Your task is to process a depth image divided into 5x5 patches, represented as a 2D array of 25 spatial values. Lower values indicate clear paths, while higher values represent obstacles.
    
    ### **Task Instructions:**
    1. **Analyze the patch_values 5x5 matrix:**
       - Process the provided 5x5 matrix of depth values.
       - Identify the largest clear path by locating the darkest (lowest-value) regions.
    
    2. **Determine the Clear Path Direction:**
       - Find the region with the lowest average depth values.
       - Map this clear path to the **clock-hour direction**:
         - **12** → Clear path moving vertically (bottom to top).
         - **11:30** → 15° left from 12.
         - **11** → 15° left from 11:30.
         - **10:30** → 15° left from 11.
         - **10** → 15° left from 10:30.
         - **9:30** → Left-center.
         - **12:30** → 15° right from 12.
         - **1** → 15° right from 12:30.
         - **1:30** → 15° right from 1.
         - **2** → 15° right from 1:30.
         - **2:30** → Right-center.
         - **3** → No clear path exists.
    
    ### **Strict Output Rules:**
    - **Carefully examine the 5x5 matrix and determine the correct clock-hour direction.**
    - **Output must be exactly one value from the list:**  
      **12, 11:30, 11, 10:30, 10, 9:30, 12:30, 1, 1:30, 2, 2:30, 3.**
    - **Print only the number. No words, no explanations, no symbols, no extra text.**
    - **If the answer is 12, return only: `12` (without quotes).**
    - **If the answer is 3, return only: `3` (without quotes).**
    - **Any deviation from this rule is incorrect.**
    
    #### **Depth Patch Values (Input Data)**
    {patch_values}
    """.format(patch_values=json.dumps(patch_values))

    
    messages = [
        {"role": "user", "content": prompt}
        # {"role": "user", "content": json.dumps(patch_values)}  # Convert the matrix to a string
    ]

    try:
        # Invoke the model
        response = generator(messages)
        # response = output[0]["generated_text"]
        # print("Model Response: ", response)  # Debugging output
        return response
    except Exception as e:
        # Handle exceptions gracefully
        error_message = f"Error analyzing depth map: {str(e)}"
        print(error_message)  # Print error message for debugging
        return {"error": error_message}


In [None]:
import os
import numpy as np
from PIL import Image, ImageOps
import io
import tensorflow as tf
import time
import matplotlib.pyplot as plt
from tabulate import tabulate  # Import tabulate for timing output

MODEL_PATH = '/kaggle/input/midasv2/keras/default/1/Midas-V2.tflite'
try:
    interpreter = tf.lite.Interpreter(model_path=MODEL_PATH)
    interpreter.allocate_tensors()
    print("Model Loaded")

    # Check model details
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # print("Input Details:", input_details)
    # print("Output Details:", output_details)

except ValueError as e:
    print("Error loading the model:", e)


INPUT_HEIGHT = 256
INPUT_WIDTH = 256

INPUT_MEAN = 127.5
INPUT_STD = 127.5

MAX_IMAGE_SIZE = 65535  # Maximum allowed size for the image in bytes

# Preprocess image to match model input size and normalize
def preprocess_image(image: Image.Image) -> np.ndarray:
    original_size = image.size  # Store the original image size

    # Compress the image if its size is greater than 1 MB
    img_byte_arr = io.BytesIO()
    image.save(img_byte_arr, format='JPEG')
    size_in_mb = len(img_byte_arr.getvalue()) / (1024 * 1024)

    # If the image size exceeds 1 MB, we compress it
    if size_in_mb > 1.0:
        quality = int(min(85, 85 * (2.0 / size_in_mb)))  # Adjust quality based on size
        compressed_io = io.BytesIO()
        image.save(compressed_io, format='JPEG', quality=quality, optimize=True)
        image = Image.open(compressed_io)

    # Convert the image to RGB and resize to the input size
    image = image.convert('RGB')
    image = image.resize((INPUT_WIDTH, INPUT_HEIGHT))

    # Convert the image to a NumPy array and normalize
    image_np = np.array(image).astype(np.float32)
    image_np = (image_np - INPUT_MEAN) / INPUT_STD  # Normalize
    image_np = np.expand_dims(image_np, axis=0)  # Add batch dimension

    return image_np, original_size

# Postprocess depth map to match original size
def postprocess_depth(depth: np.ndarray, original_size: tuple) -> Image.Image:
    depth = np.squeeze(depth)

    # Normalize the depth values to the range [0, 1]
    depth_min = depth.min()
    depth_max = depth.max()
    depth_normalized = (depth - depth_min) / (depth_max - depth_min)

    # Convert to a 0-255 range for visualization
    depth_image = (depth_normalized * 255).astype(np.uint8)

    depth_pil = Image.fromarray(depth_image)
    depth_pil = depth_pil.resize(original_size, Image.Resampling.LANCZOS)

    return depth_pil

# Calculate patch values for a 5x5 grid
def calculate_patch_values(depth_image: Image.Image) -> dict:
    depth_array = np.array(depth_image)
    h, w = depth_array.shape
    patch_values = {}
    patch_size_h = h // 5
    patch_size_w = w // 5

    for i in range(5):
        for j in range(5):
            patch = depth_array[i * patch_size_h:(i + 1) * patch_size_h,
                                j * patch_size_w:(j + 1) * patch_size_w]
            patch_values[5 * i + j + 1] = int(np.mean(patch))

    return patch_values

# Generate depth map from image
def generate_depth_from_image(image_filename):
    overall_start = time.time()

    try:
        timings = []

        step_start = time.time()
        image = Image.open(image_filename)
        image = ImageOps.exif_transpose(image)
        timings.append(["Image Loading", time.time() - step_start])

        step_start = time.time()
        input_data, original_size = preprocess_image(image)
        timings.append(["Preprocessing", time.time() - step_start])

        step_start = time.time()
        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()
        timings.append(["Model Inference", time.time() - step_start])

        step_start = time.time()
        output_data = interpreter.get_tensor(output_details[0]['index'])
        depth = output_data[0]
        timings.append(["Depth Map Extraction", time.time() - step_start])

        step_start = time.time()
        depth_image = postprocess_depth(depth, original_size)
        timings.append(["Postprocessing", time.time() - step_start])

        step_start = time.time()
        patch_values = calculate_patch_values(depth_image)
        timings.append(["Patch Value Calculation", time.time() - step_start])

        step_start = time.time()
        response = generate_response(patch_values)
        timings.append(["Response Generation", time.time() - step_start])

        overall_end = time.time()
        timings.append(["Overall Request Handling", overall_end - overall_start])

        # print("\nTiming Information:")
        # print(tabulate(timings, headers=["Step", "Time (seconds)"], tablefmt="grid"))
        # print("\nGenerated Response:")
        # print(response[0]['generated_text'])

        # plt.figure(figsize=(12, 6))

        # plt.subplot(1, 3, 1)
        # plt.imshow(image)
        # plt.title("Original Image")
        # plt.axis('off')

        # plt.subplot(1, 3, 2)
        # plt.imshow(depth_image, cmap='gray')
        # plt.title("Depth Image")
        # plt.axis('off')

        # plt.subplot(1, 3, 3)
        # plt.text(3, 0.5, f"Analysis:\n{response[0]['generated_text']}",
        #          horizontalalignment='center', verticalalignment='center', fontsize=16)
        # plt.title("Analysis: Patch with AI")
        # plt.axis('off')

        # plt.subplots_adjust(wspace=0.3)
        # plt.show()

        depth_image.save('output_depth_image.png', 'PNG')

        # Return overall time and response
        return {
            'overall_time': overall_end - overall_start,
            'analysis': response
        }

    except Exception as e:
        print(f"Error processing {image_filename}: {e}")
        return {'error': str(e)}
        

import re
def extract_valid_time(text):
    # Regular expression to match times like 10, 10:30, 12, 11:30, etc.
    pattern = r'\b(12|11:30|11|10:30|10|9:30|9|12:30|1|1:30|2|2:30|3|0)\b'
    
    match = re.search(pattern, text)
    
    return match.group(0) if match else None

import os
from PIL import Image
import pandas as pd

# Define paths
dataset_path = ""
csv_path = '/kaggle/input/path-for-outdoor-final/outdoor_gt_path.csv'

# Load the CSV file and get the first column (image filenames)
image_files = pd.read_csv(csv_path, header=None)  # Read CSV file
gt_value = image_files.iloc[:, 1]
image_files2 = image_files.iloc[:, 0]  # Select the first column (image filenames)

# Create a list to store full image paths
full_paths = []
out_pred = []
# Iterate through the first 10 filenames and construct the full path
for filename in image_files2:
    image_path = os.path.join(dataset_path, filename)  # Construct full path
    full_paths.append(image_path)  # Store the full path in the list
    # print(f"Full path: {image_path}")

total_time = 0
total_images = len(full_paths)
overall_times = []

for image_filename in full_paths:
    result = generate_depth_from_image(image_filename)
    # print("Res: ",result)
    if 'error' in result:
        print(f"Error processing {image_filename}: {result['error']}")
    else:
        print(f"Analysis result for {image_filename}: {result['analysis'][0]['generated_text']}")
        if 'error' in result['analysis'][0]['generated_text']:
          out_pred.append('0')
        else:
          vt = extract_valid_time(result['analysis'][0]['generated_text'])  
          out_pred.append(vt)
          overall_times.append(result['overall_time'])
          total_time += result['overall_time']

avg_response_time = total_time / total_images if total_images > 0 else 0
print(f"\nAverage Response Time for all images: {avg_response_time:.4f} seconds")
# print(gt_value[:5])


In [None]:
print(out_pred)

In [None]:
def time_to_minutes(time_str):
    # Handle times like 12, 11:30, 10:30, etc.
    if ":" in time_str:  # For times like 12:30, 10:30, etc.
        hour, minute = map(int, time_str.split(":"))
    else:  # For times like 12, 11, 10, etc.
        hour, minute = int(time_str), 0
    
    # If hour is greater than or equal to 1, add 12 hours
    if hour == 1 or hour == 2 or hour == 3:
        hour += 12
    
    return hour * 60 + minute

def calculate_time_difference(gt_val, out_val):
    # Convert both gt_val and out_val to minutes
    gt_time_in_minutes = time_to_minutes(gt_val)
    out_time_in_minutes = time_to_minutes(out_val)

    # print(gt_time_in_minutes, out_time_in_minutes, similar_cnt)
    # Calculate the absolute difference
    return abs(gt_time_in_minutes - out_time_in_minutes)

# Example for the 5 iterations
size = 300
tot_dig = 0
similar_cnt = 0
for i in range(size):
    gt_val = gt_value[i].replace(" ", "")
    out_val = out_pred[i].replace(" ", "")
    time_diff = calculate_time_difference(gt_val, out_val)
    if time_diff == 0:
        similar_cnt += 1
    tot_dig += time_diff * 0.5
    # print(f"GT: {gt_val}, Out: {out_val}, Time Difference: {time_diff} minutes")

print("MAE:",tot_dig/size,"degree")
print("Accuracy:",(similar_cnt/size)*100,"%")
