In [1]:
from dotenv import load_dotenv
load_dotenv("../.env")

True

In [None]:
import os
from io import BytesIO
from pathlib import Path
from dataclasses import asdict, dataclass

import psycopg
from pgvector.psycopg import register_vector
from rich import print

from google import genai
from google.genai import types

import numpy as np
import pandas as pd
import lightgbm as lgb

import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel, AutoImageProcessor
from PIL import Image

## Database

In [None]:
@dataclass
class LocationStatistics:
    subdistrict: str
    district: str
    city: str
    province: str
    listing_count: int
    average_price_idr: float

@dataclass
class RetrievedDocument:
    id: str
    parent_id: str | None
    source: str
    score: float

@dataclass
class Document:
    id: str
    content: str

In [4]:
db_conn = psycopg.connect(os.environ["DB_URI"])
register_vector(db_conn)

In [None]:
def query_locations() -> list[LocationStatistics]:
    with db_conn.cursor() as cur:
        try:
            top_area_sql = """
                SELECT 
                    a.subdistrict,
                    a.district,
                    a.city,
                    a.province,
                    count(*) as	listing_count,
                    avg(h.price) as average_price
                FROM
                    marts_dim_area a
                INNER JOIN
                    marts_fact_houses h ON h.area_sk = a.area_sk
                GROUP BY
                    a.subdistrict, a.district, a.city , a.province
                ORDER BY 
                    listing_count desc
                LIMIT 15
                """
            
            cur.execute(top_area_sql)

            return [LocationStatistics(*x) for x in cur.fetchall()]
        except Exception as e:
            db_conn.rollback()
            print(e)
            
            raise ValueError("Error when querying the database")   

In [None]:
def query_hybrid(text_query: str, text_embedding) -> list[RetrievedDocument]:
    with db_conn.cursor() as cur:
        try:
            nearest_docs_sql = """
                WITH
                bm25_query AS (
                    SELECT 
                        id, 
                        parent_id,
                        'bm25' AS source,
                        paradedb.score(id) AS score
                    FROM
                        houses
                    WHERE
                        content @@@ %(keyword)s 
                    LIMIT 3
                ),
                embedding_query as (
                    SELECT
                        id, 
                        parent_id,
                        'embedding' AS source,
                        1 - (embedding <=> %(embedding)s::vector) AS score
                    FROM
                        houses 
                    ORDER BY
                        score DESC
                    LIMIT 3
                )
                SELECT * FROM bm25_query 
                UNION
                SELECT * FROM embedding_query
                """
            
            cur.execute(nearest_docs_sql, {"keyword": text_query, "embedding": text_embedding})

            return [RetrievedDocument(*x) for x in cur.fetchall()]
        except Exception as e:
            db_conn.rollback()
            print(e)
            
            raise ValueError("Error when querying the database")   

In [66]:
def query_image(image_embedding) -> list[RetrievedDocument]:
    with db_conn.cursor() as cur:
        try:
            images_sql = """
                SELECT
                    id, 
                    parent_id,
                    'embedding' AS source,
                    1 - (embedding <=> %(embedding)s::vector) AS score
                FROM
                    house_images
                ORDER BY
                    score DESC
                LIMIT 3
                """
            
            cur.execute(images_sql, {"embedding": image_embedding})

            results = [RetrievedDocument(*x) for x in cur.fetchall()]

            added_ids = []
            return [x for x in results if x.parent_id not in added_ids]

        except Exception as e:
            db_conn.rollback()
            print(e)
            
            raise ValueError("Error when querying the database")    

In [67]:
def query_houses(ids: list[str]) -> list[Document]:
    with db_conn.cursor() as cur:
        try:
            related_docs_sql = """
                WITH RECURSIVE
                related_houses AS (
                    SELECT
                        id,
                        content
                    FROM
                        houses
                    WHERE
                        id = ANY(%(ids)s)
                    UNION
                        SELECT
                            e.id,
                            e.content
                        FROM
                            houses e
                        INNER JOIN related_houses s ON s.id = e.parent_id
                ) 
                SELECT
                    *
                FROM
                    related_houses
                WHERE
                    length(content) > 50
                """
            
            cur.execute(related_docs_sql, {"ids": ids})

            return [Document(*x) for x in cur.fetchall()]

        except Exception as e:
            db_conn.rollback()
            print(e)
            
            raise ValueError("Error when querying the database")    

In [68]:
def query_house_images(house_id: str) -> list[str]:
    with db_conn.cursor() as cur:
        try:
            related_docs_sql = """
                SELECT
                    file_path
                FROM
                    house_images
                WHERE
                    parent_id = %(house_id)s
                """
            
            cur.execute(related_docs_sql, {"house_id": house_id})

            return [x[0] for x in cur.fetchall()]

        except Exception as e:
            db_conn.rollback()
            print(e)
            
            raise ValueError("Error when querying the database")    

## Embeddings

In [69]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
text_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True, safe_serialization=True).to(device)
text_model.eval()

processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5")
vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True).to(device)

<All keys matched successfully>


In [70]:
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0]
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()

    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

In [71]:
def embed_text(text: str):
    encoded_input = tokenizer([text], padding=True, truncation=True, return_tensors="pt").to(device)

    with torch.no_grad():
        model_output = text_model(**encoded_input)

    embeddings = mean_pooling(model_output, encoded_input["attention_mask"])
    embeddings = F.normalize(embeddings, p=2, dim=1)
    
    return embeddings.cpu().numpy()[0].tolist()

In [72]:
def embed_image(image_data):
    image = Image.open(BytesIO(image_data))
    inputs = processor(image, return_tensors="pt").to(device)

    img_emb = vision_model(**inputs).last_hidden_state
    img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1)
    
    return img_embeddings.detach().cpu().numpy()[0].tolist()

## Prediction Model

In [98]:
price_model = lgb.Booster(model_file="../data/rumah-regressor.txt")
price_model

<lightgbm.basic.Booster at 0x7f8baf0a5d10>

In [129]:
sample_data = {
    'subdistrict': ['Babakanmadang'],
    'luas_tanah': [90],
    'luas_bangunan': [70],
    'jumlah_lantai': [2],
    'tahun_dibangun': [0],
    'daya_listrik': [0],

    'land_building_ratio': [90 / 70],
    'total_beds': [4],
    'total_baths': [2],
    'building_area_floor_ratio': [90 / 2],
}

df_predict = pd.DataFrame(sample_data)
df_predict["subdistrict"] = df_predict["subdistrict"].astype(pd.api.types.CategoricalDtype(price_model.pandas_categorical[0]))
df_predict

np.exp(price_model.predict(df_predict)[0])

1038541501.7866584

## Tools

In [74]:
def top_listing_by_location() -> list[dict[str, str | int | float]]:
    """List locations with the most available house sales listing along with its average price.

    Returns:
        A list of dictionary containing the area subdistrict, district, city and province, along with the number of house for sale and its average price
    """
    
    locations = query_locations()
    return [asdict(x) for x in locations]

In [75]:
def search_by_keyword(query: str) -> list[dict[str, str]]:
    """Search house sale listing using a search query.

    Args:
        query: Search query describing the house information including price, location, number of bedrooms, etc.

    Returns:
        A list of dictionary containing a unique house ID and detailed house description.
    """

    text_embedding = embed_text(query)
    retrieved_documents = query_hybrid(query, text_embedding)
    documents = query_houses([x.id for x in retrieved_documents])

    return [asdict(x) for x in documents]

In [88]:
def search_by_image_id(image_id: str) -> list[dict[str,  str]]:
    """Search house sale listing using an image.

    Args:
        image_id: Unique ID supplied by the user after it is uploaded to the system.

    Returns:
        A list of dictionary containing a unique house ID and detailed house description.
    """
    
    image_path = Path("../data/rumah123/images") / "hos5089888/2022-08-11-02-27-44-5a9a9dd9-93f8-4ec6-aad5-4ffaa1325a70.jpg"
    # image_path = Path("../data/rumah123/images") / image_id
    if not image_path.exists():
        raise ValueError("Image not found")
    
    with open(image_path, "rb") as f:
        image_embedding = embed_image(f.read())

    retrieved_documents = query_image(image_embedding)
    documents = query_houses([x.parent_id for x in retrieved_documents])

    return [asdict(x) for x in documents]

In [89]:
def get_house_images(house_id: str) -> list[str]:
    """Gets the image paths associated with the specified house ID.

    Args:
        house_id: Unique house ID that must starts with "hos".

    Returns:
        A list of file paths to the images.
    """
    
    if not house_id.startswith("hos"):
        raise ValueError("House ID must start with hos")
    
    norm_id = house_id.replace("-desc", "")
    retrieved_documents = query_house_images(norm_id)

    return retrieved_documents

In [90]:
"hos5089888/2022-08-11-02-27-44-5a9a9dd9-93f8-4ec6-aad5-4ffaa1325a70.jpg"

'hos5089888/2022-08-11-02-27-44-5a9a9dd9-93f8-4ec6-aad5-4ffaa1325a70.jpg'

In [None]:
def get_available_subdistricts() -> list[str]:
    """Lists the available subdistrict locations of the house listings. Used for searching for sale properties and predicting property prices.
    
    Returns:
        A list of subdistrict names.
    """
    
    return price_model.pandas_categorical[0]

In [None]:
def predict_house_price(subdistrict: str, land_area: float, building_area: float, num_bedrooms: int=1, num_bathrooms: int=1, num_floors: int=1, year_built: int=0, electricity_rate: float=1300) -> float:
    """Predicts a house price based on its features.

    Args:
        subdistrict: Subdistrict name of the property. This is a required field.
        land_area: Estimated land area of the property in meters squared. This is a required field.
        building_area: Estimated building area on top of the land in meters squared. This is a required field.
        num_bedrooms: Number of bedrooms. The default value is 1.
        num_bathrooms: Number of bathrooms. The default value is 1.
        num_floors: Number of floors. The default value is 1.
        year_built: What year the property is built. The default value is 0.
        electricity_rate: The electrical wattage subscription from electricity provider. The default value is 1300.

    Returns:
        The predicted property price in IDR.
    """

    valid_locations = price_model.pandas_categorical[0]
    if subdistrict not in valid_locations:
        raise ValueError("Invalid subdistrict")

    sample_data = {
        'subdistrict': [subdistrict],
        'luas_tanah': [land_area],
        'luas_bangunan': [building_area],
        'jumlah_lantai': [num_floors],
        'tahun_dibangun': [year_built],
        'daya_listrik': [electricity_rate],

        'land_building_ratio': [land_area / building_area],
        'total_bedrooms': [num_bedrooms],
        'total_bathrooms': [num_bathrooms],
        'building_area_floor_ratio': [building_area / num_floors],
    }

    df_predict = pd.DataFrame(sample_data)
    df_predict["subdistrict"] = df_predict["subdistrict"].astype(pd.api.types.CategoricalDtype(valid_locations))

    predicted = price_model.predict(df_predict)
    return np.exp(predicted[0])

## LLM Chat

In [None]:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
config = types.GenerateContentConfig(
    system_instruction="You are a house property salesman.", 
    tools=[
        top_listing_by_location, 
        search_by_keyword, 
        search_by_image_id, 
        get_house_images, 
        get_available_subdistricts, 
        predict_house_price
    ]
)

In [92]:
chat = client.chats.create(model='gemini-2.0-flash', config=config)
chat

<google.genai.chats.Chat at 0x7f8bdd57be50>

In [93]:
response = chat.send_message('get me house images for house with the following ID: hos15910480')
print(response)

In [130]:
response.text

'Okay, I have the image paths for house ID hos15910480. Here they are:\n\n*   hos15910480/2023-12-29-09-50-50-f7e3f823-e631-4655-bfa3-7d06dd827aa3.jpg\n*   hos15910480/2023-12-29-09-50-48-d09f9d7d-b9c0-4513-9f88-dadccd70647d.jpg\n*   hos15910480/2023-12-29-09-50-48-7b675ea7-e60f-46af-bae8-fc332e329fee.jpg\n*   hos15910480/2023-12-29-09-50-45-74e4cdca-4f58-451f-a0b3-bbccbc139fe0.jpg\n*   hos15910480/2023-12-29-09-50-49-0232994c-bf15-4bf2-88e6-a537a6dfc13e.jpg'

In [95]:
response.automatic_function_calling_history

[UserContent(parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=None, inline_data=None, text='get me house images for house with the following ID: hos15910480')], role='user'),
 Content(parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=FunctionCall(id=None, args={'house_id': 'hos15910480'}, name='get_house_images'), function_response=None, inline_data=None, text=None)], role='model'),
 Content(parts=[Part(video_metadata=None, thought=None, code_execution_result=None, executable_code=None, file_data=None, function_call=None, function_response=FunctionResponse(id=None, name='get_house_images', response={'result': ['hos15910480/2023-12-29-09-50-50-f7e3f823-e631-4655-bfa3-7d06dd827aa3.jpg', 'hos15910480/2023-12-29-09-50-48-d09f9d7d-b9c0-4513-9f88-dadccd70647d.jpg', 'hos15910480/2023-12-29-09-50-48-7b675ea7-e60f-46af

In [94]:
print(chat.get_history())