# Collect data from counties  in CA - LA / Bay

In [5]:
import os
import json
import http.client
import time
from pathlib import Path
from datetime import datetime
from urllib.parse import urlencode
from typing import Optional, Dict, List, Any
from datetime import datetime
import pandas as pd
from dotenv import load_dotenv
import json
import pandas as pd
import re
import google.generativeai as genai
import os

from together import Together

# Load environment variables
load_dotenv('../../config/.env')

True

In [6]:

class RapidAPIFetcher:
    """Enhanced fetcher for US Real Estate Listings API with batch fetching support"""
    
    def __init__(self):
        """Initialize with API credentials"""
        self.api_key = os.getenv('RAPIDAPI_KEY')
        if not self.api_key:
            raise ValueError("RAPIDAPI_KEY not found in .env file")
        
        self.api_host = "us-real-estate-listings.p.rapidapi.com"
        self.headers = {
            'x-rapidapi-key': self.api_key,
            'x-rapidapi-host': self.api_host
        }
        
        # Create data directory
        
        project_root = Path.cwd()
        while project_root.name != "realestatepal" and project_root.parent != project_root:
            project_root = project_root.parent
        
        self.data_path = project_root / "backend_services" / "data" / "raw"
        self.data_path.mkdir(parents=True, exist_ok=True)
    
    def fetch_house_data(self, 
                        location: str,
                        offset: int = 0,
                        limit: int = 200,
                        property_type: str = "single_family",
                        beds_min: Optional[int] = None,
                        price_min: Optional[int] = None,
                        price_max: Optional[int] = None,
                        days_on: Optional[int] = None,
                        sort: str = "relevance") -> Dict[str, Any]:
        """
        Fetch a single batch of properties from RapidAPI
        
        Args:
            location: Location string (e.g., "Fulton County", "Atlanta, GA")
            offset: Pagination offset
            limit: Number of results (max 200)
            property_type: Type of property
            beds_min: Minimum bedrooms
            price_min: Minimum price
            price_max: Maximum price
            days_on: Days on market
            sort: Sort order
            
        Returns:
            Dict with metadata and listings
        """
        # Build query parameters
        params = {
            'location': location,
            'offset': offset,
            'limit': min(limit, 200),  # Ensure we don't exceed API limit
            'property_type': property_type,
            'sort': sort,
            'expand_search_radius': 1
        }
        
        # Add optional parameters
        if beds_min is not None:
            params['beds_min'] = beds_min
        if price_min is not None:
            params['price_min'] = price_min
        if price_max is not None:
            params['price_max'] = price_max
        if days_on is not None:
            params['days_on'] = days_on
        
        # Create query string
        query_string = urlencode(params)
        endpoint = f"/for-sale?{query_string}"
        
        try:
            # Make API request
            print(f"Fetching properties in {location} (offset: {offset}, limit: {params['limit']})...")
            conn = http.client.HTTPSConnection(self.api_host)
            conn.request("GET", endpoint, headers=self.headers)
            
            response = conn.getresponse()
            data = response.read()
            conn.close()
            
            # Parse JSON response
            result = json.loads(data.decode("utf-8"))
            
            # Check if we got properties
            if 'listings' not in result:
                print("No properties found in response")
                return {"metadata": {}, "listings": []}
            
            properties = result['listings']
            total_count = result.get('totalResultCount', len(properties))
            print(f"Found {len(properties)} properties (Total available: {total_count})")
            
            # Prepare data with metadata
            return {
                "metadata": {
                    "fetch_time": datetime.now().strftime("%Y%m%d_%H%M%S"),
                    "location": location,
                    "parameters": params,
                    "total_results": total_count,
                    "fetched_count": len(properties)
                },
                "listings": properties
            }
            
        except Exception as e:
            print(f"Error: {e}")
            return {"metadata": {}, "listings": []}
    
    def fetch_all_houses(self,
                        location: str,
                        property_type: str = "single_family",
                        beds_min: Optional[int] = None,
                        price_min: Optional[int] = None,
                        price_max: Optional[int] = None,
                        days_on: Optional[int] = None,
                        sort: str = "relevance",
                        delay: float = 0.5) -> Dict[str, Any]:
        """
        Fetch ALL properties by automatically handling pagination
        
        Args:
            location: Location string
            property_type: Type of property
            beds_min: Minimum bedrooms
            price_min: Minimum price
            price_max: Maximum price
            days_on: Days on market
            sort: Sort order
            delay: Delay between API calls in seconds
            
        Returns:
            Dict with all combined metadata and listings
        """
        all_listings = []
        offset = 0
        limit = 200  # API maximum
        total_fetched = 0
        batch_count = 0
        total_available = None
        
        print(f"\n{'='*60}")
        print(f"Starting batch fetch for: {location}")
        print(f"{'='*60}\n")
        
        while True:
            batch_count += 1
            print(f"\n--- Batch {batch_count} ---")
            
            # Fetch current batch
            result = self.fetch_house_data(
                location=location,
                offset=offset,
                limit=limit,
                property_type=property_type,
                beds_min=beds_min,
                price_min=price_min,
                price_max=price_max,
                days_on=days_on,
                sort=sort
            )
            
            # Check if we got data
            if not result or 'listings' not in result or len(result['listings']) == 0:
                print("No more data available")
                break
            
            current_batch = result['listings']
            
            # First batch: get total count
            if total_available is None and 'metadata' in result:
                total_available = result['metadata'].get('total_results', 0)
                print(f"\n📊 Total properties available: {total_available}")
            
            # Add to collection
            all_listings.extend(current_batch)
            total_fetched += len(current_batch)
            
            print(f"✅ Batch size: {len(current_batch)}")
            print(f"📈 Total fetched: {total_fetched}")
            
            # Check if we're done
            if len(current_batch) < limit:
                print("\n✓ Last batch received (less than limit)")
                break
            
            if total_available and total_fetched >= total_available:
                print("\n✓ All available properties fetched")
                break
            
            # Prepare for next batch
            offset += limit
            
            # Rate limiting
            time.sleep(delay)
        
        print(f"\n{'='*60}")
        print(f"✨ Fetch complete!")
        print(f"📊 Total properties fetched: {total_fetched}")
        print(f"📦 Total batches: {batch_count}")
        print(f"{'='*60}\n")
        
        # Return combined result
        return {
            "metadata": {
                "fetch_time": datetime.now().strftime("%Y%m%d_%H%M%S"),
                "location": location,
                "parameters": {
                    "location": location,
                    "property_type": property_type,
                    "beds_min": beds_min,
                    "price_min": price_min,
                    "price_max": price_max,
                    "days_on": days_on,
                    "sort": sort
                },
                "total_results": total_available or total_fetched,
                "fetched_count": total_fetched,
                "batch_count": batch_count
            },
            "listings": all_listings
        }
    
    def save_data(self, data: Dict[str, Any], filename_suffix: str = "complete") -> str:
        """
        Save fetched data to JSON and optionally CSV
        
        Args:
            data: Data dictionary with metadata and listings
            filename_suffix: Suffix for filename (e.g., "complete", "batch")
            
        Returns:
            Path to saved JSON file
        """
        if not data or 'listings' not in data:
            print("No data to save")
            return ""
        
        # Generate filename
        location = data['metadata'].get('location', 'unknown')
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        location_clean = location.replace(" ", "_").replace(",", "").lower()
        
        # Save JSON
        json_filename = f"{timestamp}_{location_clean}_{filename_suffix}.json"
        json_filepath = self.data_path / json_filename
        
        with open(json_filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        print(f"✅ JSON saved to: {json_filepath}")
        
        # Optional: Save as CSV
        try:
            
            if data['listings']:
                df = pd.json_normalize(data['listings'])
                csv_filename = f"{timestamp}_{location_clean}_{filename_suffix}.csv"
                csv_filepath = self.data_path / csv_filename
                df.to_csv(csv_filepath, index=False)
                print(f"✅ CSV saved to: {csv_filepath}")
        except ImportError:
            print("ℹ️ pandas not installed, skipping CSV export")
        except Exception as e:
            print(f"⚠️ Error saving CSV: {e}")
        
        return str(json_filepath)
    
    def fetch_and_save_all(self,
                          location: str,
                          property_type: str = "single_family",
                          beds_min: Optional[int] = None,
                          price_min: Optional[int] = None,
                          price_max: Optional[int] = None,
                          days_on: Optional[int] = None,
                          sort: str = "relevance") -> str:
        """
        Convenience method to fetch all data and save it
        
        Args:
            Same as fetch_all_houses
            
        Returns:
            Path to saved file
        """
        # Fetch all data
        all_data = self.fetch_all_houses(
            location=location,
            property_type=property_type,
            beds_min=beds_min,
            price_min=price_min,
            price_max=price_max,
            days_on=days_on,
            sort=sort
        )
        #dedup listings 
        # Save to file
        saved_path = self.save_data(all_data, "complete")
        
        return saved_path


In [7]:

# ========== 使用示例 - 每个县单独存储 ==========

fetcher = RapidAPIFetcher()

# Two counties in LA and 6 in bay area
locations = ["Los Angeles County, CA", "Orange County, CA", 
             "San Francisco County, CA", "Alameda County, CA", 
             "San Mateo County, CA", "Santa Clara County, CA",
             "Marin County, CA", "Contra Costa County, CA"]

# Common search parameters for all counties
search_parameters = {
    "property_type": "single_family",
    "beds_min": 2,
    "price_min": 700000,
    "price_max": 7000000,  # Focus on 700k-7m
    "days_on": 90,
    "sort": "relevance"
}

# Track overall statistics
overall_stats = {
    "fetch_time": datetime.now().strftime("%Y%m%d_%H%M%S"),
    "total_locations": len(locations),
    "search_parameters": search_parameters,
    "location_results": {},
    "total_properties": 0,
    "saved_files": []
}

print(f"\n{'='*70}")
print("🏡 Starting Data Collection for CA Counties")
print(f"📍 Total counties: {len(locations)}")
print(f"💰 Price range: ${search_parameters['price_min']:,} - ${search_parameters['price_max']:,}")
print(f"🛏️  Min bedrooms: {search_parameters['beds_min']}")
print(f"📅 Days on market: {search_parameters['days_on']}")
print(f"{'='*70}")

# Process each county separately
for i, location in enumerate(locations, 1):
    print(f"\n{'='*50}")
    print(f"📍 Processing County {i}/{len(locations)}: {location}")
    print(f"{'='*50}")
    
    try:
        # Fetch data for this county
        location_data = fetcher.fetch_all_houses(
            location=location,
            **search_parameters
        )
        
        # Add search location identifier to each listing
        for listing in location_data['listings']:
            listing['search_location'] = location
        
        # Save individual county data
        county_name = location.replace(" County, CA", "").replace(" ", "_").lower()
        saved_file = fetcher.save_data(location_data, f"{county_name}_county")
        
        # Update overall statistics
        county_stats = {
            "fetched_count": len(location_data['listings']),
            "total_results": location_data['metadata'].get('total_results', 0),
            "batch_count": location_data['metadata'].get('batch_count', 0),
            "saved_file": saved_file
        }
        
        overall_stats['location_results'][location] = county_stats
        overall_stats['total_properties'] += county_stats['fetched_count']
        overall_stats['saved_files'].append(saved_file)
        
        print(f"\n✅ {location} completed:")
        print(f"   📊 Properties fetched: {county_stats['fetched_count']}")
        print(f"   💾 Saved to: {saved_file}")
        
    except Exception as e:
        print(f"\n❌ Error processing {location}: {str(e)}")
        overall_stats['location_results'][location] = {
            "error": str(e),
            "fetched_count": 0
        }

# Save overall summary
summary_file = fetcher.data_path / f"{overall_stats['fetch_time']}_CA_counties_summary.json"
with open(summary_file, 'w', encoding='utf-8') as f:
    json.dump(overall_stats, f, indent=2, ensure_ascii=False)

# Final summary
print(f"\n{'='*70}")
print("🎉 Data Collection Complete!")
print(f"{'='*70}")
print(f"📊 Overall Statistics:")
print(f"   🏠 Total properties collected: {overall_stats['total_properties']:,}")
print(f"   🗂️  Counties processed: {len([r for r in overall_stats['location_results'].values() if 'error' not in r])}/{len(locations)}")
print(f"   💾 Files created: {len(overall_stats['saved_files'])}")
print(f"\n📁 Individual County Results:")

for location, stats in overall_stats['location_results'].items():
    if 'error' not in stats:
        print(f"   • {location}: {stats['fetched_count']:,} properties")
    else:
        print(f"   • {location}: ❌ Error - {stats['error']}")

print(f"\n📋 Summary saved to: {summary_file}")
print(f"📂 All files saved in: {fetcher.data_path}")
print(f"{'='*70}")

# List all saved files for easy reference
print(f"\n📄 Generated Files:")
for file_path in overall_stats['saved_files']:
    file_name = Path(file_path).name
    file_size = Path(file_path).stat().st_size / (1024*1024)  # Convert to MB
    print(f"   • {file_name} ({file_size:.1f} MB)")
print(f"   • {Path(summary_file).name} (Summary)")
print(f"\n💡 Tip: Each county's data is now stored separately, making it easier to manage and upload to GitHub!")


🏡 Starting Data Collection for CA Counties
📍 Total counties: 8
💰 Price range: $700,000 - $7,000,000
🛏️  Min bedrooms: 2
📅 Days on market: 90

📍 Processing County 1/8: Los Angeles County, CA

Starting batch fetch for: Los Angeles County, CA


--- Batch 1 ---
Fetching properties in Los Angeles County, CA (offset: 0, limit: 200)...
Found 200 properties (Total available: 10243)

📊 Total properties available: 10243
✅ Batch size: 200
📈 Total fetched: 200

--- Batch 2 ---
Fetching properties in Los Angeles County, CA (offset: 200, limit: 200)...
Found 200 properties (Total available: 10243)
✅ Batch size: 200
📈 Total fetched: 400

--- Batch 3 ---
Fetching properties in Los Angeles County, CA (offset: 400, limit: 200)...
Found 200 properties (Total available: 10243)
✅ Batch size: 200
📈 Total fetched: 600

--- Batch 4 ---
Fetching properties in Los Angeles County, CA (offset: 600, limit: 200)...
Found 200 properties (Total available: 10243)
✅ Batch size: 200
📈 Total fetched: 800

--- Batch 5 --

# 🔧 Post-Processing: Filter LA County Data

Remove houses priced 5M-7M from LA County data to reduce file size for GitHub upload.


In [None]:
# Filter LA County data to remove 5M-7M houses
import json
import pandas as pd
from pathlib import Path
from datetime import datetime

def filter_la_county_data():
    """
    Filter LA County data to remove houses priced between 5M-7M
    This reduces file size for easier GitHub upload
    """
    
    # File paths
    original_file = Path("/Users/shuai/Desktop/realestatepal/realestatepal/backend_services/data/raw/20250822_113940_los_angeles_county_ca_los_angeles_county.json")
    original_csv = Path("/Users/shuai/Desktop/realestatepal/realestatepal/backend_services/data/raw/20250822_113940_los_angeles_county_ca_los_angeles_county.csv")
    
    print(f"🔍 Processing: {original_file.name}")
    print(f"📁 Original file size: {original_file.stat().st_size / (1024*1024):.1f} MB")
    
    # Load the original data
    with open(original_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    original_count = len(data['listings'])
    print(f"📊 Original listings count: {original_count:,}")
    
    # Filter out houses priced 5M-7M (5,000,000 - 7,000,000)
    filtered_listings = []
    removed_count = 0
    price_distribution = {'under_1M': 0, '1M_2M': 0, '2M_3M': 0, '3M_4M': 0, '4M_5M': 0, '5M_7M': 0, 'unknown': 0}
    
    for listing in data['listings']:
        list_price = listing.get('list_price')
        
        if list_price is None:
            # Keep listings without price info
            filtered_listings.append(listing)
            price_distribution['unknown'] += 1
        elif list_price >= 5000000 and list_price <= 7000000:
            # Remove listings in 5M-7M range
            removed_count += 1
            price_distribution['5M_7M'] += 1
        else:
            # Keep all other listings
            filtered_listings.append(listing)
            
            # Track price distribution for remaining houses
            if list_price < 1000000:
                price_distribution['under_1M'] += 1
            elif list_price < 2000000:
                price_distribution['1M_2M'] += 1
            elif list_price < 3000000:
                price_distribution['2M_3M'] += 1
            elif list_price < 4000000:
                price_distribution['3M_4M'] += 1
            elif list_price < 5000000:
                price_distribution['4M_5M'] += 1
    
    # Update the data structure
    data['listings'] = filtered_listings
    
    # Update metadata
    data['metadata']['filtered_info'] = {
        'filter_applied': True,
        'filter_date': datetime.now().strftime("%Y%m%d_%H%M%S"),
        'filter_reason': 'Remove houses priced 5M-7M to reduce file size',
        'original_count': original_count,
        'filtered_count': len(filtered_listings),
        'removed_count': removed_count,
        'price_max_after_filter': 4999999  # New effective max price
    }
    
    # Update search parameters to reflect the change
    data['metadata']['parameters']['price_max'] = 4999999
    data['metadata']['parameters']['note'] = 'Post-filtered to remove 5M-7M properties'
    
    # Create new filenames
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    new_json_file = original_file.parent / f"{timestamp}_los_angeles_county_ca_filtered.json"
    new_csv_file = original_file.parent / f"{timestamp}_los_angeles_county_ca_filtered.csv"
    
    # Save filtered JSON
    with open(new_json_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    # Save filtered CSV
    if filtered_listings:
        df = pd.json_normalize(filtered_listings)
        df.to_csv(new_csv_file, index=False)
    
    # Print results
    print(f"\n{'='*60}")
    print("🎯 Filtering Results:")
    print(f"{'='*60}")
    print(f"❌ Removed (5M-7M): {removed_count:,} properties")
    print(f"✅ Kept: {len(filtered_listings):,} properties")
    print(f"📉 Reduction: {(removed_count/original_count)*100:.1f}%")
    
    print(f"\n📊 Price Distribution (Remaining Properties):")
    for price_range, count in price_distribution.items():
        if price_range != '5M_7M' and count > 0:
            print(f"   • {price_range}: {count:,}")
    
    print(f"\n💾 New Files Created:")
    new_json_size = new_json_file.stat().st_size / (1024*1024)
    new_csv_size = new_csv_file.stat().st_size / (1024*1024)
    print(f"   • JSON: {new_json_file.name} ({new_json_size:.1f} MB)")
    print(f"   • CSV:  {new_csv_file.name} ({new_csv_size:.1f} MB)")
    
    original_json_size = original_file.stat().st_size / (1024*1024)
    size_reduction = ((original_json_size - new_json_size) / original_json_size) * 100
    print(f"\n📉 File Size Reduction: {size_reduction:.1f}% (from {original_json_size:.1f}MB to {new_json_size:.1f}MB)")
    
    if new_json_size < 100:
        print("✅ New file size is under 100MB - GitHub compatible!")
    else:
        print("⚠️ File still over 100MB - may need further filtering")
    
    return new_json_file, new_csv_file

# Execute the filtering
filtered_json, filtered_csv = filter_la_county_data()


In [None]:
# # Additional filtering if file is still too large
# def aggressive_filter_if_needed(json_file_path, target_size_mb=95):
#     """
#     Apply more aggressive filtering if the file is still too large
#     This will remove properties above 3M to further reduce file size
#     """
    
#     file_path = Path(json_file_path)
#     current_size_mb = file_path.stat().st_size / (1024*1024)
    
#     if current_size_mb <= target_size_mb:
#         print(f"✅ File size ({current_size_mb:.1f}MB) is already under target ({target_size_mb}MB)")
#         return json_file_path
    
#     print(f"⚠️ File still too large ({current_size_mb:.1f}MB). Applying aggressive filtering...")
    
#     # Load the current filtered data
#     with open(file_path, 'r', encoding='utf-8') as f:
#         data = json.load(f)
    
#     original_count = len(data['listings'])
#     print(f"📊 Current listings count: {original_count:,}")
    
#     # Filter out houses priced above 3M
#     aggressive_filtered_listings = []
#     removed_count = 0
    
#     for listing in data['listings']:
#         list_price = listing.get('list_price')
        
#         if list_price is None:
#             # Keep listings without price info
#             aggressive_filtered_listings.append(listing)
#         elif list_price > 3000000:
#             # Remove listings above 3M
#             removed_count += 1
#         else:
#             # Keep listings under 3M
#             aggressive_filtered_listings.append(listing)
    
#     # Update the data structure
#     data['listings'] = aggressive_filtered_listings
    
#     # Update metadata
#     if 'filtered_info' not in data['metadata']:
#         data['metadata']['filtered_info'] = {}
    
#     data['metadata']['filtered_info'].update({
#         'aggressive_filter_applied': True,
#         'aggressive_filter_date': datetime.now().strftime("%Y%m%d_%H%M%S"),
#         'aggressive_filter_reason': 'Remove houses above 3M for GitHub compatibility',
#         'count_before_aggressive': original_count,
#         'count_after_aggressive': len(aggressive_filtered_listings),
#         'removed_by_aggressive': removed_count,
#         'price_max_after_aggressive': 3000000
#     })
    
#     # Update search parameters
#     data['metadata']['parameters']['price_max'] = 3000000
#     data['metadata']['parameters']['note'] = 'Aggressively filtered: removed 3M+ properties for GitHub'
    
#     # Create new filename
#     timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
#     aggressive_json_file = file_path.parent / f"{timestamp}_los_angeles_county_ca_github_ready.json"
#     aggressive_csv_file = file_path.parent / f"{timestamp}_los_angeles_county_ca_github_ready.csv"
    
#     # Save aggressively filtered JSON
#     with open(aggressive_json_file, 'w', encoding='utf-8') as f:
#         json.dump(data, f, indent=2, ensure_ascii=False)
    
#     # Save aggressively filtered CSV
#     if aggressive_filtered_listings:
#         df = pd.json_normalize(aggressive_filtered_listings)
#         df.to_csv(aggressive_csv_file, index=False)
    
#     # Check final size
#     final_size_mb = aggressive_json_file.stat().st_size / (1024*1024)
    
#     print(f"\n{'='*60}")
#     print("🔥 Aggressive Filtering Results:")
#     print(f"{'='*60}")
#     print(f"❌ Additional removed (3M+): {removed_count:,} properties")
#     print(f"✅ Final count: {len(aggressive_filtered_listings):,} properties")
#     print(f"📉 Total reduction: {((original_count - len(aggressive_filtered_listings))/original_count)*100:.1f}%")
    
#     print(f"\n💾 GitHub-Ready Files:")
#     print(f"   • JSON: {aggressive_json_file.name} ({final_size_mb:.1f} MB)")
#     print(f"   • CSV:  {aggressive_csv_file.name} ({aggressive_csv_file.stat().st_size / (1024*1024):.1f} MB)")
    
#     if final_size_mb < 100:
#         print("✅ SUCCESS! File is now GitHub compatible!")
#     else:
#         print("⚠️ Still too large - consider further filtering or splitting")
    
#     return aggressive_json_file

# # Check if we need aggressive filtering
# print("\\n" + "="*60)
# print("🔍 Checking if additional filtering is needed...")
# print("="*60)

# try:
#     final_file = aggressive_filter_if_needed(filtered_json)
#     print(f"\\n🎉 Final processed file: {final_file}")
# except Exception as e:
#     print(f"❌ Error in aggressive filtering: {e}")
#     print(f"📁 Using previous filtered file: {filtered_json}")


In [None]:
##单独处理一下，把20250822_113940_los_angeles_county_ca_los_angeles_county.json 还有相应csv 所有5m - 7m 的house去除掉
