In [None]:
!pip install -q git+https://github.com/MostafaShams5/NeuroMend.git
!pip install -q --upgrade torch torchvision torchaudio
!pip install -q diffusers transformers==4.41.2 accelerate bitsandbytes
!pip install -q umap-learn hdbscan matplotlib seaborn opencv-python
!pip install -q ultralytics qwen-vl-utils kagglehub

import shutil
import os
for path in ["/kaggle/working/neuromend_output", "/kaggle/working/experiment_data", "/kaggle/working/raw_data"]:
    if os.path.exists(path): shutil.rmtree(path)

In [None]:
import torch
import cv2
import numpy as np
from PIL import Image
from tqdm import tqdm
import os
from transformers import AutoProcessor, AutoModelForCausalLM

class NeuroMendEngine:
    def __init__(self, device="cuda"):
        self.device = device
        self.model = AutoModelForCausalLM.from_pretrained(
            "microsoft/Florence-2-large", 
            trust_remote_code=True,
            attn_implementation="eager", 
            torch_dtype=torch.float16
        ).to(self.device).eval()
        self.processor = AutoProcessor.from_pretrained("microsoft/Florence-2-large", trust_remote_code=True)

    def label_dataset(self, image_paths, target_class, output_dir):
        os.makedirs(output_dir, exist_ok=True)
        task_prompt = "<CAPTION_TO_PHRASE_GROUNDING>"
        text_input = f"{task_prompt} {target_class}"
        
        for img_path in tqdm(image_paths):
            try:
                self._process_single_label(img_path, text_input, task_prompt, output_dir)
            except: pass

    def _process_single_label(self, img_path, text_input, task_prompt, output_dir):
        image = Image.open(img_path).convert("RGB")
        inputs = self.processor(text=text_input, images=image, return_tensors="pt").to(self.device, torch.float16)
        with torch.no_grad():
            generated_ids = self.model.generate(
                input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"],
                max_new_tokens=1024, num_beams=1, do_sample=False, use_cache=False
            )
        generated_text = self.processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
        prediction = self.processor.post_process_generation(generated_text, task=task_prompt, image_size=(image.width, image.height))
        
        bboxes = prediction[task_prompt]['bboxes']
        if not bboxes: return

        yolo_lines = []
        for box in bboxes:
            x1, y1, x2, y2 = box
            w, h = x2 - x1, y2 - y1
            xc, yc = x1 + w/2, y1 + h/2
            yolo_lines.append(f"0 {xc/image.width:.6f} {yc/image.height:.6f} {w/image.width:.6f} {h/image.height:.6f}")
        
        with open(os.path.join(output_dir, os.path.basename(img_path).replace(".jpg", ".txt")), "w") as f:
            f.write("\n".join(yolo_lines))

    def diagnose(self, image_path):
        task_prompt = "<DETAILED_CAPTION>"
        image = Image.open(image_path).convert("RGB")
        inputs = self.processor(text=task_prompt, images=image, return_tensors="pt").to(self.device, torch.float16)
        
        with torch.no_grad():
            generated_ids = self.model.generate(
                input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"],
                max_new_tokens=100, num_beams=1, do_sample=False, use_cache=False
            )
        caption = self.processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
        
        diagnosis = []
        caption_lower = caption.lower()
        if "dark" in caption_lower or "night" in caption_lower or "shadow" in caption_lower: diagnosis.append("low light")
        if "blur" in caption_lower or "grainy" in caption_lower: diagnosis.append("motion blur")
        if "rain" in caption_lower or "wet" in caption_lower: diagnosis.append("rainy")
        
        return ", ".join(diagnosis) if diagnosis else "low visibility"

def flush():
    import gc
    gc.collect()
    torch.cuda.empty_cache()

In [None]:
import kagglehub
import cv2
import numpy as np
from glob import glob
from sklearn.model_selection import train_test_split
import shutil
import os

RAW_DIR = "/kaggle/working/raw_data"
EXP_DIR = "/kaggle/working/experiment_data"
if os.path.exists(RAW_DIR): shutil.rmtree(RAW_DIR)
if os.path.exists(EXP_DIR): shutil.rmtree(EXP_DIR)
os.makedirs(RAW_DIR, exist_ok=True)

ds_path = kagglehub.dataset_download("sachinpatel21/pothole-image-dataset")
src_images = [y for x in os.walk(ds_path) for y in glob(os.path.join(x[0], '*.jpg'))][:300]

all_images = []
for i, src in enumerate(src_images):
    try:
        img = cv2.imread(src)
        if img is not None:
            dst = f"{RAW_DIR}/pothole_{i}.jpg"
            cv2.imwrite(dst, img)
            all_images.append(dst)
    except: pass

train_imgs, test_imgs = train_test_split(all_images, test_size=0.3, random_state=42)

train_dir = f"{EXP_DIR}/train_clean"
os.makedirs(train_dir, exist_ok=True)
for p in train_imgs: shutil.copy(p, f"{train_dir}/{os.path.basename(p)}")

test_dir = f"{EXP_DIR}/test_hard"
os.makedirs(test_dir, exist_ok=True)

for p in test_imgs:
    img = cv2.imread(p)
    
    size = 20 
    kernel = np.zeros((size, size))
    kernel[int((size-1)/2), :] = np.ones(size) / size
    blur = cv2.filter2D(img, -1, kernel)
    
    noise = np.random.normal(0, 25, blur.shape).astype(np.uint8)
    noisy = cv2.add(blur, noise)
    
    final = (noisy * 0.6).astype(np.uint8)
    
    cv2.imwrite(f"{test_dir}/{os.path.basename(p)}", final)

In [None]:
labeler = NeuroMendEngine()
labeler.label_dataset([f"{train_dir}/{x}" for x in os.listdir(train_dir)], "pothole", train_dir)
labeler.label_dataset([f"{test_dir}/{x}" for x in os.listdir(test_dir)], "pothole", test_dir)
del labeler
flush()

In [None]:
from ultralytics import YOLO

CFG_DIR = "/kaggle/working/yolo_cfg"
for s in ['train', 'val']:
    os.makedirs(f"{CFG_DIR}/{s}/images", exist_ok=True)
    os.makedirs(f"{CFG_DIR}/{s}/labels", exist_ok=True)

def copy_data(src, split):
    for f in os.listdir(src):
        if f.endswith(".jpg"): shutil.copy(f"{src}/{f}", f"{CFG_DIR}/{split}/images/{f}")
        if f.endswith(".txt"): shutil.copy(f"{src}/{f}", f"{CFG_DIR}/{split}/labels/{f}")

copy_data(train_dir, "train")
copy_data(test_dir, "val")

with open(f"{CFG_DIR}/data.yaml", "w") as f:
    f.write(f"path: {CFG_DIR}\ntrain: train/images\nval: val/images\nnc: 1\nnames: ['pothole']")

model_a = YOLO("yolov8n.pt") 
res_a = model_a.train(data=f"{CFG_DIR}/data.yaml", epochs=8, imgsz=512, verbose=False, project="/kaggle/working/runs", name="base")
map_a = res_a.box.map50

In [None]:
from neuro_mend import Generator

engine = NeuroMendEngine()

sample_fail = glob(f"{test_dir}/*.jpg")[0]
diagnosis = engine.diagnose(sample_fail)

generator = Generator()
synth_out = "/kaggle/working/neuromend_fix"
prompt = f"pothole on road, {diagnosis}, motion blur, night, low quality, dashboard camera"

clean_sources = glob(f"{train_dir}/*.jpg")
generator.synthesize(clean_sources, prompt, synth_out, count=60)
del generator
flush()

engine.label_dataset(glob(f"{synth_out}/*.jpg"), "pothole", synth_out)
del engine
flush()

In [None]:
for f in os.listdir(synth_out):
    if f.endswith(".jpg"): shutil.copy(f"{synth_out}/{f}", f"{CFG_DIR}/train/images/syn_{f}")
    if f.endswith(".txt"): shutil.copy(f"{synth_out}/{f}", f"{CFG_DIR}/train/labels/syn_{f}")

model_b = YOLO("yolov8n.pt")
res_b = model_b.train(data=f"{CFG_DIR}/data.yaml", epochs=8, imgsz=512, verbose=False, project="/kaggle/working/runs", name="patch")
map_b = res_b.box.map50

In [None]:
import matplotlib.pyplot as plt

imp = ((map_b - map_a) / map_a * 100) if map_a > 0.01 else 0

plt.figure(figsize=(6,5))
plt.bar(["Baseline", "Neuro-Mend"], [map_a, map_b], color=['#e74c3c', '#2ecc71'])
plt.title(f"Performance on Hard Edge Cases (Blur/Rain)\nImprovement: +{imp:.0f}%")
plt.ylabel("mAP@50 Accuracy")
plt.ylim(0, 1.0)
plt.grid(axis='y', alpha=0.3)
plt.savefig("final_result.png")
plt.show()