# HouseBrain Data Factory 2.0: The Architect's Assembly Line (Parallel Mode)

This notebook is the control center for generating a high-quality, architecturally-sound dataset for training the HouseBrain LLM. It is designed for **large-scale, parallel data generation.**

**Our Strategy:**
1.  **Generate a Master Prompt File (Once)**: Run Cell 4 a single time to generate a large (e.g., 30,000) list of prompts and save it to your Google Drive.
2.  **Run Multiple Notebooks in Parallel**: You can open this same notebook using different Google accounts.
3.  **Process Random Batches**: Each notebook instance will read from the master prompt list, select a random, unique batch of prompts to process, and save the results to a central dataset folder on your Drive.
4.  **Avoid Duplicate Work**: The script checks if a plan for a given prompt already exists, allowing multiple instances to contribute to the same dataset without collisions.

## Instructions
1.  **Set Your GitHub PAT**: In Cell 1, you will be prompted to enter a GitHub Personal Access Token to clone the repository.
2.  **(First Time Only) Run Cell 4**: Run Cell 4 to create your master `platinum_prompts.txt` file in Google Drive. You only need to do this once.
3.  **Run the Factory (Cell 3)**: Run cells 1, 2, and 3. You can configure the number of plans you want the current notebook instance to generate in Cell 3.
4.  **Repeat**: Open this notebook with other accounts, mount the same Google Drive, and run Cell 3 again to generate more data in parallel.


In [None]:
# @title ## 1. Setup Environment
# @markdown Mount Google Drive and clone the repository using a secure token.
from google.colab import drive
import os
import getpass
import subprocess

# Mount Google Drive
drive.mount('/content/drive')
print("✅ Google Drive mounted.")

# --- GitHub Setup ---
#@markdown Enter your GitHub Personal Access Token (PAT) with repo access.
GITHUB_TOKEN = getpass.getpass('Enter your GitHub PAT: ')
REPO_URL = f"https://{GITHUB_TOKEN}@github.com/Vinay-O/HouseBrainLLM.git"
REPO_DIR = "/content/HouseBrainLLM"

# Clone the repository
if os.path.exists(REPO_DIR):
    print("Repository already exists. Pulling latest changes...")
    # Use subprocess.run for better error handling
    subprocess.run(f"cd {REPO_DIR} && git pull", shell=True, check=True)
else:
    print("Cloning repository...")
    subprocess.run(f"git clone {REPO_URL} {REPO_DIR}", shell=True, check=True)

print("✅ Repository is ready.")

# --- Install Dependencies ---
#@markdown Install necessary Python packages from the new requirements file.
requirements_path = os.path.join(REPO_DIR, "requirements.txt")
if os.path.exists(requirements_path):
    print("Installing dependencies from requirements.txt...")
    !pip install -q -r {requirements_path}
    print("✅ Dependencies installed.")
else:
    print("⚠️ requirements.txt not found. Installing default packages.")
    !pip install -q pydantic

print("✅ Environment setup complete.")



In [None]:
# @title ## 2. Configure and Start Ollama Server
# @markdown This cell will download and start the Ollama server, then pull the specified model.
# @markdown **NOTE:** A powerful model like `deepseek-r1:32b` is now recommended for higher quality results. It will be slower but more reliable.

MODEL_NAME = "deepseek-r1:32b" # @param ["deepseek-r1:32b", "llama3:70b-instruct", "qwen2:72b-instruct", "mixtral:instruct"]

# Download and start Ollama
!curl -fsSL https://ollama.com/install.sh | sh
import threading
import subprocess
import time

def run_ollama():
    try:
        subprocess.run("ollama serve", shell=True, check=True, capture_output=True, text=True)
    except subprocess.CalledProcessError as e:
        print(f"Ollama server failed: {e.stderr}")

print("🚀 Starting Ollama server in the background...")
ollama_thread = threading.Thread(target=run_ollama)
ollama_thread.daemon = True
ollama_thread.start()

# Wait for the server to be ready
print("⏳ Waiting for Ollama server to initialize...")
time.sleep(15) # Increased wait time for stability

# Pull the model
print(f"📦 Pulling model: {MODEL_NAME}. This may take a while...")
try:
    process = subprocess.run(
        f"ollama pull {MODEL_NAME}",
        shell=True, check=True, capture_output=True, text=True, timeout=900
    )
    print(f"✅ Model {MODEL_NAME} is ready.")
except subprocess.CalledProcessError as e:
    print(f"Error pulling model: {e.stderr}")
    print("This might happen if the model name is incorrect or the Ollama server is not ready.")
except subprocess.TimeoutExpired:
    print("Timed out while pulling the model. The model might be very large or the connection slow.")


# Verify Ollama is running
!ollama list



In [None]:
# @title ## 3. Run the Data Factory (V3: Assembly Line)
# @markdown This cell implements the **"Assembly Line"** strategy (Plan C).
# @markdown It breaks down the complex task of generating a house plan into a series of smaller, more reliable steps, and then assembles the final product with code. This is a more robust and professional approach to AI-driven data generation.

import os
import sys
import json
import textwrap
import logging
from pathlib import Path
import urllib.request
import urllib.error
from inspect import getsource
from pydantic import BaseModel, ValidationError
import random
import hashlib
import time
import re

# --- Configuration ---
DRIVE_PROMPT_FILE = "/content/drive/MyDrive/housebrain_prompts/platinum_prompts.txt" #@param {type:"string"}
DRIVE_OUTPUT_DIR = "/content/drive/MyDrive/housebrain_platinum_dataset" #@param {type:"string"}
NUM_PLANS_TO_GENERATE = 100 #@param {type:"integer"}
MODEL_NAME = "deepseek-r1:32b" # @param ["deepseek-r1:32b", "llama3:70b-instruct", "qwen2:72b-instruct", "mixtral:instruct"]
# --- End Configuration ---


# --- Setup ---
REPO_DIR = "/content/HouseBrainLLM"
if REPO_DIR not in sys.path: sys.path.insert(0, REPO_DIR)
os.chdir(REPO_DIR)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

try:
    from src.housebrain.schema import HouseOutput, RoomType
    print("✅ Successfully imported HouseBrain schema.")
except ImportError as e:
    print(f"❌ Failed to import HouseBrain schema: {e}")
    raise e

# --- Prompts for Assembly Line ---
VALID_ROOM_TYPES = [e.value for e in RoomType]
VALID_WINDOW_TYPES = ["fixed", "casement", "sliding", "bay"]
VALID_DOOR_TYPES = ["interior", "exterior", "sliding", "pocket"]

STAGE_1_LAYOUT_PROMPT = """You are an expert AI architect. Your task is to generate ONLY the high-level geometric layout for a house based on a user's prompt.

**CRITICAL INSTRUCTIONS:**
1.  Focus ONLY on `levels` and `rooms`.
2.  Rooms MUST have an `id`, `type`, and non-overlapping `bounds`. **BOUNDS MUST NOT OVERLAP.**
3.  The `type` for each room MUST be one of the following valid options: `{valid_room_types}`. Do NOT invent new types or use shorthands.
4.  **Size Constraint**: Rooms must have realistic dimensions. For example, an `entrance` must be at least 40 sqft, a `bathroom` at least 40 sqft, and a `bedroom` at least 120 sqft.
5.  DO NOT include `doors` or `windows`.
6.  Your output MUST be a single, valid JSON object with a root "levels" key.
---
**User Prompt:**
{user_prompt}
---
Now, generate ONLY the JSON for the house layout."""

STAGE_2_DOORS_PROMPT = """You are an AI architect. Given the JSON layout of a house, your task is to generate ONLY a JSON list of Door objects to connect the rooms logically.

**CRITICAL INSTRUCTIONS:**
1.  Your output MUST be a single, valid JSON list `[...]`.
2.  Each object in the list must be a valid Door, with `position`, `width`, `type`, `room1`, and `room2`.
3.  The `type` MUST be one of: `{valid_door_types}`.
4.  `room1` and `room2` MUST be valid room IDs from the provided layout. For an exterior door, you may use an invented ID like "exterior_0" for `room2`.

**Golden Example of a perfect output:**
```json
[
  {{
    "position": {{ "x": 20, "y": 25 }},
    "width": 3.0,
    "type": "interior",
    "room1": "living_room_0",
    "room2": "dining_room_0"
  }}
]
```
---
**House Layout:**
```json
{house_layout}
```
---
Now, generate ONLY the JSON list of Door objects."""

STAGE_3_WINDOWS_PROMPT = """You are an AI architect. Given the JSON layout of a house, your task is to generate ONLY a JSON list of Window objects.

**CRITICAL INSTRUCTIONS:**
1.  Your output MUST be a single, valid JSON list `[...]`.
2.  Each object in the list must be a valid Window, with `position`, `width`, `height`, `type`, and `room_id`.
3.  The `type` MUST be one of: `{valid_window_types}`.
4.  The `room_id` MUST be a valid room ID from the provided layout.
5.  Place windows on walls that would logically be exterior walls.

**Golden Example of a perfect output:**
```json
[
  {{
    "position": {{ "x": 10, "y": 17.5 }},
    "width": 8.0,
    "height": 5.0,
    "type": "sliding",
    "room_id": "living_room_0"
  }}
]
```
---
**House Layout:**
```json
{house_layout}
```
---
Now, generate ONLY the JSON list of Window objects."""

JSON_REPAIR_PROMPT = """The following text is not a valid JSON object. Please fix any syntax errors (like missing commas, brackets, or quotes) and return ONLY the corrected, valid JSON object. Do not add any commentary.

**Broken Text:**
{broken_json}
"""

# --- Core Functions ---
def call_ollama_colab(model_name: str, prompt: str, max_retries=3):
    url = "http://localhost:11434/api/generate"
    data = {"model": model_name, "prompt": prompt, "stream": False, "format": "json"}
    encoded_data = json.dumps(data).encode('utf-8')
    req = urllib.request.Request(url, data=encoded_data, headers={'Content-Type': 'application/json'})
    for attempt in range(max_retries):
        try:
            with urllib.request.urlopen(req, timeout=900) as response:
                if response.status == 200:
                    return json.loads(response.read().decode('utf-8')).get("response", "")
        except Exception as e:
            logger.error(f"Attempt {attempt + 1}/{max_retries} failed. Error calling Ollama: {e}")
            if attempt < max_retries - 1: time.sleep(5)
            else: logger.error("Max retries reached. Failing.")
    return None

def repair_json(raw_json_str: str, model_name: str, target_type: type):
    """More robustly finds, parses, and repairs a JSON string."""
    if not raw_json_str: return None
    
    # 1. Aggressive Regex Extraction
    clean_str = None
    if target_type == list:
        match = re.search(r'\[.*\]', raw_json_str, re.DOTALL)
        if match: clean_str = match.group(0)
    else: # dict
        match = re.search(r'\{.*\}', raw_json_str, re.DOTALL)
        if match: clean_str = match.group(0)

    if clean_str:
        try:
            data = json.loads(clean_str)
            if isinstance(data, target_type): return data
        except json.JSONDecodeError:
            pass # Fall through to LLM repair

    # 2. Fallback to LLM Repair
    print(f"Initial parse for type {target_type.__name__} failed. Attempting LLM repair...")
    repair_prompt = JSON_REPAIR_PROMPT.format(broken_json=raw_json_str)
    repaired_str = call_ollama_colab(model_name, repair_prompt)
    if not repaired_str:
        print("❌ JSON repair failed: No response from model.")
        return None
    try:
        data = json.loads(repaired_str)
        if isinstance(data, target_type):
            print(f"✅ JSON successfully repaired to type {target_type.__name__}.")
            return data
        # Handle cases where model wraps list in a dict
        if target_type == list and isinstance(data, dict) and len(data.keys()) == 1:
            for key, value in data.items():
                if isinstance(value, list):
                    print(f"✅ Repaired JSON by extracting list from key '{key}'.")
                    return value
        print(f"❌ Repaired JSON is not of target type {target_type.__name__}.")
        return None
    except json.JSONDecodeError:
        print("❌ JSON parse failed even after LLM repair.")
        return None

def assemble_plan(layout_dict, doors_list, windows_list):
    """A completely bulletproof assembly function that can handle any malformed data."""
    # Safety wrapper for dict access to handle any possible type error
    def safe_get(d, key, default=None):
        try:
            if not isinstance(d, dict):
                return default
            return d.get(key, default)
        except:
            return default
    
    # Initialize an empty result with a safe structure
    result = {"levels": []}
    
    # Safely extract and verify the levels
    try:
        if not isinstance(layout_dict, dict):
            print("⚠️ Assembly Error: Layout is not a dictionary. Creating empty layout.")
            return result
            
        levels = safe_get(layout_dict, "levels", [])
        if not isinstance(levels, list):
            print("⚠️ Assembly Error: 'levels' key is not a list. Creating empty layout.")
            return result
            
        result["levels"] = levels
        
        # Create a safe room lookup dictionary
        rooms_by_id = {}
        
        # Process each level and room with complete safety
        for level_idx, level in enumerate(levels):
            try:
                if not isinstance(level, dict):
                    print(f"⚠️ Assembly Warning: Skipping non-dict level at index {level_idx}.")
                    continue
                    
                # Initialize an empty rooms list if needed
                if "rooms" not in level or not isinstance(level["rooms"], list):
                    level["rooms"] = []
                    continue
                    
                # Process each room
                for room_idx, room in enumerate(level["rooms"]):
                    try:
                        if not isinstance(room, dict):
                            print(f"⚠️ Assembly Warning: Skipping non-dict room at index {room_idx}.")
                            continue
                            
                        room_id = safe_get(room, "id")
                        if not isinstance(room_id, str):
                            print(f"⚠️ Assembly Warning: Room has invalid ID: {room_id}")
                            continue
                            
                        # Initialize empty collections for doors and windows
                        if "doors" not in room or not isinstance(room["doors"], list):
                            room["doors"] = []
                        if "windows" not in room or not isinstance(room["windows"], list):
                            room["windows"] = []
                            
                        # Add to lookup
                        rooms_by_id[room_id] = room
                    except Exception as e:
                        print(f"⚠️ Assembly Warning: Error processing room: {str(e)}")
            except Exception as e:
                print(f"⚠️ Assembly Warning: Error processing level: {str(e)}")
    
        # Process windows safely
        if isinstance(windows_list, list):
            for window_idx, window in enumerate(windows_list):
                try:
                    if not isinstance(window, dict):
                        print(f"⚠️ Assembly Warning: Discarding non-dict window at index {window_idx}: {window}")
                        continue
                        
                    room_id = safe_get(window, "room_id")
                    if not isinstance(room_id, str) or room_id not in rooms_by_id:
                        print(f"⚠️ Assembly Warning: Window has invalid room_id: {room_id}")
                        continue
                        
                    # Verify window has required fields
                    required_fields = ["position", "width", "height", "type"]
                    for field in required_fields:
                        if field not in window:
                            print(f"⚠️ Assembly Warning: Window missing required field: {field}")
                            break
                    else:
                        # All checks passed, safe to add
                        rooms_by_id[room_id]["windows"].append(window)
                except Exception as e:
                    print(f"⚠️ Assembly Warning: Error processing window: {str(e)}")
    
        # Process doors safely
        if isinstance(doors_list, list):
            for door_idx, door in enumerate(doors_list):
                try:
                    if not isinstance(door, dict):
                        print(f"⚠️ Assembly Warning: Discarding non-dict door at index {door_idx}: {door}")
                        continue
                        
                    room1_id = safe_get(door, "room1")
                    if not isinstance(room1_id, str) or room1_id not in rooms_by_id:
                        print(f"⚠️ Assembly Warning: Door has invalid room1_id: {room1_id}")
                        continue
                        
                    # Verify door has required fields
                    required_fields = ["position", "width", "type", "room2"]
                    for field in required_fields:
                        if field not in door:
                            print(f"⚠️ Assembly Warning: Door missing required field: {field}")
                            break
                    else:
                        # All checks passed, safe to add
                        rooms_by_id[room1_id]["doors"].append(door)
                except Exception as e:
                    print(f"⚠️ Assembly Warning: Error processing door: {str(e)}")
    except Exception as e:
        print(f"⚠️ Assembly Error: Unexpected error during assembly: {str(e)}")
    
    return result

# --- Execution ---
print("--- Starting Data Factory Run (V3: Assembly Line) ---")
Path(DRIVE_OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
try:
    with open(DRIVE_PROMPT_FILE, 'r') as f:
        all_prompts = [line.strip() for line in f if line.strip()]
    print(f"✅ Found {len(all_prompts)} total prompts in the master list.")
except FileNotFoundError:
    print(f"❌ MASTER PROMPT FILE NOT FOUND at '{DRIVE_PROMPT_FILE}'.")
    all_prompts = []

if all_prompts:
    random.shuffle(all_prompts)
    prompts_to_process = all_prompts[:NUM_PLANS_TO_GENERATE]
    print(f"✅ This run will process a random batch of {len(prompts_to_process)} prompts.")

    for i, prompt_text in enumerate(prompts_to_process):
        print("\n" + "="*50)
        print(f"Processing prompt {i+1}/{len(prompts_to_process)}")
        
        run_name = f"plan_{hashlib.sha1(prompt_text.encode()).hexdigest()[:16]}"
        output_file = Path(DRIVE_OUTPUT_DIR) / f"{run_name}.json"

        if output_file.exists():
            print(f"⏭️ Skipping prompt, output file already exists: {output_file.name}")
            continue
        
        print(textwrap.shorten(prompt_text, width=100, placeholder="..."))
        print("="*50)

        # --- STAGE 1: Generate Layout ---
        print("Running Stage 1: Layout Generation...")
        stage_1_prompt = STAGE_1_LAYOUT_PROMPT.format(user_prompt=prompt_text, valid_room_types=VALID_ROOM_TYPES)
        layout_str = call_ollama_colab(MODEL_NAME, stage_1_prompt)
        layout_dict = repair_json(layout_str, MODEL_NAME, dict)
        if not layout_dict or not layout_dict.get("levels"):
            print("❌ Stage 1 Failed: Could not produce a valid layout dictionary.")
            with open(str(output_file).replace('.json', '_s1_failed.txt'), 'w') as f: f.write(layout_str or "")
            continue
        
        # --- STAGE 2: Generate Doors ---
        print("Running Stage 2: Door Generation...")
        stage_2_prompt = STAGE_2_DOORS_PROMPT.format(house_layout=json.dumps(layout_dict, indent=2), valid_door_types=VALID_DOOR_TYPES)
        doors_str = call_ollama_colab(MODEL_NAME, stage_2_prompt)
        doors_list = repair_json(doors_str, MODEL_NAME, list)
        if doors_list is None: # An empty list is a valid result
            print("❌ Stage 2 Failed: Could not produce a valid list of doors.")
            with open(str(output_file).replace('.json', '_s2_failed.txt'), 'w') as f: f.write(doors_str or "")
            continue

        # --- STAGE 3: Generate Windows ---
        print("Running Stage 3: Window Generation...")
        stage_3_prompt = STAGE_3_WINDOWS_PROMPT.format(house_layout=json.dumps(layout_dict, indent=2), valid_window_types=VALID_WINDOW_TYPES)
        windows_str = call_ollama_colab(MODEL_NAME, stage_3_prompt)
        windows_list = repair_json(windows_str, MODEL_NAME, list)
        if windows_list is None: # An empty list is a valid result
            print("❌ Stage 3 Failed: Could not produce a valid list of windows.")
            with open(str(output_file).replace('.json', '_s3_failed.txt'), 'w') as f: f.write(windows_str or "")
            continue

        # --- STAGE 4: Assemble & Validate ---
        print("Running Stage 4: Assembling and Validating...")
        try:
            assembled_plan_dict = assemble_plan(layout_dict, doors_list, windows_list)
            
            processed_levels = assembled_plan_dict.get("levels", [])
            for level_idx, level in enumerate(processed_levels):
                level['level_number'] = level_idx
            
            total_area_sqft = sum(r['bounds']['width'] * r['bounds']['height'] for l in processed_levels for r in l.get("rooms", []))
            
            final_plan = {
                "input": {"basicDetails": {"prompt": prompt_text, "totalArea": total_area_sqft, "unit": "sqft"}},
                "levels": processed_levels, "total_area": round(total_area_sqft, 2),
                "construction_cost": 0.0, "materials": {}, "render_paths": {}
            }
            
            HouseOutput.model_validate(final_plan)
            with open(output_file, 'w') as f: json.dump(final_plan, f, indent=2)
            print(f"✅ SUCCESS! Saved validated plan to {output_file}")

        except ValidationError as e:
            print(f"❌ Stage 4 Failed: Pydantic validation error - {e}")
            with open(str(output_file).replace('.json', '_s4_failed_validation.json'), 'w') as f: json.dump(assembled_plan_dict, f, indent=2)
        except Exception as e:
            print(f"❌ Stage 4 Failed: An unexpected error occurred - {e}")
            with open(str(output_file).replace('.json', '_s4_failed_exception.json'), 'w') as f: json.dump(assembled_plan_dict, f, indent=2)

    print("\n🎉 Data Factory run complete!")
else:
    print("No prompts to process.")



In [None]:
# @title ## 4. (One-Time Setup) Generate Master Prompt File
# @markdown This cell uses the `generate_prompts.py` script to create your master prompt file in Google Drive.
# @markdown **You only need to run this cell once.**
# @markdown Once the file is created, Cell 3 will be able to read from it for all future runs.

import os
from pathlib import Path

# --- Configuration ---
#@markdown The desired location in your Google Drive for the master prompt file. This MUST match the path in Cell 3.
DRIVE_PROMPT_FILE = "/content/drive/MyDrive/housebrain_prompts/platinum_prompts.txt" #@param {type:"string"}

#@markdown The total number of prompts to generate for your master list.
NUM_PROMPTS_TO_GENERATE = 30000 #@param {type:"integer"}
# --- End Configuration ---

# --- Execution ---
REPO_DIR = "/content/HouseBrainLLM"
script_path = os.path.join(REPO_DIR, "scripts/generate_prompts.py")

# Ensure the repository is in the correct directory
os.chdir(REPO_DIR)

# Ensure the target directory in Drive exists
Path(DRIVE_PROMPT_FILE).parent.mkdir(parents=True, exist_ok=True)

print(f"Running prompt generation script to create {NUM_PROMPTS_TO_GENERATE} prompts...")
# Use an f-string for safer command construction
command = f'python3 "{script_path}" --num-prompts {NUM_PROMPTS_TO_GENERATE} --output-file "{DRIVE_PROMPT_FILE}"'
!{command}

print("\n--- Verification ---")
if Path(DRIVE_PROMPT_FILE).exists():
    print(f"✅ Master prompt file successfully created at: {DRIVE_PROMPT_FILE}")
    print("First 5 prompts in the file:")
    !head -n 5 "{DRIVE_PROMPT_FILE}"
else:
    print(f"❌ ERROR: Master prompt file was not created. Please check for errors above.")



In [None]:
# @title ## 5. (Optional) Download Generated Dataset
# @markdown Run this cell after the data generation is complete to compress and download the entire output folder.

import shutil
import os
from google.colab import files
from datetime import datetime

# Define the source directory in Google Drive. This should match DRIVE_OUTPUT_DIR from Cell 3.
source_dir = "/content/drive/MyDrive/housebrain_platinum_dataset"

# Create a timestamped zip filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = f"housebrain_dataset_{timestamp}.zip"
zip_filepath = f"/content/{zip_filename}"

if os.path.exists(source_dir) and os.listdir(source_dir):
    # Create the zip archive
    print(f"Compressing '{source_dir}' into '{zip_filepath}'...")
    shutil.make_archive(zip_filepath.replace('.zip', ''), 'zip', source_dir)
    print("✅ Compression complete.")

    # Provide a download link
    print(f"\nDownloading '{zip_filename}'...")
    files.download(zip_filepath)
else:
    print(f"❌ ERROR: The source directory '{source_dir}' was not found or is empty. Please ensure the Data Factory ran correctly.")

