In [None]:
from transformers import AutoProcessor, BitsAndBytesConfig, LlavaNextForConditionalGeneration
import torch
from datasets import load_dataset
from pprint import pprint

In [None]:
MODEL_ID = "llava-hf/llava-v1.6-mistral-7b-hf"
processor = AutoProcessor.from_pretrained(MODEL_ID)

# Define quantization config
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.float16
)

# Load the base model with adapters on top
model = LlavaNextForConditionalGeneration.from_pretrained(
    "/home/vaibhav/LLMs-TimeSeries/software/Model_Dataset_v2_LA_10_mtf",
    # MODEL_ID,
    torch_dtype=torch.float16,
    quantization_config=quantization_config,
)

In [None]:
dataset = load_dataset("VaibhavMal/AirQualty_imageConv_2", split="test3")
# dataset_train = load_dataset("VaibhavMal/AirQualty_imageConv", split="train")
print(dataset)
print(dataset[0])

In [None]:
import re

def Extract_num(llm_output):
    match = re.search(r'\[\\INST\]\s*(\d+)', llm_output)
    # match1 = re.search(r'\[\INST\]\s*(\d+)', llm_output)

    # Extract the number if a match is found
    if match:
        number = int(match.group(1))
        print(f"Extracted number: {number}")
        return int(number)
    else:
        print(f"Not Found")
        return 0

In [None]:
from PIL import Image

count = 0
ground_truth = []
output_result = []
image_type = "mtf"

for i in range(len(dataset)):
    
    if image_type == "scalogram":
        en_query = dataset[i]["query_scalogram"]
        image = dataset[i]["image_scalogram"]
    elif image_type == "mtf":
        en_query = dataset[i]["query_mtf"]
        image = dataset[i]["image_mtf"]
    elif image_type == "spectrogram":
        en_query = dataset[i]["query_spectrogram"]
        # line_to_remove = "Analyze the provided Spectrogram of Nitrogen Dioxide day time data for four days."
        # en_query = en_query.replace(line_to_remove, "").strip()
        # line_to_remove = "Analyze the provided Spectrogram of Nitrogen Dioxide night time data for four days."
        # en_query = en_query.replace(line_to_remove, "").strip()
        image = dataset[i]["image_spectogram"]
        # image = Image.open("zeros_image.png")
        
    # image = dataset[i]["image"]
    # en_query = dataset[i]["query"]
    ground_truth.append( int(dataset[i]["answers"]) )
    # print(en_query)

    prompt = f"[INST] <image>\n{en_query} [\INST]"
    max_output_token = 4096
    inputs = processor(prompt, image, return_tensors="pt").to("cuda:0")
    output = model.generate(**inputs, max_new_tokens=max_output_token)
    response = processor.decode(output[0], skip_special_tokens=True)
    print(response)
    output_result.append( Extract_num(response) )
    # if output_result[count] == 0:
    #     break
    # else:
    #     count = count + 1

    # mae_error = mae_error + abs(ground_truth - output)
    # print(response)

# if count == len(dataset):
#     mae_error = mae_error / len(dataset)
# else:
#     print("error at count: " + str(count))
#     print(response)


In [None]:
print(ground_truth)
print(output_result)
print(len(ground_truth) == len(dataset))

In [None]:
import math

mae_error = 0
rmse_error = 0
count = 0

for count in range(len(output_result)):
    mae_error = mae_error + abs(ground_truth[count] - output_result[count] )
    rmse_error = rmse_error + pow( (ground_truth[count] - output_result[count] ) , 2)
    count = count + 1

mae_error = mae_error / len(dataset)
rmse_error = math.sqrt(rmse_error / len(dataset))
# if count == len(dataset):
#     mae_error = mae_error / len(dataset)
# else:
#     print("error at count: " + str(count))
#     print(response)
print(mae_error)
print(rmse_error)



In [None]:
# image = dataset[3]["image"]
# en_query = dataset[3]["query"]

# prompt = f"[INST] <image>\n{en_query} [\INST]"
# max_output_token = 4096
# inputs = processor(prompt, image, return_tensors="pt").to("cuda:0")
# output = model.generate(**inputs, max_new_tokens=max_output_token)
# response = processor.decode(output[0], skip_special_tokens=True)
# response

In [None]:
# image = dataset[0]["image"]
# en_query = "Analyze the image and predict what kind of time series transformation it could have been?"
# print(en_query)

# prompt = f"[INST] <image>\n{en_query} [\INST]"
# max_output_token = 4096
# inputs = processor(prompt, image, return_tensors="pt").to("cuda:0")
# output = model.generate(**inputs, max_new_tokens=max_output_token)
# response = processor.decode(output[0], skip_special_tokens=True)
# response