In [4]:
# Real Estate Multimodal Search Engine with Gradio
import gradio as gr
import torch
from sentence_transformers import SentenceTransformer
from PIL import Image
import os
import chromadb
from transformers import CLIPProcessor, CLIPModel
import random
import json
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from typing import List, Dict, Optional, Tuple
import uuid

In [None]:
# Load credentials
with open("credentials.json", "r") as file:
    credentials = json.load(file)

openai_api_key = credentials["OPENAI_API_KEY"]
os.environ["OPENAI_API_BASE"] = credentials["OPENAI_API_BASE"]

# Load models
text_model = SentenceTransformer('all-MiniLM-L6-v2')
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Setup ChromaDB
CHROMA_PATH = "chroma_real_estate"
if not os.path.exists(CHROMA_PATH):
    os.makedirs(CHROMA_PATH)

chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
collection = chroma_client.get_or_create_collection(
    name="real_estate_listings",
    metadata={"hnsw:space": "cosine"}  # Using cosine similarity for embeddings
)

# Initialize LangChain LLM
llm = ChatOpenAI(
    model_name="gpt-3.5-turbo",
    temperature=0.0,
    openai_api_key=openai_api_key
)

# Global variable to store listing metadata
listing_metadata = []

def generate_image_path(listing_id: str) -> str:
    """Generate a path for a listing image."""
    images_dir = os.path.join(CHROMA_PATH, "images")
    if not os.path.exists(images_dir):
        os.makedirs(images_dir)
    return os.path.join(images_dir, f"{listing_id}.jpg")

def save_listings_to_txt(listings, filename="Listings.txt"):
    """
    Save the listings to a .txt file in a readable format.

    Args:
        listings: List of dictionaries containing listing information.
        filename: Output .txt file name.
    """
    try:
        with open(filename, "w", encoding="utf-8") as file:
            for idx, listing in enumerate(listings, start=1):
                file.write(f"Listing {idx}:\n")
                for key, value in listing.items():
                    file.write(f"{key.capitalize()}: {value}\n")
                file.write("\n" + "-"*40 + "\n\n")
        print(f"Saved {len(listings)} listings to {filename}")
    except Exception as e:
        print(f"Error saving to file: {e}")

def generate_real_estate_listings(n: int = 10):
    """
    Generate diverse and realistic real estate listings using LLM.
    
    Args:
        n: Number of listings to generate
        
    Returns:
        List of dictionaries containing listing information
    """
    prompt = f"""
    Generate {n} diverse and realistic real estate listings. 
    Each listing should include:
    - Title
    - Description
    - Price
    - Location
    - Number of bedrooms
    - Number of bathrooms
    - Square footage
    - Amenities
    - Neighborhood Description

    Format the output as a JSON list with these exact keys:
    "title", "description", "price", "location", "bedrooms", 
    "bathrooms", "square_footage", "amenities", "neighborhood"
    """
    try:
        response = llm.invoke(prompt)
        json_text = response.content
        listings = json.loads(json_text)
        save_listings_to_txt(listings)           
        return listings
    except Exception as e:
        print(f"Error generating listings: {e}")
        return []

def generate_listing_image(listing: Dict):
    """
    Generate a placeholder image for a listing based on its features.
    """
    # Create a simple placeholder image with text
    from PIL import ImageDraw, ImageFont
    img = Image.new('RGB', (300, 200), color=(random.randint(200, 255), 
                                            random.randint(200, 255), 
                                            random.randint(200, 255)))
    d = ImageDraw.Draw(img)
    
    # Use a default font (size might need adjustment based on your system)
    try:
        font = ImageFont.truetype("arial.ttf", 15)
    except:
        font = ImageFont.load_default()
    
    d.text((10,10), f"{listing['title']}\n{listing['location']}\n${listing['price']}", 
           fill=(0,0,0), font=font)
    return img

def store_listings(listings: List[Dict]) -> None:
    """
    Store listings in ChromaDB with text and image embeddings.
    
    Args:
        listings: List of listing dictionaries
    """
    global listing_metadata
    
    # Clear existing data by getting all IDs first, then deleting them
    try:
        # Get all existing IDs
        existing_items = collection.get()
        if existing_items['ids']:
            collection.delete(ids=existing_items['ids'])
    except Exception as e:
        print(f"Error clearing existing data: {e}")
    
    listing_metadata = []
    
    for listing in listings:
        try:
            # Generate unique ID for the listing
            listing_id = str(uuid.uuid4())
            
            # Generate and save image for the listing
            img = generate_listing_image(listing)
            img_path = generate_image_path(listing_id)
            img.save(img_path)
            
            # Create combined text for embedding
            combined_text = (
                f"{listing['title']} {listing['description']} "
                f"{listing['location']} {listing['amenities']} "
                f"{listing['neighborhood']}"
            )
            
            # Generate text embedding
            text_embedding = text_model.encode(combined_text).tolist()
            
            # Generate image embedding
            inputs = clip_processor(images=img, return_tensors="pt")
            with torch.no_grad():
                image_features = clip_model.get_image_features(**inputs)
            image_embedding = image_features.squeeze().tolist()
            
            # Combine text and image embeddings (simple average)
            combined_embedding = [
                (t + i)/2 for t, i in zip(text_embedding, image_embedding)
            ]
            
            # Store in ChromaDB
            collection.add(
                ids=[listing_id],
                embeddings=[combined_embedding],
                documents=[combined_text],
                metadatas=[{
                    "type": "real_estate",
                    "source": "generated"
                }]
            )
            
            # Store metadata for reference
            listing_metadata.append({
                "id": listing_id,
                "title": listing["title"],
                "description": listing["description"],
                "price": listing["price"],
                "location": listing["location"],
                "bedrooms": listing["bedrooms"],
                "bathrooms": listing["bathrooms"],
                "square_footage": listing["square_footage"],
                "amenities": listing["amenities"],
                "neighborhood": listing["neighborhood"],
                "image_path": img_path
            })
            
        except Exception as e:
            print(f"Error storing listing {listing['title']}: {e}")
            continue
        
def compute_combined_embedding(text: str, image: Optional[Image.Image]):
    """
    Compute combined embedding from text and optional image.
    
    Args:
        text: Search query text
        image: Optional PIL Image
        
    Returns:
        Combined embedding vector
    """
    try:
        # Compute text embedding
        text_embedding = text_model.encode(text).tolist()
        
        if image is not None:
            # Compute image embedding
            inputs = clip_processor(images=image, return_tensors="pt")
            with torch.no_grad():
                image_features = clip_model.get_image_features(**inputs)
            image_embedding = image_features.squeeze().tolist()
            
            # Combine embeddings (weighted average - you can adjust weights)
            combined_embedding = [
                0.7 * t + 0.3 * i for t, i in zip(text_embedding, image_embedding)
            ]
        else:
            combined_embedding = text_embedding
            
        return combined_embedding
    except Exception as e:
        print(f"Error computing embeddings: {e}")
        return text_model.encode("").tolist()  # Return empty embedding on error

def augment_description(listing: Dict, preferences: str) -> str:
    """
    Augment listing description based on buyer preferences using LLM.
    
    Args:
        listing: The listing dictionary
        preferences: Buyer's preferences text
        
    Returns:
        Augmented description string
    """
    prompt = f"""
    You are a real estate agent personalizing a property description for a buyer.
    
    Original listing:
    Title: {listing['title']}
    Location: {listing['location']}
    Price: {listing['price']}
    Bedrooms: {listing['bedrooms']}
    Bathrooms: {listing['bathrooms']}
    Square footage: {listing['square_footage']}
    Amenities: {listing['amenities']}
    Neighborhood: {listing['neighborhood']}
    Description: {listing['description']}
    
    Buyer preferences: {preferences}
    
    Create a personalized description that:
    1. Maintains all factual information from the original listing
    2. Highlights aspects that match the buyer's preferences
    3. Is engaging and appealing to the buyer
    4. Is 2-3 paragraphs long
    
    Personalized description:
    """
    try:
        response = llm.invoke(prompt)
        return response.content
    except Exception as e:
        print(f"Error augmenting description: {e}")
        return listing['description']

def search_listings(query: str, image: Optional[Image.Image]):
    """
    Search listings based on text and optional image query.
    
    Args:
        query: Text search query
        image: Optional image search query
        
    Returns:
        Tuple of (results text, list of result images)
    """    
    try:
        combined_query_embedding = compute_combined_embedding(query, image)
        results = collection.query(
            query_embeddings=[combined_query_embedding],
            n_results=3
        )
        
        if not results["ids"]:
            return "No results found.", None
            
        # Get matching listings from metadata
        matched_listings = []
        matched_images = []
        result_strs = []
        
        for listing_id in results["ids"][0]:
            listing = next((l for l in listing_metadata if l["id"] == listing_id), None)
            if listing:
                matched_listings.append(listing)
                matched_images.append(Image.open(listing["image_path"]))
                
                # Augment description with buyer preferences
                augmented_desc = augment_description(listing, query)
                
                # Format price as string to avoid formatting issues
                try:
                    price_str = "${:,.0f}".format(float(listing['price']))
                except (ValueError, TypeError):
                    price_str = str(listing['price'])
                
                result_strs.append(
                    f"🏠 **{listing['title']}**\n"
                    f"📍 {listing['location']}\n"
                    f"💰 {price_str}\n"
                    f"🛏️ {listing['bedrooms']} bed | 🛁 {listing['bathrooms']} bath | 📏 {listing['square_footage']} sqft\n"
                    f"\n**Personalized Description:**\n{augmented_desc}\n"
                    f"\n**Amenities:** {listing['amenities']}\n"
                    f"\n**Neighborhood:** {listing['neighborhood']}"
                )
        
        return "\n\n".join(result_strs), matched_images
    except Exception as e:
        print(f"Error searching listings: {e}")
        return "An error occurred during search.", None

def initialize_database() -> None:
    """Initialize the database with generated listings and save to file."""
    print("Generating listings...")
    listings = generate_real_estate_listings(10)
    if listings:
        print(f"Generated {len(listings)} listings")
        # Store in database
        print("Storing listings...")
        store_listings(listings)
        print("Database initialized successfully")
    else:
        print("Failed to generate listings")


initialize_database()

Generating listings...
Saved 10 listings to real_estate_listings.txt
Generated 10 listings
Storing listings...
Database initialized successfully


In [5]:
# Gradio UI
with gr.Blocks(title="Real Estate Search") as demo:
    gr.Markdown("""# 🏡 Real Estate Multimodal Search Engine
    Upload listings and search using a description and optional image.""")
    
    with gr.Tab("Search Listings"):
        with gr.Row():
            query_input = gr.Textbox(
                label="Buyer Preferences", 
                placeholder="e.g., Modern house with a pool and garden near schools.",
                lines=3
            )
            query_img_input = gr.Image(
                type="pil", 
                label="Upload Preference Image (Optional)"
            )
        search_btn = gr.Button("Search", variant="primary")
        
        with gr.Row():
            result_text = gr.Textbox(
                label="Search Results", 
                interactive=False,
                lines=10,
                max_lines=20
            )
            result_gallery = gr.Gallery(
                label="Matching Listings",
                object_fit="contain",
                height="auto"
            )
        
        search_btn.click(
            fn=search_listings,
            inputs=[query_input, query_img_input],
            outputs=[result_text, result_gallery]
        )
    
    with gr.Tab("Database Info"):
        gr.Markdown("### Current Database Status")
        db_status = gr.Textbox(label="Listings Count", value=f"{len(listing_metadata)} listings")
        refresh_btn = gr.Button("Refresh Count")
        
        def update_db_status():
            return f"{len(listing_metadata)} listings"
            
        refresh_btn.click(
            fn=update_db_status,
            inputs=None,
            outputs=db_status
        )

In [6]:
if __name__ == "__main__":
    demo.launch()

* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.
