## Load image

In [None]:
!pip install --upgrade transformers
import os
from PIL import Image
from transformers import AutoProcessor, Blip2ForConditionalGeneration
import torch

# 1. 載入 BLIP-2 模型與處理器
processor = AutoProcessor.from_pretrained("Salesforce/blip2-opt-2.7b")
model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b", 
    torch_dtype=torch.float16, 
    device_map="auto"
)

In [None]:
# 2. 設定圖片資料夾
img_dir = '/home/DL_MILS/MILS_Final/AVA_Dataset/road/train/road_0003'
img_files = [f for f in os.listdir(img_dir) if f.endswith('.jpg')]

# 3. 批次讀取圖片
images = [Image.open(os.path.join(img_dir, f)).convert('RGB') for f in img_files]

# 4. 設定問題（可根據需求修改）
# question = "Is there any risk in this image?"
question = "Question: Is this road risky?(answer 1:risky, answer 0:not risky) Answer:"

# 5. 批次處理與推論
inputs = processor(images=images, text=[question]*len(images), return_tensors="pt", padding=True).to(model.device)
with torch.no_grad():
    generated_ids = model.generate(**inputs, max_new_tokens=1)
    generated_texts = processor.batch_decode(generated_ids, skip_special_tokens=True)

# 6. 只要有一張是 1 就輸出 1
def extract_label(text):
    text = text.strip()
    if text == '1' or text.lower().startswith('1') or 'risky' in text.lower():
        return 1
    return 0

labels = [extract_label(t) for t in generated_texts]
video_label = 1 if any(labels) else 0
print(f"Folder {os.path.basename(img_dir)} prediction(1:risk, 0:safe): {video_label} ")

In [None]:
# 2. 設定圖片資料夾
# img_dir = '/home/DL_MILS/MILS_Final/AVA_Dataset/road/train/road_0003'
# img_files = [f for f in os.listdir(img_dir) if f.endswith('.jpg')]
main_dirs = [
    '/home/DL_MILS/MILS_Final/AVA_Dataset/road/train',
    '/home/DL_MILS/MILS_Final/AVA_Dataset/freeway/train'
]
# 3. 批次讀取圖片
# images = [Image.open(os.path.join(img_dir, f)).convert('RGB') for f in img_files]

# 4. 設定問題（可根據需求修改）
# question = "Is there any risk in this image?"
question = "Question: Is this road risky?(answer 1:risky, answer 0:not risky) Answer:"

# 5. 批次處理與推論
# inputs = processor(images=images, text=[question]*len(images), return_tensors="pt", padding=True).to(model.device)
# with torch.no_grad():
#     generated_ids = model.generate(**inputs, max_new_tokens=1)
#     generated_texts = processor.batch_decode(generated_ids, skip_special_tokens=True)

# 6. 只要有一張是 1 就輸出 1
def extract_label(text):
    text = text.strip()
    if text == '1' or text.lower().startswith('1') or 'risky' in text.lower():
        return 1
    return 0

for main_dir in main_dirs:
    for folder in sorted(os.listdir(main_dir)):
        folder_path = os.path.join(main_dir, folder)
        if not os.path.isdir(folder_path):
            continue
        img_files = [f for f in os.listdir(folder_path) if f.endswith('.jpg')]
        if not img_files:
            continue
        images = [Image.open(os.path.join(folder_path, f)).convert('RGB') for f in img_files]
        # 批次推論
        inputs = processor(images=images, text=[question]*len(images), return_tensors="pt", padding=True).to(model.device)
        with torch.no_grad():
            generated_ids = model.generate(**inputs, max_new_tokens=1)
            generated_texts = processor.batch_decode(generated_ids, skip_special_tokens=True)
        labels = [extract_label(t) for t in generated_texts]
        video_label = 1 if all(labels) else 0
        # video_label = 1 if any(labels) else 0
        # video_label = 1 if sum(labels) / len(labels) >= 0.5 else 0
        print(f"Folder {folder} prediction(1:risk, 0:safe): {video_label}")