In [1]:
import os
import sys
import json
import subprocess
from google.colab import drive
from google.colab import userdata

# ==========================================
# 1. MOUNT DRIVE & SETUP PATHS
# ==========================================
drive.mount('/content/drive')

# Define your project constants
REPO_NAME = "Image-Editing-by-Natural-Language-Constraints"
PROJECTS_FOLDER = '/content/drive/MyDrive/Projects/'
PROJECT_PATH = os.path.join(PROJECTS_FOLDER, REPO_NAME)

# Create Projects folder if missing
if not os.path.exists(PROJECTS_FOLDER):
    os.makedirs(PROJECTS_FOLDER)

# ==========================================
# 2. SECURE GIT CONFIGURATION
# ==========================================
try:
    # Load secrets
    config_str = userdata.get('GIT_CONFIG')
    git_config = json.loads(config_str)

    USER_EMAIL = git_config['email']
    USER_NAME = git_config['name']
    GITHUB_USERNAME = git_config['username']
    GITHUB_TOKEN = userdata.get('GITHUB_TOKEN')

    # Configure Git
    !git config --global user.email "$USER_EMAIL"
    !git config --global user.name "$USER_NAME"

    print(f"✅ Identity Loaded: {USER_NAME}")

except Exception as e:
    print(f"⚠️ Git Config Warning: {e}")
    print("   (You can still run the code, but you won't be able to commit/push.)")

# ==========================================
# 3. CLONE OR PULL REPO
# ==========================================
%cd {PROJECTS_FOLDER}

if not os.path.exists(REPO_NAME):
    print(f"📂 Cloning {REPO_NAME}...")
    !git clone https://{GITHUB_USERNAME}:{GITHUB_TOKEN}@github.com/{GITHUB_USERNAME}/{REPO_NAME}.git
else:
    print(f"🔄 Repository exists. Pulling updates...")
    %cd {REPO_NAME}
    !git pull

# ==========================================
# 4. INSTALL DEPENDENCIES (If Missing)
# ==========================================
try:
    import segment_anything
    import torchmetrics
    print("✅ Libraries already installed.")
except ImportError:
    print("⏳ Installing libraries (SAM, Diffusers, TorchMetrics, etc.)...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
                           "git+https://github.com/facebookresearch/segment-anything.git",
                           "transformers", "diffusers", "accelerate", "scipy", "safetensors", "torchmetrics"])
    print("✅ Installation complete.")

# ==========================================
# 5. FINAL PATH CONFIG & VERIFICATION
# ==========================================
# Force working directory to the repo root
if os.path.exists(PROJECT_PATH):
    os.chdir(PROJECT_PATH)

    # Add repo to Python path so 'import src.parser' works
    if PROJECT_PATH not in sys.path:
        sys.path.append(PROJECT_PATH)

    print(f"\n✅ Ready! Working Directory: {os.getcwd()}")

    # Optional: Show structure to confirm
    print("\n📂 Project Files:")
    for root, dirs, files in os.walk(".", topdown=True):
        if ".git" in root or "__pycache__" in root: continue
        level = root.count(os.sep)
        indent = ' ' * 4 * level
        print(f"{indent}{os.path.basename(root)}/")
        for f in files[:5]: # Limit to 5 files per folder to keep it clean
            print(f"{indent}    {f}")
else:
    print(f"❌ Critical Error: Repo folder not found at {PROJECT_PATH}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Identity Loaded: Adar Shapira
/content/drive/MyDrive/Projects
🔄 Repository exists. Pulling updates...
/content/drive/MyDrive/Projects/Image-Editing-by-Natural-Language-Constraints
Already up to date.
⏳ Installing libraries (SAM, Diffusers, TorchMetrics, etc.)...
✅ Installation complete.

✅ Ready! Working Directory: /content/drive/MyDrive/Projects/Image-Editing-by-Natural-Language-Constraints

📂 Project Files:
./
    groundingdino_swint_ogc.pth
    groundingdino_swint_ogc.pth.2
    sam_vit_b_01ec64.pth
    sam_vit_h_4b8939.pth.2
    sam_vit_h_4b8939.pth
    data/
    outputs/
    notebooks/
        train_brain.ipynb
        main_pipeline.ipynb
        results_and_comparisons.ipynb
        .ipynb_checkpoints/
    src/
        __init__.py
        config.py
        instruction.py
        parser.py
        attention.py
        .ipynb_checkpoints/
    evaluation/

In [2]:
import gc
import torch

def hard_flush():
    print("🧹 Flushing GPU Memory...")
    # Clear Python variables if they exist
    for var in ['pipeline', 'res', 'mask', 'cnet_img', 'orig']:
        if var in globals():
            del globals()[var]

    gc.collect()
    torch.cuda.empty_cache()
    if torch.cuda.is_available():
        torch.cuda.ipc_collect()
    print("✅ GPU Memory Cleared.")

hard_flush()

🧹 Flushing GPU Memory...
✅ GPU Memory Cleared.


In [3]:
# ==============================================================================
# 🚀 CELL 3: INITIALIZE PIPELINE (UPDATED FOR COMPLEX PROMPTS)
# ==============================================================================
import os
import sys
import pickle
import re
import numpy as np
import scipy.ndimage
from PIL import Image, ImageFilter
from google.colab import drive
from sentence_transformers import SentenceTransformer
from diffusers import StableDiffusionXLControlNetInpaintPipeline, AutoencoderKL, ControlNetModel
from transformers import DPTImageProcessor, DPTForDepthEstimation
import gc
import torch

# --- NEW: NLP LIBRARIES ---
try:
    import spacy
except ImportError:
    os.system("pip install -q spacy")
    os.system("python -m spacy download en_core_web_sm")
    import spacy

# Load NLP Model
if 'nlp' not in globals():
    try:
        nlp = spacy.load("en_core_web_sm")
    except:
        os.system("python -m spacy download en_core_web_sm")
        nlp = spacy.load("en_core_web_sm")

def extract_target_noun(prompt):
    """Extracts the physical object target, including spatial adjectives."""
    doc = nlp(prompt)
    target_tokens = []

    # Find the main noun
    root_token = None
    for token in doc:
        if token.dep_ in ["dobj", "pobj", "nsubjpass"] or (token.dep_ == "ROOT" and token.pos_ == "NOUN"):
            root_token = token
            break
    if not root_token:
        nouns = [t for t in doc if t.pos_ == "NOUN"]
        root_token = nouns[-1] if nouns else None

    if root_token:
        # Include crucial adjectives like 'left', 'right', 'top', 'bottom'
        for child in root_token.children:
            if child.dep_ == "amod" and child.text.lower() in ["left", "right", "top", "bottom", "man's", "woman's"]:
                target_tokens.append(child.text)
        target_tokens.append(root_token.text)
        return " ".join(target_tokens).replace("'s", "") # e.g., "left cat", "man hat"

    return "object"

def parse_prompt_logic(text):
    """Handles 'Change X's Y to Z' and 'Swap X with Y' structures."""
    text = text.lower().strip()

    # PATTERN 1: "Change [object]'s [property] to [value]"
    # e.g., "Change the blanket's color to blue" -> Target: "blanket", Gen: "blue blanket"
    color_match = re.search(r"change\s+(.*?)(?:'s)?\s+color\s+to\s+(.*)", text)
    if color_match:
        obj, new_color = color_match.groups()
        obj = re.sub(r"\b(the|a|an)\b", "", obj).strip()
        return obj, f"{new_color} {obj}"

    # PATTERN 2: "Replace/Swap [target] with [description]"
    patterns = [
        r"replace\s+(.*?)\s+with\s+(.*)",
        r"swap\s+(.*?)\s+(?:with|for)\s+(.*)",
        r"change\s+(.*?)\s+to\s+(.*)",
        r"turn\s+(.*?)\s+into\s+(.*)"
    ]
    for p in patterns:
        match = re.search(p, text)
        if match:
            target, new_desc = match.groups()
            target = re.sub(r"\b(the|a|an)\b", "", target).strip()
            return target, new_desc

    # Fallback to standard extraction
    return extract_target_noun(text), text

# Project Configuration
PROJECT_PATH = "/content/drive/MyDrive/Projects/Image-Editing-by-Natural-Language-Constraints"

# 1. THE AI BRAIN LOADER
class DynamicConfig:
    _embedder = None
    _regressor = None
    _classifier = None

    @classmethod
    def load_models(cls):
        if cls._embedder is None:
            if not os.path.exists('/content/drive'): drive.mount('/content/drive')
            print("🧠 Loading AI Brain from Drive...")
            cls._embedder = SentenceTransformer('all-MiniLM-L6-v2')
            with open(f"{PROJECT_PATH}/brain_regressor_hybrid.pkl", "rb") as f: cls._regressor = pickle.load(f)
            with open(f"{PROJECT_PATH}/brain_classifier_hybrid.pkl", "rb") as f: cls._classifier = pickle.load(f)

    @staticmethod
    def infer(prompt):
        DynamicConfig.load_models()

        detected_target, generation_prompt = parse_prompt_logic(prompt)
        print(f"🧠 Logic Split -> Target: '{detected_target}' | Gen Prompt: '{generation_prompt}'")

        # Use the full prompt for brain context, but the specific target for segmentation
        vec = DynamicConfig._embedder.encode([f"{prompt} | target: {detected_target}"])
        cont = DynamicConfig._regressor.predict(vec)[0]
        disc = DynamicConfig._classifier.predict(vec)[0]
        mask_map = {0: "standard", 1: "box", 2: "inverse"}

        config = {
            "strength": float(cont[0]),
            "guidance_scale": float(cont[1]),
            "controlnet_scale": float(cont[2]),
            "dilate_pixels": int(cont[3]),
            "blur_radius": int(cont[4]),
            "use_controlnet": bool(disc[0]),
            "mask_strategy": mask_map[int(disc[1])],
            "detected_target": detected_target,
            "generation_prompt": generation_prompt
        }

        if "background" in detected_target.lower():
            config["mask_strategy"] = "inverse"
            config["detected_target"] = "person" if any(x in prompt for x in ["man", "woman"]) else "subject"

        print(f"   ⚙️ Config: {config['mask_strategy'].upper()} | Str: {config['strength']:.2f} | CNet: {config['controlnet_scale']:.2f}")
        return config

# 2. THE MAIN PIPELINE
class ControllableEditPipeline:
    def __init__(self, device="cuda"):
        self.device = device
        print("🚀 Initializing SDXL Components...")
        self.depth_estimator = DPTForDepthEstimation.from_pretrained("Intel/dpt-hybrid-midas").to(device)
        self.feature_extractor = DPTImageProcessor.from_pretrained("Intel/dpt-hybrid-midas")
        self.controlnet = ControlNetModel.from_pretrained("diffusers/controlnet-depth-sdxl-1.0", torch_dtype=torch.float16)
        vae = AutoencoderKL.from_pretrained("madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16)

        self.pipe = StableDiffusionXLControlNetInpaintPipeline.from_pretrained(
            "diffusers/stable-diffusion-xl-1.0-inpainting-0.1",
            controlnet=self.controlnet, vae=vae, torch_dtype=torch.float16
        ).to(device)
        self.pipe.enable_model_cpu_offload()

        try:
            from src.segmentation import Segmenter
            self.segmenter = Segmenter(device=device)
            print("✅ Segmenter Integrated.")
        except Exception as e:
            print(f"⚠️ Segmenter Not Found: {e}")
            self.segmenter = None

    def get_depth_map(self, image, mask):
        inputs = self.feature_extractor(images=image, return_tensors="pt").to(self.device)
        with torch.no_grad():
            outputs = self.depth_estimator(**inputs)
            pred = torch.nn.functional.interpolate(outputs.predicted_depth.unsqueeze(1), size=image.size[::-1], mode="bicubic", align_corners=False)
        pred = (pred - pred.min()) / (pred.max() - pred.min())
        depth_np = np.squeeze((pred * 255.0).cpu().numpy().astype(np.uint8))
        depth_img = Image.fromarray(depth_np)
        if mask:
            depth_img = Image.composite(depth_img, depth_img.filter(ImageFilter.GaussianBlur(20)), mask.convert("L").resize(image.size))
        return depth_img

    def edit(self, image, prompt):
        gc.collect(); torch.cuda.empty_cache()

        # 1. AI Inference
        cfg = DynamicConfig.infer(prompt)
        detect_target = cfg["detected_target"]
        final_prompt = cfg["generation_prompt"]

        # 2. Prep Image
        proc_img = image.resize((1024, 1024), Image.LANCZOS)

        # 3. Masking
        mask = None
        if self.segmenter and detect_target:
            print(f"🔎 Segmenting: '{detect_target}'")
            try:
                if hasattr(self.segmenter, 'detect_and_segment'):
                    m = self.segmenter.detect_and_segment(proc_img, detect_target)
                else:
                    m = self.segmenter.predict(proc_img, [detect_target])

                if isinstance(m, tuple): m = m[0]
                m = np.squeeze(m.cpu().numpy() if hasattr(m, 'cpu') else m)
                if m.ndim > 2: m = np.max(m, axis=0)

                if cfg["mask_strategy"] == "box":
                    rows, cols = np.any(m, axis=1), np.any(m, axis=0)
                    if np.any(rows):
                        y1, y2 = np.where(rows)[0][[0, -1]]
                        x1, x2 = np.where(cols)[0][[0, -1]]
                        m[:] = 0; m[max(0,y1-40):min(1024,y2+40), max(0,x1-40):min(1024,x2+40)] = 1.0
                elif cfg["mask_strategy"] == "inverse":
                    print("   🔄 Inverting Mask (Background Mode)...")
                    m = 1.0 - m

                if cfg["dilate_pixels"] > 0:
                    m = scipy.ndimage.binary_dilation(m, iterations=int(cfg["dilate_pixels"]*2))

                mask = Image.fromarray((m * 255).astype(np.uint8))
            except Exception as e:
                print(f"⚠️ Mask Error: {e}")

        if not mask: return image, None, cfg

        # 4. Generation
        cnet_img = self.get_depth_map(proc_img, mask)
        scale = cfg["controlnet_scale"] if cfg["use_controlnet"] else 0.0

        out = self.pipe(
            prompt=final_prompt, image=proc_img, mask_image=mask, control_image=cnet_img,
            controlnet_conditioning_scale=scale, strength=cfg["strength"],
            guidance_scale=cfg["guidance_scale"], num_inference_steps=30
        ).images[0]

        return out.resize(image.size, Image.LANCZOS), mask.resize(image.size), cfg

# Re-Initialize
pipeline = ControllableEditPipeline()
print("✅ Pipeline Ready.")

Flax classes are deprecated and will be removed in Diffusers v1.0.0. We recommend migrating to PyTorch classes or pinning your version of Diffusers.
Flax classes are deprecated and will be removed in Diffusers v1.0.0. We recommend migrating to PyTorch classes or pinning your version of Diffusers.


🚀 Initializing SDXL Components...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

`torch_dtype` is deprecated! Use `dtype` instead!
The config attributes {'decay': 0.9999, 'inv_gamma': 1.0, 'min_decay': 0.0, 'optimization_step': 37000, 'power': 0.6666666666666666, 'update_after_step': 0, 'use_ema_warmup': False} were passed to UNet2DConditionModel, but are not expected and will be ignored. Please verify your config.json configuration file.


Loading Grounded-SAM (Lite Version)...
✅ Grounded-SAM (Lite) loaded.
✅ Segmenter Integrated.
✅ Pipeline Ready.


In [4]:
print("🧪 Running Component Verification...")

# 1. Test Brain (Pass prompt ONLY)
test_cfg = DynamicConfig.infer("remove the cat")
assert test_cfg['dilate_pixels'] > 10, "Brain Error: Logic check failed."
print("  - Brain: OK")

# 2. Test Segmenter
test_img = Image.new('RGB', (1024, 1024), color='red')
try:
    _ = pipeline.segmenter.detect_and_segment(test_img, "object")
    print("  - Segmenter: OK")
except Exception as e:
    print(f"  - Segmenter: FAILED ({e})")

# 3. Test Depth
try:
    _ = pipeline.get_depth_map(test_img, None)
    print("  - Depth Estimator: OK")
except:
    print("  - Depth Estimator: FAILED")

print("✨ All components verified.")

🧪 Running Component Verification...
🧠 Loading AI Brain from Drive...
🧠 Logic Split -> Target: 'cat' | Gen Prompt: 'remove the cat'
   ⚙️ Config: STANDARD | Str: 1.00 | CNet: 0.00
  - Brain: OK
   -> Searching for: 'object.'...
  - Segmenter: OK
  - Depth Estimator: OK
✨ All components verified.


In [5]:
# ==============================================================================
# 🚀 FINAL BATCH TEST CELL (15 TASKS - FULL LOGGING)
# ==============================================================================
import os
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
import gc

# 1. SETUP METRICS
try:
    import lpips
    from skimage.metrics import structural_similarity as ssim_func
except ImportError:
    os.system("pip install -q lpips scikit-image")
    import lpips
    from skimage.metrics import structural_similarity as ssim_func

from transformers import CLIPProcessor, CLIPModel

class MetricEvaluator:
    def __init__(self, device="cuda"):
        print("📊 Loading Metrics (CLIP, LPIPS)...")
        self.device = device
        self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
        self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
        self.lpips_loss = lpips.LPIPS(net='alex').to(device)

    def get_clip_score(self, image, prompt):
        inputs = self.clip_processor(text=[prompt], images=image, return_tensors="pt", padding=True).to(self.device)
        with torch.no_grad():
            outputs = self.clip_model(**inputs)
        return outputs.logits_per_image.item() / 100.0

    def get_lpips_score(self, img1, img2):
        t1 = lpips.im2tensor(np.array(img1)).to(self.device)
        t2 = lpips.im2tensor(np.array(img2)).to(self.device)
        with torch.no_grad(): return self.lpips_loss(t1, t2).item()

    def get_ssim_score(self, img1, img2, mask=None):
        i1 = np.array(img1.convert("L"))
        i2 = np.array(img2.convert("L"))
        score, diff_map = ssim_func(i1, i2, full=True, data_range=255)
        if mask:
            m = np.array(mask.convert("L"))
            bg_mask = (m == 0).astype(float)
            if np.sum(bg_mask) > 0: score = np.sum(diff_map * bg_mask) / np.sum(bg_mask)
        return score

evaluator = MetricEvaluator()

# 2. DEFINE THE 15 TASKS
TASKS = [
    # --- IMAGE 1: CAT ---
    {"file": "test_cat.jpg", "prompt": "Swap the left cat with a newborn baby"},
    {"file": "test_cat.jpg", "prompt": "Remove the right cat"},
    {"file": "test_cat.jpg", "prompt": "Change the blanket's color to blue"},
    {"file": "test_cat.jpg", "prompt": "Change the left cat's color to black"},

    # --- IMAGE 2: APPLE ---
    {"file": "test_green_apple.jpg", "prompt": "Replace the apple with a fresh orange"},
    {"file": "test_green_apple.jpg", "prompt": "Change the apple's color to red"},
    {"file": "test_green_apple.jpg", "prompt": "Change the background to a wooden table in a library"},
    {"file": "test_green_apple.jpg", "prompt": "Add the apple a bite mark"},

    # --- IMAGE 3: MAN ---
    {"file": "test_vacation_man.jpg", "prompt": "Change the background to snowy mountains"},
    {"file": "test_vacation_man.jpg", "prompt": "Swap the man with a woman"},
    {"file": "test_vacation_man.jpg", "prompt": "Change the man's shirt color to red"},
    {"file": "test_vacation_man.jpg", "prompt": "Replace the man's hat with a gold crown"},
]

# 3. RUN BATCH
results = []
print(f"\n🚀 Starting 15-Task Batch Run...")

for task in TASKS:
    if not os.path.exists(task["file"]):
        print(f"⚠️ Skipping {task['file']} (Not found)")
        continue

    print(f"\n👉 Processing: '{task['prompt']}'")
    gc.collect(); torch.cuda.empty_cache()

    orig = Image.open(task["file"]).convert("RGB").resize((1024, 1024))

    try:
        # UNPACK 3 VALUES: Image, Mask, and Config
        res, mask, cfg = pipeline.edit(orig, task["prompt"])

        # Calculate Metrics
        clip = evaluator.get_clip_score(res, task["prompt"])
        lpips_val = evaluator.get_lpips_score(orig, res)
        ssim_val = evaluator.get_ssim_score(orig, res, mask)

        # LOG EVERYTHING
        results.append({
            "Original": task["file"],
            "Prompt": task["prompt"],
            "Target": cfg["detected_target"],
            # METRICS
            "CLIP": round(clip, 3),
            "SSIM": round(ssim_val, 3),
            "LPIPS": round(lpips_val, 3),
            # BRAIN DECISIONS
            "Mask": cfg["mask_strategy"],
            "Str": f"{cfg['strength']:.2f}",
            "Guidance": f"{cfg['guidance_scale']:.1f}",
            "CNet_Scale": f"{cfg['controlnet_scale']:.2f}",
            "Use_CNet": cfg["use_controlnet"],
            "Dilate": cfg["dilate_pixels"],
            "Blur": cfg["blur_radius"]
        })

        # Visualize
        fig, ax = plt.subplots(1, 3, figsize=(12, 4))
        ax[0].imshow(orig); ax[0].set_title("Original")
        ax[1].imshow(mask, cmap="gray"); ax[1].set_title(f"Target: {cfg['detected_target']}")
        ax[2].imshow(res); ax[2].set_title(f"Result (CLIP: {clip:.2f})")
        for a in ax: a.axis("off")
        plt.show()

    except Exception as e:
        print(f"❌ Error: {e}")

# 4. SAVE RESULTS
df = pd.DataFrame(results)
display(df)
df.to_csv("/content/drive/MyDrive/Projects/Image-Editing-by-Natural-Language-Constraints/final_batch_results.csv", index=False)
print("💾 Results saved to Drive.")

Output hidden; open in https://colab.research.google.com to view.

In [None]:
# ==========================================
# 8. SAVE CHANGES TO GITHUB
# ==========================================
import os

# 1. Custom Commit Message (Edit this in the form on the right ->)
COMMIT_MESSAGE = "Fixed pipeline: Switched to DiffusionPipeline and Active Inpainting" #@param {type:"string"}

print(f"🚀 Saving changes to GitHub...")

# 2. Ensure Identity is Set
# (Uses variables from Cell 1. If they are lost, it tries to reload them)
try:
    if 'USER_EMAIL' not in globals():
        from google.colab import userdata
        import json
        git_config = json.loads(userdata.get('GIT_CONFIG'))
        USER_EMAIL = git_config['email']
        USER_NAME = git_config['name']

    !git config --global user.email "$USER_EMAIL"
    !git config --global user.name "$USER_NAME"
except Exception as e:
    print(f"⚠️ Warning: Could not verify Git identity ({e}). Proceeding...")

# 3. Add & Commit
print("📦 Staging all files...")
!git add .

print(f"📝 Committing: '{COMMIT_MESSAGE}'")
!git commit -m "$COMMIT_MESSAGE"

# 4. Push
print("⬆️ Pushing to origin...")
try:
    !git push
    print("\n✅ Successfully pushed to GitHub!")
except Exception as e:
    print(f"\n❌ Push failed.")
    print("   Tip: If you see a '403' error, verify your GITHUB_TOKEN in Colab Secrets has 'Repo' permissions.")

🚀 Saving changes to GitHub...
