In [None]:
%%capture
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

!pip install pip3-autoremove
!pip install torch torchvision torchaudio xformers --index-url https://download.pytorch.org/whl/cu124
!pip install unsloth
!pip install --upgrade transformers==4.52.4
!pip install kagglehub --upgrade
!pip install keybert

In [None]:
from unsloth import FastVisionModel # FastLanguageModel for LLMs
from datasets import load_dataset
from keybert import KeyBERT
from PIL import Image

import torch
import json
import os

In [None]:
model, tokenizer = FastVisionModel.from_pretrained(
    "unsloth/Llama-3.2-11B-Vision-Instruct",
    load_in_4bit = True, # Use 4bit to reduce memory use. False for 16bit LoRA.
    use_gradient_checkpointing = "unsloth", # True or "unsloth" for long context
)

***Check tokenizer***

In [None]:
print("Tokenizer type:", type(tokenizer))

In [None]:
print(tokenizer.__class__)

***Dataset***

In [None]:
dataset = load_dataset("unsloth/Radiology_mini", split="train")

In [None]:
dataset

In [None]:
dataset[0]["image"]

In [None]:
dataset[0]["image_id"]

In [None]:
dataset[0]["cui"]

In [None]:
dataset[0]["caption"]

```python
{ "role": "user",
          "content" : [
            {"type" : "text",  "text"  : instruction}]
        },
        { "role" : "assistant",
          "content" : [
            {"type" : "text",  "text"  : sample["caption"]},
            { "type": "text", "text": "{{keywords}}" },
            {"type": "image", "image": sample["image"]}
          ]
        },
    ]
}
```

In [None]:
# Tạo thư mục lưu kết quả
output_jsonl_path = "/kaggle/working/radiology_conversations.jsonl"
image_dir = "/kaggle/working/images"
os.makedirs(image_dir, exist_ok=True)

In [None]:
instruction = (
    "Hãy đưa ra câu trả lời chính xác và chi tiết nhất cho truy vấn của người dùng. "
    "Trả lời dựa trên thông tin hình ảnh phù hợp đã tìm thấy, "
    "nhưng đừng hiển thị các từ khóa đã được phân tích."
)


def convert_to_conversation_for_training(image_path, caption, instruction):
    """Format cho việc training/lưu trữ data"""
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": instruction}
            ]
        },
        {
            "role": "assistant", 
            "content": [
                {"type": "text", "text": caption},
                {"type": "image", "image": image_path}
            ]
        },
    ]
    return {"messages": conversation}

def convert_to_conversation_for_inference(instruction, image_path=None):
    """Format cho việc inference với model"""
    conversation = [
        {
            "role": "user",
            "content": instruction
        }
    ]
    return {"messages": conversation}

# Ghi lần lượt vào file
with open(output_jsonl_path, "w", encoding="utf-8") as f_out:
    for i, sample in enumerate(dataset):
        image_id = sample["image_id"]
        caption = sample["caption"]
        image = sample["image"]

        # Lưu ảnh
        image_path = os.path.join(image_dir, f"{image_id}.png")
        image.save(image_path)

        # OPTIONAL: Trích xuất từ khóa
        keywords = kw_model.extract_keywords(
            caption,
            keyphrase_ngram_range=(1, 2),
            stop_words='english',
            top_n=3
        )
        # print(f"[{i}] Caption:", caption)
        # print("→ Keywords:", [kw for kw, _ in keywords])

        # # Format sample
        # convo = convert_to_conversation_for_training(image_path, caption, instruction)
        # convo["keywords"] = [kw for kw, _ in keywords]

        keyword_list = [kw for kw, score in keywords]
        print(f"[{i}] Caption: {caption[:100]}...")
        print(f"→ Keywords: {keyword_list}")
        
        # Format sample
        convo = convert_to_conversation_for_training(image_path, caption, instruction)
        convo["keywords"] = keyword_list
        convo["image_id"] = image_id

        # Ghi ra file
        f_out.write(json.dumps(convo, ensure_ascii=False) + "\n")

print(f"\nĐã lưu {len(dataset)} conversation vào: {output_jsonl_path}")

In [None]:
# from difflib import SequenceMatcher

# # Load dữ liệu từ file jsonl
# def load_conversation_json(jsonl_path):
#     dataset = []
#     with open(jsonl_path, "r", encoding="utf-8") as f:
#         for line in f:
#             dataset.append(json.loads(line.strip()))
#     return dataset

# # Tìm kiếm theo độ tương đồng keyword
# class SimpleSearchEngine:
#     def __init__(self, dataset):
#         self.dataset = dataset

#     def search(self, query):
#         def match_score(query, keywords):
#             return SequenceMatcher(None, query, " ".join(keywords)).ratio()

#         best = max(self.dataset, key=lambda sample: match_score(query, sample["keywords"]))
#         caption = best["messages"][1]["content"][0]["text"]
#         image_path = best["messages"][1]["content"][1]["image"]
#         image = Image.open(image_path)

#         return {
#             "caption": caption,
#             "image": image
#         }

In [None]:
# Tìm kiếm theo độ tương đồng keyword
class ImprovedSearchEngine:
    def __init__(self, dataset):
        self.dataset = dataset
    
    def calculate_similarity(self, query, keywords):
        """Tính độ tương đồng giữa query và keywords"""
        query_lower = query.lower().strip()
        max_score = 0
        
        for keyword in keywords:
            keyword_lower = keyword.lower().strip()
            
            # Exact match
            if query_lower == keyword_lower:
                return 1.0
            
            # Substring match
            if query_lower in keyword_lower or keyword_lower in query_lower:
                score = 0.8
            else:
                # Sequence similarity
                score = SequenceMatcher(None, query_lower, keyword_lower).ratio()
            
            max_score = max(max_score, score)
        
        return max_score
    
    def search(self, query, top_k=1):
        """Tìm kiếm theo query"""
        scored_results = []
        
        for sample in self.dataset:
            keywords = sample.get("keywords", [])
            score = self.calculate_similarity(query, keywords)
            
            # Lấy thông tin từ messages
            assistant_content = sample["messages"][1]["content"]
            caption = assistant_content[0]["text"]
            image_path = assistant_content[1]["image"]
            
            scored_results.append({
                "score": score,
                "caption": caption,
                "image_path": image_path,
                "keywords": keywords,
                "image_id": sample.get("image_id", "unknown")
            })
        
        # Sort theo score
        scored_results.sort(key=lambda x: x["score"], reverse=True)
        
        if top_k == 1:
            return scored_results[0] if scored_results else None
        else:
            return scored_results[:top_k]

In [None]:
# Load dataset và khởi tạo search engine
print("\nLoading processed dataset...")
dataset_path = "/kaggle/working/radiology_conversations.jsonl"  # sửa lại path đúng nếu khác
dataset = load_conversation_json(dataset_path)
search_engine = SimpleSearchEngine(dataset)

In [None]:
print(type(sample["image"]))

In [None]:
# FastVisionModel.for_inference(model) # Enable for inference!

# #image = dataset[0]["image"]
# #instruction = "You are an expert radiographer. Describe accurately what you see in this image."

# query = "cardiac thrombi"
# retrieved = search_engine.search(query)

# instruction = (
#     "Dưới đây là một hình ảnh y tế. Vui lòng phân tích nội dung hình ảnh và đưa ra câu trả lời phù hợp. "
#     "Không hiển thị từ khóa đã được phân tích."
# )
# #image = Image.open(retrieved["image"]).convert("RGB")

# from PIL import Image

# # Nếu retrieved["image"] là đường dẫn → load ảnh
# if isinstance(retrieved["image"], str):
#     image = Image.open(retrieved["image"]).convert("RGB")

# # Nếu nó là ảnh kiểu PngImageFile → convert lại
# elif isinstance(retrieved["image"], Image.Image):
#     image = retrieved["image"].convert("RGB")
# else:
#     raise TypeError("Không xác định được định dạng ảnh!")

# sample = {
#     "caption": retrieved["caption"],  # từ search_engine ra
#     #"image": retrieved["image"]       # PIL.Image.Image object
#     "image": image
# }

# formatted = convert_to_conversation(
#     caption=sample["caption"],
#     image_path=sample["image"],
#     instruction=instruction
# )
# messages = formatted["messages"]

# user_prompt = [msg for msg in messages if msg["role"] == "user"]
# input_text = tokenizer.apply_chat_template(user_prompt, add_generation_prompt=True)
# #image = sample["image"]

# inputs = tokenizer(
#     image=sample["image"],         # PIL.Image.Image object
#     text=input_text,               # generated by apply_chat_template
#     add_special_tokens=False,
#     return_tensors="pt"
# ).to("cuda")

# from transformers import TextStreamer
# text_streamer = TextStreamer(tokenizer, skip_prompt = True)
# _ = model.generate(**inputs, streamer = text_streamer, max_new_tokens = 128,
#                    use_cache = True, temperature = 1.5, min_p = 0.1)

In [None]:
FastVisionModel.for_inference(model)

def analyze_medical_image(query):
    """Main function để phân tích hình ảnh y tế"""
    print(f"\n{'='*50}")
    print(f"Searching for: '{query}'")
    print(f"{'='*50}")
    
    # Tìm kiếm ảnh phù hợp
    retrieved = search_engine.search(query)
    
    if not retrieved:
        print("No matching images found!")
        return None
    
    print(f"Found match with score: {retrieved['score']:.3f}")
    print(f"Image ID: {retrieved['image_id']}")
    print(f"Keywords: {retrieved['keywords']}")
    print(f"Original caption: {retrieved['caption'][:100]}...")
    
    # Load image
    image_path = retrieved["image_path"]
    if not os.path.exists(image_path):
        print(f"Image file not found: {image_path}")
        return None
    
    try:
        image = Image.open(image_path).convert("RGB")
        print(f"Image loaded: {image.size}")
    except Exception as e:
        print(f"Error loading image: {e}")
        return None
    
    # Instruction cho analysis
    analysis_instruction = (
        "Dưới đây là một hình ảnh y tế về {}. "
        "Vui lòng phân tích chi tiết nội dung hình ảnh và đưa ra nhận xét chuyên môn. "
        "Mô tả những gì bạn quan sát được một cách chính xác và khoa học."
    ).format(query)
    
    # Chuẩn bị input cho model
    formatted = convert_to_conversation_for_inference(analysis_instruction)
    messages = formatted["messages"]
    
    # Apply chat template
    input_text = tokenizer.apply_chat_template(
        messages, 
        add_generation_prompt=True
    )
    
    # Tokenize
    inputs = tokenizer(
        image=image,
        text=input_text,
        add_special_tokens=False,
        return_tensors="pt"
    ).to("cuda")
    
    # Generate với streaming
    print(f"\n{'='*50}")
    print("MEDICAL IMAGE ANALYSIS:")
    print(f"{'='*50}")
    
    text_streamer = TextStreamer(tokenizer, skip_prompt=True)
    
    try:
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                streamer=text_streamer,
                max_new_tokens=256,
                use_cache=True,
                temperature=0.7,  # Lower temperature for medical analysis
                min_p=0.1,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id
            )
        
        print(f"\n{'='*50}")
        
        return {
            "query": query,
            "retrieved": retrieved,
            "image": image,
            "outputs": outputs
        }
        
    except Exception as e:
        print(f"Error during generation: {e}")
        return None

In [None]:
test_queries = [
        "cardiac thrombi",
        "lung infection", 
        "brain tumor",
        "chest x-ray"
    ]
    
for query in test_queries:
    result = analyze_medical_image(query)
    if result:
        print(f"\nSuccessfully analyzed: {query}")
    else:
        print(f"\nFailed to analyze: {query}")
    print("\n" + "="*80 + "\n")

***Load Model LoRA***

In [None]:
import kagglehub

# Đăng nhập (chỉ cần làm 1 lần)
kagglehub.login()

#Load model về notebook
model_path = kagglehub.model_download(
    handle="giahuytranviet/llama3-lora-vision/pyTorch/default"
)

print("Model directory:", model_path)

In [None]:
from unsloth import FastVisionModel
from transformers import TextStreamer

# Load LoRA đã fine-tune
model, tokenizer = FastVisionModel.from_pretrained(
    model_name = model_path,  # thư mục bạn đã lưu
    load_in_4bit = True,         # hoặc False nếu bạn không dùng quantization
    device_map="auto",
)
FastVisionModel.for_inference(model)  # Bắt buộc để chuyển về chế độ dự đoán

In [None]:
from datasets import load_dataset

dataset = load_dataset("unsloth/Radiology_mini", split="train")

In [None]:
dataset[0]["caption"]

In [None]:
dataset[1]["image"]

In [None]:
image = dataset[1]["image"]
instruction = "Bạn là một chuyên gia X-quang. Hãy mô tả những gì bạn thấy trong ảnh này."

messages = [
    {"role": "user", "content": [
        {"type": "image"},
        {"type": "text", "text": instruction}
    ]}
]
input_text = tokenizer.apply_chat_template(messages, add_generation_prompt = True)
inputs = tokenizer(
    image,
    input_text,
    add_special_tokens = False,
    return_tensors = "pt",
).to("cuda")

from transformers import TextStreamer
text_streamer = TextStreamer(tokenizer, skip_prompt = True)
_ = model.generate(**inputs, streamer = text_streamer, max_new_tokens = 1280,
                   use_cache = True, temperature = 1.5, min_p = 0.1)