In [None]:
#pip install peft

In [None]:
import os
import argparse
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import random
import numpy as np
from tqdm import tqdm
import json

torch.cuda.empty_cache()

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def print_trainable_parameters(model):
    """
    Prints the number of trainable parameters in the model.
    """
    trainable_params = 0
    all_param = 0
    for _, param in model.named_parameters():
        all_param += param.numel()
        if param.requires_grad:
            trainable_params += param.numel()
    print(
        f"trainable params: {trainable_params} || all params: {all_param} || trainable%: {100 * trainable_params / all_param}"
    )

In [None]:
# # Specify which BLIP-2 model you want
# model_name = "Salesforce/blip2-flan-t5-xl"  # You can choose other variants like blip2-flan-t5-xl

# # Download the processor and model
# processor = Blip2Processor.from_pretrained(model_name)
# model = Blip2ForConditionalGeneration.from_pretrained(model_name, torch_dtype=torch.float16)

# # If you have a GPU, you can move the model to it
# device = "cuda" if torch.cuda.is_available() else "cpu"
# model.to(device)

In [None]:
default_path = os.path.expanduser("~/.cache/huggingface/hub/models--Salesforce--blip2-flan-t5-xl/snapshots/0eb0d3b46c14c1f8c7680bca2693baafdb90bb28/")
#pretrain_path = os.path.join("./snapshot", f"best_model")

processor = Blip2Processor.from_pretrained(default_path)
model = Blip2ForConditionalGeneration.from_pretrained(default_path, torch_dtype=torch.float16, device_map="cuda")
device = next(model.parameters()).device
print(f"Model is loaded on: {device}")

In [None]:
model

In [None]:
class VQADataset(Dataset):
    def __init__(self, annotations_file, image_dir, processor, max_length=32, add_qa = True):
        """
        Dataset for VQA fine-tuning
        
        Args:
            image_dir (str): Directory containing the images
            annotations_file (str): Path to annotations file (should contain image_id, question, answer)
            processor (Blip2Processor): BLIP-2 processor
            max_length (int): Maximum length for answer generation
        """
        self.image_dir = image_dir
        self.processor = processor
        self.max_length = max_length
        
        # Load annotations
        self.samples = []
        with open(annotations_file, 'r') as f:
            data = json.load(f)  # Load the entire JSON file as a dictionary

        for key, item in data.items():  # Iterate over key-value pairs
            question = item['query']
            if add_qa:
                question = "Question: "+ question + ". Answer:"
                
            self.samples.append({
                'image_path': item['path'],
                'question': question,
                # 'answer': "I don't want to answer Xiao Li"
                'answer': item['answer']
            })
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        item = self.samples[idx]
        image_path = os.path.join(self.image_dir, f"{item['image_path']}")
        image = Image.open(image_path).convert('RGB')
        
        # Process inputs
        inputs = self.processor(
            images=image,
            text=item['question'],
            padding="max_length",
            return_tensors="pt"
        )
        
        # Process targets
        target = self.processor(
            text=item['answer'],
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        
        # Remove batch dimension
        for k, v in inputs.items():
            inputs[k] = v.squeeze(0)
        
        labels = target.input_ids.squeeze(0)
        labels[labels == self.processor.tokenizer.pad_token_id] = -100  # Set padding tokens to -100 to ignore them in loss
        
        return {
            "inputs" : inputs,
            "question" : item['question'],
            "answer" : item['answer'],
            "pixel_values": inputs.pixel_values,
            "input_ids": inputs.input_ids,
            "attention_mask": inputs.attention_mask,
            "labels": labels
        }

In [None]:
from peft import LoraConfig, get_peft_model

# Let's define the LoraConfig
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules=["v",'q'],
    lora_dropout=0.05,
    bias="none",
    task_type="SEQ_2_SEQ_LM"
)

peft_model = get_peft_model(model, lora_config)
print_trainable_parameters(peft_model)

In [None]:
def train(model, args):
    seed_everything(args.seed)
    
    train_dataset = VQADataset(
        image_dir=args.train_image_dir,
        annotations_file=args.train_annotations,
        processor=processor,
        max_length=args.max_length
    )
    
    val_dataset = VQADataset(
        image_dir=args.val_image_dir,
        annotations_file=args.val_annotations,
        processor=processor,
        max_length=args.max_length
    )
    
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=args.batch_size,
        shuffle=True,
        num_workers=args.num_workers
    )
    
    val_dataloader = DataLoader(
        val_dataset,
        batch_size=args.batch_size,
        shuffle=False,
        num_workers=args.num_workers
    )
    
    optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate, weight_decay=args.weight_decay)
    total_steps = len(train_dataloader) * args.num_epochs
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=total_steps)
    
    best_val_loss = float('inf')
    train_losses = []
    val_losses = []
    
    for epoch in range(args.num_epochs):
        model.train()
        
        train_loss = 0.0
        
        train_pbar = tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{args.num_epochs} [Train]")
        
        for idx, batch in zip(tqdm(range(len(train_dataloader)), desc='Training batch: ...'), train_dataloader):
            input_ids = batch.pop('input_ids').to(device)
            pixel_values = batch.pop('pixel_values').to(device)
            attention_masked = batch.pop('attention_mask').to(device)
            labels = batch.pop('labels').to(device)
            
            outputs = model(input_ids=input_ids,
                            pixel_values=pixel_values,
                            #attention_mask = attention_masked,
                            labels=labels)
            
            loss = outputs.loss
            optimizer.zero_grad()
            loss.backward()
            
            
            if args.clip_grad_norm > 0:
                nn.utils.clip_grad_norm_(model.parameters(), args.clip_grad_norm)
            
            optimizer.step()
            scheduler.step()
            
            train_losses.append(float(loss.item()))
            train_loss += loss.item()
            train_pbar.set_postfix({"loss": loss.item()})
        
        train_loss /= len(train_dataloader)
        #train_losses.append(train_loss)
        
        model.eval()
        val_loss = 0
        val_pbar = tqdm(val_dataloader, desc=f"Epoch {epoch+1}/{args.num_epochs} [Val]")
        for idx, batch in zip(tqdm(range(len(val_dataloader)), desc='Validating batch: ...'), val_dataloader):
            input_ids = batch.pop('input_ids').to(device)
            pixel_values = batch.pop('pixel_values').to(device)
            attention_masked = batch.pop('attention_mask').to(device)
            labels = batch.pop('labels').to(device)

            outputs = model(input_ids=input_ids,
                        pixel_values=pixel_values,
                        attention_mask=attention_masked,
                        labels=labels)

            loss = outputs.loss
            val_losses.append(float(loss.item()))
            val_loss += loss.item()
            val_pbar.set_postfix({"loss": loss.item()})
        
        val_loss /= len(val_dataloader)
        #val_losses.append(val_loss)
        
        print(f"Epoch {epoch+1}/{args.num_epochs} - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            print(f"Saving best model with validation loss: {val_loss:.4f}")
            model.save_pretrained(os.path.join(args.output_dir, f"best_model"))
            processor.save_pretrained(os.path.join(args.output_dir, f"best_model"))
    return train_losses, val_losses

In [None]:
import os
class Args:
    def __init__(self):
        # General parameters
        self.seed = 47
        self.output_dir = "./peft_t_aug"
        
        # Data parameters
        self.train_image_dir = "datasets_peft"
        self.train_annotations = "datasets_peft/augmented.json"
        self.val_image_dir = "datasets_peft"
        self.val_annotations = "datasets_peft/test_label.json"
        self.max_length = 32
        
        # Training parameters
        self.num_epochs = 10
        self.batch_size = 8
        self.learning_rate = 5e-4
        self.weight_decay = 0.01
        self.clip_grad_norm = 1.0
        self.num_workers = 1
        self.save_every = 1
        
        # Inference parameters
        self.model_path = "peft_blip2flant5_vqa_finetuned/best_model"
        self.test_image = "datasets_peft/bedroom/8_cuarto.jpg"
        self.test_question = "Question: How to reach the bed? Answer:"
        self.num_beams = 5
        self.do_sample = False
        self.top_p = 0.9
        self.temperature = 1.0
        self.repetition_penalty = 1.0

# Create output directory
os.makedirs("./peft_t_aug", exist_ok=True)

# For training
train_args = Args()
run3_train_losses, run3_val_losses = train(peft_model, train_args)

In [None]:
file_path = "./peft_t_aug/Train_loss.txt"
# OR Mac/Linux example
# file_path = "/home/YourUsername/Documents/my_list.txt"

# Save to a text file with custom path
with open(file_path, 'w') as file:
    for item in run3_train_losses:
        file.write(f"{item}\n")

print(f"List saved to {file_path}")

In [None]:
file_path = "./peft_t_aug/Val_loss.txt"
# OR Mac/Linux example
# file_path = "/home/YourUsername/Documents/my_list.txt"

# Save to a text file with custom path
with open(file_path, 'w') as file:
    for item in run3_val_losses:
        file.write(f"{item}\n")

print(f"List saved to {file_path}")

In [None]:

import matplotlib.pyplot as plt


args = Args()
plt.figure(figsize=(10, 5))
plt.plot(range(1, 2001), run3_train_losses, label="Training Loss", marker="o")
plt.plot(range(1, 201), run3_val_losses, label="Validation Loss", marker="o")
plt.xlabel("Iterations")
plt.ylabel("Loss")
plt.title("Training and Validation Loss")
plt.legend()
plt.grid()

loss_plot_path = os.path.join(args.output_dir, "training_loss_plot.png")
plt.savefig(loss_plot_path)
print(f"Loss plot saved at {loss_plot_path}")

In [None]:
# Import required packages
import numpy as np
import matplotlib.pyplot as plt
from bert_score import score
import pandas as pd
import torch
import re
import spacy
from collections import Counter

# Load spaCy for linguistic analysis
nlp = spacy.load('en_core_web_md')

def build_directional_synonym_dictionary():
    """
    Build an enhanced comprehensive dictionary of directional terms with synonym groups
    based on analysis of navigation instruction datasets

    Returns:
        Dictionary of directional terms organized by semantic groups
    """
    # Create semantic groups of functionally equivalent directional terms
    directional_dict = {
        # Basic directional terms with synonym groups
        'basic_directions': {
            'left_group': ['left', 'leftward', 'leftside', 'to the left'],
            'right_group': ['right', 'rightward', 'rightside', 'to the right'],
            'front_group': ['front', 'ahead', 'forward', 'straight ahead', 'directly ahead',
                           'in front', 'straight forward', 'onward'],
            'back_group': ['back', 'behind', 'backward', 'rear', 'backwards'],
            'up_group': ['up', 'upward', 'upper', 'above', 'upstairs'],
            'down_group': ['down', 'downward', 'lower', 'below', 'downstairs'],
            'center_group': ['center', 'middle', 'central'],
        },

        # Movement verbs with expanded synonym groups
        'movement_verbs': {
            'walk_group': ['walk', 'move', 'go', 'proceed', 'head', 'continue', 'step',
                          'advance', 'progress', 'press on', 'forge ahead'],
            'turn_group': ['turn', 'rotate', 'pivot', 'veer', 'swing', 'swerve', 'wheel'],
            'follow_group': ['follow', 'take', 'use', 'pursue'],
            'find_group': ['find', 'locate', 'look for', 'seek', 'spot', 'identify'],
            'enter_exit_group': ['enter', 'exit', 'access', 'leave', 'go in', 'go out'],
            'pass_group': ['pass', 'bypass', 'go past', 'move past', 'overtake'],
            'sit_group': ['sit', 'take a seat', 'be seated'],
        },

        # Relative positions with synonym groups
        'relative_positions': {
            'left_side_group': ['on your left', 'to your left', 'on the left', 'to the left side'],
            'right_side_group': ['on your right', 'to your right', 'on the right', 'to the right side'],
            'front_side_group': ['in front of you', 'directly in front', 'straight ahead of you',
                               'directly ahead', 'straight in front'],
            'back_side_group': ['behind you', 'to your rear', 'at your back'],
            'near_group': ['near', 'close to', 'nearby', 'adjacent to', 'next to', 'beside'],
            'beyond_group': ['beyond', 'past', 'further than', 'after'],
            'between_group': ['between', 'amid', 'among', 'in between'],
            'along_group': ['along', 'alongside', 'by', 'across'],
        },

        # Compound directions with expanded synonym groups
        'compound_directions': {
            'turn_left_group': ['turn left', 'make a left', 'make a left turn', 'veer left',
                               'take a left', 'take a left turn', 'hang a left', 'bear left'],
            'turn_right_group': ['turn right', 'make a right', 'make a right turn', 'veer right',
                                'take a right', 'take a right turn', 'hang a right', 'bear right'],
            'go_straight_group': ['go straight', 'continue straight', 'proceed straight',
                                 'head straight', 'walk straight', 'move straight ahead',
                                 'move forward', 'advance forward'],
            'turn_around_group': ['turn around', 'turn back', 'rotate 180 degrees', 'make a u-turn'],
            'walk_toward_group': ['walk toward', 'move toward', 'proceed toward', 'head toward',
                                 'go toward', 'approach', 'walk to', 'move to'],
            'move_right_group': ['move right', 'go right', 'head right', 'proceed right'],
            'move_left_group': ['move left', 'go left', 'head left', 'proceed left'],
        },

        # Landmark references often used with directions
        'landmark_references': {
            'counter_group': ['counter', 'bar counter', 'bar', 'desk'],
            'door_group': ['door', 'doorway', 'entrance', 'exit', 'gateway'],
            'seating_group': ['seat', 'chair', 'stool', 'bench', 'seating area', 'seating section'],
            'wall_group': ['wall', 'side wall', 'back wall', 'front wall'],
            'room_group': ['room', 'area', 'section', 'space', 'hall'],
            'table_group': ['table', 'desk', 'workspace'],
            'stage_group': ['stage', 'platform', 'podium'],
            'aisle_group': ['aisle', 'corridor', 'pathway', 'walkway'],
        },

        # Navigation action synonyms
        'navigation_actions': {
            'proceed_group': ['proceed', 'continue', 'advance', 'progress', 'move on',
                             'go ahead', 'move forward', 'press on', 'forge ahead'],
            'navigate_group': ['navigate', 'guide', 'steer', 'pilot', 'direct', 'maneuver'],
        }
    }

    # Flatten the dictionary for easier term detection
    flat_dict = {}
    for category, group_dict in directional_dict.items():
        for group_name, terms in group_dict.items():
            flat_dict[group_name] = terms

    return directional_dict, flat_dict

def detect_directional_terms_with_spacy(text, flat_dict, nlp):
    """
    Enhanced detection of directional terms with better synonym handling
    """
    doc = nlp(text.lower())
    found_terms = []

    # First check for multi-word terms
    text_lower = text.lower()
    for group_name, terms in flat_dict.items():
        for term in terms:
            if ' ' in term and term in text_lower:
                found_terms.append((term, group_name))
                # Mark the text to avoid double-counting
                text_lower = text_lower.replace(term, ' '*len(term))


    for token in doc:
        term = token.text
        for group_name, terms in flat_dict.items():
            if term in terms:
                found_terms.append((term, group_name))
                break

          # Adjust similarity threshold for movement verbs and compound directions
        if group_name in ['move_right_group', 'move_left_group', 'go_straight_group']:
            threshold = 0.75  # Lower threshold for these specific groups
        elif group_name.startswith('turn_') or group_name.endswith('_group'):
            threshold = 0.85
        else:
            threshold = 0.8

        for dict_term in terms:
            if ' ' not in dict_term and nlp(term).has_vector and nlp(dict_term).has_vector:
                similarity = nlp(term).similarity(nlp(dict_term))
                if similarity > threshold:
                    found_terms.append((term, group_name))
                    break

    return found_terms

def has_directional_conflict(ref_text, cand_text):
    """
    Check if there are conflicting directional instructions

    Args:
        ref_text: Reference (ground truth) text
        cand_text: Candidate (prediction) text

    Returns:
        Boolean indicating if conflicting directions exist
    """
    opposing_pairs = [
        ('left', 'right'),
        ('front', 'back'),
        ('ahead', 'behind'),
        ('forward', 'backward'),
        ('up', 'down')
    ]

    ref_text, cand_text = ref_text.lower(), cand_text.lower()

    for term1, term2 in opposing_pairs:
        ref_has_term1 = re.search(r'\b' + re.escape(term1) + r'\b', ref_text) is not None
        ref_has_term2 = re.search(r'\b' + re.escape(term2) + r'\b', ref_text) is not None
        cand_has_term1 = re.search(r'\b' + re.escape(term1) + r'\b', cand_text) is not None
        cand_has_term2 = re.search(r'\b' + re.escape(term2) + r'\b', cand_text) is not None

        # Check for contradictory directions
        if (ref_has_term1 and not ref_has_term2) and (cand_has_term2 and not cand_has_term1):
            return True
        if (ref_has_term2 and not ref_has_term1) and (cand_has_term1 and not cand_has_term2):
            return True

    # Check compound directional conflicts
    compound_conflicts = [
        ('turn left', 'turn right'),
        ('on your left', 'on your right'),
        ('to your left', 'to your right')
    ]

    for comp1, comp2 in compound_conflicts:
        ref_has_comp1 = comp1 in ref_text
        ref_has_comp2 = comp2 in ref_text
        cand_has_comp1 = comp1 in cand_text
        cand_has_comp2 = comp2 in cand_text

        if (ref_has_comp1 and not ref_has_comp2) and (cand_has_comp2 and not cand_has_comp1):
            return True
        if (ref_has_comp2 and not ref_has_comp1) and (cand_has_comp1 and not cand_has_comp2):
            return True

    # Special case for turn around (180°) vs. rotate 360°
    if ('turn around' in ref_text and '360' in cand_text):
        return True

    # Front vs back position conflicts
    if ('at the back' in ref_text and 'at the front' in cand_text) or \
       ('at the front' in ref_text and 'at the back' in cand_text):
        return True

    # Behind vs in front conflicts
    if ('behind you' in ref_text and 'in front of' in cand_text) or \
       ('in front of' in ref_text and 'behind you' in cand_text):
        return True

    return False

def extract_directional_sequence(doc):
    directional_verbs = {'walk', 'move', 'go', 'turn', 'proceed', 'head', 'continue', 'enter'}
    sequence = []

    # Extract verbs in document order
    for token in doc:
        if token.lemma_ in directional_verbs:
            direction = ' '.join([child.text for child in token.children
                                if child.dep_ in ('prep', 'dobj')])
            sequence.append((token.lemma_, direction))

    return sequence

    # ADD: Sort by original token order instead of dependency order
    sequence.sort(key=lambda x: x[2])  # Sort by token position

    # REMOVE OLD RETURN AND REPLACE WITH:
    return [(item[0], item[1]) for item in sequence]  # Return ordered actions



def calculate_directional_flow_similarity(ref_text, cand_text, nlp):
    """
    Calculate similarity in directional flow/sequence between two texts using spaCy

    Args:
        ref_text: Reference text
        cand_text: Candidate text
        nlp: spaCy NLP model

    Returns:
        Float from 0.0 to 1.0 indicating flow similarity
    """

    # Add at the beginning
    if has_sequence_conflict(ref_text, cand_text, nlp):
        return 0.0  # Zero score for sequence conflicts

    # Parse texts with spaCy
    ref_doc = nlp(ref_text.lower())
    cand_doc = nlp(cand_text.lower())

    # Extract directional sequences from the texts
    ref_sequence = extract_directional_sequence(ref_doc)
    cand_sequence = extract_directional_sequence(cand_doc)


    # ADD STRICT ORDER VALIDATION AT BEGINNING
    if len(ref_sequence) >= 2 and len(cand_sequence) >= 2:
        ref_first_actions = [item[0] for item in ref_sequence[:2]]
        cand_first_actions = [item[0] for item in cand_sequence[:2]]

        # Apply penalty if first two actions differ in order
        if ref_first_actions != cand_first_actions:
            return 0.2  # Hard penalty (original range: 0-1)



 # ADD THIS: Check for sequence order mismatch
    sequence_order_match = True
    if len(ref_sequence) >= 2 and len(cand_sequence) >= 2:
        # Check if primary actions are reversed
        actions_reversed = False
        ref_verbs = [item[0] for item in ref_sequence[:2]]
        cand_verbs = [item[0] for item in cand_sequence[:2]]

        # Check if first two actions are swapped
        if ref_verbs[0] == cand_verbs[1] and ref_verbs[1] == cand_verbs[0]:
            actions_reversed = True
            sequence_order_match = False

        # Check if directional objects are swapped
        ref_dirs = [item[1] for item in ref_sequence[:2]]
        cand_dirs = [item[1] for item in cand_sequence[:2]]
        if ref_dirs[0] in cand_dirs[1] or ref_dirs[1] in cand_dirs[0]:
            sequence_order_match = False

    # Apply severe penalty for sequence errors
    if not sequence_order_match:
        return 0.1  # Return very low score for sequence order mismatch

    # Continue with existing similarity calculation...
    # [rest of the original function]





    # If no sequences found, return neutral score
    if not ref_sequence or not cand_sequence:
        return 0.5

    # Calculate sequence similarity
    max_len = max(len(ref_sequence), len(cand_sequence))
    if max_len == 0:
        return 0.5

    # Count matching steps in sequence with word vector similarity
    similarity_sum = 0
    for i in range(min(len(ref_sequence), len(cand_sequence))):
        ref_verb, ref_dir = ref_sequence[i]
        cand_verb, cand_dir = cand_sequence[i]

        # Calculate verb similarity
        verb_sim = nlp(ref_verb).similarity(nlp(cand_verb)) if nlp(ref_verb).has_vector and nlp(cand_verb).has_vector else 0

        # Calculate direction similarity
        dir_sim = nlp(ref_dir).similarity(nlp(cand_dir)) if nlp(ref_dir).has_vector and nlp(cand_dir).has_vector else 0

        # Combined similarity for this step
        step_sim = (verb_sim + dir_sim) / 2
        similarity_sum += step_sim

    # Normalize by sequence length
    return similarity_sum / min(len(ref_sequence), len(cand_sequence))

def has_sequence_conflict(ref_text, cand_text, nlp):
    """
    Check if there are conflicting sequence orders in the instructions

    Args:
        ref_text: Reference (ground truth) text
        cand_text: Candidate (prediction) text
        nlp: spaCy NLP model

    Returns:
        Boolean indicating if sequence order conflicts exist
    """


    # ADD THESE LINES FOR TEMPORAL MARKER CHECK
    temporal_markers = {'then', 'after', 'next', 'first', 'second'}

    cand_has_markers = any(marker in cand_text.lower() for marker in temporal_markers)
    ref_has_markers = any(marker in ref_text.lower() for marker in temporal_markers)

    # Check for temporal markers in either text
    has_temporal = any(marker in ref_text.lower() or marker in cand_text.lower()
                      for marker in temporal_markers)


    # Parse texts
    ref_doc = nlp(ref_text.lower())
    cand_doc = nlp(cand_text.lower())

    # MODIFY ACTION REVERSAL CHECK
    ref_sequence = extract_directional_sequence(nlp(ref_text.lower()))
    cand_sequence = extract_directional_sequence(nlp(cand_text.lower()))

    # ADD STRICT ACTION ORDER CHECK
    if len(ref_sequence) >= 2 and len(cand_sequence) >= 2:
        ref_verbs = [item[0] for item in ref_sequence[:2]]
        cand_verbs = [item[0] for item in cand_sequence[:2]]

    if len(ref_sequence) < 2 or len(cand_sequence) < 2:
        return False

    # Check if first two actions are swapped with temporal markers
    if has_temporal:
        ref_verbs = [item[0] for item in ref_sequence[:2]]
        cand_verbs = [item[0] for item in cand_sequence[:2]]

        if ref_verbs == list(reversed(cand_verbs)):
            return True

    return False

    # Check verb order mismatch with temporal markers
    if ref_verbs != cand_verbs and (cand_has_markers or ref_has_markers):
        return True


    # Check if actions appear in reverse order
    reversed_actions = False
    for i in range(min(len(ref_sequence)-1, len(cand_sequence)-1)):
        for j in range(i+1, min(len(ref_sequence), len(cand_sequence))):
            if ref_sequence[i][0] == cand_sequence[j][0] and ref_sequence[j][0] == cand_sequence[i][0]:
                reversed_actions = True
                break



    # ENHANCE LANDMARK ORDER CHECK
    ref_landmarks = [item[1] for item in ref_sequence if item[1]]
    cand_landmarks = [item[1] for item in cand_sequence if item[1]]

    if len(ref_landmarks) >= 2 and len(cand_landmarks) >= 2:
      # Check if first two landmarks are swapped
      if (ref_landmarks[0] in cand_landmarks[1] and
          ref_landmarks[1] in cand_landmarks[0]):
          return True

    return False


    # Check for landmark action reversals (e.g., "enter door then hallway" vs "hallway then door")
    landmark_reversal = False


    for landmark in ['door', 'hallway', 'corridor', 'room', 'stairs', 'elevator']:
        ref_indices = [i for i, dir in enumerate(ref_landmarks) if landmark in dir.lower()]
        cand_indices = [i for i, dir in enumerate(cand_landmarks) if landmark in dir.lower()]

        if ref_indices and cand_indices:
            # Check if landmark appears in different order
            if ref_indices[0] < len(ref_sequence)//2 and cand_indices[0] >= len(cand_sequence)//2:
                landmark_reversal = True
                break

    return reversed_actions or landmark_reversal

def calculate_enhanced_directional_score(ref_text, cand_text, directional_dict, flat_dict, nlp):
    """
    Calculate improved similarity score focusing on semantic directional equivalence
    using spaCy for semantic similarity
    """
    # Check for directional conflict or sequence conflict first
    if (has_directional_conflict(ref_text, cand_text) or
            has_sequence_conflict(ref_text, cand_text, nlp)):
            return 0.0, [], [], []  # Zero score for sequence/directional conflicts

    # Detect terms and their semantic groups using spaCy
    ref_terms_with_groups = detect_directional_terms_with_spacy(ref_text, flat_dict, nlp)
    cand_terms_with_groups = detect_directional_terms_with_spacy(cand_text, flat_dict, nlp)

    # If no directional terms in reference, return 1.0
    if not ref_terms_with_groups:
        return 1.0, [], [], []

    # Extract just the terms for output purposes
    ref_terms = [term for term, _ in ref_terms_with_groups]
    cand_terms = [term for term, _ in cand_terms_with_groups]

    # Extract the groups
    ref_groups = [group for _, group in ref_terms_with_groups]
    cand_groups = [group for _, group in cand_terms_with_groups]

    # Count unique semantic groups in each text
    ref_group_counts = Counter(ref_groups)
    cand_group_counts = Counter(cand_groups)

    # Find all unique groups
    all_groups = set(ref_group_counts.keys()) | set(cand_group_counts.keys())

    # Check for special cases of functionally equivalent instructions
    special_case_boost = 0.0

    # Case 1: "Move right" vs "Go right" type equivalence
    move_go_equivalence = False
    if ('move_right_group' in ref_groups and 'move_right_group' in cand_groups) or \
       ('move_left_group' in ref_groups and 'move_left_group' in cand_groups):
        move_go_equivalence = True
        special_case_boost = 0.3  # Significant boost

    # Case 2: "Turn left" vs "Take a left turn" type equivalence
    turn_take_equivalence = False
    if ('turn_left_group' in ref_groups and 'turn_left_group' in cand_groups) or \
       ('turn_right_group' in ref_groups and 'turn_right_group' in cand_groups):
        turn_take_equivalence = True
        special_case_boost = 0.3  # Significant boost

    # Assign weights to different categories
    category_weights = {
        'basic_directions': 1.0,
        'compound_directions': 1.5,  # Increased weight
        'relative_positions': 0.9,
        'movement_verbs': 1.2,  # Increased weight
        'landmark_references': 0.7,
        'navigation_actions': 1.1  # Added category
    }

    # Calculate weighted semantic matching score
    total_weight = 0
    matched_weight = 0
    matched_groups = []

    # Determine which group belongs to which category
    group_to_category = {}
    for category, group_dict in directional_dict.items():
        for group_name in group_dict:
            group_to_category[group_name] = category

    # Calculate scores based on matched semantic groups
    for group in all_groups:
        # Get category for this group
        category = group_to_category.get(group, 'basic_directions')  # Default to basic if not found
        weight = category_weights.get(category, 1.0)

        # Extra weight for move/go and turn/take equivalence groups
        if (move_go_equivalence and group in ['move_right_group', 'move_left_group']) or \
           (turn_take_equivalence and group in ['turn_left_group', 'turn_right_group']):
            weight *= 1.5  # 50% more weight for these specific groups

        # Count occurrences in each text
        ref_count = ref_group_counts.get(group, 0)
        cand_count = cand_group_counts.get(group, 0)

        # Add to total weight
        total_weight += weight * max(ref_count, cand_count)

        # Add to matched weight (using the minimum count as match)
        if min(ref_count, cand_count) > 0:
            matched_weight += weight * min(ref_count, cand_count)
            matched_groups.append(group)

    # Calculate final score
    if total_weight > 0:
        similarity = matched_weight / total_weight
    else:
        similarity = 0.0

    # Apply directional flow bonus using spaCy
    flow_bonus = calculate_directional_flow_similarity(ref_text, cand_text, nlp)

    # Calculate overall semantic similarity using spaCy
    ref_doc = nlp(ref_text)
    cand_doc = nlp(cand_text)
    semantic_similarity = ref_doc.similarity(cand_doc)

    # Apply special case boost for functionally equivalent instructions
    # Combine all scores into final directional score with special case boost
    final_score = 0.4 * similarity + 0.3 * flow_bonus + 0.1 * semantic_similarity + special_case_boost

    # For very short, functionally equivalent instructions, provide an additional boost
    if len(ref_text.split()) <= 3 and len(cand_text.split()) <= 4 and special_case_boost > 0:
        final_score += 0.1  # Additional boost for short, equivalent instructions

    # Ensure score is between 0 and 1
    final_score = max(0.0, min(1.0, final_score))

    return final_score, ref_terms, cand_terms, matched_groups


def enhanced_directional_weighted_bertscore(refs, cands, directional_weight, model_name="microsoft/deberta-xlarge-mnli"):
    """
    Calculate BERTScore with enhanced directional term weighting using spaCy

    Args:
        refs: List of reference (ground truth) texts
        cands: List of candidate (prediction) texts
        directional_weight: Weight to give directional terms (0.0-1.0)
        model_name: BERT model to use

    Returns:
        DataFrame with results
    """
    print(f"Computing Enhanced Directional-Weighted BERTScore with {model_name} and spaCy...")

    # Build directional dictionary with synonym groups
    directional_dict, flat_dict = build_directional_synonym_dictionary()

    # Print directional term dictionary stats
    print("\nEnhanced Directional Dictionary:")
    total_terms = 0
    for category, group_dict in directional_dict.items():
        category_terms = sum(len(terms) for terms in group_dict.values())
        total_terms += category_terms
        print(f"  {category}: {len(group_dict)} groups, {category_terms} terms")
        # Print example groups
        example_groups = list(group_dict.items())[:2]
        for group_name, terms in example_groups:
            print(f"    {group_name}: {', '.join(terms[:3])}{'...' if len(terms) > 3 else ''}")

    print(f"  Total: {total_terms} directional terms in {sum(len(group_dict) for group_dict in directional_dict.values())} semantic groups")

    # Calculate standard BERTScore
    P, R, F1 = score(cands, refs, lang="en", model_type=model_name, verbose=True)

    # Convert to numpy
    P_np = P.numpy()
    R_np = R.numpy()
    F1_np = F1.numpy()

    # Calculate enhanced directional scores with spaCy
    directional_scores = []
    weighted_scores = []
    conflicts = []
    ref_dir_terms = []
    cand_dir_terms = []
    matched_dir_groups = []

    sequence_conflicts = []
    for i, (ref, cand) in enumerate(zip(refs, cands)):
        # Calculate directional score using spaCy
        dir_score, ref_terms, cand_terms, matched_groups = calculate_enhanced_directional_score(
            ref, cand, directional_dict, flat_dict, nlp
        )
        directional_scores.append(dir_score)
        ref_dir_terms.append(ref_terms)
        cand_dir_terms.append(cand_terms)
        matched_dir_groups.append(matched_groups)

        # Check for conflicts
        conflict = has_directional_conflict(ref, cand)
        conflicts.append(conflict)

        # Add sequence conflict check
        seq_conflict = has_sequence_conflict(ref, cand, nlp)
        sequence_conflicts.append(seq_conflict)

        # Calculate weighted score
        weighted_f1 = (1 - directional_weight) * F1_np[i] + directional_weight * dir_score
        weighted_scores.append(weighted_f1)

    # Create a dataframe with results
    results_df = pd.DataFrame({
        'Pair_ID': range(1, len(refs) + 1),
        'Ground Truth': refs,
        'Prediction': cands,
        'BERT_Precision': P_np,
        'BERT_Recall': R_np,
        'BERT_F1': F1_np,
        'Enhanced_Directional_Score': directional_scores,
        'Directional_Conflict': conflicts,
        'Weighted_F1': weighted_scores,
        'Ref_Directional_Terms': [', '.join(terms) for terms in ref_dir_terms],
        'Cand_Directional_Terms': [', '.join(terms) for terms in cand_dir_terms],
        'Matched_Semantic_Groups': [', '.join(groups) for groups in matched_dir_groups],
        'Sequence_Conflict': sequence_conflicts  # Add this line
    })

    # Calculate average scores
    avg_bert_f1 = np.mean(F1_np)
    avg_dir_score = np.mean(directional_scores)
    avg_weighted_f1 = np.mean(weighted_scores)
    conflict_count = sum(conflicts)

    print(f"\nAverage BERT F1 Score: {avg_bert_f1:.4f}")
    print(f"Average Enhanced Directional Score: {avg_dir_score:.4f}")
    print(f"Average Weighted F1 Score: {avg_weighted_f1:.4f}")
    print(f"Number of directional conflicts detected: {conflict_count}")

    return results_df

def visualize_results(results_df, directional_weight):
    """
    Visualize the evaluation results

    Args:
        results_df: DataFrame with results
        directional_weight: Weight given to directional terms

    Returns:
        None (displays visualization)
    """
    # Extract data for visualization
    conflicts = results_df['Directional_Conflict']
    conflict_count = sum(conflicts)

    # Visualize the results
    plt.figure(figsize=(15, 7))

    # Create bar chart for scores with color coding for conflicts
    bar_colors = ['red' if conflict else 'skyblue' for conflict in conflicts]
    x = np.arange(len(results_df))
    width = 0.3

    plt.bar(x - width, results_df['BERT_F1'], width=width, label='BERT F1', color='skyblue', alpha=0.7)
    plt.bar(x, results_df['Enhanced_Directional_Score'], width=width, label='Enhanced Directional Score', color='green', alpha=0.7)
    plt.bar(x + width, results_df['Weighted_F1'], width=width, label='Weighted F1', color=bar_colors, alpha=0.7)

    # Add average score lines
    avg_bert_f1 = results_df['BERT_F1'].mean()
    avg_dir_score = results_df['Enhanced_Directional_Score'].mean()
    avg_weighted_f1 = results_df['Weighted_F1'].mean()

    plt.axhline(y=avg_bert_f1, color='skyblue', linestyle='--', label=f'Avg BERT F1: {avg_bert_f1:.4f}')
    plt.axhline(y=avg_dir_score, color='green', linestyle='--', label=f'Avg Directional: {avg_dir_score:.4f}')
    plt.axhline(y=avg_weighted_f1, color='red', linestyle='--', label=f'Avg Weighted F1: {avg_weighted_f1:.4f}')

    plt.xlabel('Pair ID')
    plt.ylabel('Score')
    plt.title(f'Enhanced BERTScore with Directional Weighting (Weight={directional_weight})')
    plt.xticks(x, results_df['Pair_ID'])
    plt.ylim(0, 1)
    plt.legend()
    plt.grid(axis='y', linestyle='--', alpha=0.7)





# Add note for conflicting directions and sequences
    if conflict_count > 0:
        conflict_ids = [i+1 for i, c in enumerate(conflicts) if c]
        seq_conflict_ids = [i+1 for i, c in enumerate(results_df['Sequence_Conflict']) if c]


        conflict_text = []
        if conflict_count > 0:
            conflict_text.append(f"Directional conflicts: {conflict_ids}")



    # ADD SEQUENCE CONFLICT TRACKING
        seq_conflicts = results_df[results_df['Sequence_Conflict']].index.tolist()
        if seq_conflicts:
            conflict_text.append(f"Sequence conflicts: {[x+1 for x in seq_conflicts]}")

        if conflict_text:
            plt.figtext(0.5, 0.01, "\n".join(conflict_text),
                      ha="center", fontsize=10, bbox={"facecolor":"orange", "alpha":0.2, "pad":5})



        conflict_text = f"Red bars indicate directional conflicts in pairs: {conflict_ids}"
        if seq_conflict_ids:
            conflict_text += f"\nOrange bars indicate sequence order conflicts in pairs: {seq_conflict_ids}"




    plt.tight_layout()
    plt.show()


def analyze_directional_differences(results_df, n=3):
    """
    Analyze pairs with the largest differences between BERT F1 and directional scores

    Args:
        results_df: DataFrame with BERTScore results
        n: Number of examples to show

    Returns:
        None (prints analysis)
    """
    # Calculate difference between scores
    results_df['Score_Difference'] = abs(results_df['BERT_F1'] - results_df['Enhanced_Directional_Score'])

    # Sort by difference
    largest_diff = results_df.sort_values(by='Score_Difference', ascending=False).head(n)

    print(f"\nTop {n} Pairs with Largest Differences Between BERT F1 and Directional Scores:")
    for i, row in largest_diff.iterrows():
        print(f"Pair {row['Pair_ID']} (Difference: {row['Score_Difference']:.4f}):")
        print(f"  Ground Truth: {row['Ground Truth']}")
        print(f"  Prediction: {row['Prediction']}")
        print(f"  BERT F1: {row['BERT_F1']:.4f}, Enhanced Directional Score: {row['Enhanced_Directional_Score']:.4f}")
        print(f"  Ground Truth Directional Terms: {row['Ref_Directional_Terms']}")
        print(f"  Prediction Directional Terms: {row['Cand_Directional_Terms']}")
        print(f"  Matched Semantic Groups: {row['Matched_Semantic_Groups']}")
        print(f"  Has Directional Conflict: {row['Directional_Conflict']}")
        print()

def evaluate_navigation_instructions(ground_truths, predictions, directional_weight):
    """
    Main function to evaluate navigation instructions

    Args:
        ground_truths: List of ground truth instructions
        predictions: List of predicted instructions
        directional_weight: Weight to give directional terms

    Returns:
        DataFrame with evaluation results
    """
    print("Evaluating Navigation Instructions with Directional-Weighted BERTScore\n")

    # Evaluate with directional weighting
    results = enhanced_directional_weighted_bertscore(
        ground_truths,
        predictions,
        directional_weight=directional_weight
    )

    # Display detailed results
    print("\nDetailed Evaluation Results:")
    pd.set_option('display.max_colwidth', None)  # Show full text
    print(results[['Pair_ID', 'BERT_F1', 'Enhanced_Directional_Score', 'Weighted_F1', 'Directional_Conflict']].to_string(index=False))

    # Visualize results
    #visualize_results(results, directional_weight)

    # Analyze differences between metrics
    #analyze_directional_differences(results)

    return results

In [None]:
from peft import PeftModel, PeftConfig
default_path = os.path.expanduser("~/.cache/huggingface/hub/models--Salesforce--blip2-flan-t5-xl/snapshots/0eb0d3b46c14c1f8c7680bca2693baafdb90bb28/")
base_model = Blip2ForConditionalGeneration.from_pretrained(default_path, torch_dtype=torch.float16, device_map="cuda")
trained_model_path = "./peft_t_aug/best_model/"
config = PeftConfig.from_pretrained(trained_model_path)
trained_model_w_peft = PeftModel.from_pretrained(base_model, trained_model_path)
processor = Blip2Processor.from_pretrained(default_path)
#device = next(trained_model_w_peft.parameters()).device
#print(f"Model is loaded on: {device}")

In [None]:
img_path = 'datasets_peft/bar/bar_0008.jpg'
image = Image.open(img_path).convert('RGB')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
prompt = "Question: How to find the seating area. Answer:"
trained_model_w_peft.to(device)
trained_model_w_peft.eval()
# base_model.to(device)
# base_model.eval()
inputs = processor(image, text=prompt, return_tensors="pt").to(device, torch.float16)
#print(inputs)
generated_ids = trained_model_w_peft.generate(**inputs,
    max_new_tokens=64,
    num_beams=5,
    min_length=1,
    do_sample=False,
    temperature=1.0,
    repetition_penalty=1,
    no_repeat_ngram_size=2
                                         
)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
print(generated_text)

In [None]:
# Check input shapes
print(f"Input shape: {inputs.input_ids.shape if hasattr(inputs, 'input_ids') else 'No input_ids'}")

# Print raw generated IDs
print(f"Generated IDs: {generated_ids}")

# Check if generated IDs contain anything meaningful
print(f"Generated IDs length: {len(generated_ids[0])}")

In [None]:
print(f"Base model config: {base_model.config}")
print(f"PEFT config: {config.__dict__}")

In [None]:
def evaluate_model(model, dataloader):
    all_result = []
    ground_truth = []
    predictions = []
    model.eval()
    for data in dataloader:
        inputs = data['inputs'].to(device, torch.float16)
        generated_ids = model.generate(pixel_values=inputs.pixel_values, 
                                       input_ids=inputs.input_ids, 
                                       max_new_tokens=64,
                                        num_beams=5,
                                        min_length=1,
                                        do_sample=False,
                                        temperature=1.0,
                                        repetition_penalty=1,
                                        no_repeat_ngram_size=2
                                      )
        generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        ground_truth.append(data['answer'][0])
        predictions.append(generated_text)
        result = [data['question'][0], data['answer'][0], generated_text]
        print("%30s |label|  %30s  |prediction|  %30s"%(data['question'][0], data['answer'][0], generated_text))
        all_result.append(result)
    return all_result, ground_truth, predictions
val_dataset = VQADataset(
        image_dir="datasets_peft",
        annotations_file="datasets_peft/test_label.json",
        processor=processor,
        max_length=32
    )
val_dataloader = DataLoader(
    val_dataset,
    batch_size=1,
    shuffle=False,
    num_workers=1
)

In [None]:
all_result, ground_truth, predictions = evaluate_model(trained_model_w_peft,val_dataloader)

In [None]:
bert_results = evaluate_navigation_instructions(ground_truth, predictions, directional_weight=0.5)