In [None]:
### NOTEBOOK 1: 01_geocode.ipynb (CORRECTED PATHS) ###
#
# GOAL: Run all expensive API calls (OpenAI & Google Maps)
# to create two clean, geocoded "failsafe" files.
#
# 1. Processes `Platform A` via REVERSE Geocoding.
# 2. Processes `Platform B` via AI Parse + FORWARD Geocoding.
#
# NOTE: Paths are adjusted for:
# - .env location: ..
# - Notebook location: ..\notebooks
#

import pandas as pd
import numpy as np
import json
import re
import time
import os
import sys
from pathlib import Path
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import warnings

# API & Data Handling
from openai import OpenAI
import googlemaps
from dotenv import load_dotenv
from thefuzz import fuzz

# Suppress warnings
warnings.filterwarnings('ignore')
pd.options.mode.chained_assignment = None
tqdm.pandas()

print("--- 01_geocode.ipynb ---")

In [None]:
# ## Step 1: Setup (API Clients & File Paths)
# ---

print("Step 1: Setting up clients and paths...")

# --- Path Definitions ---
# This is the project root folder (where your .env is)
PROJECT_ROOT = Path(r"..")
DATA_DIR = PROJECT_ROOT / "data"

# Load API keys from the project root directory
load_dotenv(dotenv_path=PROJECT_ROOT / ".env")

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
GOOGLE_MAPS_API_KEY = os.getenv("GOOGLE_MAPS_API_KEY")

# Initialize API Clients
try:
    gmaps_client = googlemaps.Client(key=GOOGLE_MAPS_API_KEY)
    openai_client = OpenAI(api_key=OPENAI_API_KEY)
    print("Google Maps and OpenAI clients configured successfully.")
except Exception as e:
    print(f"Warning: Could not initialize API clients. {e}")

# Input files (in data/raw)
RAW_R123_PATH = DATA_DIR / "raw" / "platform_a_raw.json"
RAW_PLATFORM_B_PATH = DATA_DIR / "raw" / "platform_b_raw.json"

# Output "Failsafe" files (in data/processed)
PROCESSED_DIR = DATA_DIR / "processed"
PROCESSED_DIR.mkdir(parents=True, exist_ok=True) # Ensure dir exists

R123_GEOCODED_PATH = PROCESSED_DIR / "platform_a_geocoded.csv"
PLATFORM_B_AI_PARSED_PATH = PROCESSED_DIR / "platform_b_parsed.csv"
PLATFORM_B_GEOCODED_PATH = PROCESSED_DIR / "platform_b_geocoded.csv"

print("Paths defined and API keys loaded from project root.\n")

In [None]:
# ---
# ## Step 2: Helper Functions (Loaders & Cleaners)
# ---

print("Step 2: Defining helper functions...")

def load_json_lines(file_path):
    """Loads a JSON-lines file, skipping malformed lines."""
    data = []
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                try:
                    data.append(json.loads(line))
                except json.JSONDecodeError:
                    pass # Skip errors
        print(f"Loaded {len(data)} records from {file_path.name}")
        return pd.DataFrame(data)
    except FileNotFoundError:
        print(f"❌ ERROR: File not found at {file_path}")
        return None
    except Exception as e:
        print(f"❌ ERROR loading {file_path.name}: {e}")
        return None

def clean_r123_description(desc):
    """Fixes spacing issues for Platform A descriptions."""
    if desc is None: return np.nan
    return re.sub(r'\s+', ' ', str(desc)).strip()

def clean_platform_b_description(desc):
    """Fixes spacing issues for Platform B descriptions."""
    if desc == "" or desc is None: return np.nan
    if isinstance(desc, list): desc = ' '.join(map(str, desc))
    return str(desc).replace('\u00b2', '').replace('\n', ' ').replace('\t', ' ').strip()

def clean_platform_b_id(id_str):
    """Cleans the Platform B ID for keyword extraction."""
    return str(id_str).replace('-', ' ').strip()

def stringify_specs(specs_dict):
    """Converts a specs dictionary to a clean JSON string."""
    if isinstance(specs_dict, dict):
        return json.dumps(specs_dict)
    return str(specs_dict)

print("Helper functions defined.\n")

In [None]:
# ---
# ## Step 3: `Platform A` - Path 1 (Reverse Geocoding)
# ---

print("---")
print("## Step 3: `Platform A` - Path 1 (Reverse Geocoding)")
print("Logic: Use existing lat/lon to get a clean address and zipcode.")

def geocode_reverse_google(lat_lon_tuple):
    """Reverse geocodes a (lat, lon) tuple."""
    time.sleep(0.02) # Rate limiting
    try:
        resp = gmaps_client.reverse_geocode(lat_lon_tuple)
        if not resp: return {}
        
        result = resp[0]
        full_address = result.get("formatted_address", "")
        confidence = result["geometry"].get("location_type", "UNKNOWN")
        postcode = ""
        for comp in result.get("address_components", []):
            if "postal_code" in comp.get("types", []):
                postcode = comp.get("short_name", "")
                break
        
        return {
            "geo_address": full_address,
            "zipcode": postcode,
            "geo_confidence": confidence
        }
    except Exception as exc:
        print(f"Google API Error (Reverse): {exc}", file=sys.stderr)
        return {}

def run_batch_reverse_geocoding(df, lat_col, lon_col):
    """Runs Google Reverse Geocoding in parallel (with bug fix)."""
    df[lat_col] = pd.to_numeric(df[lat_col], errors='coerce')
    df[lon_col] = pd.to_numeric(df[lon_col], errors='coerce')
    
    coords_df = df.dropna(subset=[lat_col, lon_col])
    unique_coords = list(coords_df.drop_duplicates(subset=[lat_col, lon_col])[[lat_col, lon_col]].itertuples(index=False, name=None))
    
    print(f"Starting batch REVERSE geocoding for {len(unique_coords)} unique coordinates...")
    
    results_cache = {}
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_coord = {executor.submit(geocode_reverse_google, coord): coord for coord in unique_coords}
        
        for future in tqdm(as_completed(future_to_coord), total=len(unique_coords), desc="Reverse Geocoding"):
            coord = future_to_coord[future]
            try:
                results_cache[coord] = future.result()
            except Exception as e:
                print(f"Geocoding worker error: {e}")
                results_cache[coord] = {}
                
    print("Batch reverse geocoding complete.")
    
    geo_df = pd.DataFrame.from_dict(results_cache, orient="index").reset_index()
    
    # --- BUG FIX ---
    geo_df.rename(columns={'level_0': lat_col, 'level_1': lon_col}, inplace=True)
    # --- END FIX ---
    
    return df.merge(geo_df, on=[lat_col, lon_col], how="left")

def process_platform_a():
    """Main pipeline for `Platform A` data."""
    print("\n--- Processing `Platform A` (Reverse Geocoding) ---")
    df_r123 = load_json_lines(RAW_R123_PATH)
    if df_r123 is None: return None
    
    # CRITICAL FIX: Clean description text *before* saving
    print("Cleaning `Platform A` text fields...")
    df_r123['description'] = df_r123['description'].apply(clean_r123_description)
    df_r123['specs'] = df_r123['specs'].apply(stringify_specs)
    df_r123['price'] = pd.to_numeric(df_r123['price'], errors='coerce')
    
    # Run REVERSE Geocoding
    df_r123 = run_batch_reverse_geocoding(df_r123, 'latitude', 'longitude')
    df_r123['source'] = 'Platform A'
    
    # Save the failsafe file
    df_r123.to_csv(R123_GEOCODED_PATH, index=False, encoding='utf-8-sig')
    print(f"✅ `Platform A` processing complete. Failsafe saved to:")
    print(f"{R123_GEOCODED_PATH}\n")
    return df_r123

In [None]:
# ---
# ## Step 4: `Platform B` - Path 2 (AI Parse + Forward Geocoding)
# ---

print("---")
print("## Step 4: `Platform B` - Path 2 (AI Parse + Forward Geocoding)")
print("Logic: Use a 'One-Shot' AI call to parse text fields, then forward geocode.")

# --- AI "One-Shot" Parser ---
AI_PROMPT_PLATFORM_B_PARSE = """
You are an expert Indonesian real estate data analyst. Your task is to synthesize the best possible geocoding string from the provided data.
1.  Analyze the `id`, `address`, and `description`.
2.  Prioritize specific housing complexes (e.g., "Podomoro Park") or street names.
3.  Include the main sub-district (kecamatan) and city (Bandung).
4.  Remove junk like "rumah dijual", "harga", etc.
5.  **Output *only* the final, clean, comma-separated string.**
"""
def get_ai_geocoding_string(row):
    """Uses a single AI call to parse all text fields."""
    time.sleep(0.5) # Rate limiting
    
    user_input = f"""
    - id: "{row['id_clean']}"
    - address: "{row['address']}"
    - description: "{row['description_clean']}"
    """
    
    try:
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": AI_PROMPT_PLATFORM_B_PARSE},
                {"role": "user", "content": user_input}
            ],
            temperature=0.0
        )
        return response.choices[0].message.content.strip().lower()
    except Exception as e:
        print(f"AI Parse Error: {e}")
        return "api_error"

# --- Forward Geocoding ---
def geocode_forward_google(address):
    """Forward geocodes a single address string."""
    time.sleep(0.02) # Rate limiting
    try:
        full_address = str(address) + ", Bandung, Jawa Barat, Indonesia"
        resp = gmaps_client.geocode(full_address, language="en", components={'country': 'ID'})
        if not resp: return {}
        
        result = resp[0]
        location = result["geometry"]["location"]
        confidence = result["geometry"].get("location_type", "UNKNOWN")
        full_address = result.get("formatted_address", "")
        postcode = ""
        for comp in result.get("address_components", []):
            if "postal_code" in comp.get("types", []):
                postcode = comp.get("short_name", "")
                break
        
        return {
            "latitude": location["lat"],
            "longitude": location["lng"],
            "geo_address": full_address,
            "zipcode": postcode,
            "geo_confidence": confidence
        }
    except Exception as exc:
        print(f"Google API Error (Forward): {exc}", file=sys.stderr)
        return {}

def run_batch_forward_geocoding(df, address_column):
    """Runs Google Forward Geocoding in parallel."""
    unique_addrs = df[address_column].dropna().unique()
    print(f"Starting batch FORWARD geocoding for {len(unique_addrs)} unique addresses...")
    
    results_cache = {}
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_addr = {executor.submit(geocode_forward_google, addr): addr for addr in unique_addrs}
        
        for future in tqdm(as_completed(future_to_addr), total=len(unique_addrs), desc="Forward Geocoding"):
            addr = future_to_addr[future]
            try:
                results_cache[addr] = future.result()
            except Exception as e:
                print(f"Geocoding worker error: {e}")
                results_cache[addr] = {}
                
    print("Batch forward geocoding complete.")
    
    geo_df = pd.DataFrame.from_dict(results_cache, orient="index").reset_index()
    geo_df.rename(columns={"index": address_column}, inplace=True)
    
    return df.merge(geo_df, on=address_column, how="left")

def process_platform_b():
    """Main pipeline for `Platform B` data."""
    print("\n--- Processing `Platform B` (AI Parse + Forward Geocoding) ---")
    df_platform_b = load_json_lines(RAW_PLATFORM_B_PATH)
    if df_platform_b is None: return None
    
    # CRITICAL FIX: Clean text fields *before* AI and saving
    print("Cleaning `Platform B` text fields...")
    df_platform_b['description_clean'] = df_platform_b['description'].apply(clean_platform_b_description)
    df_platform_b['id_clean'] = df_platform_b['id'].apply(clean_platform_b_id)
    df_platform_b['specs'] = df_platform_b['specs'].apply(stringify_specs)
    df_platform_b['price'] = pd.to_numeric(df_platform_b['price'], errors='coerce')
    
    # --- AI Parser Step ---
    print("Running 'One-Shot' AI Parser for `Platform B`...")
    df_platform_b['master_geo_string'] = df_platform_b.progress_apply(get_ai_geocoding_string, axis=1)
    
    # CRITICAL FAILSAFE: Save AI output *before* geocoding
    df_platform_b.to_csv(PLATFORM_B_AI_PARSED_PATH, index=False, encoding='utf-8-sig')
    print(f"Saved AI parse results (internal failsafe) to:")
    print(f"{PLATFORM_B_AI_PARSED_PATH}\n")
    
    # --- Geocoding Step ---
    df_platform_b_parsed = pd.read_csv(PLATFORM_B_AI_PARSED_PATH)
    print(f"Loaded AI parsed file, proceeding with geocoding...")

    df_platform_b_geocoded = run_batch_forward_geocoding(df_platform_b_parsed, "master_geo_string")
    df_platform_b_geocoded['source'] = 'Platform B'
    
    # Save the final failsafe file
    df_platform_b_geocoded.to_csv(PLATFORM_B_GEOCODED_PATH, index=False, encoding='utf-8-sig')
    print(f"✅ `Platform B` processing complete. Failsafe saved to:")
    print(f"{PLATFORM_B_GEOCODED_PATH}\n")
    return df_platform_b_geocoded

print("Defined `Platform B` AI-Parse + forward geocoding pipeline.\n")

In [None]:
# ---
# ## Step 5: MAIN EXECUTION
# ---

if __name__ == "__main__":
    print("==========================================")
    print("### STARTING 01_geocode.ipynb ###")
    print("==========================================")
    
    start_time = time.time()
    
    # Run Path 1
    process_platform_a()
    
    # Run Path 2
    process_platform_b()
    
    end_time = time.time()
    print("------------------------------------------")
    print(f"✅✅✅ 01_geocode.ipynb COMPLETE! ✅✅✅")
    print(f"Total time: {(end_time - start_time) / 60:.2f} minutes.")
    print(f"Two failsafe files have been created in:")
    print(f"{PROCESSED_DIR}")

Geocoding Process Summary

AI Parse (9,680 listings)
- Parsed all 9,680 listings (id, address, description).

Unique Addresses (5,567)
- Produced 9,680 geo strings.
- Found 5,567 unique addresses.
- 4,113 listings were duplicates (e.g., 10 listings → "podomoro park, bojongsoang, bandung").

API Calls (5,567)
- Called Google Maps API 5,567 times instead of 9,680.
- Each unique address geocoded once.

Map Results Back (9,680 listings)
- Results mapped back to all 9,680 listings.
- Duplicates received the same coordinates.

In Short
- All 9,680 listings processed and geocoded.
- Saved 4,113 redundant API calls.
