## IR question

In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
import torchvision.transforms as T
from PIL import Image

from torchvision.transforms.functional import InterpolationMode


IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)


def build_transform(input_size):
    MEAN, STD = IMAGENET_MEAN, IMAGENET_STD
    transform = T.Compose([
        T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img),
        T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),
        T.ToTensor(),
        T.Normalize(mean=MEAN, std=STD)
    ])
    return transform


def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
    best_ratio_diff = float('inf')
    best_ratio = (1, 1)
    area = width * height
    for ratio in target_ratios:
        target_aspect_ratio = ratio[0] / ratio[1]
        ratio_diff = abs(aspect_ratio - target_aspect_ratio)
        if ratio_diff < best_ratio_diff:
            best_ratio_diff = ratio_diff
            best_ratio = ratio
        elif ratio_diff == best_ratio_diff:
            if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
                best_ratio = ratio
    return best_ratio


def dynamic_preprocess(image, min_num=1, max_num=6, image_size=448, use_thumbnail=False):
    orig_width, orig_height = image.size
    aspect_ratio = orig_width / orig_height

    # calculate the existing image aspect ratio
    target_ratios = set(
        (i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if
        i * j <= max_num and i * j >= min_num)
    target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])

    # find the closest aspect ratio to the target
    target_aspect_ratio = find_closest_aspect_ratio(
        aspect_ratio, target_ratios, orig_width, orig_height, image_size)

    # calculate the target width and height
    target_width = image_size * target_aspect_ratio[0]
    target_height = image_size * target_aspect_ratio[1]
    blocks = target_aspect_ratio[0] * target_aspect_ratio[1]

    # resize the image
    resized_img = image.resize((target_width, target_height))
    processed_images = []
    for i in range(blocks):
        box = (
            (i % (target_width // image_size)) * image_size,
            (i // (target_width // image_size)) * image_size,
            ((i % (target_width // image_size)) + 1) * image_size,
            ((i // (target_width // image_size)) + 1) * image_size
        )
        # split the image
        split_img = resized_img.crop(box)
        processed_images.append(split_img)
    assert len(processed_images) == blocks
    if use_thumbnail and len(processed_images) != 1:
        thumbnail_img = image.resize((image_size, image_size))
        processed_images.append(thumbnail_img)
    return processed_images


def load_image(image_file, input_size=448, max_num=6):
    image = Image.open(image_file).convert('RGB')
    transform = build_transform(input_size=input_size)
    images = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, max_num=max_num)
    pixel_values = [transform(image) for image in images]
    pixel_values = torch.stack(pixel_values)
    return pixel_values


path = "OpenGVLab/InternVL-Chat-V1-5"
# If you have an 80G A100 GPU, you can put the entire model on a single GPU.
model = AutoModel.from_pretrained(
    path,
    torch_dtype=torch.bfloat16,
    low_cpu_mem_usage=True,
    trust_remote_code=True, cache_dir="/scratch365/kguo2/TRANS_cache/").eval().cuda()
# Otherwise, you need to set device_map='auto' to use multiple GPUs for inference.
# import os
# os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
# model = AutoModel.from_pretrained(
#     path,
#     torch_dtype=torch.bfloat16,
#     low_cpu_mem_usage=True,
#     trust_remote_code=True,
#     device_map='auto').eval()

tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True)
# set the max number of tiles in `max_num`
pixel_values = load_image('./examples/image1.jpg', max_num=6).to(torch.bfloat16).cuda()

generation_config = dict(
    num_beams=1,
    max_new_tokens=512,
    do_sample=False,
)

# single-round single-image conversation
question = "请详细描述图片" # Please describe the picture in detail
response = model.chat(tokenizer, pixel_values, question, generation_config)
print(question, response)

# # multi-round single-image conversation
# question = "请详细描述图片" # Please describe the picture in detail
# response, history = model.chat(tokenizer, pixel_values, question, generation_config, history=None, return_history=True)
# print(question, response)

# question = "请根据图片写一首诗" # Please write a poem according to the picture
# response, history = model.chat(tokenizer, pixel_values, question, generation_config, history=history, return_history=True)
# print(question, response)

# # multi-round multi-image conversation
# pixel_values1 = load_image('./examples/image1.jpg', max_num=6).to(torch.bfloat16).cuda()
# pixel_values2 = load_image('./examples/image2.jpg', max_num=6).to(torch.bfloat16).cuda()
# pixel_values = torch.cat((pixel_values1, pixel_values2), dim=0)

# question = "详细描述这两张图片" # Describe the two pictures in detail
# response, history = model.chat(tokenizer, pixel_values, question, generation_config, history=None, return_history=True)
# print(question, response)

# question = "这两张图片的相同点和区别分别是什么" # What are the similarities and differences between these two pictures
# response, history = model.chat(tokenizer, pixel_values, question, generation_config, history=history, return_history=True)
# print(question, response)

# # batch inference (single image per sample)
# pixel_values1 = load_image('./examples/image1.jpg', max_num=6).to(torch.bfloat16).cuda()
# pixel_values2 = load_image('./examples/image2.jpg', max_num=6).to(torch.bfloat16).cuda()
# image_counts = [pixel_values1.size(0), pixel_values2.size(0)]
# pixel_values = torch.cat((pixel_values1, pixel_values2), dim=0)

# questions = ["Describe the image in detail."] * len(image_counts)
# responses = model.batch_chat(tokenizer, pixel_values,
#                              image_counts=image_counts,
#                              questions=questions,
#                              generation_config=generation_config)
# for question, response in zip(questions, responses):
#     print(question)
#     print(response)


## Data preprocessing

In [1]:
import os
os.makedirs("./data/mol_figures/step2/", exist_ok=True)

In [2]:
import pandas as pd
import os
# Define the functional groups for questions that require iterating over different groups
# Alcohol,Phenol,Ketone,Acid,Aldehyde,Ester,Aromatic,Nitrile,Isocyanate,Amine,Amide,Ether,Sulfide,Halide,Alkyl,Nitric oxide,Acetaldehyde,Benzyl,Iodine,Ethyl,Hydroxymethyl ,Nitro ,Benzene,Benzaldehyde,Alkyne ,C-O Stretching,C=O Stretching,O-H Stretching,N-H Stretching,triple bond C-H Stretching:,Degree_Unsaturation,Saturation

functional_groups = [
    "Alcohol", "Phenol", "Ketone", "Acid", "Aldehyde", "Ester", "Nitrile",
    "Isocyanate", "Amine", "Ether", "Sulfide", "Halide"]

data = pd.read_csv("./data/mol_figures/step2.csv")
# Create a new DataFrame to store the generated questions and answers
columns = ["Molecule Index", "SMILES","cls", "Formula", "Question", "Answer"]
results_df = pd.DataFrame(columns=columns)

# Generate questions and answers based on the template provided
for _, row in data.iterrows():
    #   Q31: O-H Stretching question
    oh_question = "Does the IR spectrum contains broad absorption peak of O-H stretching around 3300 cm⁻1?"
    oh_answer = "Yes" if row['O-H Stretching'] == 1 else "No"
    qclass = "IR spectrum peak analysis"
    results_df.loc[len(results_df)] = [row['Molecule Index'], row['SMILES'], qclass, row['Formula'], oh_question, oh_answer]

    # Q32: alkyl question
    akl_question = f"Does the IR spectrum contains sharp absorption peak of Alkyl stretching around 2900 cm⁻1?"
    akl_answer = "Yes" if row['Alkyl'] == 1 else "No"
    qclass = "IR spectrum peak analysis" 
    results_df.loc[len(results_df)] = [row['Molecule Index'], row['SMILES'], qclass,row['Formula'], akl_question, akl_answer]

    #Q33 C=O Stretching question
    co_question = f" Does the IR spectrum contains strong, sharp peak of C=O stretching around 1700 cm⁻¹?"
    co_answer = "Yes" if row['C=O Stretching'] == 1 else "No"
    qclass = "IR spectrum peak analysis" 
    results_df.loc[len(results_df)] = [row['Molecule Index'], row['SMILES'], qclass,row['Formula'], co_question, co_answer]

    #Q34 N-H Stretching question
    nh_question = f"Does the IR spectrum contains broad absorption peak of N-H stretching around 3200-3600 cm⁻1?"
    nh_answer = "Yes" if row['N-H Stretching'] == 1 else "No"
    qclass = "IR spectrum peak analysis"
    results_df.loc[len(results_df)] = [row['Molecule Index'], row['SMILES'],qclass, row['Formula'], nh_question, nh_answer]

    #Q35 triple bond C-H Stretching question
    tbch_question = f"Does the IR spectrum contains weak absorption peak of triple bond C-H stretching around 2260-2100 cm⁻1?"
    tbch_answer = "Yes" if row['triple bond C-H Stretching:'] == 1 else "No"
    qclass = "IR spectrum peak analysis"
    results_df.loc[len(results_df)] = [row['Molecule Index'], row['SMILES'],qclass, row['Formula'], tbch_question, tbch_answer]


    # Q17: Functional group presence question
    for group in functional_groups:
        # Q1: Basic functional group presence question
        question = f" Examine the IR spectrum to determine if the molecule could potentially contain specific functional groups: {group}? Look for the presence of characteristic absorption bands and analyze the wavenumbers and intensities of these peaks. This analysis will help identify the functional groups and key structural features within the molecule."
        answer = "Yes" if row[group] == 1 else "No"
        qclass = "IR spectrum structure elucidation"
        results_df.loc[len(results_df)] = [row['Molecule Index'], row['SMILES'],qclass, row['Formula'], question, answer]


results_df.to_csv("./data/mol_figures/step2/IR_questions.csv", index=False)


## MASS question

In [7]:
import pandas as pd

functional_groups = [
    "Alcohol", "Phenol", "Ketone", "Acid", "Aldehyde", "Ester", "Nitrile",
    "Isocyanate", "Amine", "Ether", "Sulfide", "Halide"]
data = pd.read_csv("./data/mol_figures/step2.csv")
# Create a new DataFrame to store the generated questions and answers
columns = ["Molecule Index", "SMILES","cls", "Formula", "Question", "Answer"]
results_df = pd.DataFrame(columns=columns)

# Generate questions and answers based on the template provided
for _, row in data.iterrows():
        for group in functional_groups:
        # Q1: Basic functional group presence question
            question = f"Examine the MS spectrum to determine if the molecule could potentially contain specific fragments:{group}. Look into the number of fragments observed and analyze the differences between the larger fragments. This analysis will help identify the presence of key structural features within the molecule?"
            answer = "Yes" if row[group] == 1 else "No"
            qclass = "MASS spectrum structure elucidation"
            results_df.loc[len(results_df)] = [row['Molecule Index'], row['SMILES'],qclass, row['Formula'], question, answer]

results_df.to_csv("./data/mol_figures/step2/MASS_questions.csv", index=False)


In [8]:
results_df['Question'].iloc[0]

'Examine the MS spectrum to determine if the molecule could potentially contain specific fragments:Alcohol. Look into the number of fragments observed and analyze the differences between the larger fragments. This analysis will help identify the presence of key structural features within the molecule?'

In [None]:
import pandas as pd
iteration = 3
data = pd.read_csv("./data/mol_figures/step2/MASS_questions.csv")

ratios = {
        'MASS spectrum structure elucidation': 1,
    }
for i in range(0, iteration):
    total_samples = 100
    samples_per_class = {clss: int(total_samples * ratio) for clss, ratio in ratios.items()}
    print(samples_per_class)

    sampled_data = pd.DataFrame()
    for clss, n_samples in samples_per_class.items():
        print(clss)
        sampled_class_data = data[data['cls'] == clss].sample(n=n_samples)
        sampled_data = pd.concat([sampled_data, sampled_class_data])

    if len(sampled_data) < total_samples:
        additional_samples = data[~data.index.isin(sampled_data.index)].sample(n=total_samples - len(sampled_data), random_state=42)
        sampled_data = pd.concat([sampled_data, additional_samples])
    # os.makedirs('./data/mol_figures/mol_understanding', exist_ok=True)
    sampled_data.to_csv(f'./data/mol_figures/step2/MASS_sampled_questions_answers_{i}.csv', index=False)
print("Sampled data saved to 'MASS_sampled_questions_answers.csv'")

## Data sampling

In [4]:
import pandas as pd
iteration = 3
data = pd.read_csv("./data/mol_figures/step2/IR_questions.csv")
ratios = {
        'IR spectrum peak analysis': 0.8,
        'IR spectrum structure elucidation': 0.2,
    }
for i in range(0, iteration):
    total_samples = 100
    samples_per_class = {clss: int(total_samples * ratio) for clss, ratio in ratios.items()}
    print(samples_per_class)

    sampled_data = pd.DataFrame()
    for clss, n_samples in samples_per_class.items():
        print(clss)
        sampled_class_data = data[data['cls'] == clss].sample(n=n_samples)
        sampled_data = pd.concat([sampled_data, sampled_class_data])


    if len(sampled_data) < total_samples:
        additional_samples = data[~data.index.isin(sampled_data.index)].sample(n=total_samples - len(sampled_data), random_state=42)
        sampled_data = pd.concat([sampled_data, additional_samples])
    # os.makedirs('./data/mol_figures/mol_understanding', exist_ok=True)
    sampled_data.to_csv(f'./data/mol_figures/step2/IR_sampled_questions_answers_{i}.csv', index=False)
    print("Sampled data saved to 'IR_sampled_questions_answers.csv'")

{'IR spectrum peak analysis': 80, 'IR spectrum structure elucidation': 20}
IR spectrum peak analysis
IR spectrum structure elucidation
Sampled data saved to 'IR_sampled_questions_answers.csv'
{'IR spectrum peak analysis': 80, 'IR spectrum structure elucidation': 20}
IR spectrum peak analysis
IR spectrum structure elucidation
Sampled data saved to 'IR_sampled_questions_answers.csv'
{'IR spectrum peak analysis': 80, 'IR spectrum structure elucidation': 20}
IR spectrum peak analysis
IR spectrum structure elucidation
Sampled data saved to 'IR_sampled_questions_answers.csv'


In [6]:
import pandas as pd
iteration = 3
data = pd.read_csv("./data/mol_figures/step2/MASS_questions.csv")

ratios = {
        'MASS spectrum structure elucidation': 1,
    }
for i in range(0, iteration):
    total_samples = 100
    samples_per_class = {clss: int(total_samples * ratio) for clss, ratio in ratios.items()}
    print(samples_per_class)

    sampled_data = pd.DataFrame()
    for clss, n_samples in samples_per_class.items():
        print(clss)
        sampled_class_data = data[data['cls'] == clss].sample(n=n_samples)
        sampled_data = pd.concat([sampled_data, sampled_class_data])


    if len(sampled_data) < total_samples:
        additional_samples = data[~data.index.isin(sampled_data.index)].sample(n=total_samples - len(sampled_data), random_state=42)
        sampled_data = pd.concat([sampled_data, additional_samples])
    # os.makedirs('./data/mol_figures/mol_understanding', exist_ok=True)
    sampled_data.to_csv(f'./data/mol_figures/step2/IR_sampled_questions_answers_{i}.csv', index=False)
    print("Sampled data saved to 'IR_sampled_questions_answers.csv'")

{'MASS spectrum structure elucidation': 100}
MASS spectrum structure elucidation
Sampled data saved to 'IR_sampled_questions_answers.csv'
{'MASS spectrum structure elucidation': 100}
MASS spectrum structure elucidation
Sampled data saved to 'IR_sampled_questions_answers.csv'
{'MASS spectrum structure elucidation': 100}
MASS spectrum structure elucidation
Sampled data saved to 'IR_sampled_questions_answers.csv'


## zero-shotting learning

In [9]:
sampled_data['Question'].iloc[0]

'Examine the MS spectrum to determine if the molecule could potentially contain specific fragments:Alcohol. Look into the number of fragments observed and analyze the differences between the larger fragments. This analysis will help identify the presence of key structural features within the molecule?'

In [6]:
import openai
import anthropic
from openai import OpenAI
import os
import pandas as pd
import base64
from transformers import AutoProcessor, LlavaForConditionalGeneration
from transformers import pipeline
from PIL import Image
from transformers import InstructBlipProcessor, InstructBlipForConditionalGeneration
import torch
from utils import *
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel
from transformers.generation import GenerationConfig
import torchvision.transforms as T
from torchvision.transforms.functional import InterpolationMode


# Set your OpenAI API key
os.environ['OPENAI_API_KEY'] = ''
os.environ['ANTHROPIC_API_KEY'] = ''
cache_dir = ""

sys_prompt = """
    As an expert organic chemist, your task is to analyze and determine the potential structures that can be derived from a given molecular MASS spectrum image.\
    Utilize your knowledge to systematically explore and identify plausible structural configurations based on the MASS spectrum image provided and answer the question.\
    Analyze the problem step-by-step internally, but do not include the analysis in your output. Provide only a very short answer with the exact result. Respond with ‘Yes’ or ‘No’ to the question.
    example: output:Yes
    """

def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')
    
def analyze_image_with_prompt_gpt(image_paths, prompt_text, sys_prompt=sys_prompt):
    # Encode the image to base64
    api_key = os.getenv("OPENAI_API_KEY")
    base64_images = [encode_image(image_path) for image_path in image_paths]
    client = OpenAI()
    
    messages=[
        {
          "role": "user",
          "content": [
            {
              "type": "text",
              "text": sys_prompt + "\n" + prompt_text
            },
          ],
        }
      ]
    
    for base64_image in base64_images:
        messages[0]['content'].append(
            {
                "type": "image_url",
                "image_url": {
                    "url": f"data:image/png;base64,{base64_image}"
                }
            }
        )
        
    response = client.chat.completions.create(
      model="gpt-4o",
      messages=messages,
      max_tokens=500,
      temperature=0.7
    )
    
    gpt_responses = response.choices[0]
    
    # Return only the GPT model's responses
    return gpt_responses

def analyze_image_with_prompt_claude(image_paths, prompt_text, sys_prompt=sys_prompt):
    
    # Encode the image to base64
    api_key = ''
    client = anthropic.Anthropic(api_key=api_key)
    base64_image = encode_image(image_path)
    
    message = client.messages.create(
    model="claude-3-opus-20240229",
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": 'image/png',
                        "data": base64_image,
                    },
                },
                {
                    "type": "text",
                    "text": "Describe this image."
                }
            ],
        }
    ],
    )
  
    return message


def analyze_image_with_prompt_llava_8B(model, processor, image_path, prompt_text, sys_prompt=sys_prompt):
    images = Image.open(image_path)

    prompt = (f"<|start_header_id|>system<|end_header_id|>\n\n{sys_prompt}<|eot_id|>"
        f"<|start_header_id|>user<|end_header_id|>\n\n<image>\n{prompt_text}<|eot_id|>"
              "<|start_header_id|>assistant<|end_header_id|>\n\n")
    inputs = processor(prompt, images, return_tensors='pt').to(0, torch.float16)
    outputs = model.generate(**inputs, max_new_tokens=1024, do_sample=False)
    len_tokens = len(prompt.split())
    generated_text = processor.decode(outputs[0][len(inputs['input_ids'][0]):], skip_special_tokens=True)
    return generated_text
    
def analyze_image_with_prompt_instructBlip(model, processor, image_path, prompt_text, sys_prompt=sys_prompt):
    if image_path is not None:
        images = Image.open(image_path).convert("RGB")
        prompt = prompt_text
        inputs = processor(images=images, text='You are an expert in organic chemistry, based on the provided image, answer the following question. ' + prompt, return_tensors="pt").to("cuda")
    else:
        prompt = prompt_text
        inputs = processor(text=sys_prompt + prompt, return_tensors="pt").to("cuda")
    outputs = model.generate(
            **inputs,
            do_sample=False,
            num_beams=5,
            max_length=512,
            min_length=1,
            top_p=0.9,
            repetition_penalty=1.5,
            length_penalty=1.0,
            temperature=1,
    )
    generated_text = processor.decode(outputs[0], skip_special_tokens=True).strip()
    return generated_text

def analyze_image_with_prompt_Qwen(model, tokenizer, image_path, prompt_text, sys_prompt=sys_prompt):
    query = tokenizer.from_list_format([
        {'image': image_path},
        {'text': sys_prompt+prompt_text},
    ])
    response, history = model.chat(tokenizer, query=query, history=None)
    return response
    # # 2nd dialogue turn
    # response, history = model.chat(tokenizer, '输出"击掌"的检测框', history=history)
    # print(response)
    # # <ref>击掌</ref><box>(517,508),(589,611)</box>
    # image = tokenizer.draw_bbox_on_latest_picture(response, history)
    # if image:
    #   image.save('1.jpg')
    # else:
    #   print("no box")

def analyze_image_with_prompt_InternVL(model, tokenizer, image_path, prompt_text, sys_prompt=sys_prompt):
    IMAGENET_MEAN = (0.485, 0.456, 0.406)
    IMAGENET_STD = (0.229, 0.224, 0.225)
    
    
    def build_transform(input_size):
        MEAN, STD = IMAGENET_MEAN, IMAGENET_STD
        transform = T.Compose([
            T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img),
            T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),
            T.ToTensor(),
            T.Normalize(mean=MEAN, std=STD)
        ])
        return transform
    
    
    def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
        best_ratio_diff = float('inf')
        best_ratio = (1, 1)
        area = width * height
        for ratio in target_ratios:
            target_aspect_ratio = ratio[0] / ratio[1]
            ratio_diff = abs(aspect_ratio - target_aspect_ratio)
            if ratio_diff < best_ratio_diff:
                best_ratio_diff = ratio_diff
                best_ratio = ratio
            elif ratio_diff == best_ratio_diff:
                if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
                    best_ratio = ratio
        return best_ratio
    
    
    def dynamic_preprocess(image, min_num=1, max_num=6, image_size=448, use_thumbnail=False):
        orig_width, orig_height = image.size
        aspect_ratio = orig_width / orig_height
    
        # calculate the existing image aspect ratio
        target_ratios = set(
            (i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if
            i * j <= max_num and i * j >= min_num)
        target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])
    
        # find the closest aspect ratio to the target
        target_aspect_ratio = find_closest_aspect_ratio(
            aspect_ratio, target_ratios, orig_width, orig_height, image_size)
    
        # calculate the target width and height
        target_width = image_size * target_aspect_ratio[0]
        target_height = image_size * target_aspect_ratio[1]
        blocks = target_aspect_ratio[0] * target_aspect_ratio[1]
    
        # resize the image
        resized_img = image.resize((target_width, target_height))
        processed_images = []
        for i in range(blocks):
            box = (
                (i % (target_width // image_size)) * image_size,
                (i // (target_width // image_size)) * image_size,
                ((i % (target_width // image_size)) + 1) * image_size,
                ((i // (target_width // image_size)) + 1) * image_size
            )
            # split the image
            split_img = resized_img.crop(box)
            processed_images.append(split_img)
        assert len(processed_images) == blocks
        if use_thumbnail and len(processed_images) != 1:
            thumbnail_img = image.resize((image_size, image_size))
            processed_images.append(thumbnail_img)
        return processed_images
    
    
    def load_image(image_file, input_size=448, max_num=6):
        image = Image.open(image_file).convert('RGB')
        transform = build_transform(input_size=input_size)
        images = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, max_num=max_num)
        pixel_values = [transform(image) for image in images]
        pixel_values = torch.stack(pixel_values)
        return pixel_values
    
    
    
    # set the max number of tiles in `max_num`
    pixel_values = load_image(image_path, max_num=6).to(torch.bfloat16).cuda()
    
    generation_config = dict(
        num_beams=1,
        max_new_tokens=1024,
        do_sample=False,
    )
    
    # single-round single-image conversation
    question = sys_prompt + prompt_text
    response = model.chat(tokenizer, pixel_values, question, generation_config)
    return response
    

def analyze_image_with_prompt(model_name, image_path, prompt_text, sys_prompt=sys_prompt, model=None, processor=None):
    if 'gpt-4o' in model_name:
        return analyze_image_with_prompt_gpt(image_path, prompt_text, sys_prompt=sys_prompt)
    elif "claude" in model_name:
        return analyze_image_with_prompt_claude(image_path, prompt_text, sys_prompt=sys_prompt)
    elif "llava" in model_name:
        return analyze_image_with_prompt_llava_8B(model, processor, image_path, prompt_text, sys_prompt=sys_prompt)
    elif "instructBlip" in model_name:
        return analyze_image_with_prompt_instructBlip(model, processor, image_path, prompt_text, sys_prompt=sys_prompt)
    elif 'Qwen' in model_name:
        return analyze_image_with_prompt_Qwen(model, processor, image_path, prompt_text, sys_prompt=sys_prompt)
    elif 'InternVL' in model_name:
        return analyze_image_with_prompt_InternVL(model, processor, image_path, prompt_text, sys_prompt=sys_prompt)
    else:
        raise ValueError("Invalid model name. Choose 'gpt' or 'claude'.")
    
    
def is_open_source(model_name):
    if 'claude' in model_name or 'gemini' in model_name or 'gpt' in model_name:
        return False
    return True

In [None]:

iteration = 3
model_names = ['llava', 'instructBlip-7B', 'instructBlip-13B', 'Qwen-VL-Chat']
for model_name in model_names:
    if is_open_source(model_name):
        if 'llava' in model_name:
            model = LlavaForConditionalGeneration.from_pretrained(
                model_paths[model_name], 
                torch_dtype=torch.float16, 
                low_cpu_mem_usage=True, 
                cache_dir=cache_dir,
                ).to(0)

            processor = AutoProcessor.from_pretrained(model_paths[model_name])
        elif 'instructBlip' in model_name:
            model = InstructBlipForConditionalGeneration.from_pretrained(model_paths[model_name], cache_dir=cache_dir).to("cuda")
            processor = InstructBlipProcessor.from_pretrained(model_paths[model_name], cache_dir=cache_dir) 
        elif 'Qwen' in model_name:
            processor = AutoTokenizer.from_pretrained(model_paths[model_name], trust_remote_code=True, cache_dir=cache_dir)
            model = AutoModelForCausalLM.from_pretrained(model_paths[model_name], device_map="cuda", trust_remote_code=True, cache_dir=cache_dir).eval()
        elif 'InternVL' in model_name:
            os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
            model = AutoModel.from_pretrained(
                model_paths[model_name],
                torch_dtype=torch.bfloat16,
                low_cpu_mem_usage=True,
                trust_remote_code=True,
                cache_dir=cache_dir,
                device_map='auto').eval()

            tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, cache_dir=cache_dir)
        else:
            model = None
            processor=None
            
    for i in range(0, iteration):
        data_frame = pd.DataFrame(columns=["question", "cls", "Answer", "Generated Response"])
        data = pd.read_csv(f"./data/mol_figures/step2/IR_sampled_questions_answers_{i}.csv")
        for _, row in data.iterrows():
            try:
                index_parts = row['Molecule Index'].split('_')
                index_parts = [part.strip() for part in index_parts]
                # image_path = "/home/kguo2/PycharmProjects/spectrumLM/data/mol_figures/5th_specs/Problem_21_1.png"
                image_path = f'./data/mol_figures/{index_parts[0]}th_specs/Problem {index_parts[1]}_1.png'

                generated_response = analyze_image_with_prompt(model_name, image_path, row['Question'], sys_prompt=sys_prompt, model=model, processor=processor)
                # print(generated_response)
                data_frame.loc[len(data_frame)] = [row['Question'], row['cls'], row['Answer'], generated_response]
                
            except Exception as e:
                print(e)
                continue
        data_frame.to_csv(f'./data/mol_figures/step2/IR_{model_name}_generated_responses_{i}.csv', index=False)
    
    del model, processor

111

In [4]:
from sklearn.metrics import accuracy_score, f1_score
import re

def evaluate_responses(df):
    categories = ['IR spectrum peak analysis', 'IR spectrum structure elucidation']
    results = {}
    
    for category in categories:
        # Filter data for the current category
        category_data = df[df['cls'] == category]
        
        # Compute accuracy
        answer = [1 if ans == 'Yes' else 0 for ans in category_data['Answer']]
        generated_response = [1 if 'yes' in ans.lower() else 0 if 'no' in ans.lower() else -1 for ans in category_data['Generated Response']]
        accuracy = accuracy_score(answer, generated_response)
        
        # Compute F1 score
        f1 = f1_score(answer, generated_response, average='macro')
        
        results[category] = {'Accuracy': accuracy, 'F1 Score': f1}
    
    return results

In [None]:
import numpy as np
import pandas as pd
models = ['llava', 'instructBlip-7B', 'instructBlip-13B', 'Qwen-VL-Chat']

for model in models:
    for i in range(0, 3):
        sa_f1 = []
        sa_acc = []
        ar_acc = []
        ar_f1 = []
        fg_acc = []
        fg_f1 = []
        matchs = []
        print(model)
        generated_answers = pd.read_csv(f'./data/mol_figures/step2/IR_{model}_generated_responses_{i}.csv')
        results = evaluate_responses(generated_answers)
        print(results)
        sa_f1.append(results['IR spectrum peak analysis']['F1 Score'])
        sa_acc.append(results['IR spectrum peak analysis']['Accuracy'])
        ar_f1.append(results['IR spectrum structure elucidation']['F1 Score'])
        ar_acc.append(results['IR spectrum structure elucidation']['Accuracy'])
        sa_f1 = np.array(sa_f1)
        sa_acc = np.array(sa_acc)
        ar_acc = np.array(ar_acc)
        ar_f1 = np.array(ar_f1)
    print(sa_f1,sa_acc,ar_acc,ar_f1)
    # print(sa_f1.mean(),sa_acc.mean(),ar_acc.mean(),ar_f1.mean(),fg_acc.mean(),fg_f1.mean(),matchs.mean())
    # print(sa_f1.std(),sa_acc.std(),ar_acc.std(),ar_f1.std(),fg_acc.std(),fg_f1.std(),matchs.std())

## COT IR

In [None]:
sys_prompt = """
    As an expert organic chemist, your task is to analyze and determine the potential structures that can be derived from a given molecular spectrum image.\
    Utilize your knowledge to systematically explore and identify plausible structural configurations based on the spectrum image provided and answer the question.\"\
    please think step by step and provide the answer"""

def get_gpt_response(model_name, prompt):
    openai.api_key = os.getenv("OPENAI_API_KEY")
    client = OpenAI()
    response = client.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "system", "content": "You are an expert organic chemist."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=150,
        temperature=0.7
    ).choices[0].message.content
    return response

iteration = 3
data_frame = pd.DataFrame(columns=["question", "cls", "cot", "Answer", "Generated Response"])
model_names = ['instructBlip-13B', 'Qwen-VL-Chat']
for model_name in model_names:
    if is_open_source(model_name):
        if 'llava' in model_name:
            model = LlavaForConditionalGeneration.from_pretrained(
                model_paths[model_name], 
                torch_dtype=torch.float16, 
                low_cpu_mem_usage=True, 
                cache_dir=cache_dir,
                ).to(0)

            processor = AutoProcessor.from_pretrained(model_paths[model_name])
        elif 'instructBlip' in model_name:
            model = InstructBlipForConditionalGeneration.from_pretrained(model_paths[model_name], cache_dir=cache_dir).to("cuda")
            processor = InstructBlipProcessor.from_pretrained(model_paths[model_name], cache_dir=cache_dir) 
        elif 'Qwen' in model_name:
            processor = AutoTokenizer.from_pretrained(model_paths[model_name], trust_remote_code=True, cache_dir=cache_dir)
            model = AutoModelForCausalLM.from_pretrained(model_paths[model_name], device_map="cuda", trust_remote_code=True, cache_dir=cache_dir).eval()
        elif 'InternVL' in model_name:
            os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
            model = AutoModel.from_pretrained(
                model_paths[model_name],
                torch_dtype=torch.bfloat16,
                low_cpu_mem_usage=True,
                trust_remote_code=True,
                cache_dir=cache_dir,
                device_map='auto').eval()

            tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, cache_dir=cache_dir)
        else:
            model = None
            processor=None
    for i in range(iteration):
        data = pd.read_csv(f"./data/mol_figures/step2/IR_sampled_questions_answers_{i}.csv")
        # data = data.sample(n=1)
        for _, row in data.iterrows():
            try:
                index_parts = row['Molecule Index'].split('_')
                index_parts = [part.strip() for part in index_parts]
                image_path = f'./data/mol_figures/{index_parts[0]}th_specs/Problem {index_parts[1]}_1.png'

                generated_cot_response = analyze_image_with_prompt(model_name, image_path, row['Question'], sys_prompt=sys_prompt, model=model, processor=processor)
                cot_prompt = f"Based on the following reasoning: {generated_cot_response} {row['Question']} Provide only a very short answer with the exact result. Respond with 'Yes' or 'No' to the question."
                if is_open_source(model_name):
                    generated_response = analyze_image_with_prompt(model_name, image_path, cot_prompt, 'You are an expert organic chemist.', model=model, processor=processor)
                else:
                    generated_response = get_gpt_response(model_name, cot_prompt)

                data_frame.loc[len(data_frame)] = [row['Question'], row['cls'], generated_cot_response,row['Answer'], generated_response]

            except Exception as e:
                print(e)
                continue
    
        data_frame.to_csv(f'./data/mol_figures/step2/IR_cot_{model_name}_generated_responses_{i}.csv', index=False)
    del model, processor

In [None]:
import numpy as np
import pandas as pd
model_names = ['llava', 'instructBlip-7B', 'instructBlip-13B', 'Qwen-VL-Chat']

for model_name in model_names:
    print(model_name)
    for i in range(0, 3):
        sa_f1 = []
        sa_acc = []
        ar_acc = []
        ar_f1 = []
        fg_acc = []
        fg_f1 = []
        matchs = []

        generated_answers = pd.read_csv(f'./data/mol_figures/step2/IR_cot_{model_name}_generated_responses_{i}.csv')
        results = evaluate_responses(generated_answers)
        # print(results)
        sa_f1.append(results['IR spectrum peak analysis']['F1 Score'])
        sa_acc.append(results['IR spectrum peak analysis']['Accuracy'])
        ar_f1.append(results['IR spectrum structure elucidation']['F1 Score'])
        ar_acc.append(results['IR spectrum structure elucidation']['Accuracy'])
        sa_f1 = np.array(sa_f1)
        sa_acc = np.array(sa_acc)
        ar_acc = np.array(ar_acc)
        ar_f1 = np.array(ar_f1)
    print(sa_f1.mean(),ar_f1.mean())
    print(sa_f1.std(),ar_f1.std())
    print(sa_acc.mean(),ar_acc.mean())
    print(sa_acc.std(),ar_acc.std())
    print(0.8*sa_f1.mean() + 0.2*ar_f1.mean())
    print(0.8*sa_acc.mean() + 0.2*ar_acc.mean())
    print()

## MASS zero-shot

In [None]:
iteration = 3
model_names = ['llava','instructBlip-7B', 'instructBlip-13B', 'Qwen-VL-Chat']
for model_name in model_names:
    if is_open_source(model_name):
        if 'llava' in model_name:
            model = LlavaForConditionalGeneration.from_pretrained(
                model_paths[model_name], 
                torch_dtype=torch.float16, 
                low_cpu_mem_usage=True, 
                cache_dir=cache_dir,
                ).to(0)

            processor = AutoProcessor.from_pretrained(model_paths[model_name])
        elif 'instructBlip' in model_name:
            model = InstructBlipForConditionalGeneration.from_pretrained(model_paths[model_name], cache_dir=cache_dir).to("cuda")
            processor = InstructBlipProcessor.from_pretrained(model_paths[model_name], cache_dir=cache_dir) 
        elif 'Qwen' in model_name:
            processor = AutoTokenizer.from_pretrained(model_paths[model_name], trust_remote_code=True, cache_dir=cache_dir)
            model = AutoModelForCausalLM.from_pretrained(model_paths[model_name], device_map="cuda", trust_remote_code=True, cache_dir=cache_dir).eval()
        elif 'InternVL' in model_name:
            os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
            model = AutoModel.from_pretrained(
                model_paths[model_name],
                torch_dtype=torch.bfloat16,
                low_cpu_mem_usage=True,
                trust_remote_code=True,
                cache_dir=cache_dir,
                device_map='auto').eval()

            tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, cache_dir=cache_dir)
        else:
            model = None
            processor=None
    for i in range(0, iteration):
        data_frame = pd.DataFrame(columns=["question", "cls", "Answer", "Generated Response"])
        data = pd.read_csv(f"./data/mol_figures/step2/MASS_sampled_questions_answers_{i}.csv")
        for _, row in data.iterrows():
            try:
                index_parts = row['Molecule Index'].split('_')
                index_parts = [part.strip() for part in index_parts]
                # image_path = "/home/kguo2/PycharmProjects/spectrumLM/data/mol_figures/5th_specs/Problem_21_1.png"
                image_path = f'./data/mol_figures/{index_parts[0]}th_specs/Problem {index_parts[1]}_2.png'
                
                generated_response = analyze_image_with_prompt(model_name, image_path, row['Question'], sys_prompt='You are an expert in organic chemistry. ', model=model, processor=processor)
                data_frame.loc[len(data_frame)] = [row['Question'], row['cls'], row['Answer'], generated_response]
                
            except Exception as e:
                print(e)
                continue
        data_frame.to_csv(f'./data/mol_figures/step2/MASS_{model_name}_generated_responses_{i}.csv', index=False)

In [1]:
from sklearn.metrics import accuracy_score, f1_score
import re

def evaluate_responses(df):
    categories = ['MASS spectrum structure elucidation']
    results = {}
    
    for category in categories:
        # Filter data for the current category
        category_data = df[df['cls'] == category]
        
        # Compute accuracy
        answer = [1 if ans == 'Yes' else 0 for ans in category_data['Answer']]
        generated_response = [1 if 'Yes' in ans else 0 if 'No' in ans else -1 for ans in category_data['Generated Response']]
        accuracy = accuracy_score(answer, generated_response)
        
        # Compute F1 score
        f1 = f1_score(answer, generated_response, average='macro')
        
        results[category] = {'Accuracy': accuracy, 'F1 Score': f1}
    
    return results

In [None]:
import numpy as np
import pandas as pd

models = ['llava','instructBlip-7B', 'instructBlip-13B', 'Qwen-VL-Chat']
for model in models:
    print()
    print(model)

    sa_f1 = []
    sa_acc = []
    ar_f1 = []
    ar_acc = []
    for i in range(0, 3):
        generated_answers = pd.read_csv(f'./data/mol_figures/step2/MASS_{model}_generated_responses_{i}.csv')
        results = evaluate_responses(generated_answers)
        print(results)
        ar_f1.append(results['MASS spectrum structure elucidation']['F1 Score'])
        ar_acc.append(results['MASS spectrum structure elucidation']['Accuracy'])

    ar_acc = np.array(ar_acc)
    ar_f1 = np.array(ar_f1)

    print(ar_acc.mean())
    print(ar_acc.std())
    print("f1 mean",(ar_f1.mean()))
    print("f1 std",(ar_f1.std() ))
    print("acc mean",(ar_acc.mean())) 
    print("acc std",(ar_acc.std() ))

## cot MASS

In [None]:
sys_prompt = """
    As an expert organic chemist, your task is to analyze and determine the potential structures that can be derived from a given molecular spectrum image.\
    Utilize your knowledge to systematically explore and identify plausible structural configurations based on the spectrum image provided and answer the question.\"\
    please think step by step and provide the answer"""

iteration = 3
data_frame = pd.DataFrame(columns=["question", "cls", "cot", "Answer", "Generated Response"])
model_names = ['llava', 'instructBlip-7B', 'instructBlip-13B', 'Qwen-VL-Chat']
for model_name in model_names:
    if is_open_source(model_name):
        if 'llava' in model_name:
            model = LlavaForConditionalGeneration.from_pretrained(
                model_paths[model_name], 
                torch_dtype=torch.float16, 
                low_cpu_mem_usage=True, 
                cache_dir=cache_dir,
                ).to(0)

            processor = AutoProcessor.from_pretrained(model_paths[model_name])
        elif 'instructBlip' in model_name:
            model = InstructBlipForConditionalGeneration.from_pretrained(model_paths[model_name], cache_dir=cache_dir).to("cuda")
            processor = InstructBlipProcessor.from_pretrained(model_paths[model_name], cache_dir=cache_dir) 
        elif 'Qwen' in model_name:
            processor = AutoTokenizer.from_pretrained(model_paths[model_name], trust_remote_code=True, cache_dir=cache_dir)
            model = AutoModelForCausalLM.from_pretrained(model_paths[model_name], device_map="cuda", trust_remote_code=True, cache_dir=cache_dir).eval()
        elif 'InternVL' in model_name:
            os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
            model = AutoModel.from_pretrained(
                model_paths[model_name],
                torch_dtype=torch.bfloat16,
                low_cpu_mem_usage=True,
                trust_remote_code=True,
                cache_dir=cache_dir,
                device_map='auto').eval()

            tokenizer = AutoTokenizer.from_pretrained(path, trust_remote_code=True, cache_dir=cache_dir)
        else:
            model = None
            processor=None
    for i in range(iteration):
        data = pd.read_csv(f"./data/mol_figures/step2/MASS_sampled_questions_answers_{i}.csv")
        # data = data.sample(n=1)
        for _, row in data.iterrows():
            try:
                index_parts = row['Molecule Index'].split('_')
                index_parts = [part.strip() for part in index_parts]
                image_path = f'./data/mol_figures/{index_parts[0]}th_specs/Problem {index_parts[1]}_1.png'
                generated_cot_response = analyze_image_with_prompt(model_name, image_path, row['Question'], sys_prompt=sys_prompt, model=model, processor=processor)
                cot_prompt = f"Based on the following reasoning: {generated_cot_response} {row['Question']} Provide only a very short answer with the exact result. Respond with 'Yes' or 'No' to the question."
                if is_open_source(model_name):
                    generated_response = analyze_image_with_prompt(model_name, image_path, cot_prompt, 'You are an expert organic chemist.', model=model, processor=processor)
                else:
                    generated_response = get_gpt_response(model_name, cot_prompt)

                data_frame.loc[len(data_frame)] = [row['Question'], row['cls'], generated_cot_response,row['Answer'], generated_response]

            except Exception as e:
                print(e)
                continue
    
        data_frame.to_csv(f'./data/mol_figures/step2/MASS_cot_{model_name}_generated_responses_{i}.csv', index=False)
    del model, processor

In [None]:
import numpy as np
model_names = ['llava', 'instructBlip-7B', 'instructBlip-13B', 'Qwen-VL-Chat']
for i in range(0, 3):
    for model_name in model_names:
        ar_acc = []
        ar_f1 = []
        print(model_name)
        generated_answers = pd.read_csv(f'./data/mol_figures/step2/MASS_cot_{model_name}_generated_responses_{i}.csv')
        results = evaluate_responses(generated_answers)
        print(results)
        ar_f1.append(results['MASS spectrum structure elucidation']['F1 Score'])
        ar_acc.append(results['MASS spectrum structure elucidation']['Accuracy'])

        ar_acc = np.array(ar_acc)
        ar_f1 = np.array(ar_f1)
    print(ar_acc,ar_f1)